Lately, like everyone else, I've been reading a lot about IoT and how this could change the world.
Even if it's been a while since I first heard that (1995 was the year when I first heard about the concepts of smart home), this time, I've got a feeling that it will soon be truly available to everyone.
So I read about the different protocols that are available today.
Bluetooth LE, ZigBee, Z-Wave and EnOcean are among the most used / discussed, from my point of view.
I've been seduced by EnOcean, since its technology is indeed well-suited to IoT : very low energy consumption and for most sensors, the ability to work without batteries !
In this article, as depicted in the first picture I will demonstrate a simple scenario : an EnOcean sensor will send data to Jeedom, which will feed a MQTT broker and then a synchronizer will send each received message to a Kafka topic. Then the latest value will be exposed to be displayed in a Grafana dashboard.
Before we start ...
For this configuration, you will need :
- a Raspberry Pi (with everything it needs to run)
- a EnOcean sensor (wall switch from NodOn)
- a EnOcean physical bridge (USB stick)
- Jeedom, MQTT plugin and the EnOcean plugin (6€)
- SoapUI Open Source
- A Java IDE (I'm using Netbeans)
- Kafka and ZooKeeper (I'm using the Confluent Open Source)
- Prometheus
- Grafana
Installing Jeedom
Well, there are plenty of ways to do so, so I won't guide you on this one, especially since I've installed it on a existing RPi.
Please refer to the installation guide, from the official documentation (I think the translation has been started but not finished yet ...).
(Note that you can install it on Synology NAS, which can also be interesting)
Configuring the EnOcean sensor
Once Jeedom is installed (as well as the plugins) and the EnOcean USB stick has been plugged in, it's time to bind the sensor.
First of all, we will add a group, which will allow an easier administration. You can think of it as the room your sensor is in.
Go to the menu Tools > Objects and click on Add
Give it the name Room :
Then to add the sensor, go to the menu : Plugins > Automation protocols > EnOcean
Click on Inclusion Mode :
And then choose the automatic mode on the next window :
Click on any button, then your device should show up in the EnOcean plugin homepage.
If you want to see the first physical interactions or if you just want to make sure your device works properly, you can go to the Health section of the EnOcean plugin menu and you will see the current status of your sensor.
Configuring Mosquitto (MQTT Broker)
First, make sure that the MQTT plugin has been correctly installed. If not, please do so.
Normally, when you install the MQTT plugin, Mosquitto is automatically added. If not, install Mosquitto, following the official documentation.
Configure the MQTT plugin by specifying the default configuration (the default password being "jeedom") :
The # sign means the root, or all topics if you prefer. If you don't know how the MQTT protocol works : read that.
Like Kafka, if the message is sent to a non-existing topic, the Mosquitto broker will automatically create it.
Using SoapUI to test a scenario
SoapUI is an excellent tool I've been using for many years now. I mostly use it to test functionalities built upon webservices.
If you are not familiar with it yet, I strongly encourage you to try it.
SmartBear is also behind Swagger, which is part of the excellent Eclipse MicroProfile.
Create a TestSuite and add a Publish using MQTT step :
Configure the MQTT broker and make sure you have the same information you have in the Jeedom MQTT plugin.
Send a sample value to the topic sensor/counter :
If you go to your Room dashboard, in Jeedom, you should see the value reflected on the gauge, as well as the state of your buttons (each one has a on/off status) :
Of course, in our simple case, you may also use the Mosquitto client, which is far simpler.
mosquitto_pub -h localhost -t sensor/counter -m 25
Creating a Jeedom scenario
All right, everything is set up, but for now, we've not used our EnOcean sensor yet !
The wall switch can be used in two configurations : 2-button configuration or 4-button configuration. We will use the first one.
The principle is pretty simple : when I click the upper button, I want the counter to increase and when I click the down button, I want it to decrease.
Since you have sent a message to the topic sensor/counter (whether it was through SoapUI or Mosquitto client), it has already been created.
On the MQTT plugin configuration page, you should then have your topic displayed.
Bind it to the Room object (choose it as the parent).
If you go to the Commands tab, you should see the subtopic.
Tick the checkbox Historiser (not translated !) which will allow you to track the value changes and to display a time series in the Analysis > History menu.
You can also check the current value by clicking Tester.
We will add two actions to that component. One for increasing the value, and the other one to decrease the value.
Name them as you like, as long as it stays meaningful to you.
What's important is the Payload field : in the Up action, it will take the value #[Room][sensor][counter]#+1 which is the current value of the topic (represented by the final #) plus one.
And in the Down section, you can guess :)
Now we need to create scenarios and decide when to execute them.
To do so, go to the Tools > Scenario menu and then click Add.
Create two scenarios, one for the increase, the other one for the decrease.
For a scenario to be properly defined, you need to decide when it's triggered and what it will do.
For the trigger, it will be the click on a EnOcean sensor button. (Button 1 is the one which will be used to the increase action).
For the actions, go to the Scenario pane and click on +Add block button on the right side of the screen.
Chose Action :
Once the action block has been created, we will call one of the action that we previously added on the MQTT component.
Repeat the process for the Down button (whose number is 3) and invoke the Down action.
Coding the MQTT / Kafka synchronizer
I haven't had the time to code a Kafka connector yet, that's why I wrote a very small piece of code.
As usual, I used Maven to create my project. Three dependencies are needed, here's an excerpt of my POM :
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.0.13</version> </dependency>
package fr.mbutton.kafka.sync; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.logging.Level; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MQTT2Kafka { private final transient Logger logger = LoggerFactory.getLogger(this.getClass()); public static String MQTT_BROKER_URL = "tcp://192.168.0.232:1883"; public static String MQTT_BROKER_USER = "jeedom"; public static String MQTT_BROKER_PASSWORD = "jeedom"; public static String KAFKA_BOOTSTRAP = "localhost:9092"; public static String KAFKA_TOPIC = "sensor.counter"; public void synchronize() { final Properties props = new Properties(); props.put("bootstrap.servers", KAFKA_BOOTSTRAP); props.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer"); MemoryPersistence persistence = new MemoryPersistence(); MqttClient mqttClient = null; // not auto-closeable try { mqttClient = new MqttClient(MQTT_BROKER_URL, "Synchronizer", persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { logger.warn("Connection lost ¯\\_(ツ)_/¯ : please restart me !"); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { logger.debug("Message: " + message.toString()); sendMessageOverKafka(topic, message); } @Override public void deliveryComplete(IMqttDeliveryToken token) { logger.debug("Job's done."); } private void sendMessageOverKafka(String topic, MqttMessage message) { logger.debug("Sending the message to my Kafka cluster."); List<Header> headers = new ArrayList<>(); headers.add(new RecordHeader("mqtt-topic", topic.getBytes())); try (KafkaProducerproducer = new KafkaProducer<>(props)) { ProducerRecord record = new ProducerRecord<>(KAFKA_TOPIC, 0, "1", message.toString(), headers); producer.send(record); } } }); mqttClient.connect(connOpts); mqttClient.subscribe("#"); } catch (MqttException e) { logger.error("Little problem while trying to synchronize MQTT gateway with Kafka broker.", e); } finally { if (mqttClient != null) { try { mqttClient.close(); } catch (MqttException ex) { logger.error("Error while liberating the resources.", ex); } } } } public static void main(String[] args) { MQTT2Kafka bridge = new MQTT2Kafka(); bridge.synchronize(); } }
And that's it.
Trying to build a native image thanks to GraalVM
As this lib seems to be a perfect use case, I tried to use GraalVM, just out of curiosity, to generate a native image.
Unfortunately, I encountered some generation problems that seem unresolvable for now.
On my Maven project, I first copied the dependencies to make the compilation easier :
mvn dependency:copy-dependencies [INFO] --- maven-dependency-plugin:2.6:copy-dependencies (default-cli) @ KafkaMQTTSynchronizer --- [INFO] Copying slf4j-api-1.7.25.jar to sources/KafkaMQTTSynchronizer/target/dependency/slf4j-api-1.7.25.jar [INFO] Copying org.eclipse.paho.client.mqttv3-1.2.0.jar to sources/KafkaMQTTSynchronizer/target/dependency/org.eclipse.paho.client.mqttv3-1.2.0.jar [INFO] Copying snappy-java-1.1.7.1.jar to sources/KafkaMQTTSynchronizer/target/dependency/snappy-java-1.1.7.1.jar [INFO] Copying logback-core-1.0.13.jar to sources/KafkaMQTTSynchronizer/target/dependency/logback-core-1.0.13.jar [INFO] Copying javaee-web-api-7.0.jar to sources/KafkaMQTTSynchronizer/target/dependency/javaee-web-api-7.0.jar [INFO] Copying lz4-java-1.4.1.jar to sources/KafkaMQTTSynchronizer/target/dependency/lz4-java-1.4.1.jar [INFO] Copying logback-classic-1.0.13.jar to sources/KafkaMQTTSynchronizer/target/dependency/logback-classic-1.0.13.jar [INFO] Copying kafka-clients-1.1.1.jar to sources/KafkaMQTTSynchronizer/target/dependency/kafka-clients-1.1.1.jar
Compiled the source :
graalvm-ce-1.0.0-rc5/bin/javac -cp "../../../target/dependency/*" fr/mbutton/kafka/sync/MQTT2Kafka.java
Tried to generate the native image :
graalvm-ce-1.0.0-rc5/bin/native-image -cp ".:../../../target/dependency/*" fr.mbutton.kafka.sync.MQTT2Kafka Build on Server(pid: 47807, port: 52391) classlist: 827.05 ms (cap): 1,428.57 ms setup: 2,142.86 ms Detected unnecessary RecomputeFieldValue.ArrayBaseOffset com.oracle.svm.core.jdk.Target_java_nio_DirectByteBuffer.arrayBaseOffset substitution field for java.nio.DirectByteBuffer.arrayBaseOffset. The annotated field can be removed. This ArrayBaseOffset computation can be detected automatically. Use option -H:+UnsafeAutomaticSubstitutionsLogLevel=2 to print all automatically detected substitutions. analysis: 8,346.29 ms error: unsupported features in 3 methods Detailed message: Error: com.oracle.graal.pointsto.constraints.UnsupportedFeatureException: Invoke with MethodHandle argument could not be reduced to at most a single call: java.lang.invoke.LambdaForm$MH.2127087555.invoke_MT(Object, Object, Object) Trace: at parsing org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:90) [...]
I'm disappointed. Maybe next time !
Setting up Kafka
Thanks to the Confluent Suite, starting Kafka for testing can be done in a blink of an eye.
Download Confluent Open Source and start it with the following command :
bin/confluent start
bin/kafka-topics --zookeeper localhost:2181 --create --partitions 1 --replication-factor 1 --topic sensor.counter
Click on your buttons, to see if the changes are correctly reflected in Kafka.
bash-4.4$ bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic sensor.counter 26 27 28 29 28 27 26 25 24 25
Exposing the information to Prometheus
Coding the exporter
To expose the data to Prometheus, you only have to expose text composed of a name and a value.
In our case, what I'm interested in, is to expose the current (or last known) value of a topic.
As Kafka stores the values in a log, I thought it would be pretty simple to take a quick peek. But it's not and the product has clearly not been made to be queryable as a database.
(Even if it's possible to see the current state of your topics in the Kafka Control Center)
I then thought about coding a consumer that would write the change in a database. But it sounded too oversized ...
Same thing about writing in a file ...
I then found a neat in-memory solution.
It's really simple : I used Payara Micro (I love it) to deploy a simple app with a WebListener that launches a thread, in charge of listening for new Kafka messages.
Then on each received message, it updates a static variable, on a JAXRS service, whose value is returned when a GET query is made.
JAX-RS Service :
package fr.mbutton.prometheus; import javax.ws.rs.GET; import javax.ws.rs.Path; @Path("metrics") public class KafkaCounterExporter { static String counterValue = "0"; @GET public String getCounter() { return "counter " + counterValue; } }
And as for the WebListener :
package fr.mbutton.prometheus; import java.util.Arrays; import java.util.Properties; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import javax.servlet.annotation.WebListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @WebListener public class KafkaListener implements ServletContextListener { private transient Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public void contextInitialized(ServletContextEvent sce) { logger.info("Starting Kafka Listener"); Runnable listener = () -> { Properties props = new Properties(); props.put("bootstrap.servers" , "localhost:9092"); props.put("group.id" , "confluent"); props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); try (KafkaConsumerconsumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList("sensor.counter")); while (true) { ConsumerRecords records = consumer.poll(10); for (ConsumerRecord record : records) { KafkaCounterExporter.counterValue = record.value(); } } } }; Thread t = new Thread(listener); t.start(); } @Override public void contextDestroyed(ServletContextEvent sce) { logger.info("Disconnecting"); } }
Once it's built, run it with Payara Micro with the following line :
java -jar payara-micro-5.183.jar --deploy /<path>/KafkaPrometheusExporter-1.0-SNAPSHOT.war --port 8090
Configuring Prometheus
Now we have a MicroProfile application running, the endpoint is something like http://localhost:8090/KafkaPrometheusExporter-1.0-SNAPSHOT/prometheus/metrics
But by default, you just can't use a random endpoint : it has to be /metrics
There are several ways to fix that. The simplest way is to change the metrics path.
- job_name: kafka-mqtt metrics_path: KafkaPrometheusExporter-1.0-SNAPSHOT/prometheus/metrics static_configs: - targets: ['localhost:8090']
But if like me, you're using NGinx as a reverse proxy, you may configure it to perform URL rewriting.
location /metrics { proxy_pass http://localhost:8090/KafkaPrometheusExporter-1.0-SNAPSHOT/prometheus/metrics; }
To be sure, open Prometheus and try to display the counter stat.
Displaying it via Grafana
I won't spend too much time on setting up Grafana, which is not the topic of this post.
Just as a reminder, you'll have to define a DataSource, pointing at your Prometheus.
Then, look for the stat counter that we exposed.
And create a new dashboard, based on a single stat, make it a gauge, configure it more or less like that :
And when you're satisfied, you can live-test it (with the auto-refresh option).
No comments:
Post a Comment