Exporting SQL Data to Elasticsearch with Logstash

I recently found myself needing to continually export SQL data to my Elasticsearch cluster. All of my logging data (webserver access logs, syslog, security logs, etc…) is shipped through an ELK stack (Elasticsearch, Logstash, Kibana), but I still had data that was relevant to my logs being stored by applications that I work with in various databases. Googling this subject led me down a number of paths, none of which provided the results I was looking for. I needed the ability to run a query, and insert the results into Elasticsearch, without having duplicate documents when running the query on a schedule. After reading the docs a bit, I came up with a solution.

TLDR: Set the document_id in your Logstash elasticsearch output

Walkthrough

There’s a few things to this to build something fairly robust, here’s the details:

  1. Logstash JDBC input plugin
  2. Logstash filter plugins to process your data
  3. Logstash Elasticsearch output plugin with the document_id option set

For this example, I exported some NGINX logs to a MySQL database. The data looks like this:

mysql> select * from access LIMIT 1 \G
*************************** 1. row ***************************
     Host: 88.198.22.8
timestamp: 2017-02-07 06:45:07
 timezone: GMT-0500
     verb: GET
  request: /robots.txt
 response: 301
    bytes: 178
  referer: -
    agent: Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET4.0C; Media Center PC 6.0; InfoPath.2; .NET4.0E)
       id: 1
1 row in set (0.00 sec)
ELK setup

Usually when I’m testing Logstash filters, I’ll set up an ELK stack using Docker containers, you can replicate this setup using the following commands:

  1. docker pull elasticsearch
  2. docker pull logstash
  3. docker pull kibana
  4. docker run -d --name elasticsearch elasticsearch
  5. docker run -d -t --name logstash --link elasticsearch -v $(pwd):/etc/logstash logstash logstash -f /etc/logstash/logstash.conf
  6. docker run -d --name kibana --link elasticsearch -p 5601:5601 -e ELASTICSEACH_URL=http://elasticsearch:9200 kibana

When running your logstash container, you can view the logs with docker logs logstash. This is handy if you’re testing with the stdout plugin.

Logstash input

We’ll start with the JDBC input to get the logs from the database. The input configuration looks like this:

input {
  jdbc {
    jdbc_driver_library => "/etc/logstash/mysql-connector-java-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://mysql:3306/logs?zeroDateTimeBehavior=convertToNull"
    jdbc_user => "root"
    jdbc_password => "secret"
    schedule => "* * * * *"
    statement => "SELECT * from access;"
    type => "db-logs-access"
  }
}

There’s a couple things worth noting here:

  1. You need to grab a copy of the JDBC driver .jar file for your database platform, in this case I’m using the MySQL driver that can be found on Oracle’s Website.
  2. You’ll notice this at the end of my connection string:
    ?zeroDateTimeBehavior=convertToNull. This is a cool query parameter which helps with handling null/zero DATETIME fields, setting the value to null rather than throwing an exception (the default behavior).
  3. schedule: This is a cron formatted string that specifies the schedule for this input. In this example it is set to every minute.
Logstash filter

The filter for this should look familiar if you have experience with Logstash, else I recommend check out the docs.

filter {
  if [type] == "db-logs-access" {

    # Set logstash timestamp to the timestamp in the row and rename host column
    mutate {
      rename => {
        "timestamp" => "@timestamp"
        "host" => "clientip"
      }
    }
    useragent {
      source => "agent"
    }
    geoip {
      source => "clientip"
    }
  }
}
Logstash output

Finally, we have the Elasticsearch output.

output {
  if [type] == "db-logs-access" {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      document_id => "db-logs-access-%{id}"
    }
  }
}

The key thing to note here is the document_id. This option ensures that we can ship the same rows to Elasticsearch without creating new events. This is helpful if you want to pick up updates to rows or if you’re following inserts to the DB and overlapping our time interval so that you don’t miss inserts.

The results

After combining the Logstash input, filter, and output in our logstash.conf or adding them as separate configs to the conf.d directory, we should be ready to rock. Here’s some examples of the data in Kibana: Kibana Discover View Kibana Dashboard View