2018-11-17

Building a real-time dashboard using Kafka & ElasticStack



In this article I will demonstrate how to build a real-time dashboard thanks to the ElasticStack, fed by a CSV and a continuous flow coming from Kafka.
The CSV contains the description of some hotels, including their names and their GPS coordinates.
The Kafka flow simulates comments and ratings about the hotels, coming from imaginary customers.
Like Kafka Streams, the principle is to enrich data coming from this Kafka flow with the data contained in the CSV file.

About what you need


For that article, I used :
  • Confluent OpenSource 5.0.1
  • ElasticStack 6.5.0
  • Mockaroo
  • cURL

Generating a consistent dataset


To generate meaningful data, I'm using Mockaroo (a big thank to Robin Moffatt's article for pointing out that excellent website).
I strongly encourage you to register : you'll be able to save your work and to consume your data exposed via a REST endpoint (which will prove to be very useful).
I created two schemas, one for the CSV, containing the hotel information and another one for the comments/ratings.


Nothing complex about the hotel schema : id / name / latitude / longitude.
I created 50 sample rows and then exported them to a CSV file.
The ratings schema requires a bit more explanation.

To generate credible comments, such as :
My wife and I were delighted about the restaurant
or
The kids were disappointed about the pool
I used Mockaroo functionality called "Custom List" to create some parts of my sentence and I then assembled all of them into a comment.


That did the trick. But I was facing a little problem about the rating : as I first selected a random value between 1 and 10, it wasn't exactly related to the comment : a person who says he/she is amazed wouldn't normally rate the hotel a 1 or 2 out of 10 !
Mockaroo has another very cool feature called "Scenario" which will allow you to set a data according to the context. In my case, I chose to set some ratings based on the adjective.


I generated 1000 comments which were about the 50 hotels that we created earlier.
Make sure you have chosen the JSON format and that you disabled the "array" checkbox.
You should have comments like this one :
{
    "first_name": "Kaycee",
    "last_name": "Doring",
    "email": "kdoringrd@hhs.gov",
    "gender": "F",
    "age": 61,
    "hotel_id": 37,
    "subject": "The children were",
    "level": "pretty",
    "satisfaction": "satisfied",
    "place": "activities",
    "rating": 6,
    "comment": "The children were pretty satisfied about the activities"
}
Okay, now our data is ready to be used.

Sending CSV data to ElasticSearch


To send our CSV data into ElasticSearch, I used the powerful Logstash and its CSV module.
input { 
    file {
        path => "20181115_HotelRatings/data/Hotels.csv"
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }    
}

filter {
    csv {
        skip_header => "true"
        columns => ["id", "name", "latitude", "longitude"]
    }
    mutate {
        rename => {
            "latitude"  => "[location][lat]"
            "longitude" => "[location][lon]"
        }
        convert => {
            "[location][lat]" => "float"
            "[location][lon]" => "float"
        }
        remove_field => [
            "host",
            "@version",
            "message",
            "path"
        ]
    }
}

output {
    elasticsearch { 
        template => "20181115_HotelRatings/esk/hotels_mapping.json"
        template_overwrite => "true"
        index => "logstash-ratings-hotels"
        hosts => ["localhost:9200"] 
    }
    stdout { codec => rubydebug }
}
To be able to use the location as a GeoPoint, you have to use a custom mapping because it's an ElasticSearch concept, completely unknown to Logtash.
The super trick to keep in mind is to use the tag :
        template_overwrite => "true"
Else you could get stuck, wondering why your custom mapping is not taken into account. Super easy when you know it, but before that, it could be a bit foggy.
To create your custom mapping, just follow the excellent (and only) tutorial provided by Aaron Mildenstein

To summarize, you first have to run the CSV import, so that ElasticSearch creates a default mapping. Then you get the mapping, update it and reference it in your logstash conf file.
Then, you can relaunch the CSV injection.
{
  "template": "logstash-ratings-hotels*",
  "version": 1,
  "settings": {
    "index.refresh_interval": "5s"
  },
  "mappings": {
    "_default_": {
      "properties": {
        "id": {
          "type": "text",
          "norms": false,
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "location": {
          "type": "geo_point"
        },
        "name": {
          "type": "text",
          "norms": false,
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        }
      }
    }
  }
}
If everything went fine, you should see your updated mapping with the location as type "geo_point". To check if everything is in order, create a Coordinate Map in Kibana, mapped on your index and you should have some data.


Sending Kafka stream of events to ElasticSearch


My first idea was to use the Kafka Connect for ElasticSearch. But after I got it working, I wasn't really happy with several facts :
  • It's quite complex
  • It uses Avro
  • No filtering before the sending
I then took a look at the Kafka input login of Logstash and all the points described above vanished in an instant ! Super easy, works well and fast setup !
Moreover, I feel more comfortable with the fact that my central nervous system (Kafka) knows nothing about the producers / consumers.
To me, it's a basic principle of loose coupling.
input { 
    kafka {
        bootstrap_servers => "localhost:9092"
        topics => [
            "ratings"
        ]
    }    
}

filter {
    json {
        source => "message"
    }
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "logstash-ratings-hotels"
        query => "id:%{hotel_id}"
        fields => { 
            "location" => "location"
            "name" => "hotel_name"
        }
    }
    mutate {
        remove_field => [
            "host",
            "message",
            "@version",
            "path",
            "subject",
            "satisfaction",
            "level",
            "place",
            "hotel_id"
        ]
    }
}

output {
    elasticsearch { 
        template => "20181115_HotelRatings/esk/comments_mapping.json"
        index => "logstash-ratings-comments"
        template_overwrite => "true"
        hosts => ["localhost:9200"] 
    }
    stdout { codec => rubydebug }
}
I have to explain a bit what's going on here.
The input section is simple enough. Then in the filter section, I describe the content of the message as JSON, which will index each pair of key/value.
The elasticsearch plugin is used to retrieve data previously indexed in my elastic cluster. I just have to define which index I want to get data from and in the query field, the join condition. And once the join is done, I have to specify the fields I want to get copied in my record.

To be more concrete, each time a new comment record is received from Kafka, we isolate the hotel_id and look for the corresponding id in the index logstash-ratings-hotels. Once found, we copy the field location and name, to location and hotel_name, respectively.

The mutate filter is used to remove all the unnecessary fields (all the fields used to create the final comment for instance).

The mechanism to transform a classical data into a geo_point is exactly the same as the one described above : through a custom mapping.
Here's the custom mapping for the comments.
{
  "template": "logstash-ratings-comments*",
  "version": 1,
  "settings": {
    "index.refresh_interval": "5s"
  },
  "mappings": {
    "_default_": {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "age": {
          "type": "long"
        },
        "comment": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "email": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "first_name": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "gender": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "location": {
          "type": "geo_point"
        },
        "hotel_name": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "last_name": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "rating": {
          "type": "long"
        }
      }
    }
  }
}

Designing our dashboard


Before sending our stream of data, it's interesting to send only one record. That way, ElasticSearch will create an index, which will be used for creating a Kibana index.
From that Kibana Index, we will be able to design our visualizations and create a dashboard. First of all, we've got to summarize the information we have.
When we sent the sample record, the output Ruby plugin wrote it on the Logstash standard out.
{
    "first_name" => "Kaycee",
         "email" => "kdoringrd@hhs.gov",
      "location" => {
        "lat" => 21.938596,
        "lon" => 110.552977
    },
     "last_name" => "Doring",
        "rating" => 6,
    "@timestamp" => 2018-11-17T13:16:22.120Z,
           "age" => 61,
        "gender" => "F",
    "hotel_name" => "Konopelski Inc",
       "comment" => "The children were pretty satisfied about the activities"
}
I'll exploit 4 elements :
  • Hotel location => as a map
  • Hotel Name & rating => Top 20 of the best rated hotels
  • Age of the person => Age donut
  • Gender => Gender pie

Now the dashboard is ready, it's time to send some data !

Streaming Mockaroo JSON data to Kafka


I created a simple topic ratings, with only one partition. This time, I'm not interested in scaling (that will be the subject of another blogpost)
kafka-topics --create --zookeeper localhost:2181 --topic ratings --partitions 1 --replication-factor 1
I could use Kafkacat to stream the data from the REST endpoint to my Kafka topic, but I prefer to use the platform, as Adam Bien would say :)
In order to see a progressive update of the dashboard, I used awk to send the messages at a slower rate.
curl -s "https://api.mockaroo.com/api/fb43c4e0?count=1000&key=0673f940" \
| awk '{print $0; system("sleep 1");}' \
| kafka-console-producer --broker-list localhost:9092 --topic ratings
That way, it will send a message every second.

Results & Live demonstration


You can see a sample of what it will be like.

And here's the dashboard, once the 1000 comments have been processed.

Source and references


2018-09-09

IoT / SmartHome - Send EnOcean sensor data to Kafka


Lately, like everyone else, I've been reading a lot about IoT and how this could change the world.
Even if it's been a while since I first heard that (1995 was the year when I first heard about the concepts of smart home), this time, I've got a feeling that it will soon be truly available to everyone.

So I read about the different protocols that are available today.
Bluetooth LE, ZigBee, Z-Wave and EnOcean are among the most used / discussed, from my point of view.

I've been seduced by EnOcean, since its technology is indeed well-suited to IoT : very low energy consumption and for most sensors, the ability to work without batteries !
In this article, as depicted in the first picture I will demonstrate a simple scenario : an EnOcean sensor will send data to Jeedom, which will feed a MQTT broker and then a synchronizer will send each received message to a Kafka topic. Then the latest value will be exposed to be displayed in a Grafana dashboard.

Before we start ...


For this configuration, you will need :

Installing Jeedom


Well, there are plenty of ways to do so, so I won't guide you on this one, especially since I've installed it on a existing RPi.
Please refer to the installation guide, from the official documentation (I think the translation has been started but not finished yet ...).
(Note that you can install it on Synology NAS, which can also be interesting)

Configuring the EnOcean sensor


Once Jeedom is installed (as well as the plugins) and the EnOcean USB stick has been plugged in, it's time to bind the sensor.
First of all, we will add a group, which will allow an easier administration. You can think of it as the room your sensor is in.
Go to the menu Tools > Objects and click on Add



Give it the name Room :


Then to add the sensor, go to the menu : Plugins > Automation protocols > EnOcean



Click on Inclusion Mode :



And then choose the automatic mode on the next window :



Click on any button, then your device should show up in the EnOcean plugin homepage.



If you want to see the first physical interactions or if you just want to make sure your device works properly, you can go to the Health section of the EnOcean plugin menu and you will see the current status of your sensor.


Configuring Mosquitto (MQTT Broker)


First, make sure that the MQTT plugin has been correctly installed. If not, please do so.
Normally, when you install the MQTT plugin, Mosquitto is automatically added. If not, install Mosquitto, following the official documentation.
Configure the MQTT plugin by specifying the default configuration (the default password being "jeedom") :



The # sign means the root, or all topics if you prefer. If you don't know how the MQTT protocol works : read that.
Like Kafka, if the message is sent to a non-existing topic, the Mosquitto broker will automatically create it.

Using SoapUI to test a scenario


SoapUI is an excellent tool I've been using for many years now. I mostly use it to test functionalities built upon webservices.
If you are not familiar with it yet, I strongly encourage you to try it.
SmartBear is also behind Swagger, which is part of the excellent Eclipse MicroProfile.

Create a TestSuite and add a Publish using MQTT step :



Configure the MQTT broker and make sure you have the same information you have in the Jeedom MQTT plugin.



Send a sample value to the topic sensor/counter :



If you go to your Room dashboard, in Jeedom, you should see the value reflected on the gauge, as well as the state of your buttons (each one has a on/off status) :



Of course, in our simple case, you may also use the Mosquitto client, which is far simpler.
mosquitto_pub -h localhost -t sensor/counter -m 25
But keep in mind that to test a real-world scenario, with multi steps with correlation, pause, etc, SoapUI is, by far, the best solution to me.

Creating a Jeedom scenario


All right, everything is set up, but for now, we've not used our EnOcean sensor yet !
The wall switch can be used in two configurations : 2-button configuration or 4-button configuration. We will use the first one.
The principle is pretty simple : when I click the upper button, I want the counter to increase and when I click the down button, I want it to decrease.

Since you have sent a message to the topic sensor/counter (whether it was through SoapUI or Mosquitto client), it has already been created.
On the MQTT plugin configuration page, you should then have your topic displayed.
Bind it to the Room object (choose it as the parent).

If you go to the Commands tab, you should see the subtopic.




Tick the checkbox Historiser (not translated !) which will allow you to track the value changes and to display a time series in the Analysis > History menu.
You can also check the current value by clicking Tester.

We will add two actions to that component. One for increasing the value, and the other one to decrease the value.



Name them as you like, as long as it stays meaningful to you.
What's important is the Payload field : in the Up action, it will take the value #[Room][sensor][counter]#+1 which is the current value of the topic (represented by the final #) plus one.
And in the Down section, you can guess :)

Now we need to create scenarios and decide when to execute them.
To do so, go to the Tools > Scenario menu and then click Add.

Create two scenarios, one for the increase, the other one for the decrease.



For a scenario to be properly defined, you need to decide when it's triggered and what it will do.
For the trigger, it will be the click on a EnOcean sensor button. (Button 1 is the one which will be used to the increase action).



For the actions, go to the Scenario pane and click on +Add block button on the right side of the screen.
Chose Action :


Once the action block has been created, we will call one of the action that we previously added on the MQTT component.



Repeat the process for the Down button (whose number is 3) and invoke the Down action.

Coding the MQTT / Kafka synchronizer


I haven't had the time to code a Kafka connector yet, that's why I wrote a very small piece of code.

As usual, I used Maven to create my project. Three dependencies are needed, here's an excerpt of my POM :
    <dependency>
       <groupId>org.eclipse.paho</groupId>
       <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
       <version>1.2.0</version>
    </dependency>
    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>1.1.1</version>
    </dependency>
    <dependency> 
       <groupId>ch.qos.logback</groupId>
       <artifactId>logback-classic</artifactId>
       <version>1.0.13</version>
    </dependency>
And here's the code :
package fr.mbutton.kafka.sync;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MQTT2Kafka {

    private final transient Logger logger = LoggerFactory.getLogger(this.getClass());

    public static String MQTT_BROKER_URL      = "tcp://192.168.0.232:1883";
    public static String MQTT_BROKER_USER     = "jeedom";
    public static String MQTT_BROKER_PASSWORD = "jeedom";

    public static String KAFKA_BOOTSTRAP      = "localhost:9092";
    public static String KAFKA_TOPIC          = "sensor.counter";

    public void synchronize() {
        final Properties props = new Properties();
        props.put("bootstrap.servers", KAFKA_BOOTSTRAP);
        props.put("key.serializer"   , "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer");

        MemoryPersistence persistence = new MemoryPersistence();

        MqttClient mqttClient = null; // not auto-closeable
        try {
            mqttClient = new MqttClient(MQTT_BROKER_URL, "Synchronizer", persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);

            mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                logger.warn("Connection lost ¯\\_(ツ)_/¯ : please restart me !");
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                logger.debug("Message: " + message.toString());
                sendMessageOverKafka(topic, message);
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                logger.debug("Job's done.");
            }

            private void sendMessageOverKafka(String topic, MqttMessage message) {
                    logger.debug("Sending the message to my Kafka cluster.");
                    List<Header> headers = new ArrayList<>();
                    headers.add(new RecordHeader("mqtt-topic", topic.getBytes()));
                    try (KafkaProducer producer = new KafkaProducer<>(props)) {
                        ProducerRecord record = new ProducerRecord<>(KAFKA_TOPIC, 0, "1", message.toString(), headers);
                        producer.send(record);
                    }
                }
            });

            mqttClient.connect(connOpts);
            mqttClient.subscribe("#");

        } catch (MqttException e) {
            logger.error("Little problem while trying to synchronize MQTT gateway with Kafka broker.", e);
        } finally {
            if (mqttClient != null) {
                try {
                    mqttClient.close();
                } catch (MqttException ex) {
                    logger.error("Error while liberating the resources.", ex);
                }
            }
        }
    }

    public static void main(String[] args) {
        MQTT2Kafka bridge = new MQTT2Kafka();
        bridge.synchronize();
    }
}

And that's it.

Trying to build a native image thanks to GraalVM


As this lib seems to be a perfect use case, I tried to use GraalVM, just out of curiosity, to generate a native image.
Unfortunately, I encountered some generation problems that seem unresolvable for now.
On my Maven project, I first copied the dependencies to make the compilation easier :

mvn dependency:copy-dependencies

[INFO] --- maven-dependency-plugin:2.6:copy-dependencies (default-cli) @ KafkaMQTTSynchronizer ---
[INFO] Copying slf4j-api-1.7.25.jar to sources/KafkaMQTTSynchronizer/target/dependency/slf4j-api-1.7.25.jar
[INFO] Copying org.eclipse.paho.client.mqttv3-1.2.0.jar to sources/KafkaMQTTSynchronizer/target/dependency/org.eclipse.paho.client.mqttv3-1.2.0.jar
[INFO] Copying snappy-java-1.1.7.1.jar to sources/KafkaMQTTSynchronizer/target/dependency/snappy-java-1.1.7.1.jar
[INFO] Copying logback-core-1.0.13.jar to sources/KafkaMQTTSynchronizer/target/dependency/logback-core-1.0.13.jar
[INFO] Copying javaee-web-api-7.0.jar to sources/KafkaMQTTSynchronizer/target/dependency/javaee-web-api-7.0.jar
[INFO] Copying lz4-java-1.4.1.jar to sources/KafkaMQTTSynchronizer/target/dependency/lz4-java-1.4.1.jar
[INFO] Copying logback-classic-1.0.13.jar to sources/KafkaMQTTSynchronizer/target/dependency/logback-classic-1.0.13.jar
[INFO] Copying kafka-clients-1.1.1.jar to sources/KafkaMQTTSynchronizer/target/dependency/kafka-clients-1.1.1.jar

Compiled the source :
graalvm-ce-1.0.0-rc5/bin/javac -cp "../../../target/dependency/*" fr/mbutton/kafka/sync/MQTT2Kafka.java

Tried to generate the native image :
graalvm-ce-1.0.0-rc5/bin/native-image -cp ".:../../../target/dependency/*" fr.mbutton.kafka.sync.MQTT2Kafka

Build on Server(pid: 47807, port: 52391)
   classlist:     827.05 ms
       (cap):   1,428.57 ms
       setup:   2,142.86 ms
Detected unnecessary RecomputeFieldValue.ArrayBaseOffset 
  com.oracle.svm.core.jdk.Target_java_nio_DirectByteBuffer.arrayBaseOffset substitution field for java.nio.DirectByteBuffer.arrayBaseOffset. 
  The annotated field can be removed. This ArrayBaseOffset computation can be detected automatically. 
  Use option -H:+UnsafeAutomaticSubstitutionsLogLevel=2 to print all automatically detected substitutions. 
    analysis:   8,346.29 ms
error: unsupported features in 3 methods
Detailed message:
Error: com.oracle.graal.pointsto.constraints.UnsupportedFeatureException: 
   Invoke with MethodHandle argument could not be reduced to at most a single call: 
   java.lang.invoke.LambdaForm$MH.2127087555.invoke_MT(Object, Object, Object)
Trace: 
 at parsing org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:90)
        [...]

I'm disappointed. Maybe next time !

Setting up Kafka


Thanks to the Confluent Suite, starting Kafka for testing can be done in a blink of an eye.
Download Confluent Open Source and start it with the following command :
bin/confluent start
Then create a mirror topic that will receive messages from the MQTT broker.
bin/kafka-topics --zookeeper localhost:2181 --create --partitions 1 --replication-factor 1 --topic sensor.counter
Run the KafkaSync (Java bridge) either via the CLI or via your IDE

Click on your buttons, to see if the changes are correctly reflected in Kafka.
bash-4.4$ bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic sensor.counter
26
27
28
29
28
27
26
25
24
25


Exposing the information to Prometheus



Coding the exporter


To expose the data to Prometheus, you only have to expose text composed of a name and a value.
In our case, what I'm interested in, is to expose the current (or last known) value of a topic.
As Kafka stores the values in a log, I thought it would be pretty simple to take a quick peek. But it's not and the product has clearly not been made to be queryable as a database.
(Even if it's possible to see the current state of your topics in the Kafka Control Center)

I then thought about coding a consumer that would write the change in a database. But it sounded too oversized ...
Same thing about writing in a file ...
I then found a neat in-memory solution.

It's really simple : I used Payara Micro (I love it) to deploy a simple app with a WebListener that launches a thread, in charge of listening for new Kafka messages.
Then on each received message, it updates a static variable, on a JAXRS service, whose value is returned when a GET query is made.

JAX-RS Service :
package fr.mbutton.prometheus;

import javax.ws.rs.GET;
import javax.ws.rs.Path;

@Path("metrics")
public class KafkaCounterExporter {

 static String counterValue = "0";

 @GET
 public String getCounter() {
  return "counter " + counterValue;
 }
}
Can it be simpler ?
And as for the WebListener :
package fr.mbutton.prometheus;

import java.util.Arrays;
import java.util.Properties;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@WebListener
public class KafkaListener implements ServletContextListener {

 private transient Logger logger = LoggerFactory.getLogger(this.getClass());

 @Override
 public void contextInitialized(ServletContextEvent sce) {
  logger.info("Starting Kafka Listener");
  Runnable listener = () -> {
   Properties props = new Properties();
   props.put("bootstrap.servers" , "localhost:9092");
   props.put("group.id"          , "confluent");
   props.put("key.deserializer"  , "org.apache.kafka.common.serialization.StringDeserializer");
   props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

   try (KafkaConsumer consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList("sensor.counter"));

    while (true) {
     ConsumerRecords records = consumer.poll(10);
     for (ConsumerRecord record : records) {
      KafkaCounterExporter.counterValue = record.value();
     }
    }
   }
  };

  Thread t = new Thread(listener);
  t.start();
 }

 @Override
 public void contextDestroyed(ServletContextEvent sce) {
  logger.info("Disconnecting");
 }
}
Then use your favorite IDE (NetBeans for me) to build your project.
Once it's built, run it with Payara Micro with the following line :
java -jar payara-micro-5.183.jar --deploy /<path>/KafkaPrometheusExporter-1.0-SNAPSHOT.war --port 8090

Configuring Prometheus


Now we have a MicroProfile application running, the endpoint is something like http://localhost:8090/KafkaPrometheusExporter-1.0-SNAPSHOT/prometheus/metrics
But by default, you just can't use a random endpoint : it has to be /metrics

There are several ways to fix that. The simplest way is to change the metrics path.
  - job_name: kafka-mqtt
    metrics_path: KafkaPrometheusExporter-1.0-SNAPSHOT/prometheus/metrics
    static_configs:
      - targets: ['localhost:8090']

But if like me, you're using NGinx as a reverse proxy, you may configure it to perform URL rewriting.
        location /metrics {
            proxy_pass   http://localhost:8090/KafkaPrometheusExporter-1.0-SNAPSHOT/prometheus/metrics;
        }
The following URL (http://localhost/metrics) is then available and ready to be scraped.

To be sure, open Prometheus and try to display the counter stat.



Displaying it via Grafana


I won't spend too much time on setting up Grafana, which is not the topic of this post.
Just as a reminder, you'll have to define a DataSource, pointing at your Prometheus.
Then, look for the stat counter that we exposed.
And create a new dashboard, based on a single stat, make it a gauge, configure it more or less like that :


And when you're satisfied, you can live-test it (with the auto-refresh option).



References


2018-04-22

KSQL, a step beyond Kafka Streams



I'm currently learning Kafka Streams and I found a very interesting article (in French) from William Conti.

In his article, William shows how to exploit the power of Kafka Streams to get the number of meetups per city, using a KTable.

I first took his repo, adjusted the sources to be compliant with the latest version of Kafka Streams, and then I thought it could be a good candidate to test KSQL, which has been officially certified for production not so long ago and whose presentation by Robin Moffatt, during a Paris Kafka Meetup made me want to play with it.

That's what I'm going to demonstrate in this article.

Start Confluent Platform


First, download Confluent Platform 4.1 to be able to play with KSQL.
Follow the instructions to start it (if you're using defaults, it should be up in seconds).

Feed a Kafka topic with RVSPs


To have the full project, check Williams' GitHub out.
But I will only use the class "Producer".
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer {

  public static void main (String... args) throws MalformedURLException, IOException {

    URL url = new URL("http://stream.meetup.com/2/rsvps");
    URLConnection connection = url.openConnection();

    JsonFactory jsonFactory = new JsonFactory(new ObjectMapper());
    JsonParser parser = jsonFactory.createParser(connection.getInputStream());

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG     , "localhost:9092");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG  , StringSerializer.class.getName());

    try (KafkaProducer kafkaProducer = new KafkaProducer<>(props)) {
      while (parser.nextToken() != null) {
        String record = parser.readValueAsTree().toString();
        String updatedRecord = record.replace("\"group\"", "\"rvsp_group\"");
        ProducerRecord producerRecord = new ProducerRecord<>("meetups", updatedRecord);
        kafkaProducer.send(producerRecord);
      }
    }
  }
}
Note the instruction in bold : I had to replace the "group" element as it is a keyword in KSQL (as it is in SQL).
A bug has been opened on this subject.

Once you've run this class, you can verify that your topic "meetups" is well fed, by running the console consumer.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic meetups
You should have some RVSPs popping.
For instance :
{
  "venue": {
    "venue_name": "HSBXL (Hackerspace Brussels)",
    "lon": 4.328797,
    "lat": 50.848236,
    "venue_id": 24593226
  },
  "visibility": "public",
  "response": "yes",
  "guests": 0,
  "member": {
    "member_id": 35492902,
    "photo": "https://secure.meetupstatic.com/photos/member/b/9/9/1/thumb_243587505.jpeg",
    "member_name": "Jo"
  },
  "rsvp_id": 1724491047,
  "mtime": 1524412889175,
  "event": {
    "event_name": "DIY home-automation v2",
    "event_id": "249274076",
    "time": 1524916800000,
    "event_url": "https://www.meetup.com/hackerspace-Brussels-hsbxl/events/249274076/"
  },
  "rvsp_group": {
    "group_topics": [{
      "urlkey": "opensource",
      "topic_name": "Open Source"
    }, {
      "urlkey": "robotics",
      "topic_name": "Robotics"
    }, {
      "urlkey": "gameprog",
      "topic_name": "Game Programming"
    }, {
      "urlkey": "softwaredev",
      "topic_name": "Software Development"
    }, {
      "urlkey": "electronics",
      "topic_name": "Electronics"
    }, {
      "urlkey": "microcontroller",
      "topic_name": "Microcontrollers"
    }, {
      "urlkey": "web-development",
      "topic_name": "Web Development"
    }, {
      "urlkey": "hacking",
      "topic_name": "Hacking"
    }, {
      "urlkey": "arduino",
      "topic_name": "Arduino"
    }, {
      "urlkey": "computer-programming",
      "topic_name": "Computer programming"
    }, {
      "urlkey": "makerspaces",
      "topic_name": "Makerspaces"
    }, {
      "urlkey": "3d-printing",
      "topic_name": "3D Printing"
    }],
    "group_city": "Brussels",
    "group_country": "be",
    "group_id": 20346523,
    "group_name": "Hackerspace Brussels - HSBXL",
    "group_lon": 4.33,
    "group_urlname": "hackerspace-Brussels-hsbxl",
    "group_lat": 50.83
  }
}
We can see that the "group" has been successfully renamed to "rvsp_group".

Time to bring KSQL in the game


My goal is to count the number of meetups per city, as a KTable would allow, thanks to its state store (RocksDB).
First of all, launch the KSQL CLI.

To be able to query, we first have to create a stream (in KSQL terminology, not to be mixed up with KStreams).
CREATE STREAM meetups ( \
                         venue      VARCHAR, \
                         visibility VARCHAR, \
                         response   VARCHAR, \
                         guests     BIGINT,  \
                         member     VARCHAR, \
                         rsvp_id    BIGINT,  \
                         mtime      BIGINT,  \
                         event      VARCHAR, \
                         rvsp_group VARCHAR  \
                      ) \
WITH (KAFKA_TOPIC='meetups', VALUE_FORMAT='JSON');
I guess it's pretty self-explanatory : simply the definition of our first-level elements.
But note that you have to specify that our records are in JSON and are in the "meetups" topic.

Then, we create another stream, which extracts only the relevant information, that is to say, cities.
CREATE STREAM meetup_cities AS SELECT EXTRACTJSONFIELD(rvsp_group,'$.group_city') AS city FROM meetups;
Now we can perform some aggregation ...
SELECT city, COUNT(*) FROM meetup_cities GROUP BY city;
And here is the result I was hoping for :)
Longmont | 3
Zürich | 16
Calgary | 44
Helsinki | 9
Essex Fells | 1
London | 392
Philadelphia | 35
Brooklyn | 19
Columbus | 17
Las Vegas | 34
Note : if you want the first records to be taken into account, just update the following parameter and re-run the query.
ksql> SET 'auto.offset.reset' = 'earliest';  
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

References


2018-03-18

Simple example of using Avro in Kafka





Kafka has been designed to reach the best performance possible, as it is very well explained in the official documentation.
(If you haven't read it yet, I strongly encourage you to do so).

It has come pretty clear that to stay on the path of performance, some exchange formats were to be excluded, such as XML and JSON, as nicely exposed by Criteo.

I then read about Protobuff, Thrift and Avro. But the article from Confluent about Avro for Kafka Data made the choice clear to me : it was going to be Avro.

So, here I am, sending Avro messages to a Kafka topic.

Setting up the project


For that article, I will use NetBeans and Maven, as well as my Raspberry Pi cluster on which I deployed my Kafka cluster.
Just create a Java Maven project.

Edit the pom.xml so it's like the description below (or get it from my GitHub) :
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" 
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>fr.mbutton</groupId>
    <artifactId>Kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <repositories>
        <repository>
            <id>apache-repo</id>
            <name>Apache Repository</name>
            <url>https://repository.apache.org/content/repositories/releases</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
   
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.2</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
      
    <dependencies>
        <dependency> 
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.0.13</version>
        </dependency>        
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.1</version>
        </dependency>    
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.2</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>
       
</project>

We're using four dependencies : logback-classic (logback for SL4J), kafka-clients + avro, obviously, and kafka-avro-serializer from Confluent.

We're also configuring the Avro build plugin that will generate the Java stubs from the Avro schema.

Manipulating Avro


Schema definition


First of all, you have to define a schema "avsc" which is gonna be your contract (OK, I may have worked a little bit too much with WSDL and XML).
You can also work with dynamic schema (which are only defined in your code), but that's not what I'm interested in : to me, a schema has to be shared / exposed.

In our case, it's going to be a simple message.

{"namespace": "fr.mbutton.avro",
 "type": "record",
 "name": "Message",
 "fields": [
     {"name": "code"    , "type": "string"},
     {"name": "priority", "type": ["int"   , "null"]},
     {"name": "extra"   , "type": ["string", "null"]}
 ]
}
I quote the Avro official documentation :
    At minimum, a record definition must include its type ("type": "record"), a name ("name": "Message") [...]. 
    Fields are defined via an array of objects, each of which defines a name and type, which is another schema object, primitive or complex type
    (record, enum, array, map, union, and fixed).
    unions are a complex type that can be any of the types listed in the array.
    e.g., priority can either be an int or null, essentially making it an optional field.

Then, just build your Maven project and it should generate the Java source for your object. (It reminds me a lot of the dead but yet excellent XMLBeans project, by the way).

Testing the serialization / deserialization


Following the Avro documentation, I wrote a sample Java class, AvroManipulator, to understand how the serialization / deserialization process works.

package fr.mbutton.kafka;

import fr.mbutton.avro.Message;
import java.io.File;
import java.io.IOException;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvroManipulator {

    public static void main(String... args) {

        Logger logger = LoggerFactory.getLogger(AvroManipulator.class);

        Message message1 = new Message("Test1", 1, "This");
        Message message2 = new Message("Test2", 1, "is");
        Message message3 = new Message("Test3", 1, "super");
        Message message4 = new Message("Test4", 1, "dope");

        try {
            File file = new File("messages.avro");

            // Serialization
            DatumWriter datumWriter = new SpecificDatumWriter<>(Message.class);
            try (DataFileWriter dfWriter = new DataFileWriter<>(datumWriter)) {
                dfWriter.create(message1.getSchema(), file);
                dfWriter.append(message1);
                dfWriter.append(message2);
                dfWriter.append(message3);
                dfWriter.append(message4);
            }

            // Deserialization
            DatumReader messageDatumReader = new SpecificDatumReader<>(Message.class);
            DataFileReader dfReader = new DataFileReader<>(file, messageDatumReader);
            dfReader.forEach(readMessage -> logger.info(readMessage.toString()));

        } catch (IOException e) {
            logger.error("Something obviously went wrong !", e);
        }
    }
}
Perfect ! I can now manipulate my Avro objects.

Note : If you take a look at the file messages.avro, which contains the serialized version of your messages, you can see the schema description in JSON, and then your message in a binary format.

Sending Avro object to a Kafka topic


Based on Alexis Seigneurin's article, I tweaked his classes to send my Avro object instead. I just updated the bootstrap servers to reflect my Kafka/Rpi cluster and replaced the original StringSerializer by Confluent Avro serializer / deserializer.
But when you want those Confluent objects, you also need to setup a schema registry, whose definition follows :
    Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving Avro schemas. 
    It stores a versioned history of all schemas, provides multiple compatibility settings and allows evolution of schemas according to the configured
    compatibility setting. It provides serializers that plug into Kafka clients that handle schema storage and retrieval for Kafka messages 
    that are sent in the Avro format.
Source : https://docs.confluent.io/current/schema-registry/docs/intro.html

Then, I configured Confluent platform, updated the schema-registry.properties to have my Kafka cluster configuration instead of localhost ...
listeners=http://0.0.0.0:8081
kafkastore.connection.url=192.168.0.231:2181,192.168.0.232:2181,192.168.0.233:2181,192.168.0.234:2181
kafkastore.topic=_schemas
debug=false
... and simply launched the schema registry.
bin/schema-registry-start etc/schema-registry/schema-registry.properties
(By the way, I never could get it started from the GitHub repo).

Be sure that the bootstrap.servers and schema.registry.url properties are correctly set up for your Java consumer and producer :
props.put("bootstrap.servers"  , "192.168.0.231:9092,192.168.0.232:9092,192.168.0.233:9092,192.168.0.234:9092");
props.put("schema.registry.url", "http://localhost:8081");
And that the serializer / deserializer objects are those from Confluent (io.confluent.kafka.serializers), so that your consumer looks like :
package fr.mbutton.kafka;

import fr.mbutton.avro.Message;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class AvroConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.0.231:9092,192.168.0.232:9092,192.168.0.233:9092,192.168.0.234:9092");
        props.put("group.id", "mygroup");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", "http://localhost:8081");

        try (KafkaConsumer consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Arrays.asList("testopic"));
            
            while (true) {
                ConsumerRecords records = consumer.poll(100);
                for (ConsumerRecord record : records) {
                    System.out.println(record.value());
                }
            }
        }
    }
}
And your producer :
package fr.mbutton.kafka;

import fr.mbutton.avro.Message;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class AvroProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.0.231:9092,192.168.0.232:9092,192.168.0.233:9092,192.168.0.234:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("schema.registry.url", "http://localhost:8081");


        try (KafkaProducer producer = new KafkaProducer<>(props)) {
            for (int i = 0; i < 100; i++) {
                ProducerRecord record = new ProducerRecord<>("testopic", new Message("Message-" + i, 1, "extra"));
                producer.send(record);
            }
        }
    }    
}

Running the project


Simply run both Java files, first the AvroConsumer and then the AvroProducer.

Then, observe the console output.

From the producer:
16:30:24.633 [main] DEBUG i.c.k.s.client.rest.RestService - Sending POST with input 
{"schema":
  "{\"type\":\"record\",
    \"name\":\"Message\",
    \"namespace\":\"fr.mbutton.avro\",
    \"fields\":[
      {\"name\":\"code\",\"type\":\"string\"},
      {\"name\":\"priority\",\"type\":[\"int\",\"null\"]},
      {\"name\":\"extra\",\"type\":[\"string\",\"null\"]}]}"
} to http://localhost:8081/subjects/testopic-value/versions
------------------------------------------------------------------------
BUILD SUCCESS
------------------------------------------------------------------------
From the schema registry:
[2018-03-18 16:30:24,647] INFO 127.0.0.1 - "POST /subjects/testopic-value/versions HTTP/1.1" 200 9  2 (io.confluent.rest-utils.requests:77)
[2018-03-18 16:30:25,100] INFO 127.0.0.1 - "GET /schemas/ids/21 HTTP/1.1" 200 246  4 (io.confluent.rest-utils.requests:77)
From the consumer :
16:30:25.084 [main] DEBUG i.c.k.s.client.rest.RestService - Sending GET with input null to http://localhost:8081/schemas/ids/21
{"code": "Message-0", "priority": 1, "extra": "extra"}
{"code": "Message-1", "priority": 1, "extra": "extra"}
{"code": "Message-2", "priority": 1, "extra": "extra"}
[...]
{"code": "Message-97", "priority": 1, "extra": "extra"}
{"code": "Message-98", "priority": 1, "extra": "extra"}
{"code": "Message-99", "priority": 1, "extra": "extra"}
All the code is available on my GitHub.

References

  1. https://avro.apache.org/docs/1.8.2/gettingstartedjava.html
  2. http://aseigneurin.github.io/2016/03/02/kafka-spark-avro-kafka-101.html
  3. https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
  4. https://github.com/gwenshap/kafka-examples/tree/master/AvroProducerExample

2018-02-21

Setting up Kafka on a Raspberry Pi cluster via Ansible




After installing Docker and Kubernetes on my RPi cluster, I wanted to go on, with Kafka.


Prerequisites



First of all, I assume that you have a RPi cluster already configured to work with Ansible. If not, please refer to my previous article on the subject.

I could have refer to online archives, but as you need Java 8 and as Oracles forces you to acknowledge their license, I decided to start with archives previously downloaded.
You need :
  1. Kafka 1.0.0
  2. Java 8 (recent update) for ARM
You will then have to fill the var section in the playbook with the values matching your environment.


Ansible Playbook



The goal of this playbook is to provide a way to learn about using Kafka in a cluster, without having to cope with the installation. Therefore, you can decide whether it suits your needs or not.
And if it's the case, you still can learn about the installation process by simply reading the playbook.

The playbook is here for information, but if it were to be modified, it would be on my github.

Zookeeper and Kafka cluster configuration



One thing to have in mind when reading the Ansible playbook is that my nodes have fixed IPs that are following each other.
[pi-cluster]
192.168.0.23[1:4]

[masters]
192.168.0.231

[slaves]
192.168.0.232
192.168.0.233
192.168.0.234
As you can see, my first node IP last number is 1. The second one is 2 and so on. I took that as a fact when I wrote my playbook. If you have a different strategy, you'll have to modify the playbook accordingly.
This playbook is not that hard, but there is a little tricky part : the configuration of the ZooKeeper and Kafka clusters : I had to compose the cluster configuration from the information available through the Ansible inventory (= Ansible hosts).

For instance, the zookeeper cluster config looks like :
server.1=192.168.0.231:2888:3888
server.2=192.168.0.232:2888:3888
server.3=192.168.0.233:2888:3888
server.4=192.168.0.234:2888:3888
To reach that result, here is what I used :
- name: (ZOOKEEPER CONFIG) Adding cluster nodes to the zookeeper configuration
  lineinfile:
    path: "{{ zookeper_config }}"
    line: "server.{{ item[-1] }}={{ item + ':2888:3888' }}"
    insertafter: EOF
  with_items:
    - "{{ groups['pi-cluster'] }}"
And regarding the Kafka config, it looks like :
192.168.0.231:2181,192.168.0.232:2181,192.168.0.233:2181,192.168.0.234:2181
That was done by a simple var declaration :
"{{ groups['pi-cluster'] | join(':2181,') }}:2181"


Idempotent, yes but how ?



One of Ansible great strengths is the fact that most of its command are idempotent. Yet, in certain cases, I did not use lineinfile because I wanted to keep the original files intact to be able to start the configuration all over again without having to go to the process of copying archives and installing them from scratch. Maybe there's a better way to do it.
If so, leave a comment or better, open a PR !

The playbook




---
- name: Set Kafka up on the RPi cluster
  hosts: pi-cluster
  remote_user: pi

  tasks:
    - block:
      - name: (INSTALL) Checking if Java is already installed
        stat:
          path: "/opt/{{ java_version }}"
        register: javadir
      - name: (INSTALL) Checking if Kafka is already installed
        stat:
          path: "/opt/{{ kafka_version }}"
        register: kafkadir
      - name: (INSTALL) Unarchiving Java and Kafka
        unarchive:
          src: "{{ item }}"
          dest: /opt
          owner: pi
          group: pi
          mode: 0755
        with_items:
          - "{{ java_installer_path }}"
          - "{{ kafka_installer_path }}"
        when: javadir.stat.exists == false or kafkadir.stat.exists == false
      - name: (INSTALL) Fixing permissions for Java (unarchive user/group modification does not work with that one)
        file:
          path: /opt/{{ java_version }}
          owner: pi
          group: pi
          mode: 0755
          recurse: yes
        when: javadir.stat.exists == false
      - name: (INSTALL) Adding symbolic link for Java
        file:
          src: "/opt/{{ java_version }}/bin/java"
          dest: /usr/bin/java
          owner: pi
          group: pi
          state: link
        when: javadir.stat.exists == false
      - name: (INSTALL) Removing Kafka "windows" directory
        file:
          path: "/opt/{{ kafka_version }}/bin/windows"
          state: absent
        when: kafkadir.stat.exists == false
      - name: (BACKUP) Checking if previous config backups already exist
        stat:
          path: "{{ item }}"
        register: backup
        with_items:
          - "{{ zookeper_config }}.original"
          - "{{ kafka_config }}.original"
      - debug:
          var: backup
      - name: (BACKUP) Making backup copies of the zookeper and kafka config files, if never been backed up before
        copy:
          src: "{{ item }}"
          dest: "{{ item }}.original"
          owner: pi
          group: pi
          mode: 0755
          remote_src: yes
        with_items:
          - "{{ zookeper_config }}"
          - "{{ kafka_config }}"
        when: backup.results[0].stat.exists == false
      - name: (BACKUP) Restoring original file to be truly idempotent
        copy:
          src: "{{ item }}.original"
          dest: "{{ item }}"
          remote_src: true
        with_items:
          - "{{ zookeper_config }}"
          - "{{ kafka_config }}"
        when: backup.results[0].stat.exists == true
      - name: (ZOOKEEPER CONFIG) Creating zookeeper work directory
        file:
          path: /var/zookeeper
          owner: pi
          group: pi
          state: directory
          mode: 0755
      - name: (ZOOKEEPER CONFIG) Replacing the default config which sets the zookeeper workdir under var
        lineinfile:
          path: "{{ zookeper_config }}"
          regexp: '^dataDir=.*$'
          line: 'dataDir={{ zookeeper_workdir }}'
      - name: (ZOOKEEPER CONFIG) Adding useful configuration
        lineinfile:
          path: "{{ zookeper_config }}"
          line: "{{ item }}"
          insertafter: EOF
        with_items:
          - "tickTime=2000"
          - "initLimit=10"
          - "syncLimit=5"
      - name: (ZOOKEEPER CONFIG) Adding cluster nodes to the zookeeper configuration
        lineinfile:
          path: "{{ zookeper_config }}"
          line: "server.{{ item[-1] }}={{ item + ':2888:3888' }}"
          insertafter: EOF
        with_items:
          - "{{ groups['pi-cluster'] }}"
      - name: (ZOOKEEPER CONFIG) Removing a previous idFile
        file:
          path: "{{ zookeeper_workdir }}/myid"
          state: absent
      - name: (ZOOKEEPER CONFIG) Creating zookeeper id file
        file:
          path: "{{ zookeeper_workdir }}/myid"
          state: touch
          owner: pi
          group: pi
          mode: 0755
      - name: (ZOOKEEPER CONFIG) Filling id file with respecting id
        lineinfile:
          path: "{{ zookeeper_workdir }}/myid"
          line: "{{ inventory_hostname[-1] }}"
          insertafter: EOF
      - name: (KAFKA CONFIG) Defining the broker ID
        lineinfile:
          path: "{{ kafka_config }}"
          regexp: '^broker.id=.*$'
          line: 'broker.id={{ inventory_hostname[-1] }}'
      - name: (KAFKA CONFIG) Setting the listen address
        lineinfile:
          path: "{{ kafka_config }}"
          regexp: '^#listeners=.*$'
          line: 'listeners=PLAINTEXT://{{ inventory_hostname }}:9092'
      - name: (KAFKA CONFIG) Setting the zookeeper cluster address
        lineinfile:
          path: "{{ kafka_config }}"
          regexp: '^zookeeper.connect=.*$'
          line: 'zookeeper.connect={{ zookeeper_cluster_address }}'
      - name: (STARTUP) Starting ZooKeeper
        shell: "nohup /opt/{{ kafka_version }}/bin/zookeeper-server-start.sh {{ zookeper_config }} &"
        async: 10
        poll: 0
      - name: (STARTUP) Starting Kafka
        shell: "nohup /opt/{{ kafka_version }}/bin/kafka-server-start.sh {{ kafka_config }} &"
        async: 10
        poll: 0

      become: true
      vars:
        installer_dir: "YourPathToTheDownloadedArchives"
        java_version: "jdk1.8.0_162"
        kafka_version: "kafka_2.11-1.0.0"
        java_installer_path: "{{ installer_dir }}/jdk-8u162-linux-arm32-vfp-hflt.tar.gz"
        kafka_installer_path: "{{ installer_dir }}/{{ kafka_version }}.tgz"
        zookeper_config: "/opt/{{ kafka_version }}/config/zookeeper.properties"
        kafka_config: "/opt/{{ kafka_version }}/config/server.properties"
        zookeeper_workdir: "/var/zookeeper"
        zookeeper_cluster_address: "{{ groups['pi-cluster'] | join(':2181,') }}:2181"

Then to run it, use the following command :
ansible-playbook nameOfYourFile.yml --ask-become-pass 
Then you will be prompted the password (that's what the option --ask-become-pass do) for issuing commands as root.

Testing the cluster


Now, it's time to check that everything went smoothly. To do so, I'm going to use the test command lines shipped with the distribution.

First, start a producer from any of the nodes (or even on another machine, as long as Kafka is installed) :
/opt/kafka_2.11-1.0.0/bin/kafka-console-producer.sh \
           --broker-list 192.168.0.231:9092,192.168.0.232:9092,192.168.0.233:9092,192.168.0.234:9092 --topic testopic
This will get you a command prompt in which anything you type will be sent across the cluster.

Then, start as many consumers as you want and just observe what comes up in the terminal. (You can use something like Terminator to handle multi dynamic display, even if I never managed to get it working on my Mac)
/opt/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh \
           --zookeeper 192.168.0.231 --topic testopic from-beginning
Then type something in the producer prompt, and it should be displayed on all consumer terminals.


Troubleshooting


But if something does not work, I strongly suggest that you refer to these commands :
echo dump | nc localhost 2181
/opt/kafka_2.11-1.0.0/bin/zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/1"
Source : https://stackoverflow.com/questions/46158296/kafka-broker-not-available-at-starting

References