How to Transfer Data from MQTT to Kafka

How to Transfer Data from MQTT to Kafka

Open Automation Software can be used to transfer data from MQTT devices to a Kafka cluster, locally or over a network. This guide walks you through downloading and installing OAS, configuring a MQTT data source, a tag, and a Kafka publisher, and finally publishing the tag using the Kafka publisher.

For this guide on how to transfer data from an MQTT source to a Kafka cluster you will need:

  • To install MQTT Explorer or a similar tool for publishing messages to a MQTT broker and subscribing to messages in a MQTT broker
  • Docker installed in Linux or Windows including the docker-compose utility

1 - Download and Install OAS

If you have not already done so, you will need to download and install the OAS platform.

Fully functional trial versions of the software are available for Windows, Windows IoT Core, Linux, Raspberry Pi and Docker on our downloads page.

On Windows, run the downloaded setup.exe file to install the Open Automation Software platform. For a default installation, Agree to the End User License Agreement and then click the Next button on each of the installation steps until it has completed.

If you'd like to customize your installation or learn more, use the following instructions:

The OAS Service Control application will appear when the installation finishes on Windows.

OAS Service Control

Click on each START SERVICE button to start each of the three OAS services.

2 - Configure OAS

Configure OAS is the main application used to configure local and remote OAS instances.

OAS Logo

  1. From your operating system start menu, open the Configure OAS application.

  2. Select the Configure > Tags screen.

    Important

    If this is the first time you have installed OAS, the AdminCreate utility will run when you select a screen in the Configure menu. This will ask you to create a username and password for the admin user. This user will have full permissions in the OAS platform.

    For further information see Getting Started - Security.

  3. If this is the first time you are logging in, you will see the AdminCreate utility. Follow the prompts to set up your admin account. Otherwise, select the Log In menu button and provide the Network Node, username and password.

    Log In Menu

    Log In Dialog

Info

In this guide you will use the Configure OAS application to configure the local Network Node which by default is localhost.

If you have installed OAS on a remote instance you can also connect to the remote instance by setting the relevant IP address or host name in the Network Node field.

3 - Create a Kafka test cluster using Docker

In this step you will create a local Kafka cluster. To simplify the deployment you will need to have Docker installed including the docker-compose utility. This method allows you to create a cluster very quickly for testing purposes and remove it again when you are done.

  1. Create a new folder such as Kafka and inside the folder create a new docker-compose.yml file with the following definition.

    services:
        zookeeper:
            image: confluentinc/cp-zookeeper:latest
            container_name: zookeeper
            ports:
                - "2181:2181"
            environment:
            ZOOKEEPER_CLIENT_PORT: 2181
            ZOOKEEPER_TICK_TIME: 2000
    
        kafka:
            image: confluentinc/cp-kafka:latest
            container_name: kafka
            ports:
                - "9092:9092"
            environment:
            KAFKA_BROKER_ID: 1
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    
    
  2. Open a command line terminal or bash terminal and use the following command to start a new container. This will download the image and all its dependencies and may take a few minutes.

    docker-compose up -d

  3. Create a new topic called temperature using the following command.

    docker exec kafka kafka-topics --create --topic temperature --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1


Info

Once you are done testing, you can stop and remove the containers that you created by using the following command:

docker-compose down

4 - Configure MQTT Data Source Connector

In the following steps you will create and configure a MQTT Connector to connect to a third party broker so that tags will be able to subscribe to a topic. For the purposes of this guide, you will connect to the public HiveMQ broker.

  1. Select Configure > Drivers from the top menu.

    Configure drivers menu

  2. Enter a meaningful Driver Interface Name to give this driver interface instance a unique name.

  3. Ensure the following parameters are configured:

    • Driver: MQTT
    • IP Address: broker.hivemq.com
    • Port: 1883
    • Client ID: OAS_Client_123

    MQTT connector configuration

    Tips

    If you are having trouble reading a value in later steps, try to change the Client ID to something else as each client ID connecting to the broker must be unique.

  4. Click on the ADD DRIVER button on the left hand side to add this driver configuration. Once added, the driver interface name should appear in the list of drivers.

    Add MQTT driver button

5 - Add Tags

In this step you will add two tags. One tag to represent a JSON payload coming from an external data source and a second Tag to represent the sensor value.

  1. Select Configure > Tags from the top menu.

    Configure tags menu

  2. If you want to add a Tag to the root Tags group make sure the Tags node is selected in the tag list and click on the ADD TAG button.

    Add tag button

    If you want to add a Tag to a Tag Group, select the Tag Group first and then click on the ADD TAG button.

    You can also add Tag Groups by using the ADD GROUP button.

  3. Enter TemperatureSensorJSON as the Tag name and click the OK button.

    Add JSON tag to root node dialog

  4. Repeat the above two steps and add another Tag with the name TemperatureSensor.

    Add tag to root node dialog

6 - Assign MQTT as Tag Data Source

You will now set the Tag's data source to the MQTT driver interface that you created previously.

At the end of this section you will need to use a tool like MQTT Explorer to publish a value to your Tag.

  1. Select the Tag that will source data from the MQTT data source.

    Tag

  2. Set the following properties:

    • Data Type: JSON
    • Data Source: MQTT
    • Select Driver Interface: MQTT Data Source
    • Topic: oas/temperature

    MQTT tag configuration

  3. Click on the Apply Changes button to apply the changes.

  4. Using an MQTT testing tool to publish messages, such as MQTT Explorer, publish a JSON message to the oas/temperature topic.

    {
        "temperature": 24.9
    }
    

    MQTT explorer connection

    MQTT explorer connection advanced

    MQTT explorer connection publish

  5. Check that the quality status is Good Quality and you can see the JSON value.

    MQTT tag quality

7 - Parse JSON Payload

In this step you will extract the sensor value from the JSON payload using a Calculation Tag and JSON functions.

  1. Select the Tag that will represent the temperature value.

    Tag

  2. Set the Data Source value to Calculation.

  3. Next to the Calculation field click on the EDIT button.

  4. In the CALCULATION EDITOR window enter the following function.

    JSONQUERY([TemperatureSensorJSON.Value], "$.temperature")
    

    This will extract the temperature field from the TemperatureSensorJSON Tag value.

    Click the OK button to save the calculation.

    JSON calculation

  5. Click on the Apply Changes button to apply the changes.

  6. You should now see the sensor value in the Value field.

Tips

In this guide we used a separate calculation Tag in order to extract the sensor value. This is good practice when you want each Tag to have a single purpose and when you want to use the sensor value for other purposes like alarm thresholds and trending.

There is an alternative method using direct JSON access using the JSON property of a Tag. Instead of creating a separate TemperatureSensor calculation tag, the TemperatureSensorJSON tag can be accessed directly using the following syntax:

TemperatureJSON.JSON-temperature

This avoids having to create separate tags for each JSON property that you want to extract.

For more information on JSON handing, calculation and data access see the following:

8 - Configure Kafka Producer

In the following steps you will create and configure a Kafka Connector connecting to a local Kafka cluster on port 9092. This connector will act as a Producer where Tags are published to a topic in the cluster.

  1. Select Configure > Drivers from the top menu.

    Configure drivers menu

  2. Set the Driver Interface Name to Kafka Producer to give this driver interface instance a unique name.

  3. Ensure the following parameters are configured:

    • Driver: Kafka
    • Bootstrap Servers: localhost:9092
    • Security Protocol: Plaintext
    • Client Id: oas-kafka-producer

    Kafka producer configuration

  4. Click on the ADD DRIVER button on the left hand side to add this driver configuration. Once added, the driver interface name should appear in the list of drivers.

    Add Kafka producer driver button

9 - Publish Selected Tags in Kafka Producer

In this step you will select the Tags that you want to publish to the Kafka cluster.

  1. In the Configure > Drivers screen, select the Kafka driver instance that you created in the previous section (for example Kafka Producer).

  2. Make sure the Publish Selected Tags checkbox is ticked.

    Publish selected tags

  3. In the table at the bottom click on the ADD button.

    Publish selected tags ADD button

  4. Select the Tag you want to add in the left hand panel and then ensure the Value property is selected. By default the name of the property will be the full Tag path (e.g. TemperatureSensor.Value). If you want to set your own property name, you can change the Id field to your own custom value.

    Tag browser

  5. The Tag has now been added to the list. You can add other Tags by repeating steps 3 and 4.

    Tag browser

  6. Click on the Apply Changes button.

10 - Verify Messages are Published to Kafka Cluster

In this step you will confirm that OAS is successfully publishing your selected Tags to the Kafka cluster.

  1. Run the following command in a terminal or command line window to start listening for new messages on the oas_tags topic which is the default topic for the Publish Selected Tags configuration.

    docker exec kafka kafka-console-consumer --topic oas_tags --bootstrap-server localhost:9092
    

    Info

    You may receive a warning message, because the oas_tags topic doesn't exist yet. You can safely ignore this as it will be created automatically.

  2. Any changes your TemperatureSensor tag value should be published within 10 seconds to the terminal window.

    {
        "values": [
            {
                "id": "TemperatureSensor",
                "value": 24.902344,
                "quality": true,
                "timestamp": "2025-05-20T09:23:41.332Z"
            }
        ]
    }
    
  3. To stop listening for new messages press the Ctrl-C shortcut combination on your keyboard.

11 - Save Changes

Once you have successfully configured your OAS instances, make sure you save your configuration.

On each configuration page, click on the Save button.

If this is the first time you are saving the configuration, or if you are changing the name of the configuration file, OAS will ask you if you want to change the default configuration file.

If you select Yes then OAS will make this configuration file the default and if the OAS service is restarted then this file will be loaded on start-up.

If you select No then OAS will still save your configuration file, but it will not be the default file that is loaded on start-up.

Change Default Configuration Files dialog

Important

Each configuration screen has an independent configuration file except for the Tags and Drivers configurations, which share the same configuration file. It is still important to click on the Save button whenever you make any changes.

For more information see: Save and Load Configuration

Info

  • On Windows the configuration files are stored in C:\ProgramData\OpenAutomationSoftware\ConfigFiles.
  • On Linux the configuration files are stored in the ConfigFiles subfolder of the OAS installation path.