I'm currently learning Kafka Streams and I found a very interesting article (in French) from William Conti.
In his article, William shows how to exploit the power of Kafka Streams to get the number of meetups per city, using a KTable.
I first took his repo, adjusted the sources to be compliant with the latest version of Kafka Streams, and then I thought it could be a good candidate to test KSQL, which has been officially certified for production not so long ago and whose presentation by Robin Moffatt, during a Paris Kafka Meetup made me want to play with it.
That's what I'm going to demonstrate in this article.
Start Confluent Platform
First, download Confluent Platform 4.1 to be able to play with KSQL.
Follow the instructions to start it (if you're using defaults, it should be up in seconds).
Feed a Kafka topic with RVSPs
To have the full project, check Williams' GitHub out.
But I will only use the class "Producer".
import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { public static void main (String... args) throws MalformedURLException, IOException { URL url = new URL("http://stream.meetup.com/2/rsvps"); URLConnection connection = url.openConnection(); JsonFactory jsonFactory = new JsonFactory(new ObjectMapper()); JsonParser parser = jsonFactory.createParser(connection.getInputStream()); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , "localhost:9092"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName()); try (KafkaProducerkafkaProducer = new KafkaProducer<>(props)) { while (parser.nextToken() != null) { String record = parser.readValueAsTree().toString(); String updatedRecord = record.replace("\"group\"", "\"rvsp_group\""); ProducerRecord producerRecord = new ProducerRecord<>("meetups", updatedRecord); kafkaProducer.send(producerRecord); } } } }
A bug has been opened on this subject.
Once you've run this class, you can verify that your topic "meetups" is well fed, by running the console consumer.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic meetups
For instance :
{ "venue": { "venue_name": "HSBXL (Hackerspace Brussels)", "lon": 4.328797, "lat": 50.848236, "venue_id": 24593226 }, "visibility": "public", "response": "yes", "guests": 0, "member": { "member_id": 35492902, "photo": "https://secure.meetupstatic.com/photos/member/b/9/9/1/thumb_243587505.jpeg", "member_name": "Jo" }, "rsvp_id": 1724491047, "mtime": 1524412889175, "event": { "event_name": "DIY home-automation v2", "event_id": "249274076", "time": 1524916800000, "event_url": "https://www.meetup.com/hackerspace-Brussels-hsbxl/events/249274076/" }, "rvsp_group": { "group_topics": [{ "urlkey": "opensource", "topic_name": "Open Source" }, { "urlkey": "robotics", "topic_name": "Robotics" }, { "urlkey": "gameprog", "topic_name": "Game Programming" }, { "urlkey": "softwaredev", "topic_name": "Software Development" }, { "urlkey": "electronics", "topic_name": "Electronics" }, { "urlkey": "microcontroller", "topic_name": "Microcontrollers" }, { "urlkey": "web-development", "topic_name": "Web Development" }, { "urlkey": "hacking", "topic_name": "Hacking" }, { "urlkey": "arduino", "topic_name": "Arduino" }, { "urlkey": "computer-programming", "topic_name": "Computer programming" }, { "urlkey": "makerspaces", "topic_name": "Makerspaces" }, { "urlkey": "3d-printing", "topic_name": "3D Printing" }], "group_city": "Brussels", "group_country": "be", "group_id": 20346523, "group_name": "Hackerspace Brussels - HSBXL", "group_lon": 4.33, "group_urlname": "hackerspace-Brussels-hsbxl", "group_lat": 50.83 } }
Time to bring KSQL in the game
My goal is to count the number of meetups per city, as a KTable would allow, thanks to its state store (RocksDB). First of all, launch the KSQL CLI.
To be able to query, we first have to create a stream (in KSQL terminology, not to be mixed up with KStreams).
CREATE STREAM meetups ( \ venue VARCHAR, \ visibility VARCHAR, \ response VARCHAR, \ guests BIGINT, \ member VARCHAR, \ rsvp_id BIGINT, \ mtime BIGINT, \ event VARCHAR, \ rvsp_group VARCHAR \ ) \ WITH (KAFKA_TOPIC='meetups', VALUE_FORMAT='JSON');
But note that you have to specify that our records are in JSON and are in the "meetups" topic.
Then, we create another stream, which extracts only the relevant information, that is to say, cities.
CREATE STREAM meetup_cities AS SELECT EXTRACTJSONFIELD(rvsp_group,'$.group_city') AS city FROM meetups;
SELECT city, COUNT(*) FROM meetup_cities GROUP BY city;
Longmont | 3 Zürich | 16 Calgary | 44 Helsinki | 9 Essex Fells | 1 London | 392 Philadelphia | 35 Brooklyn | 19 Columbus | 17 Las Vegas | 34
ksql> SET 'auto.offset.reset' = 'earliest'; Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
References
- http://blog.ippon.fr/2017/04/18/kafka-streams-101/
- https://docs.confluent.io/4.1.0/control-center/docs/quickstart.html
- https://www.confluent.io/blog/using-ksql-to-analyse-query-and-transform-data-in-kafka (Excellent article by Robin Moffatt, as usual.)
- https://github.com/confluentinc/ksql/blob/0.1.x/docs/syntax-reference.md