Context
I wanted to play with Quarkus the day it came out. Unfortunately, I had other matters to deal with and today is the first day I can sit and concentrate :)
In this article, I will demonstrate a use case where I want to display on a webpage the events received on a Kafka topic, dynamically.
Configuring the components
Kafka
That part is really easy.
You will need to run Kafka. If you already have a setup, that's perfect : launch Kafka the way you usually do
Else, I strongly encourage you to use the Confluent Platform (the most recent version, at the time of writing, is the 5.2.1).
Then, just type bin/confluent start and Kafka (+ the whole stack around it) will be up in seconds.
Once Kafka is started, we will create a topic named reactive :
./kafka-topics --create --zookeeper localhost:2181 --partitions 3 --replication-factor 1 --topic reactive
Quarkus
In order to be able to run Quarkus, you will have to follow the prerequisites on this page :
https://quarkus.io/guides/getting-started-guide
Quarkus doesn't have to be installed stricto sensu : the whole configuration will be done through the application (POM, Maven plugin and application.properties).
NetBeans
I'm also using the excellent NetBeans 11 (which is now a top-level Apache project !), available at Apache :
https://netbeans.apache.org/download/nb110/nb110.html
But there are a few things to know about NetBeans to be able to run this app smoothly.
Make sure that the Maven version you're using is not the bundled one (3.3.9) : to be compatible with Quarkus, you'll have to run a Maven 3.5.3 or later.
You can define it in Options > Java > Maven > Maven Home
And in order to be able to run Quarkus, you will have to tell NetBeans how to do so.
Right click on your project and select "Properties" then select "Actions" and click "Add Custom" and name it "Quarkus Run"
In "Execute Goals", type : "compile quarkus:dev" and click "OK".
Now, when you want to launch Quarkus, right click on your project and select "Run Maven > Quarkus Run"
Designing the application
One thing I like about Quarkus is that it has been designed by developers who know what they are talking about.
For instance, some very nice improvements :
- you don't have to ship an empty beans.xml to activate CDI
- no need to declare an empty class extending javax.ws.rs.core.Application to be able to design JAX-RS WS
- the time when you had to write an infinite loop to get messages is over
To create a starter project, just follow the instructions provided on the Quarkus "Getting started" page.
mvn io.quarkus:quarkus-maven-plugin:0.14.0:create \ -DprojectGroupId=fr.mbutton.blog \ -DprojectArtifactId=quarkus-kafka \ -DclassName="fr.mbutton.blog.KafkaResource" \ -Dpath="/consume"
<dependencies> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-vertx</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-arc</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> </dependencies>
For instance, you will find the log configuration, as well as the Kafka bootstrap properties.
smallrye.messaging.source.events.type=io.smallrye.reactive.messaging.kafka.Kafka smallrye.messaging.source.events.topic=reactive smallrye.messaging.source.events.bootstrap.servers=localhost:9092 smallrye.messaging.source.events.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer smallrye.messaging.source.events.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer smallrye.messaging.source.events.group.id=shipment-service-quarkus quarkus.log.console.enable=true quarkus.log.console.format=%d{HH:mm:ss} %-5p [%c{2.}]] (%t) %s%e%n quarkus.log.console.level=DEBUG quarkus.log.console.color=false quarkus.log.category."fr.mbutton.blog".level=DEBUG
package fr.mbutton.blog; import io.smallrye.reactive.messaging.kafka.KafkaMessage; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.time.LocalTime; import java.util.concurrent.CompletionStage; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.sse.OutboundSseEvent; import javax.ws.rs.sse.Sse; import javax.ws.rs.sse.SseEventSink; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Path("/consume") public class KafkaResource { private OutboundSseEvent.Builder eventBuilder; private Sse sse; private SseEventSink sseEventSink = null; private Logger logger = LoggerFactory.getLogger(this.getClass()); @Context public void setSse (Sse sse) { this.sse = sse; this.eventBuilder = sse.newEventBuilder(); } @GET @Produces(MediaType.SERVER_SENT_EVENTS) public void consume (@Context SseEventSink sseEventSink) { this.sseEventSink = sseEventSink; } @Incoming("events") public CompletionStageonMessage (KafkaMessage message) throws IOException { logger.debug("Message with content \"{}\" has been received.", message.getPayload()); if (sseEventSink != null) { String display = String.valueOf(LocalTime.now().toString()) + " | " + getHost() + " | " + message.getPayload(); OutboundSseEvent sseEvent = this.eventBuilder .name("message") .id(message.getKey()) .mediaType(MediaType.TEXT_PLAIN_TYPE) .data(display) .reconnectDelay(3000) .comment(message.getPayload()) .build(); sseEventSink.send(sseEvent); } return message.ack(); } private String getHost () { try { return InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException ex) { logger.error("Problem while trying to get the hostname", ex); return "Unknown"; } } }
It's very important because that's here where the link between the destination known by the application and the Kafka topic is made.
[•••] smallrye.messaging.source.events.topic=reactive [•••]
Later, when Kafka messages are received, they are sent onto it.
Sending events
Just use one of the Kafka CLI tools, to send messages :
./kafka-console-producer --broker-list localhost:9092 --topic reactive >test >yet another test
Either you simply query the index.html, which will show you the last received message.
Or you can also directly consume from the webservice (on path /consume), which will stack all the messages.
References
- https://debezium.io/blog/2019/03/14/debezium-meets-quarkus/ by Jiri Pechanec
- https://smallrye.io/smallrye-reactive-messaging/ by Clément Escoffier
- https://quarkus.io/get-started/
- Sources on GitHub