2018-03-18

Simple example of using Avro in Kafka





Kafka has been designed to reach the best performance possible, as it is very well explained in the official documentation.
(If you haven't read it yet, I strongly encourage you to do so).

It has come pretty clear that to stay on the path of performance, some exchange formats were to be excluded, such as XML and JSON, as nicely exposed by Criteo.

I then read about Protobuff, Thrift and Avro. But the article from Confluent about Avro for Kafka Data made the choice clear to me : it was going to be Avro.

So, here I am, sending Avro messages to a Kafka topic.

Setting up the project


For that article, I will use NetBeans and Maven, as well as my Raspberry Pi cluster on which I deployed my Kafka cluster.
Just create a Java Maven project.

Edit the pom.xml so it's like the description below (or get it from my GitHub) :
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" 
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>fr.mbutton</groupId>
    <artifactId>Kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <repositories>
        <repository>
            <id>apache-repo</id>
            <name>Apache Repository</name>
            <url>https://repository.apache.org/content/repositories/releases</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
   
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.2</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
      
    <dependencies>
        <dependency> 
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.0.13</version>
        </dependency>        
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.1</version>
        </dependency>    
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.2</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>
       
</project>

We're using four dependencies : logback-classic (logback for SL4J), kafka-clients + avro, obviously, and kafka-avro-serializer from Confluent.

We're also configuring the Avro build plugin that will generate the Java stubs from the Avro schema.

Manipulating Avro


Schema definition


First of all, you have to define a schema "avsc" which is gonna be your contract (OK, I may have worked a little bit too much with WSDL and XML).
You can also work with dynamic schema (which are only defined in your code), but that's not what I'm interested in : to me, a schema has to be shared / exposed.

In our case, it's going to be a simple message.

{"namespace": "fr.mbutton.avro",
 "type": "record",
 "name": "Message",
 "fields": [
     {"name": "code"    , "type": "string"},
     {"name": "priority", "type": ["int"   , "null"]},
     {"name": "extra"   , "type": ["string", "null"]}
 ]
}
I quote the Avro official documentation :
    At minimum, a record definition must include its type ("type": "record"), a name ("name": "Message") [...]. 
    Fields are defined via an array of objects, each of which defines a name and type, which is another schema object, primitive or complex type
    (record, enum, array, map, union, and fixed).
    unions are a complex type that can be any of the types listed in the array.
    e.g., priority can either be an int or null, essentially making it an optional field.

Then, just build your Maven project and it should generate the Java source for your object. (It reminds me a lot of the dead but yet excellent XMLBeans project, by the way).

Testing the serialization / deserialization


Following the Avro documentation, I wrote a sample Java class, AvroManipulator, to understand how the serialization / deserialization process works.

package fr.mbutton.kafka;

import fr.mbutton.avro.Message;
import java.io.File;
import java.io.IOException;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvroManipulator {

    public static void main(String... args) {

        Logger logger = LoggerFactory.getLogger(AvroManipulator.class);

        Message message1 = new Message("Test1", 1, "This");
        Message message2 = new Message("Test2", 1, "is");
        Message message3 = new Message("Test3", 1, "super");
        Message message4 = new Message("Test4", 1, "dope");

        try {
            File file = new File("messages.avro");

            // Serialization
            DatumWriter datumWriter = new SpecificDatumWriter<>(Message.class);
            try (DataFileWriter dfWriter = new DataFileWriter<>(datumWriter)) {
                dfWriter.create(message1.getSchema(), file);
                dfWriter.append(message1);
                dfWriter.append(message2);
                dfWriter.append(message3);
                dfWriter.append(message4);
            }

            // Deserialization
            DatumReader messageDatumReader = new SpecificDatumReader<>(Message.class);
            DataFileReader dfReader = new DataFileReader<>(file, messageDatumReader);
            dfReader.forEach(readMessage -> logger.info(readMessage.toString()));

        } catch (IOException e) {
            logger.error("Something obviously went wrong !", e);
        }
    }
}
Perfect ! I can now manipulate my Avro objects.

Note : If you take a look at the file messages.avro, which contains the serialized version of your messages, you can see the schema description in JSON, and then your message in a binary format.

Sending Avro object to a Kafka topic


Based on Alexis Seigneurin's article, I tweaked his classes to send my Avro object instead. I just updated the bootstrap servers to reflect my Kafka/Rpi cluster and replaced the original StringSerializer by Confluent Avro serializer / deserializer.
But when you want those Confluent objects, you also need to setup a schema registry, whose definition follows :
    Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving Avro schemas. 
    It stores a versioned history of all schemas, provides multiple compatibility settings and allows evolution of schemas according to the configured
    compatibility setting. It provides serializers that plug into Kafka clients that handle schema storage and retrieval for Kafka messages 
    that are sent in the Avro format.
Source : https://docs.confluent.io/current/schema-registry/docs/intro.html

Then, I configured Confluent platform, updated the schema-registry.properties to have my Kafka cluster configuration instead of localhost ...
listeners=http://0.0.0.0:8081
kafkastore.connection.url=192.168.0.231:2181,192.168.0.232:2181,192.168.0.233:2181,192.168.0.234:2181
kafkastore.topic=_schemas
debug=false
... and simply launched the schema registry.
bin/schema-registry-start etc/schema-registry/schema-registry.properties
(By the way, I never could get it started from the GitHub repo).

Be sure that the bootstrap.servers and schema.registry.url properties are correctly set up for your Java consumer and producer :
props.put("bootstrap.servers"  , "192.168.0.231:9092,192.168.0.232:9092,192.168.0.233:9092,192.168.0.234:9092");
props.put("schema.registry.url", "http://localhost:8081");
And that the serializer / deserializer objects are those from Confluent (io.confluent.kafka.serializers), so that your consumer looks like :
package fr.mbutton.kafka;

import fr.mbutton.avro.Message;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class AvroConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.0.231:9092,192.168.0.232:9092,192.168.0.233:9092,192.168.0.234:9092");
        props.put("group.id", "mygroup");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", "http://localhost:8081");

        try (KafkaConsumer consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Arrays.asList("testopic"));
            
            while (true) {
                ConsumerRecords records = consumer.poll(100);
                for (ConsumerRecord record : records) {
                    System.out.println(record.value());
                }
            }
        }
    }
}
And your producer :
package fr.mbutton.kafka;

import fr.mbutton.avro.Message;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class AvroProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.0.231:9092,192.168.0.232:9092,192.168.0.233:9092,192.168.0.234:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("schema.registry.url", "http://localhost:8081");


        try (KafkaProducer producer = new KafkaProducer<>(props)) {
            for (int i = 0; i < 100; i++) {
                ProducerRecord record = new ProducerRecord<>("testopic", new Message("Message-" + i, 1, "extra"));
                producer.send(record);
            }
        }
    }    
}

Running the project


Simply run both Java files, first the AvroConsumer and then the AvroProducer.

Then, observe the console output.

From the producer:
16:30:24.633 [main] DEBUG i.c.k.s.client.rest.RestService - Sending POST with input 
{"schema":
  "{\"type\":\"record\",
    \"name\":\"Message\",
    \"namespace\":\"fr.mbutton.avro\",
    \"fields\":[
      {\"name\":\"code\",\"type\":\"string\"},
      {\"name\":\"priority\",\"type\":[\"int\",\"null\"]},
      {\"name\":\"extra\",\"type\":[\"string\",\"null\"]}]}"
} to http://localhost:8081/subjects/testopic-value/versions
------------------------------------------------------------------------
BUILD SUCCESS
------------------------------------------------------------------------
From the schema registry:
[2018-03-18 16:30:24,647] INFO 127.0.0.1 - "POST /subjects/testopic-value/versions HTTP/1.1" 200 9  2 (io.confluent.rest-utils.requests:77)
[2018-03-18 16:30:25,100] INFO 127.0.0.1 - "GET /schemas/ids/21 HTTP/1.1" 200 246  4 (io.confluent.rest-utils.requests:77)
From the consumer :
16:30:25.084 [main] DEBUG i.c.k.s.client.rest.RestService - Sending GET with input null to http://localhost:8081/schemas/ids/21
{"code": "Message-0", "priority": 1, "extra": "extra"}
{"code": "Message-1", "priority": 1, "extra": "extra"}
{"code": "Message-2", "priority": 1, "extra": "extra"}
[...]
{"code": "Message-97", "priority": 1, "extra": "extra"}
{"code": "Message-98", "priority": 1, "extra": "extra"}
{"code": "Message-99", "priority": 1, "extra": "extra"}
All the code is available on my GitHub.

References

  1. https://avro.apache.org/docs/1.8.2/gettingstartedjava.html
  2. http://aseigneurin.github.io/2016/03/02/kafka-spark-avro-kafka-101.html
  3. https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
  4. https://github.com/gwenshap/kafka-examples/tree/master/AvroProducerExample