2019-04-28

Server-Sent Events with Quarkus and Kafka



Context


I wanted to play with Quarkus the day it came out. Unfortunately, I had other matters to deal with and today is the first day I can sit and concentrate :)
In this article, I will demonstrate a use case where I want to display on a webpage the events received on a Kafka topic, dynamically.

Configuring the components


Kafka


That part is really easy.
You will need to run Kafka. If you already have a setup, that's perfect : launch Kafka the way you usually do
Else, I strongly encourage you to use the Confluent Platform (the most recent version, at the time of writing, is the 5.2.1).
Then, just type bin/confluent start and Kafka (+ the whole stack around it) will be up in seconds.

Once Kafka is started, we will create a topic named reactive :
./kafka-topics --create --zookeeper localhost:2181 --partitions 3 --replication-factor 1 --topic reactive

Quarkus


In order to be able to run Quarkus, you will have to follow the prerequisites on this page :
https://quarkus.io/guides/getting-started-guide

Quarkus doesn't have to be installed stricto sensu : the whole configuration will be done through the application (POM, Maven plugin and application.properties).

NetBeans


I'm also using the excellent NetBeans 11 (which is now a top-level Apache project !), available at Apache :
https://netbeans.apache.org/download/nb110/nb110.html

But there are a few things to know about NetBeans to be able to run this app smoothly.
Make sure that the Maven version you're using is not the bundled one (3.3.9) : to be compatible with Quarkus, you'll have to run a Maven 3.5.3 or later.
You can define it in Options > Java > Maven > Maven Home

And in order to be able to run Quarkus, you will have to tell NetBeans how to do so.
Right click on your project and select "Properties" then select "Actions" and click "Add Custom" and name it "Quarkus Run"
In "Execute Goals", type : "compile quarkus:dev" and click "OK".



Now, when you want to launch Quarkus, right click on your project and select "Run Maven > Quarkus Run"




Designing the application


One thing I like about Quarkus is that it has been designed by developers who know what they are talking about.
For instance, some very nice improvements :
  • you don't have to ship an empty beans.xml to activate CDI
  • no need to declare an empty class extending javax.ws.rs.core.Application to be able to design JAX-RS WS
  • the time when you had to write an infinite loop to get messages is over

To create a starter project, just follow the instructions provided on the Quarkus "Getting started" page.
mvn io.quarkus:quarkus-maven-plugin:0.14.0:create \
-DprojectGroupId=fr.mbutton.blog                  \ 
-DprojectArtifactId=quarkus-kafka                 \
-DclassName="fr.mbutton.blog.KafkaResource"       \
-Dpath="/consume"    
But some dependencies are missing. Here is the dependency section of my POM.
<dependencies>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-resteasy</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-vertx</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-arc</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>
</dependencies>
The file application.properties is the file where the application is truly configured.
For instance, you will find the log configuration, as well as the Kafka bootstrap properties.
        
smallrye.messaging.source.events.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.source.events.topic=reactive
smallrye.messaging.source.events.bootstrap.servers=localhost:9092
smallrye.messaging.source.events.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.events.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.events.group.id=shipment-service-quarkus

quarkus.log.console.enable=true
quarkus.log.console.format=%d{HH:mm:ss} %-5p [%c{2.}]] (%t) %s%e%n
quarkus.log.console.level=DEBUG
quarkus.log.console.color=false

quarkus.log.category."fr.mbutton.blog".level=DEBUG
And here is the code of the webservice :
package fr.mbutton.blog;

import io.smallrye.reactive.messaging.kafka.KafkaMessage;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalTime;
import java.util.concurrent.CompletionStage;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path("/consume")
public class KafkaResource {

 private OutboundSseEvent.Builder eventBuilder;
 private Sse sse;
 private SseEventSink sseEventSink = null;
 private Logger logger = LoggerFactory.getLogger(this.getClass());

 @Context
 public void setSse (Sse sse) {
  this.sse = sse;
  this.eventBuilder = sse.newEventBuilder();
 }

 @GET
 @Produces(MediaType.SERVER_SENT_EVENTS)
 public void consume (@Context SseEventSink sseEventSink) {
  this.sseEventSink = sseEventSink;
 }

 @Incoming("events")
 public CompletionStage onMessage (KafkaMessage message) throws IOException {
  logger.debug("Message with content \"{}\" has been received.", message.getPayload());
  if (sseEventSink != null) {
   String display = String.valueOf(LocalTime.now().toString()) + " | "  + getHost() + " | " + message.getPayload();
   OutboundSseEvent sseEvent = this.eventBuilder
     .name("message")
     .id(message.getKey())
     .mediaType(MediaType.TEXT_PLAIN_TYPE)
     .data(display)
     .reconnectDelay(3000)
     .comment(message.getPayload())
     .build();
   sseEventSink.send(sseEvent);
  }
  return message.ack();
 }

 private String getHost () {
  try {
   return InetAddress.getLocalHost().getHostName();
  } catch (UnknownHostException ex) {
   logger.error("Problem while trying to get the hostname", ex);
   return "Unknown";
  }
 }
}
The mechanism is fairly simple : when loaded, thanks to the annotation "Incoming", the class binds to the destination "events" whose configuration lies in the "application.properties".
It's very important because that's here where the link between the destination known by the application and the Kafka topic is made.
[•••]
smallrye.messaging.source.events.topic=reactive
[•••]
Then, when a GET request is received on path /consume, it initializes the SseEventSink.
Later, when Kafka messages are received, they are sent onto it.

Sending events


Just use one of the Kafka CLI tools, to send messages :
./kafka-console-producer --broker-list localhost:9092 --topic reactive 
>test
>yet another test
From a consumer point of view you have two ways of observing the incoming messages.
Either you simply query the index.html, which will show you the last received message.


Or you can also directly consume from the webservice (on path /consume), which will stack all the messages.

References


No comments: