I recently found myself needing to continually export SQL data to my Elasticsearch cluster. All of my logging data (webserver access logs, syslog, security logs, etc…) is shipped through an ELK stack (Elasticsearch, Logstash, Kibana), but I still had data that was relevant to my logs being stored by applications that I work with in various databases. Googling this subject led me down a number of paths, none of which provided the results I was looking for. I needed the ability to run a query, and insert the results into Elasticsearch, without having duplicate documents when running the query on a schedule. After reading the docs a bit, I came up with a solution.
TLDR: Set the document_id
in your Logstash elasticsearch
output
Walkthrough
There’s a few things to this to build something fairly robust, here’s the details:
- Logstash JDBC input plugin
- Logstash filter plugins to process your data
- Logstash Elasticsearch output plugin with the
document_id
option set
For this example, I exported some NGINX logs to a MySQL database. The data looks like this:
mysql> select * from access LIMIT 1 \G
*************************** 1. row ***************************
Host: 88.198.22.8
timestamp: 2017-02-07 06:45:07
timezone: GMT-0500
verb: GET
request: /robots.txt
response: 301
bytes: 178
referer: -
agent: Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET4.0C; Media Center PC 6.0; InfoPath.2; .NET4.0E)
id: 1
1 row in set (0.00 sec)
ELK setup
Usually when I’m testing Logstash filters, I’ll set up an ELK stack using Docker containers, you can replicate this setup using the following commands:
docker pull elasticsearch
docker pull logstash
docker pull kibana
docker run -d --name elasticsearch elasticsearch
docker run -d -t --name logstash --link elasticsearch -v $(pwd):/etc/logstash logstash logstash -f /etc/logstash/logstash.conf
docker run -d --name kibana --link elasticsearch -p 5601:5601 -e ELASTICSEACH_URL=http://elasticsearch:9200 kibana
When running your logstash container, you can view the logs with docker logs logstash
. This is handy if you’re testing with the stdout
plugin.
Logstash input
We’ll start with the JDBC input to get the logs from the database. The input configuration looks like this:
input {
jdbc {
jdbc_driver_library => "/etc/logstash/mysql-connector-java-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306/logs?zeroDateTimeBehavior=convertToNull"
jdbc_user => "root"
jdbc_password => "secret"
schedule => "* * * * *"
statement => "SELECT * from access;"
type => "db-logs-access"
}
}
There’s a couple things worth noting here:
- You need to grab a copy of the JDBC driver .jar file for your database platform, in this case I’m using the MySQL driver that can be found on Oracle’s Website.
- You’ll notice this at the end of my connection string:
?zeroDateTimeBehavior=convertToNull
. This is a cool query parameter which helps with handling null/zero DATETIME fields, setting the value to null rather than throwing an exception (the default behavior). schedule
: This is a cron formatted string that specifies the schedule for this input. In this example it is set to every minute.
Logstash filter
The filter for this should look familiar if you have experience with Logstash, else I recommend check out the docs.
filter {
if [type] == "db-logs-access" {
# Set logstash timestamp to the timestamp in the row and rename host column
mutate {
rename => {
"timestamp" => "@timestamp"
"host" => "clientip"
}
}
useragent {
source => "agent"
}
geoip {
source => "clientip"
}
}
}
Logstash output
Finally, we have the Elasticsearch output.
output {
if [type] == "db-logs-access" {
elasticsearch {
hosts => ["elasticsearch:9200"]
document_id => "db-logs-access-%{id}"
}
}
}
The key thing to note here is the document_id
. This option ensures that we can ship the same rows to Elasticsearch without creating new events. This is helpful if you want to pick up updates to rows or if you’re following inserts to the DB and overlapping our time interval so that you don’t miss inserts.
The results
After combining the Logstash input
, filter
, and output
in our logstash.conf
or adding them as separate configs to the conf.d
directory, we should be ready to rock. Here’s some examples of the data in Kibana: