1. Home
  2. /
  3. Docs
  4. /
  5. Documentation
  6. /
  7. User Guide
  8. /
  9. MQTT Integration

MQTT Integration

MQTT Integration allows to connect to external MQTT brokers, subscribe to data streams from those brokers and convert any type of payload from your devices to NexAI message format. Its typical use is whenever your devices are already connected to external MQTT broker or any other IoT platform or connectivity provider with MQTT based back-end.

Please review the integration diagram to learn more.

NexAI MQTT Integration acts as an MQTT client. It subscribes to topics and converts the data into telemetry and attribute updates. In case of downlink message, MQTT integration converts it to the device-suitable format and pushes to external MQTT broker. Pay attention: MQTT broker should be either co-located with NexAI instance or deployed in the cloud and have a valid DNS name or static IP address. NexAI instance that is running in the cloud can’t connect to the MQTT broker deployed in local area network.

MQTT Integration Configuration

In this tutorial, we will configure MQTT Integration to provide devices connection to the Platform and ability to send RPC commands to devices.

Prerequisites

In this tutorial, we will use:

  • The instance of NexAI
  • MQTT broker, accessible by NexAI instance — broker.hivemq.com (port 1883);
  • mosquitto_pub and mosquitto_sub MQTT clients to send and receive messages;
  • an advanced device simulator for RPC simulation example.

Let’s assume that we have a sensor which is sending current temperature readings. Our sensor device SN-001 publishes it’s temperature readings to ‘mqtt-integration-tutorial/sensors/SN-001/temperature’ and it is subscribed to ‘mqtt-integration-tutorial/sensors/SN-001/rx’ to receive RPC calls.

NexAI setup

Before setting up an MQTT integration, you need to create uplink and downlink converters.

Uplink converter is a script for parsing and transforming the data received by MQTT integration.

Downlink converter parses and transforms the data sent from NexAI to the format that is consumed by existing device(s).

The purpose of the decoder function is to parse the incoming data and metadata to a format that NexAI can consume. deviceName and deviceType are required, while attributes and telemetry are optional. Attributes and telemetry are flat key-value objects. Nested objects are not supported.

To create an uplink converter go to the Integrations center -> Data converters page and click “plus” button. Name it “MQTT Uplink Converter” and select type Uplink. Use debug mode for now.

NOTE:
Although the Debug mode is very useful for development and troubleshooting, leaving it enabled in production mode may tremendously increase the disk space, used by the database, because all the debugging data is stored there. It is highly recommended to turn the Debug mode off when done debugging.

One can use either NAEL (NexAI expression language) or JavaScript to develop user defined functions. We recommend utilizing NAEL as it’s execution in NexAI is much more efficient compared to JS.

/** Decoder **/ // decode payload to string var payloadStr = decodeToString(payload); var data = JSON.parse(payloadStr); var deviceName = metadata.topic.split("/")[3]; // decode payload to JSON var deviceType = 'sensor'; // Result object with device attributes/telemetry data var result = { deviceName: deviceName, deviceType: deviceType, attributes: { integrationName: metadata['integrationName'], }, telemetry: { temperature: data.value, } }; /** Helper functions 'decodeToString' and 'decodeToJson' are already built-in **/ return result;

The Downlink converter transforming outgoing RPC message and then the Integration sends it to external MQTT broker.

NOTE:
Even if you won’t send downlink RPC, you still need to create a dummy Downlink converter.

Create another converter with the name “MQTT Downlink Converter” and type Downlink. Leave the default script and click Add.

// Encode downlink data from incoming Rule Engine message // msg - JSON message payload downlink message json // msgType - type of message, for ex. 'ATTRIBUTES_UPDATED', 'POST_TELEMETRY_REQUEST', etc. // metadata - list of key-value pairs with additional data about the message // integrationMetadata - list of key-value pairs with additional data defined in Integration executing this converter /** Encoder **/ var data = {}; // Process data from incoming message and metadata data.tempFreq = msg.temperatureUploadFrequency; data.humFreq = msg.humidityUploadFrequency; data.devSerialNumber = metadata['ss_serialNumber']; // Result object with encoded downlink payload var result = { // downlink data content type: JSON, TEXT or BINARY (base64 format) contentType: "JSON", // downlink data data: JSON.stringify(data), // Optional metadata object presented in key/value format metadata: { topic: metadata['deviceType']+'/'+metadata['deviceName']+'/upload' } }; return result;

MQTT Integration Setup

  • Go to the Integrations center -> Integrations page and click “plus” icon to add a new integration. Name it “MQTT Integration”, select type MQTT;
  • The next steps is to add the recently created uplink and downlink converters;
  • Specify hostbroker.hivemq.com and port1883 at the connection step;
  • Add a topic filter:
mqtt-integration-tutorial/sensors/+/temperature
  • You can also select an MQTT QoS level. We use MQTT QoS level 0 (At most once) by default;
  • Go to the advanced settings. It is better to uncheck the Clean session parameter. Many brokers do not support sticky sessions, so will silently close the connection if you try to connect with this option enabled;
  • Let’s leave the Downlink topic pattern by default, meaning that the Integration will take the metadata.topic and use it as the downlink topic;
  • [Optional] Click on Check connection button to check connection to your Service Bus topic. Click Add button to create the integration.

Now let’s simulate the device sending a temperature reading to the integration.

Use terminal for will send a message with temperature readings in a simple format: {"value":25.1}:

mosquitto_pub -h broker.hivemq.com -p 1883 -t "mqtt-integration-tutorial/sensors/SN-001/temperature" -m '{"value":25.1}'

After you sent uplink message a new device was created. You should receive a notification about it. To view notification click on the bell icon in the upper right corner of the screen. The notification will contain link to created device. Go to this device.

Here you will see information about the new device. As well as the telemetry which we sent to the device.

Go back to your Integrations page and navigate to the Events tab. There you’ll see the message consumed by the MQTT Integration.

On Events tab of your MQTT Uplink Converter there will be “In”, “Out”, and “Metadata” columns. The “In” and “Metadata” are the input for the data converter, and “Out” is the result.


Summary: the Uplink Data Converter defines provisioning of device and interpreting the input data. In our example we capture the device name from the topic (SN-001), set the default device type (sensor) and populate a telemetry value to it. In more complex cases you can write a script that will take this data from any part of data or metadata.

Send One-Way RPC to Device

This section describes how to send a one-way RPC request to the device using Control Widgets.

  • Go to the Dashboards page and create a new dashboard named MQTT RPC. Open the dashboard add an alias by clicking on Entity aliases icon;
  • Name the alias (Sensor, for example), select filter type Single Entity, type Device and choose our SN-001 sensor. Press Add and then Save;
  • Now add a new widget, select the Control Widgets bundle from drop down menu and select the Knob Control widget;
  • On the Data field select created alias (Sensor). Set Number of digits after floating point to 0;
  • Go to Advanced tab and set Minimum value to 15 and Maximum value to 45. Leave the rest by default. Click Add to create widget;
  • Save changes.

Now go to the Rule Chains page and open Root Rule Chain. Double-click on message type switch node and enable the Debug mode on it.

Now go back to your dashboard and turn knob a couple of times.

In the message type switch node on the Events tab you should then see incoming messages with the message type RPC_CALL_FROM_SERVER_TO_DEVICE and relation type RPC Request to Device. You can check out what data and metadata was sent by the Knob Control to the Rule Engine.

To make sensor performing this command you need RPC Request to Device type messages to be forwarded to the downlink data converter. In the Root Rule Chain editor find the integration downlink node, drag and drop it to the rule chain. Name it MQTT Integration Downlink, select our MQTT Integration and click Add. Drag a connection from Message Type Switch node to MQTT Integration Downlink node with label “RPC Request to Device” and click add.

Go to the Data converters page, open your “MQTT Downlink Converter” and replace the default script with this one:

/** Encoder **/ var value = parseInt(msg.params.replaceAll("[\"]","")); var data = {value: value}; // Result object with encoded downlink payload var result = { // downlink data content type: JSON, TEXT or BINARY (base64 format) contentType: "JSON", // downlink data data: JSON.stringify(data), // Optional metadata object presented in key/value format metadata: { topic: 'mqtt-integration-tutorial/sensors/'+metadata['deviceName']+'/rx' } }; return result;

The script above removes quotes from msg.params value, which comes as quoted string, and parses it to integer. Then it builds a result object which is passed to the Integration. The result object structure should be followed: the data (the message payload sent to the external MQTT broker as-is) and the metadata (is used by Integrationin). As mentioned: the Integration downlink topic is configured to ${topic}, which means that the integration will take the metadata.topic and use it as the downlink topic.

Open the terminal window and execute the following command:

mosquitto_sub -h broker.hivemq.com -p 1883 -t "mqtt-integration-tutorial/sensors/+/rx"

Go to the dashboard and turn the wheel again. In your terminal window you should receive series of the incoming messages sent by thew knob control widget:


{"value":33}
{"value":42}

Simulating of Two-Way RPC

Now try to simulate sending an RPC request to the device with recieving a response.

First you should modify converters to send downlink messages to ‘mqtt-integration-tutorial/sensors/+/rx/twoway’ topic and receive device responses on mqtt-integration-tutorial/sensors/+/rx/response’ topic.

Change downlink converter code to send messages to ‘mqtt-integration-tutorial/sensors/+/rx/twoway’ topic. Go to the “MQTT Downlink Converter” and change the code on line 16 to

mqtt-integration-tutorial/sensors/'+metadata['deviceName']+'/rx/twoway'

Or just paste the following code in the encoder window:

/** Encoder **/ var value = parseInt(msg.params.replaceAll("[\"]","")); var data = {value: value}; // Result object with encoded downlink payload var result = { // downlink data content type: JSON, TEXT or BINARY (base64 format) contentType: "JSON", // downlink data data: JSON.stringify(data), // Optional metadata object presented in key/value format metadata: { topic: 'mqtt-integration-tutorial/sensors/'+metadata['deviceName']+'/rx/twoway' } }; return result;

Then prepare the uplink converter to receive the response messages. Go to the “MQTT Uplink Converter” and paste the following code in the decoder window:

/** Decoder **/ // decode payload to string var payloadStr = decodeToString(payload); var data = JSON.parse(payloadStr); var deviceName = metadata.topic.split("/")[3]; // decode payload to JSON var deviceType = 'sensor'; // Result object with device attributes/telemetry data var telemetry; if (metadata.topic.endsWith('/temperature')) { // Transform the incoming data as before telemetry = getTemperatureTelemetry(data); } else if (metadata.topic.endsWith('/rx/response')) { // Get the input value as is telemetry = data; } var result = { deviceName: deviceName, deviceType: deviceType, attributes: { integrationName: metadata['integrationName'], }, telemetry: telemetry }; /** Helper functions 'decodeToString' and 'decodeToJson' are already built-in **/ return result;

The script above is slightly different from what we had initially. It distinguishes between Post Telemetry requests and RPC call Responses, thus publishing different kind of output to Rule Engine.

You also must add a topic filter in your integration to receive the RPC response messages: MQTT Integration -> Topic filters -> Add topic filter. Add this topic with the default QoS level:

/mqtt-integration-tutorial/sensors/+/rx/response

Apply changes.

Now run device emulator. Notice, the mosquitto_pub and mosquitto_sub is not suffice, so please launch an advanced simulator:

python mqtt-client.py

Try to turn the knob wheel on a dashboard. In the terminal window you should have an output similar to :

Incoming message
Topic: mqtt-integration-tutorial/sensors/SN-001/rx/twoway
Message: {"value":40}
This is a Two-way RPC call. Going to reply now!
Sending a response message: {"rpcReceived":"OK"}

Go to the Devices page and find rpcReceived telemetry value is “OK” on the Latest telemetry tab of your SN-001 device.

Still stuck? Contact

How can we help?

Access the CapitalAI Library

Enter your details below to view and download all of the content from our library.

PDF Download Form
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.