2018-04-22

KSQL, a step beyond Kafka Streams



I'm currently learning Kafka Streams and I found a very interesting article (in French) from William Conti.

In his article, William shows how to exploit the power of Kafka Streams to get the number of meetups per city, using a KTable.

I first took his repo, adjusted the sources to be compliant with the latest version of Kafka Streams, and then I thought it could be a good candidate to test KSQL, which has been officially certified for production not so long ago and whose presentation by Robin Moffatt, during a Paris Kafka Meetup made me want to play with it.

That's what I'm going to demonstrate in this article.

Start Confluent Platform


First, download Confluent Platform 4.1 to be able to play with KSQL.
Follow the instructions to start it (if you're using defaults, it should be up in seconds).

Feed a Kafka topic with RVSPs


To have the full project, check Williams' GitHub out.
But I will only use the class "Producer".
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer {

  public static void main (String... args) throws MalformedURLException, IOException {

    URL url = new URL("http://stream.meetup.com/2/rsvps");
    URLConnection connection = url.openConnection();

    JsonFactory jsonFactory = new JsonFactory(new ObjectMapper());
    JsonParser parser = jsonFactory.createParser(connection.getInputStream());

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG     , "localhost:9092");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG  , StringSerializer.class.getName());

    try (KafkaProducer kafkaProducer = new KafkaProducer<>(props)) {
      while (parser.nextToken() != null) {
        String record = parser.readValueAsTree().toString();
        String updatedRecord = record.replace("\"group\"", "\"rvsp_group\"");
        ProducerRecord producerRecord = new ProducerRecord<>("meetups", updatedRecord);
        kafkaProducer.send(producerRecord);
      }
    }
  }
}
Note the instruction in bold : I had to replace the "group" element as it is a keyword in KSQL (as it is in SQL).
A bug has been opened on this subject.

Once you've run this class, you can verify that your topic "meetups" is well fed, by running the console consumer.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic meetups
You should have some RVSPs popping.
For instance :
{
  "venue": {
    "venue_name": "HSBXL (Hackerspace Brussels)",
    "lon": 4.328797,
    "lat": 50.848236,
    "venue_id": 24593226
  },
  "visibility": "public",
  "response": "yes",
  "guests": 0,
  "member": {
    "member_id": 35492902,
    "photo": "https://secure.meetupstatic.com/photos/member/b/9/9/1/thumb_243587505.jpeg",
    "member_name": "Jo"
  },
  "rsvp_id": 1724491047,
  "mtime": 1524412889175,
  "event": {
    "event_name": "DIY home-automation v2",
    "event_id": "249274076",
    "time": 1524916800000,
    "event_url": "https://www.meetup.com/hackerspace-Brussels-hsbxl/events/249274076/"
  },
  "rvsp_group": {
    "group_topics": [{
      "urlkey": "opensource",
      "topic_name": "Open Source"
    }, {
      "urlkey": "robotics",
      "topic_name": "Robotics"
    }, {
      "urlkey": "gameprog",
      "topic_name": "Game Programming"
    }, {
      "urlkey": "softwaredev",
      "topic_name": "Software Development"
    }, {
      "urlkey": "electronics",
      "topic_name": "Electronics"
    }, {
      "urlkey": "microcontroller",
      "topic_name": "Microcontrollers"
    }, {
      "urlkey": "web-development",
      "topic_name": "Web Development"
    }, {
      "urlkey": "hacking",
      "topic_name": "Hacking"
    }, {
      "urlkey": "arduino",
      "topic_name": "Arduino"
    }, {
      "urlkey": "computer-programming",
      "topic_name": "Computer programming"
    }, {
      "urlkey": "makerspaces",
      "topic_name": "Makerspaces"
    }, {
      "urlkey": "3d-printing",
      "topic_name": "3D Printing"
    }],
    "group_city": "Brussels",
    "group_country": "be",
    "group_id": 20346523,
    "group_name": "Hackerspace Brussels - HSBXL",
    "group_lon": 4.33,
    "group_urlname": "hackerspace-Brussels-hsbxl",
    "group_lat": 50.83
  }
}
We can see that the "group" has been successfully renamed to "rvsp_group".

Time to bring KSQL in the game


My goal is to count the number of meetups per city, as a KTable would allow, thanks to its state store (RocksDB).
First of all, launch the KSQL CLI.

To be able to query, we first have to create a stream (in KSQL terminology, not to be mixed up with KStreams).
CREATE STREAM meetups ( \
                         venue      VARCHAR, \
                         visibility VARCHAR, \
                         response   VARCHAR, \
                         guests     BIGINT,  \
                         member     VARCHAR, \
                         rsvp_id    BIGINT,  \
                         mtime      BIGINT,  \
                         event      VARCHAR, \
                         rvsp_group VARCHAR  \
                      ) \
WITH (KAFKA_TOPIC='meetups', VALUE_FORMAT='JSON');
I guess it's pretty self-explanatory : simply the definition of our first-level elements.
But note that you have to specify that our records are in JSON and are in the "meetups" topic.

Then, we create another stream, which extracts only the relevant information, that is to say, cities.
CREATE STREAM meetup_cities AS SELECT EXTRACTJSONFIELD(rvsp_group,'$.group_city') AS city FROM meetups;
Now we can perform some aggregation ...
SELECT city, COUNT(*) FROM meetup_cities GROUP BY city;
And here is the result I was hoping for :)
Longmont | 3
Zürich | 16
Calgary | 44
Helsinki | 9
Essex Fells | 1
London | 392
Philadelphia | 35
Brooklyn | 19
Columbus | 17
Las Vegas | 34
Note : if you want the first records to be taken into account, just update the following parameter and re-run the query.
ksql> SET 'auto.offset.reset' = 'earliest';  
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

References