Kafka has been designed to reach the best performance possible, as it is very well explained in the official documentation.
(If you haven't read it yet, I strongly encourage you to do so).
It has come pretty clear that to stay on the path of performance, some exchange formats were to be excluded, such as XML and JSON, as nicely exposed by Criteo.
I then read about Protobuff, Thrift and Avro. But the article from Confluent about Avro for Kafka Data made the choice clear to me : it was going to be Avro.
So, here I am, sending Avro messages to a Kafka topic.
Setting up the project
For that article, I will use NetBeans and Maven, as well as my Raspberry Pi cluster on which I deployed my Kafka cluster.
Just create a Java Maven project.
Edit the pom.xml so it's like the description below (or get it from my GitHub) :
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>fr.mbutton</groupId> <artifactId>Kafka</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <repositories> <repository> <id>apache-repo</id> <name>Apache Repository</name> <url>https://repository.apache.org/content/repositories/releases</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> <repository> <id>confluent</id> <url>http://packages.confluent.io/maven/</url> </repository> </repositories> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <build> <plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.2</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.2</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>2.0.0</version> </dependency> </dependencies> </project>
We're also configuring the Avro build plugin that will generate the Java stubs from the Avro schema.
Manipulating Avro
Schema definition
First of all, you have to define a schema "avsc" which is gonna be your contract (OK, I may have worked a little bit too much with WSDL and XML).
You can also work with dynamic schema (which are only defined in your code), but that's not what I'm interested in : to me, a schema has to be shared / exposed.
In our case, it's going to be a simple message.
{"namespace": "fr.mbutton.avro", "type": "record", "name": "Message", "fields": [ {"name": "code" , "type": "string"}, {"name": "priority", "type": ["int" , "null"]}, {"name": "extra" , "type": ["string", "null"]} ] }
At minimum, a record definition must include its type ("type": "record"), a name ("name": "Message") [...]. Fields are defined via an array of objects, each of which defines a name and type, which is another schema object, primitive or complex type (record, enum, array, map, union, and fixed). unions are a complex type that can be any of the types listed in the array. e.g., priority can either be an int or null, essentially making it an optional field.
Then, just build your Maven project and it should generate the Java source for your object. (It reminds me a lot of the dead but yet excellent XMLBeans project, by the way).
Testing the serialization / deserialization
Following the Avro documentation, I wrote a sample Java class, AvroManipulator, to understand how the serialization / deserialization process works.
package fr.mbutton.kafka; import fr.mbutton.avro.Message; import java.io.File; import java.io.IOException; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class AvroManipulator { public static void main(String... args) { Logger logger = LoggerFactory.getLogger(AvroManipulator.class); Message message1 = new Message("Test1", 1, "This"); Message message2 = new Message("Test2", 1, "is"); Message message3 = new Message("Test3", 1, "super"); Message message4 = new Message("Test4", 1, "dope"); try { File file = new File("messages.avro"); // Serialization DatumWriterdatumWriter = new SpecificDatumWriter<>(Message.class); try (DataFileWriter dfWriter = new DataFileWriter<>(datumWriter)) { dfWriter.create(message1.getSchema(), file); dfWriter.append(message1); dfWriter.append(message2); dfWriter.append(message3); dfWriter.append(message4); } // Deserialization DatumReader messageDatumReader = new SpecificDatumReader<>(Message.class); DataFileReader dfReader = new DataFileReader<>(file, messageDatumReader); dfReader.forEach(readMessage -> logger.info(readMessage.toString())); } catch (IOException e) { logger.error("Something obviously went wrong !", e); } } }
Note : If you take a look at the file messages.avro, which contains the serialized version of your messages, you can see the schema description in JSON, and then your message in a binary format.
Sending Avro object to a Kafka topic
Based on Alexis Seigneurin's article, I tweaked his classes to send my Avro object instead. I just updated the bootstrap servers to reflect my Kafka/Rpi cluster and replaced the original StringSerializer by Confluent Avro serializer / deserializer.
But when you want those Confluent objects, you also need to setup a schema registry, whose definition follows :
Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving Avro schemas. It stores a versioned history of all schemas, provides multiple compatibility settings and allows evolution of schemas according to the configured compatibility setting. It provides serializers that plug into Kafka clients that handle schema storage and retrieval for Kafka messages that are sent in the Avro format.Source : https://docs.confluent.io/current/schema-registry/docs/intro.html
Then, I configured Confluent platform, updated the schema-registry.properties to have my Kafka cluster configuration instead of localhost ...
listeners=http://0.0.0.0:8081 kafkastore.connection.url=192.168.0.231:2181,192.168.0.232:2181,192.168.0.233:2181,192.168.0.234:2181 kafkastore.topic=_schemas debug=false
bin/schema-registry-start etc/schema-registry/schema-registry.properties
Be sure that the bootstrap.servers and schema.registry.url properties are correctly set up for your Java consumer and producer :
props.put("bootstrap.servers" , "192.168.0.231:9092,192.168.0.232:9092,192.168.0.233:9092,192.168.0.234:9092"); props.put("schema.registry.url", "http://localhost:8081");
package fr.mbutton.kafka; import fr.mbutton.avro.Message; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class AvroConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.231:9092,192.168.0.232:9092,192.168.0.233:9092,192.168.0.234:9092"); props.put("group.id", "mygroup"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); props.put("schema.registry.url", "http://localhost:8081"); try (KafkaConsumerconsumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList("testopic")); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println(record.value()); } } } } }
package fr.mbutton.kafka; import fr.mbutton.avro.Message; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class AvroProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.231:9092,192.168.0.232:9092,192.168.0.233:9092,192.168.0.234:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("schema.registry.url", "http://localhost:8081"); try (KafkaProducerproducer = new KafkaProducer<>(props)) { for (int i = 0; i < 100; i++) { ProducerRecord record = new ProducerRecord<>("testopic", new Message("Message-" + i, 1, "extra")); producer.send(record); } } } }
Running the project
Simply run both Java files, first the AvroConsumer and then the AvroProducer.
Then, observe the console output.
From the producer:
16:30:24.633 [main] DEBUG i.c.k.s.client.rest.RestService - Sending POST with input {"schema": "{\"type\":\"record\", \"name\":\"Message\", \"namespace\":\"fr.mbutton.avro\", \"fields\":[ {\"name\":\"code\",\"type\":\"string\"}, {\"name\":\"priority\",\"type\":[\"int\",\"null\"]}, {\"name\":\"extra\",\"type\":[\"string\",\"null\"]}]}" } to http://localhost:8081/subjects/testopic-value/versions ------------------------------------------------------------------------ BUILD SUCCESS ------------------------------------------------------------------------
[2018-03-18 16:30:24,647] INFO 127.0.0.1 - "POST /subjects/testopic-value/versions HTTP/1.1" 200 9 2 (io.confluent.rest-utils.requests:77) [2018-03-18 16:30:25,100] INFO 127.0.0.1 - "GET /schemas/ids/21 HTTP/1.1" 200 246 4 (io.confluent.rest-utils.requests:77)
16:30:25.084 [main] DEBUG i.c.k.s.client.rest.RestService - Sending GET with input null to http://localhost:8081/schemas/ids/21 {"code": "Message-0", "priority": 1, "extra": "extra"} {"code": "Message-1", "priority": 1, "extra": "extra"} {"code": "Message-2", "priority": 1, "extra": "extra"} [...] {"code": "Message-97", "priority": 1, "extra": "extra"} {"code": "Message-98", "priority": 1, "extra": "extra"} {"code": "Message-99", "priority": 1, "extra": "extra"}