2018-09-09

IoT / SmartHome - Send EnOcean sensor data to Kafka


Lately, like everyone else, I've been reading a lot about IoT and how this could change the world.
Even if it's been a while since I first heard that (1995 was the year when I first heard about the concepts of smart home), this time, I've got a feeling that it will soon be truly available to everyone.

So I read about the different protocols that are available today.
Bluetooth LE, ZigBee, Z-Wave and EnOcean are among the most used / discussed, from my point of view.

I've been seduced by EnOcean, since its technology is indeed well-suited to IoT : very low energy consumption and for most sensors, the ability to work without batteries !
In this article, as depicted in the first picture I will demonstrate a simple scenario : an EnOcean sensor will send data to Jeedom, which will feed a MQTT broker and then a synchronizer will send each received message to a Kafka topic. Then the latest value will be exposed to be displayed in a Grafana dashboard.

Before we start ...


For this configuration, you will need :

Installing Jeedom


Well, there are plenty of ways to do so, so I won't guide you on this one, especially since I've installed it on a existing RPi.
Please refer to the installation guide, from the official documentation (I think the translation has been started but not finished yet ...).
(Note that you can install it on Synology NAS, which can also be interesting)

Configuring the EnOcean sensor


Once Jeedom is installed (as well as the plugins) and the EnOcean USB stick has been plugged in, it's time to bind the sensor.
First of all, we will add a group, which will allow an easier administration. You can think of it as the room your sensor is in.
Go to the menu Tools > Objects and click on Add



Give it the name Room :


Then to add the sensor, go to the menu : Plugins > Automation protocols > EnOcean



Click on Inclusion Mode :



And then choose the automatic mode on the next window :



Click on any button, then your device should show up in the EnOcean plugin homepage.



If you want to see the first physical interactions or if you just want to make sure your device works properly, you can go to the Health section of the EnOcean plugin menu and you will see the current status of your sensor.


Configuring Mosquitto (MQTT Broker)


First, make sure that the MQTT plugin has been correctly installed. If not, please do so.
Normally, when you install the MQTT plugin, Mosquitto is automatically added. If not, install Mosquitto, following the official documentation.
Configure the MQTT plugin by specifying the default configuration (the default password being "jeedom") :



The # sign means the root, or all topics if you prefer. If you don't know how the MQTT protocol works : read that.
Like Kafka, if the message is sent to a non-existing topic, the Mosquitto broker will automatically create it.

Using SoapUI to test a scenario


SoapUI is an excellent tool I've been using for many years now. I mostly use it to test functionalities built upon webservices.
If you are not familiar with it yet, I strongly encourage you to try it.
SmartBear is also behind Swagger, which is part of the excellent Eclipse MicroProfile.

Create a TestSuite and add a Publish using MQTT step :



Configure the MQTT broker and make sure you have the same information you have in the Jeedom MQTT plugin.



Send a sample value to the topic sensor/counter :



If you go to your Room dashboard, in Jeedom, you should see the value reflected on the gauge, as well as the state of your buttons (each one has a on/off status) :



Of course, in our simple case, you may also use the Mosquitto client, which is far simpler.
mosquitto_pub -h localhost -t sensor/counter -m 25
But keep in mind that to test a real-world scenario, with multi steps with correlation, pause, etc, SoapUI is, by far, the best solution to me.

Creating a Jeedom scenario


All right, everything is set up, but for now, we've not used our EnOcean sensor yet !
The wall switch can be used in two configurations : 2-button configuration or 4-button configuration. We will use the first one.
The principle is pretty simple : when I click the upper button, I want the counter to increase and when I click the down button, I want it to decrease.

Since you have sent a message to the topic sensor/counter (whether it was through SoapUI or Mosquitto client), it has already been created.
On the MQTT plugin configuration page, you should then have your topic displayed.
Bind it to the Room object (choose it as the parent).

If you go to the Commands tab, you should see the subtopic.




Tick the checkbox Historiser (not translated !) which will allow you to track the value changes and to display a time series in the Analysis > History menu.
You can also check the current value by clicking Tester.

We will add two actions to that component. One for increasing the value, and the other one to decrease the value.



Name them as you like, as long as it stays meaningful to you.
What's important is the Payload field : in the Up action, it will take the value #[Room][sensor][counter]#+1 which is the current value of the topic (represented by the final #) plus one.
And in the Down section, you can guess :)

Now we need to create scenarios and decide when to execute them.
To do so, go to the Tools > Scenario menu and then click Add.

Create two scenarios, one for the increase, the other one for the decrease.



For a scenario to be properly defined, you need to decide when it's triggered and what it will do.
For the trigger, it will be the click on a EnOcean sensor button. (Button 1 is the one which will be used to the increase action).



For the actions, go to the Scenario pane and click on +Add block button on the right side of the screen.
Chose Action :


Once the action block has been created, we will call one of the action that we previously added on the MQTT component.



Repeat the process for the Down button (whose number is 3) and invoke the Down action.

Coding the MQTT / Kafka synchronizer


I haven't had the time to code a Kafka connector yet, that's why I wrote a very small piece of code.

As usual, I used Maven to create my project. Three dependencies are needed, here's an excerpt of my POM :
    <dependency>
       <groupId>org.eclipse.paho</groupId>
       <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
       <version>1.2.0</version>
    </dependency>
    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>1.1.1</version>
    </dependency>
    <dependency> 
       <groupId>ch.qos.logback</groupId>
       <artifactId>logback-classic</artifactId>
       <version>1.0.13</version>
    </dependency>
And here's the code :
package fr.mbutton.kafka.sync;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MQTT2Kafka {

    private final transient Logger logger = LoggerFactory.getLogger(this.getClass());

    public static String MQTT_BROKER_URL      = "tcp://192.168.0.232:1883";
    public static String MQTT_BROKER_USER     = "jeedom";
    public static String MQTT_BROKER_PASSWORD = "jeedom";

    public static String KAFKA_BOOTSTRAP      = "localhost:9092";
    public static String KAFKA_TOPIC          = "sensor.counter";

    public void synchronize() {
        final Properties props = new Properties();
        props.put("bootstrap.servers", KAFKA_BOOTSTRAP);
        props.put("key.serializer"   , "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer");

        MemoryPersistence persistence = new MemoryPersistence();

        MqttClient mqttClient = null; // not auto-closeable
        try {
            mqttClient = new MqttClient(MQTT_BROKER_URL, "Synchronizer", persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);

            mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                logger.warn("Connection lost ¯\\_(ツ)_/¯ : please restart me !");
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                logger.debug("Message: " + message.toString());
                sendMessageOverKafka(topic, message);
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                logger.debug("Job's done.");
            }

            private void sendMessageOverKafka(String topic, MqttMessage message) {
                    logger.debug("Sending the message to my Kafka cluster.");
                    List<Header> headers = new ArrayList<>();
                    headers.add(new RecordHeader("mqtt-topic", topic.getBytes()));
                    try (KafkaProducer producer = new KafkaProducer<>(props)) {
                        ProducerRecord record = new ProducerRecord<>(KAFKA_TOPIC, 0, "1", message.toString(), headers);
                        producer.send(record);
                    }
                }
            });

            mqttClient.connect(connOpts);
            mqttClient.subscribe("#");

        } catch (MqttException e) {
            logger.error("Little problem while trying to synchronize MQTT gateway with Kafka broker.", e);
        } finally {
            if (mqttClient != null) {
                try {
                    mqttClient.close();
                } catch (MqttException ex) {
                    logger.error("Error while liberating the resources.", ex);
                }
            }
        }
    }

    public static void main(String[] args) {
        MQTT2Kafka bridge = new MQTT2Kafka();
        bridge.synchronize();
    }
}

And that's it.

Trying to build a native image thanks to GraalVM


As this lib seems to be a perfect use case, I tried to use GraalVM, just out of curiosity, to generate a native image.
Unfortunately, I encountered some generation problems that seem unresolvable for now.
On my Maven project, I first copied the dependencies to make the compilation easier :

mvn dependency:copy-dependencies

[INFO] --- maven-dependency-plugin:2.6:copy-dependencies (default-cli) @ KafkaMQTTSynchronizer ---
[INFO] Copying slf4j-api-1.7.25.jar to sources/KafkaMQTTSynchronizer/target/dependency/slf4j-api-1.7.25.jar
[INFO] Copying org.eclipse.paho.client.mqttv3-1.2.0.jar to sources/KafkaMQTTSynchronizer/target/dependency/org.eclipse.paho.client.mqttv3-1.2.0.jar
[INFO] Copying snappy-java-1.1.7.1.jar to sources/KafkaMQTTSynchronizer/target/dependency/snappy-java-1.1.7.1.jar
[INFO] Copying logback-core-1.0.13.jar to sources/KafkaMQTTSynchronizer/target/dependency/logback-core-1.0.13.jar
[INFO] Copying javaee-web-api-7.0.jar to sources/KafkaMQTTSynchronizer/target/dependency/javaee-web-api-7.0.jar
[INFO] Copying lz4-java-1.4.1.jar to sources/KafkaMQTTSynchronizer/target/dependency/lz4-java-1.4.1.jar
[INFO] Copying logback-classic-1.0.13.jar to sources/KafkaMQTTSynchronizer/target/dependency/logback-classic-1.0.13.jar
[INFO] Copying kafka-clients-1.1.1.jar to sources/KafkaMQTTSynchronizer/target/dependency/kafka-clients-1.1.1.jar

Compiled the source :
graalvm-ce-1.0.0-rc5/bin/javac -cp "../../../target/dependency/*" fr/mbutton/kafka/sync/MQTT2Kafka.java

Tried to generate the native image :
graalvm-ce-1.0.0-rc5/bin/native-image -cp ".:../../../target/dependency/*" fr.mbutton.kafka.sync.MQTT2Kafka

Build on Server(pid: 47807, port: 52391)
   classlist:     827.05 ms
       (cap):   1,428.57 ms
       setup:   2,142.86 ms
Detected unnecessary RecomputeFieldValue.ArrayBaseOffset 
  com.oracle.svm.core.jdk.Target_java_nio_DirectByteBuffer.arrayBaseOffset substitution field for java.nio.DirectByteBuffer.arrayBaseOffset. 
  The annotated field can be removed. This ArrayBaseOffset computation can be detected automatically. 
  Use option -H:+UnsafeAutomaticSubstitutionsLogLevel=2 to print all automatically detected substitutions. 
    analysis:   8,346.29 ms
error: unsupported features in 3 methods
Detailed message:
Error: com.oracle.graal.pointsto.constraints.UnsupportedFeatureException: 
   Invoke with MethodHandle argument could not be reduced to at most a single call: 
   java.lang.invoke.LambdaForm$MH.2127087555.invoke_MT(Object, Object, Object)
Trace: 
 at parsing org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:90)
        [...]

I'm disappointed. Maybe next time !

Setting up Kafka


Thanks to the Confluent Suite, starting Kafka for testing can be done in a blink of an eye.
Download Confluent Open Source and start it with the following command :
bin/confluent start
Then create a mirror topic that will receive messages from the MQTT broker.
bin/kafka-topics --zookeeper localhost:2181 --create --partitions 1 --replication-factor 1 --topic sensor.counter
Run the KafkaSync (Java bridge) either via the CLI or via your IDE

Click on your buttons, to see if the changes are correctly reflected in Kafka.
bash-4.4$ bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic sensor.counter
26
27
28
29
28
27
26
25
24
25


Exposing the information to Prometheus



Coding the exporter


To expose the data to Prometheus, you only have to expose text composed of a name and a value.
In our case, what I'm interested in, is to expose the current (or last known) value of a topic.
As Kafka stores the values in a log, I thought it would be pretty simple to take a quick peek. But it's not and the product has clearly not been made to be queryable as a database.
(Even if it's possible to see the current state of your topics in the Kafka Control Center)

I then thought about coding a consumer that would write the change in a database. But it sounded too oversized ...
Same thing about writing in a file ...
I then found a neat in-memory solution.

It's really simple : I used Payara Micro (I love it) to deploy a simple app with a WebListener that launches a thread, in charge of listening for new Kafka messages.
Then on each received message, it updates a static variable, on a JAXRS service, whose value is returned when a GET query is made.

JAX-RS Service :
package fr.mbutton.prometheus;

import javax.ws.rs.GET;
import javax.ws.rs.Path;

@Path("metrics")
public class KafkaCounterExporter {

 static String counterValue = "0";

 @GET
 public String getCounter() {
  return "counter " + counterValue;
 }
}
Can it be simpler ?
And as for the WebListener :
package fr.mbutton.prometheus;

import java.util.Arrays;
import java.util.Properties;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@WebListener
public class KafkaListener implements ServletContextListener {

 private transient Logger logger = LoggerFactory.getLogger(this.getClass());

 @Override
 public void contextInitialized(ServletContextEvent sce) {
  logger.info("Starting Kafka Listener");
  Runnable listener = () -> {
   Properties props = new Properties();
   props.put("bootstrap.servers" , "localhost:9092");
   props.put("group.id"          , "confluent");
   props.put("key.deserializer"  , "org.apache.kafka.common.serialization.StringDeserializer");
   props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

   try (KafkaConsumer consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList("sensor.counter"));

    while (true) {
     ConsumerRecords records = consumer.poll(10);
     for (ConsumerRecord record : records) {
      KafkaCounterExporter.counterValue = record.value();
     }
    }
   }
  };

  Thread t = new Thread(listener);
  t.start();
 }

 @Override
 public void contextDestroyed(ServletContextEvent sce) {
  logger.info("Disconnecting");
 }
}
Then use your favorite IDE (NetBeans for me) to build your project.
Once it's built, run it with Payara Micro with the following line :
java -jar payara-micro-5.183.jar --deploy /<path>/KafkaPrometheusExporter-1.0-SNAPSHOT.war --port 8090

Configuring Prometheus


Now we have a MicroProfile application running, the endpoint is something like http://localhost:8090/KafkaPrometheusExporter-1.0-SNAPSHOT/prometheus/metrics
But by default, you just can't use a random endpoint : it has to be /metrics

There are several ways to fix that. The simplest way is to change the metrics path.
  - job_name: kafka-mqtt
    metrics_path: KafkaPrometheusExporter-1.0-SNAPSHOT/prometheus/metrics
    static_configs:
      - targets: ['localhost:8090']

But if like me, you're using NGinx as a reverse proxy, you may configure it to perform URL rewriting.
        location /metrics {
            proxy_pass   http://localhost:8090/KafkaPrometheusExporter-1.0-SNAPSHOT/prometheus/metrics;
        }
The following URL (http://localhost/metrics) is then available and ready to be scraped.

To be sure, open Prometheus and try to display the counter stat.



Displaying it via Grafana


I won't spend too much time on setting up Grafana, which is not the topic of this post.
Just as a reminder, you'll have to define a DataSource, pointing at your Prometheus.
Then, look for the stat counter that we exposed.
And create a new dashboard, based on a single stat, make it a gauge, configure it more or less like that :


And when you're satisfied, you can live-test it (with the auto-refresh option).



References