In this article I will demonstrate how to build a real-time dashboard thanks to the ElasticStack, fed by a CSV and a continuous flow coming from Kafka.
The CSV contains the description of some hotels, including their names and their GPS coordinates.
The Kafka flow simulates comments and ratings about the hotels, coming from imaginary customers.
Like Kafka Streams, the principle is to enrich data coming from this Kafka flow with the data contained in the CSV file.
About what you need
For that article, I used :
- Confluent OpenSource 5.0.1
- ElasticStack 6.5.0
- Mockaroo
- cURL
Generating a consistent dataset
To generate meaningful data, I'm using Mockaroo (a big thank to Robin Moffatt's article for pointing out that excellent website).
I strongly encourage you to register : you'll be able to save your work and to consume your data exposed via a REST endpoint (which will prove to be very useful).
I created two schemas, one for the CSV, containing the hotel information and another one for the comments/ratings.
Nothing complex about the hotel schema : id / name / latitude / longitude.
I created 50 sample rows and then exported them to a CSV file.
The ratings schema requires a bit more explanation.
To generate credible comments, such as :
My wife and I were delighted about the restaurant or The kids were disappointed about the pool
That did the trick. But I was facing a little problem about the rating : as I first selected a random value between 1 and 10, it wasn't exactly related to the comment : a person who says he/she is amazed wouldn't normally rate the hotel a 1 or 2 out of 10 !
Mockaroo has another very cool feature called "Scenario" which will allow you to set a data according to the context. In my case, I chose to set some ratings based on the adjective.
I generated 1000 comments which were about the 50 hotels that we created earlier.
Make sure you have chosen the JSON format and that you disabled the "array" checkbox.
You should have comments like this one :
{ "first_name": "Kaycee", "last_name": "Doring", "email": "kdoringrd@hhs.gov", "gender": "F", "age": 61, "hotel_id": 37, "subject": "The children were", "level": "pretty", "satisfaction": "satisfied", "place": "activities", "rating": 6, "comment": "The children were pretty satisfied about the activities" }
Sending CSV data to ElasticSearch
To send our CSV data into ElasticSearch, I used the powerful Logstash and its CSV module.
input { file { path => "20181115_HotelRatings/data/Hotels.csv" start_position => "beginning" sincedb_path => "/dev/null" } } filter { csv { skip_header => "true" columns => ["id", "name", "latitude", "longitude"] } mutate { rename => { "latitude" => "[location][lat]" "longitude" => "[location][lon]" } convert => { "[location][lat]" => "float" "[location][lon]" => "float" } remove_field => [ "host", "@version", "message", "path" ] } } output { elasticsearch { template => "20181115_HotelRatings/esk/hotels_mapping.json" template_overwrite => "true" index => "logstash-ratings-hotels" hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
The super trick to keep in mind is to use the tag :
template_overwrite => "true"
To create your custom mapping, just follow the excellent (and only) tutorial provided by Aaron Mildenstein
To summarize, you first have to run the CSV import, so that ElasticSearch creates a default mapping. Then you get the mapping, update it and reference it in your logstash conf file.
Then, you can relaunch the CSV injection.
{ "template": "logstash-ratings-hotels*", "version": 1, "settings": { "index.refresh_interval": "5s" }, "mappings": { "_default_": { "properties": { "id": { "type": "text", "norms": false, "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "location": { "type": "geo_point" }, "name": { "type": "text", "norms": false, "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } } }
Sending Kafka stream of events to ElasticSearch
My first idea was to use the Kafka Connect for ElasticSearch. But after I got it working, I wasn't really happy with several facts :
- It's quite complex
- It uses Avro
- No filtering before the sending
Moreover, I feel more comfortable with the fact that my central nervous system (Kafka) knows nothing about the producers / consumers.
To me, it's a basic principle of loose coupling.
input { kafka { bootstrap_servers => "localhost:9092" topics => [ "ratings" ] } } filter { json { source => "message" } elasticsearch { hosts => ["localhost:9200"] index => "logstash-ratings-hotels" query => "id:%{hotel_id}" fields => { "location" => "location" "name" => "hotel_name" } } mutate { remove_field => [ "host", "message", "@version", "path", "subject", "satisfaction", "level", "place", "hotel_id" ] } } output { elasticsearch { template => "20181115_HotelRatings/esk/comments_mapping.json" index => "logstash-ratings-comments" template_overwrite => "true" hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
The input section is simple enough. Then in the filter section, I describe the content of the message as JSON, which will index each pair of key/value.
The elasticsearch plugin is used to retrieve data previously indexed in my elastic cluster. I just have to define which index I want to get data from and in the query field, the join condition. And once the join is done, I have to specify the fields I want to get copied in my record.
To be more concrete, each time a new comment record is received from Kafka, we isolate the hotel_id and look for the corresponding id in the index logstash-ratings-hotels. Once found, we copy the field location and name, to location and hotel_name, respectively.
The mutate filter is used to remove all the unnecessary fields (all the fields used to create the final comment for instance).
The mechanism to transform a classical data into a geo_point is exactly the same as the one described above : through a custom mapping.
Here's the custom mapping for the comments.
{ "template": "logstash-ratings-comments*", "version": 1, "settings": { "index.refresh_interval": "5s" }, "mappings": { "_default_": { "properties": { "@timestamp": { "type": "date" }, "age": { "type": "long" }, "comment": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "email": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "first_name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "gender": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "location": { "type": "geo_point" }, "hotel_name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "last_name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "rating": { "type": "long" } } } } }
Designing our dashboard
Before sending our stream of data, it's interesting to send only one record. That way, ElasticSearch will create an index, which will be used for creating a Kibana index.
From that Kibana Index, we will be able to design our visualizations and create a dashboard. First of all, we've got to summarize the information we have.
When we sent the sample record, the output Ruby plugin wrote it on the Logstash standard out.
{ "first_name" => "Kaycee", "email" => "kdoringrd@hhs.gov", "location" => { "lat" => 21.938596, "lon" => 110.552977 }, "last_name" => "Doring", "rating" => 6, "@timestamp" => 2018-11-17T13:16:22.120Z, "age" => 61, "gender" => "F", "hotel_name" => "Konopelski Inc", "comment" => "The children were pretty satisfied about the activities" }
- Hotel location => as a map
- Hotel Name & rating => Top 20 of the best rated hotels
- Age of the person => Age donut
- Gender => Gender pie
Now the dashboard is ready, it's time to send some data !
Streaming Mockaroo JSON data to Kafka
I created a simple topic ratings, with only one partition. This time, I'm not interested in scaling (that will be the subject of another blogpost)
kafka-topics --create --zookeeper localhost:2181 --topic ratings --partitions 1 --replication-factor 1
In order to see a progressive update of the dashboard, I used awk to send the messages at a slower rate.
curl -s "https://api.mockaroo.com/api/fb43c4e0?count=1000&key=0673f940" \ | awk '{print $0; system("sleep 1");}' \ | kafka-console-producer --broker-list localhost:9092 --topic ratings
Results & Live demonstration
You can see a sample of what it will be like.
And here's the dashboard, once the 1000 comments have been processed.
No comments:
Post a Comment