2018-11-17

Building a real-time dashboard using Kafka & ElasticStack



In this article I will demonstrate how to build a real-time dashboard thanks to the ElasticStack, fed by a CSV and a continuous flow coming from Kafka.
The CSV contains the description of some hotels, including their names and their GPS coordinates.
The Kafka flow simulates comments and ratings about the hotels, coming from imaginary customers.
Like Kafka Streams, the principle is to enrich data coming from this Kafka flow with the data contained in the CSV file.

About what you need


For that article, I used :
  • Confluent OpenSource 5.0.1
  • ElasticStack 6.5.0
  • Mockaroo
  • cURL

Generating a consistent dataset


To generate meaningful data, I'm using Mockaroo (a big thank to Robin Moffatt's article for pointing out that excellent website).
I strongly encourage you to register : you'll be able to save your work and to consume your data exposed via a REST endpoint (which will prove to be very useful).
I created two schemas, one for the CSV, containing the hotel information and another one for the comments/ratings.


Nothing complex about the hotel schema : id / name / latitude / longitude.
I created 50 sample rows and then exported them to a CSV file.
The ratings schema requires a bit more explanation.

To generate credible comments, such as :
My wife and I were delighted about the restaurant
or
The kids were disappointed about the pool
I used Mockaroo functionality called "Custom List" to create some parts of my sentence and I then assembled all of them into a comment.


That did the trick. But I was facing a little problem about the rating : as I first selected a random value between 1 and 10, it wasn't exactly related to the comment : a person who says he/she is amazed wouldn't normally rate the hotel a 1 or 2 out of 10 !
Mockaroo has another very cool feature called "Scenario" which will allow you to set a data according to the context. In my case, I chose to set some ratings based on the adjective.


I generated 1000 comments which were about the 50 hotels that we created earlier.
Make sure you have chosen the JSON format and that you disabled the "array" checkbox.
You should have comments like this one :
{
    "first_name": "Kaycee",
    "last_name": "Doring",
    "email": "kdoringrd@hhs.gov",
    "gender": "F",
    "age": 61,
    "hotel_id": 37,
    "subject": "The children were",
    "level": "pretty",
    "satisfaction": "satisfied",
    "place": "activities",
    "rating": 6,
    "comment": "The children were pretty satisfied about the activities"
}
Okay, now our data is ready to be used.

Sending CSV data to ElasticSearch


To send our CSV data into ElasticSearch, I used the powerful Logstash and its CSV module.
input { 
    file {
        path => "20181115_HotelRatings/data/Hotels.csv"
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }    
}

filter {
    csv {
        skip_header => "true"
        columns => ["id", "name", "latitude", "longitude"]
    }
    mutate {
        rename => {
            "latitude"  => "[location][lat]"
            "longitude" => "[location][lon]"
        }
        convert => {
            "[location][lat]" => "float"
            "[location][lon]" => "float"
        }
        remove_field => [
            "host",
            "@version",
            "message",
            "path"
        ]
    }
}

output {
    elasticsearch { 
        template => "20181115_HotelRatings/esk/hotels_mapping.json"
        template_overwrite => "true"
        index => "logstash-ratings-hotels"
        hosts => ["localhost:9200"] 
    }
    stdout { codec => rubydebug }
}
To be able to use the location as a GeoPoint, you have to use a custom mapping because it's an ElasticSearch concept, completely unknown to Logtash.
The super trick to keep in mind is to use the tag :
        template_overwrite => "true"
Else you could get stuck, wondering why your custom mapping is not taken into account. Super easy when you know it, but before that, it could be a bit foggy.
To create your custom mapping, just follow the excellent (and only) tutorial provided by Aaron Mildenstein

To summarize, you first have to run the CSV import, so that ElasticSearch creates a default mapping. Then you get the mapping, update it and reference it in your logstash conf file.
Then, you can relaunch the CSV injection.
{
  "template": "logstash-ratings-hotels*",
  "version": 1,
  "settings": {
    "index.refresh_interval": "5s"
  },
  "mappings": {
    "_default_": {
      "properties": {
        "id": {
          "type": "text",
          "norms": false,
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "location": {
          "type": "geo_point"
        },
        "name": {
          "type": "text",
          "norms": false,
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        }
      }
    }
  }
}
If everything went fine, you should see your updated mapping with the location as type "geo_point". To check if everything is in order, create a Coordinate Map in Kibana, mapped on your index and you should have some data.


Sending Kafka stream of events to ElasticSearch


My first idea was to use the Kafka Connect for ElasticSearch. But after I got it working, I wasn't really happy with several facts :
  • It's quite complex
  • It uses Avro
  • No filtering before the sending
I then took a look at the Kafka input login of Logstash and all the points described above vanished in an instant ! Super easy, works well and fast setup !
Moreover, I feel more comfortable with the fact that my central nervous system (Kafka) knows nothing about the producers / consumers.
To me, it's a basic principle of loose coupling.
input { 
    kafka {
        bootstrap_servers => "localhost:9092"
        topics => [
            "ratings"
        ]
    }    
}

filter {
    json {
        source => "message"
    }
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "logstash-ratings-hotels"
        query => "id:%{hotel_id}"
        fields => { 
            "location" => "location"
            "name" => "hotel_name"
        }
    }
    mutate {
        remove_field => [
            "host",
            "message",
            "@version",
            "path",
            "subject",
            "satisfaction",
            "level",
            "place",
            "hotel_id"
        ]
    }
}

output {
    elasticsearch { 
        template => "20181115_HotelRatings/esk/comments_mapping.json"
        index => "logstash-ratings-comments"
        template_overwrite => "true"
        hosts => ["localhost:9200"] 
    }
    stdout { codec => rubydebug }
}
I have to explain a bit what's going on here.
The input section is simple enough. Then in the filter section, I describe the content of the message as JSON, which will index each pair of key/value.
The elasticsearch plugin is used to retrieve data previously indexed in my elastic cluster. I just have to define which index I want to get data from and in the query field, the join condition. And once the join is done, I have to specify the fields I want to get copied in my record.

To be more concrete, each time a new comment record is received from Kafka, we isolate the hotel_id and look for the corresponding id in the index logstash-ratings-hotels. Once found, we copy the field location and name, to location and hotel_name, respectively.

The mutate filter is used to remove all the unnecessary fields (all the fields used to create the final comment for instance).

The mechanism to transform a classical data into a geo_point is exactly the same as the one described above : through a custom mapping.
Here's the custom mapping for the comments.
{
  "template": "logstash-ratings-comments*",
  "version": 1,
  "settings": {
    "index.refresh_interval": "5s"
  },
  "mappings": {
    "_default_": {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "age": {
          "type": "long"
        },
        "comment": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "email": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "first_name": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "gender": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "location": {
          "type": "geo_point"
        },
        "hotel_name": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "last_name": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "rating": {
          "type": "long"
        }
      }
    }
  }
}

Designing our dashboard


Before sending our stream of data, it's interesting to send only one record. That way, ElasticSearch will create an index, which will be used for creating a Kibana index.
From that Kibana Index, we will be able to design our visualizations and create a dashboard. First of all, we've got to summarize the information we have.
When we sent the sample record, the output Ruby plugin wrote it on the Logstash standard out.
{
    "first_name" => "Kaycee",
         "email" => "kdoringrd@hhs.gov",
      "location" => {
        "lat" => 21.938596,
        "lon" => 110.552977
    },
     "last_name" => "Doring",
        "rating" => 6,
    "@timestamp" => 2018-11-17T13:16:22.120Z,
           "age" => 61,
        "gender" => "F",
    "hotel_name" => "Konopelski Inc",
       "comment" => "The children were pretty satisfied about the activities"
}
I'll exploit 4 elements :
  • Hotel location => as a map
  • Hotel Name & rating => Top 20 of the best rated hotels
  • Age of the person => Age donut
  • Gender => Gender pie

Now the dashboard is ready, it's time to send some data !

Streaming Mockaroo JSON data to Kafka


I created a simple topic ratings, with only one partition. This time, I'm not interested in scaling (that will be the subject of another blogpost)
kafka-topics --create --zookeeper localhost:2181 --topic ratings --partitions 1 --replication-factor 1
I could use Kafkacat to stream the data from the REST endpoint to my Kafka topic, but I prefer to use the platform, as Adam Bien would say :)
In order to see a progressive update of the dashboard, I used awk to send the messages at a slower rate.
curl -s "https://api.mockaroo.com/api/fb43c4e0?count=1000&key=0673f940" \
| awk '{print $0; system("sleep 1");}' \
| kafka-console-producer --broker-list localhost:9092 --topic ratings
That way, it will send a message every second.

Results & Live demonstration


You can see a sample of what it will be like.

And here's the dashboard, once the 1000 comments have been processed.

Source and references


No comments: