After installing Docker and Kubernetes on my RPi cluster, I wanted to go on, with Kafka.
Prerequisites
First of all, I assume that you have a RPi cluster already configured to work with Ansible. If not, please refer to my previous article on the subject.
I could have refer to online archives, but as you need Java 8 and as Oracles forces you to acknowledge their license, I decided to start with archives previously downloaded.
You need : You will then have to fill the var section in the playbook with the values matching your environment.
Ansible Playbook
The goal of this playbook is to provide a way to learn about using Kafka in a cluster, without having to cope with the installation. Therefore, you can decide whether it suits your needs or not.
And if it's the case, you still can learn about the installation process by simply reading the playbook.
The playbook is here for information, but if it were to be modified, it would be on my github.
Zookeeper and Kafka cluster configuration
One thing to have in mind when reading the Ansible playbook is that my nodes have fixed IPs that are following each other.
[pi-cluster] 192.168.0.23[1:4] [masters] 192.168.0.231 [slaves] 192.168.0.232 192.168.0.233 192.168.0.234
This playbook is not that hard, but there is a little tricky part : the configuration of the ZooKeeper and Kafka clusters : I had to compose the cluster configuration from the information available through the Ansible inventory (= Ansible hosts).
For instance, the zookeeper cluster config looks like :
server.1=192.168.0.231:2888:3888 server.2=192.168.0.232:2888:3888 server.3=192.168.0.233:2888:3888 server.4=192.168.0.234:2888:3888
- name: (ZOOKEEPER CONFIG) Adding cluster nodes to the zookeeper configuration lineinfile: path: "{{ zookeper_config }}" line: "server.{{ item[-1] }}={{ item + ':2888:3888' }}" insertafter: EOF with_items: - "{{ groups['pi-cluster'] }}"
192.168.0.231:2181,192.168.0.232:2181,192.168.0.233:2181,192.168.0.234:2181
"{{ groups['pi-cluster'] | join(':2181,') }}:2181"
Idempotent, yes but how ?
One of Ansible great strengths is the fact that most of its command are idempotent. Yet, in certain cases, I did not use lineinfile because I wanted to keep the original files intact to be able to start the configuration all over again without having to go to the process of copying archives and installing them from scratch. Maybe there's a better way to do it.
If so, leave a comment or better, open a PR !
The playbook
--- - name: Set Kafka up on the RPi cluster hosts: pi-cluster remote_user: pi tasks: - block: - name: (INSTALL) Checking if Java is already installed stat: path: "/opt/{{ java_version }}" register: javadir - name: (INSTALL) Checking if Kafka is already installed stat: path: "/opt/{{ kafka_version }}" register: kafkadir - name: (INSTALL) Unarchiving Java and Kafka unarchive: src: "{{ item }}" dest: /opt owner: pi group: pi mode: 0755 with_items: - "{{ java_installer_path }}" - "{{ kafka_installer_path }}" when: javadir.stat.exists == false or kafkadir.stat.exists == false - name: (INSTALL) Fixing permissions for Java (unarchive user/group modification does not work with that one) file: path: /opt/{{ java_version }} owner: pi group: pi mode: 0755 recurse: yes when: javadir.stat.exists == false - name: (INSTALL) Adding symbolic link for Java file: src: "/opt/{{ java_version }}/bin/java" dest: /usr/bin/java owner: pi group: pi state: link when: javadir.stat.exists == false - name: (INSTALL) Removing Kafka "windows" directory file: path: "/opt/{{ kafka_version }}/bin/windows" state: absent when: kafkadir.stat.exists == false - name: (BACKUP) Checking if previous config backups already exist stat: path: "{{ item }}" register: backup with_items: - "{{ zookeper_config }}.original" - "{{ kafka_config }}.original" - debug: var: backup - name: (BACKUP) Making backup copies of the zookeper and kafka config files, if never been backed up before copy: src: "{{ item }}" dest: "{{ item }}.original" owner: pi group: pi mode: 0755 remote_src: yes with_items: - "{{ zookeper_config }}" - "{{ kafka_config }}" when: backup.results[0].stat.exists == false - name: (BACKUP) Restoring original file to be truly idempotent copy: src: "{{ item }}.original" dest: "{{ item }}" remote_src: true with_items: - "{{ zookeper_config }}" - "{{ kafka_config }}" when: backup.results[0].stat.exists == true - name: (ZOOKEEPER CONFIG) Creating zookeeper work directory file: path: /var/zookeeper owner: pi group: pi state: directory mode: 0755 - name: (ZOOKEEPER CONFIG) Replacing the default config which sets the zookeeper workdir under var lineinfile: path: "{{ zookeper_config }}" regexp: '^dataDir=.*$' line: 'dataDir={{ zookeeper_workdir }}' - name: (ZOOKEEPER CONFIG) Adding useful configuration lineinfile: path: "{{ zookeper_config }}" line: "{{ item }}" insertafter: EOF with_items: - "tickTime=2000" - "initLimit=10" - "syncLimit=5" - name: (ZOOKEEPER CONFIG) Adding cluster nodes to the zookeeper configuration lineinfile: path: "{{ zookeper_config }}" line: "server.{{ item[-1] }}={{ item + ':2888:3888' }}" insertafter: EOF with_items: - "{{ groups['pi-cluster'] }}" - name: (ZOOKEEPER CONFIG) Removing a previous idFile file: path: "{{ zookeeper_workdir }}/myid" state: absent - name: (ZOOKEEPER CONFIG) Creating zookeeper id file file: path: "{{ zookeeper_workdir }}/myid" state: touch owner: pi group: pi mode: 0755 - name: (ZOOKEEPER CONFIG) Filling id file with respecting id lineinfile: path: "{{ zookeeper_workdir }}/myid" line: "{{ inventory_hostname[-1] }}" insertafter: EOF - name: (KAFKA CONFIG) Defining the broker ID lineinfile: path: "{{ kafka_config }}" regexp: '^broker.id=.*$' line: 'broker.id={{ inventory_hostname[-1] }}' - name: (KAFKA CONFIG) Setting the listen address lineinfile: path: "{{ kafka_config }}" regexp: '^#listeners=.*$' line: 'listeners=PLAINTEXT://{{ inventory_hostname }}:9092' - name: (KAFKA CONFIG) Setting the zookeeper cluster address lineinfile: path: "{{ kafka_config }}" regexp: '^zookeeper.connect=.*$' line: 'zookeeper.connect={{ zookeeper_cluster_address }}' - name: (STARTUP) Starting ZooKeeper shell: "nohup /opt/{{ kafka_version }}/bin/zookeeper-server-start.sh {{ zookeper_config }} &" async: 10 poll: 0 - name: (STARTUP) Starting Kafka shell: "nohup /opt/{{ kafka_version }}/bin/kafka-server-start.sh {{ kafka_config }} &" async: 10 poll: 0 become: true vars: installer_dir: "YourPathToTheDownloadedArchives" java_version: "jdk1.8.0_162" kafka_version: "kafka_2.11-1.0.0" java_installer_path: "{{ installer_dir }}/jdk-8u162-linux-arm32-vfp-hflt.tar.gz" kafka_installer_path: "{{ installer_dir }}/{{ kafka_version }}.tgz" zookeper_config: "/opt/{{ kafka_version }}/config/zookeeper.properties" kafka_config: "/opt/{{ kafka_version }}/config/server.properties" zookeeper_workdir: "/var/zookeeper" zookeeper_cluster_address: "{{ groups['pi-cluster'] | join(':2181,') }}:2181"
ansible-playbook nameOfYourFile.yml --ask-become-pass
Testing the cluster
Now, it's time to check that everything went smoothly. To do so, I'm going to use the test command lines shipped with the distribution.
First, start a producer from any of the nodes (or even on another machine, as long as Kafka is installed) :
/opt/kafka_2.11-1.0.0/bin/kafka-console-producer.sh \ --broker-list 192.168.0.231:9092,192.168.0.232:9092,192.168.0.233:9092,192.168.0.234:9092 --topic testopic
Then, start as many consumers as you want and just observe what comes up in the terminal. (You can use something like Terminator to handle multi dynamic display, even if I never managed to get it working on my Mac)
/opt/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh \ --zookeeper 192.168.0.231 --topic testopic from-beginning
Troubleshooting
But if something does not work, I strongly suggest that you refer to these commands :
echo dump | nc localhost 2181 /opt/kafka_2.11-1.0.0/bin/zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/1"