How to Transfer Data from MQTT to Kafka
Open Automation Software can be used to transfer data from MQTT devices to a Kafka cluster, locally or over a network. This guide walks you through downloading and installing OAS, configuring a MQTT data source, a tag, and a Kafka publisher, and finally publishing the tag using the Kafka publisher.
For this guide on how to transfer data from an MQTT source to a Kafka cluster you will need:
- To install MQTT Explorer or a similar tool for publishing messages to a MQTT broker and subscribing to messages in a MQTT broker
- Docker installed in Linux or Windows including the docker-compose utility
1 - Download and Install OAS
If you have not already done so, you will need to download and install the OAS platform.
Fully functional trial versions of the software are available for Windows, Windows IoT Core, Linux, Raspberry Pi and Docker on our downloads page.
On Windows, run the downloaded setup.exe file to install the Open Automation Software platform. For a default installation, Agree to the End User License Agreement and then click the Next button on each of the installation steps until it has completed.
If you'd like to customize your installation or learn more, use the following instructions:
The OAS Service Control application will appear when the installation finishes on Windows.
Click on each START SERVICE button to start each of the three OAS services.
2 - Configure OAS
Configure OAS is the main application used to configure local and remote OAS instances.
From your operating system start menu, open the Configure OAS application.
Select the Configure > Tags screen.
Important
If this is the first time you have installed OAS, the AdminCreate utility will run when you select a screen in the Configure menu. This will ask you to create a username and password for the admin user. This user will have full permissions in the OAS platform.
For further information see Getting Started - Security.
If this is the first time you are logging in, you will see the AdminCreate utility. Follow the prompts to set up your admin account. Otherwise, select the Log In menu button and provide the Network Node, username and password.
Info
In this guide you will use the Configure OAS application to configure the local Network Node which by default is localhost.
If you have installed OAS on a remote instance you can also connect to the remote instance by setting the relevant IP address or host name in the Network Node field.
3 - Create a Kafka test cluster using Docker
In this step you will create a local Kafka cluster. To simplify the deployment you will need to have Docker installed including the docker-compose utility. This method allows you to create a cluster very quickly for testing purposes and remove it again when you are done.
Create a new folder such as Kafka and inside the folder create a new
docker-compose.yml
file with the following definition.services: zookeeper: image: confluentinc/cp-zookeeper:latest container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:latest container_name: kafka ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Open a command line terminal or bash terminal and use the following command to start a new container. This will download the image and all its dependencies and may take a few minutes.
docker-compose up -d
Create a new topic called temperature using the following command.
docker exec kafka kafka-topics --create --topic temperature --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Info
Once you are done testing, you can stop and remove the containers that you created by using the following command:
docker-compose down
4 - Configure MQTT Data Source Connector
In the following steps you will create and configure a MQTT Connector to connect to a third party broker so that tags will be able to subscribe to a topic. For the purposes of this guide, you will connect to the public HiveMQ broker.
Select Configure > Drivers from the top menu.
Enter a meaningful Driver Interface Name to give this driver interface instance a unique name.
Ensure the following parameters are configured:
- Driver: MQTT
- IP Address: broker.hivemq.com
- Port: 1883
- Client ID: OAS_Client_123
Tips
If you are having trouble reading a value in later steps, try to change the Client ID to something else as each client ID connecting to the broker must be unique.
Click on the ADD DRIVER button on the left hand side to add this driver configuration. Once added, the driver interface name should appear in the list of drivers.
5 - Add Tags
In this step you will add two tags. One tag to represent a JSON payload coming from an external data source and a second Tag to represent the sensor value.
Select Configure > Tags from the top menu.
If you want to add a Tag to the root Tags group make sure the Tags node is selected in the tag list and click on the ADD TAG button.
If you want to add a Tag to a Tag Group, select the Tag Group first and then click on the ADD TAG button.
You can also add Tag Groups by using the ADD GROUP button.
Enter TemperatureSensorJSON as the Tag name and click the OK button.
Repeat the above two steps and add another Tag with the name TemperatureSensor.
6 - Assign MQTT as Tag Data Source
You will now set the Tag's data source to the MQTT driver interface that you created previously.
At the end of this section you will need to use a tool like MQTT Explorer to publish a value to your Tag.
Select the Tag that will source data from the MQTT data source.
Set the following properties:
- Data Type: JSON
- Data Source: MQTT
- Select Driver Interface: MQTT Data Source
- Topic: oas/temperature
Click on the Apply Changes button to apply the changes.
Using an MQTT testing tool to publish messages, such as MQTT Explorer, publish a JSON message to the oas/temperature topic.
{ "temperature": 24.9 }
Check that the quality status is Good Quality and you can see the JSON value.
7 - Parse JSON Payload
In this step you will extract the sensor value from the JSON payload using a Calculation Tag and JSON functions.
Select the Tag that will represent the temperature value.
Set the Data Source value to Calculation.
Next to the Calculation field click on the EDIT button.
In the CALCULATION EDITOR window enter the following function.
JSONQUERY([TemperatureSensorJSON.Value], "$.temperature")
This will extract the temperature field from the TemperatureSensorJSON Tag value.
Click the OK button to save the calculation.
Click on the Apply Changes button to apply the changes.
You should now see the sensor value in the Value field.
Tips
In this guide we used a separate calculation Tag in order to extract the sensor value. This is good practice when you want each Tag to have a single purpose and when you want to use the sensor value for other purposes like alarm thresholds and trending.
There is an alternative method using direct JSON access using the JSON property of a Tag. Instead of creating a separate TemperatureSensor calculation tag, the TemperatureSensorJSON tag can be accessed directly using the following syntax:
TemperatureJSON.JSON-temperature
This avoids having to create separate tags for each JSON property that you want to extract.
For more information on JSON handing, calculation and data access see the following:
8 - Configure Kafka Producer
In the following steps you will create and configure a Kafka Connector connecting to a local Kafka cluster on port 9092. This connector will act as a Producer where Tags are published to a topic in the cluster.
Select Configure > Drivers from the top menu.
Set the Driver Interface Name to Kafka Producer to give this driver interface instance a unique name.
Ensure the following parameters are configured:
- Driver: Kafka
- Bootstrap Servers: localhost:9092
- Security Protocol: Plaintext
- Client Id: oas-kafka-producer
Click on the ADD DRIVER button on the left hand side to add this driver configuration. Once added, the driver interface name should appear in the list of drivers.
9 - Publish Selected Tags in Kafka Producer
In this step you will select the Tags that you want to publish to the Kafka cluster.
In the Configure > Drivers screen, select the Kafka driver instance that you created in the previous section (for example Kafka Producer).
Make sure the Publish Selected Tags checkbox is ticked.
In the table at the bottom click on the ADD button.
Select the Tag you want to add in the left hand panel and then ensure the Value property is selected. By default the name of the property will be the full Tag path (e.g. TemperatureSensor.Value). If you want to set your own property name, you can change the Id field to your own custom value.
The Tag has now been added to the list. You can add other Tags by repeating steps 3 and 4.
Click on the Apply Changes button.
10 - Verify Messages are Published to Kafka Cluster
In this step you will confirm that OAS is successfully publishing your selected Tags to the Kafka cluster.
Run the following command in a terminal or command line window to start listening for new messages on the oas_tags topic which is the default topic for the Publish Selected Tags configuration.
docker exec kafka kafka-console-consumer --topic oas_tags --bootstrap-server localhost:9092
Info
You may receive a warning message, because the oas_tags topic doesn't exist yet. You can safely ignore this as it will be created automatically.
Any changes your TemperatureSensor tag value should be published within 10 seconds to the terminal window.
{ "values": [ { "id": "TemperatureSensor", "value": 24.902344, "quality": true, "timestamp": "2025-05-20T09:23:41.332Z" } ] }
To stop listening for new messages press the Ctrl-C shortcut combination on your keyboard.
11 - Save Changes
Once you have successfully configured your OAS instances, make sure you save your configuration.
On each configuration page, click on the Save button.
If this is the first time you are saving the configuration, or if you are changing the name of the configuration file, OAS will ask you if you want to change the default configuration file.
If you select Yes then OAS will make this configuration file the default and if the OAS service is restarted then this file will be loaded on start-up.
If you select No then OAS will still save your configuration file, but it will not be the default file that is loaded on start-up.
Important
Each configuration screen has an independent configuration file except for the Tags and Drivers configurations, which share the same configuration file. It is still important to click on the Save button whenever you make any changes.
For more information see: Save and Load Configuration
Info
- On Windows the configuration files are stored in C:\ProgramData\OpenAutomationSoftware\ConfigFiles.
- On Linux the configuration files are stored in the ConfigFiles subfolder of the OAS installation path.