Tutorial: Using Kafka with MQTT
In this tutorial we will learn how to connect a Kafka and MQTT cluster by using a third party MQTT connector.
You will need:
- An Instaclustr Account
- An AWS Account
- An MQTT connector
Install Mosquitto Client
We use Eclipse Mosquitto for reading test messages to verify the messages we send via Kafka are being received by our MQTT cluster.
If you are on a MacOS computer run:
1 |
brew install mosquitto |
On Unix run:
1 2 |
sudo apt update sudo apt install mosquitto-clients |
You can also download mosquitto clients here: https://mosquitto.org/download/
Download Third Party Custom Plug-In
We now download the third party custom plug-in we will be using for MQTT.
We used a third party plug-in from lenses.io, but any functional MQTT connector should work.
Once downloaded, extract the archive and copy contents of the plug-in to somewhere accessible such as your desktop.
In the next step we will copy the contents of the plug-in into the S3 bucket
Set up S3 Bucket for Custom Connector
It is now time to set up an S3 bucket which will allow us to configure a new Kafka Connect Cluster using the plug-in we installed in the previous step.
Follow the steps here to configure your S3 bucket:
Once the bucket has been created by following the steps in the above article, drop the “kafka-connect-mqtt” folder into the root directory of the bucket. Wait until the upload has completed successfully.
Create Kafka Cluster
Using the Instaclustr Console, set up a new Kafka cluster with the following properties:
Name: | Kafka_MQTT_Test_Cluster |
Security: | Add IP to cluster firewall allowed addresses |
WARNING: ensure that you set the firewall rules of the kafka cluster so your local machine has access. Later on in the process we will need to use the CLI to create the test topic in our kafka cluster and to do this we will need to have access to the cluster.
Create Test Topic
For Kafka Connect to work, sources and sinks must refer to specific Kafka Topics. Create a new Kafka Topic named “MQTT_Test_Topic”. Follow the instructions to create a kafka topic found here: Kafka Topic Management | Instaclustr.
Create Kafka Connect Cluster
Now create a Kafka Connect cluster using the Instaclustr console with the following properties:
Name: | Kafka_Connect_MQTT_Test_Cluster |
Security: | Add IP to cluster firewall allowed addresses |
Target Kafka Cluster | Instaclustr Managed Kafka Cluster |
Kafka Cluster Name | Kafka_MQTT_Test_Cluster |
VPC Configuration | Kafka Cluster VPC |
Custom Connectors | Use custom connectors is checked |
Custom Connector Configuration | aws.access.key.id=<AWS_ACCESS_KEY_ID>
aws.secret.access.key=<AWS_SECRET_ACCESS_KEY> s3.bucket.name=<S3_BUCKET_NAME> These three properties allow us to reference the S3 bucket which was created in the previous step containing the kafka-connect-mqtt plug-in. <AWS_ACCESS_KEY_ID> and <AWS_SECRET_ACCESS_KEY> correspond to an AWS account which has read and write access to the S3 bucket we set up earlier. The <S3_BUCKET_NAME> corresponds to the name of the S3 bucket containing the MQTT plug-in. These three values should be found under the outputs tab for the custom stack you created earlier |
WARNING: Ensure that you check the “use custom connectors” field and enter the credentials of the bucket you created previously in the “Custom Connector Configuration” box.
Configure the Custom Plug In
Now that we have our clusters, we need to configure our custom MQTT plug-in to listen to the Kafka “MQTT_Test_Topic” and send messages to our MQTT cluster. We do this by using the REST API built into the MQTT Stream Reactor plug-in. Execute the following curl command:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
curl https://<ip-address-kafka-connect-node>:8083/connectors -X POST -H 'Content-Type: application/json' -d '{ "name":"sink", "config":{ "tasks.max":"1", "topics":"MQTT_Test_Topic", "connector.class":"com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector", "connect.mqtt.hosts":"tcp://test.mosquitto.org:1883", "connect.mqtt.clean":"true", "connect.mqtt.timeout":"1000", "connect.mqtt.keep.alive":"1000", "connect.mqtt.service.quality":"1", "connect.mqtt.kcql":"INSERT INTO /<your-prefix-here>/test SELECT * FROM MQTT_Test_Topic", "connect.progress.enabled":"true" } }' -k -u ic_kc_user:<ic-kc-password> |
WARNING: ensure that you set the firewall rules of the kafka connect cluster so your local machine has access. Otherwise when you run this curl command on your machine it won’t be able to reach the kafka connect cluster.
WARNING: for the ic-kc-password which you need to supply at the end of the command, note that this password is not the same as the password you used previously for the kafka cluster. It can be found by going to the Connection Info tab within your Kafka Connect cluster and finding the password.
Test Set-Up
Now to test our set-up we run the mosquitto client in one terminal and a kafka console producer in another terminal window. When we type messages into the kafka console we should see them appear in the mosquitto terminal with a slight delay.
To open the mosquitto client run the following command:
1 |
mosquitto_sub -h test.mosquitto.org -t /<your-prefix-here>/test -q 1 |
To open a console producer follow the steps here:
Now test your set-up by typing some messages into the kafka console.
Congratulations, you have successfully connected a kafka cluster to an MQTT cluster using Kafka Connect and a Custom plug in.