1. Home
  2. /
  3. Docs
  4. /
  5. Documentation
  6. /
  7. User Guide
  8. /
  9. Kafka Integration

Kafka Integration

Kafka Integration

Apache Kafka — is an open-source distributed software message broker under the Apache foundation. It is written in the Java and Scala programming languages.

Designed as a distributed, horizontally scalable system that provides capacity growth both with an increase in the number and load from the sources, and the number of subscriber systems. Subscribers can be combined into groups. Supports the ability to temporarily store data for subsequent batch processing.

In some scenarios, Kafka can be used instead of a message queue, in cases where there is no stable connection between the device and an instance.

Required environment

Before you start setting up the integration, you should already have a prepared Broker Kafka server. This is either a local installation or a cloud solution. If you need to use a cloud solution, then you can consider Kafka Confluent, on the basis of which examples will be built in this guide.

Before creating the integration, you need to create an Uplink converter in Data converters. Uplink is necessary in order to convert the incoming data from the device into the required format for displaying them in NexAI. Click on the “plus” and on “Create new converter”. To view the events, enable Debug. In the function decoder field, specify a script to parse and transform data.

Note: While debug mode is very useful for development and troubleshooting, leaving it enabled in production mode can significantly increase the disk space used by the database since all debug data is stored there. After debugging is complete, it is highly recommended turning off debug mode.

Let’s review sample uplink message from Kafka:

{
  "EUI"  : "43T1YH-REE",
  "ts"   : 1638876127000,
  "data"  : "3d1f0059",
  "port" : 10,
  "freq" : 24300,
  "rssi" : -130,
  "serial"  : "230165HRT"
}

EUI is responsible for the name of the device. The “data” is a telemetry concatenation by two characters, where the first value “3d” – temperature, “1f” – humidity, “00” – fan speed, “59” – pressure.

You can use the following code, copy it to the decoder function section:

// Decode an uplink message from a buffer
// payload - array of bytes
// metadata - key/value object
/** Decoder **/
// decode payload to JSON
var payloadJson = decodeToJson(payload);
// Use EUI as unique device name.
var deviceName = payloadJson.EUI;
// Specify the device type. Use one data converter per device type or application.
var deviceType = 'Monitoring-sensor';
// Optionally, add the site name and device group to automatically create them in NexAI and assign new device to it.
// var siteName = 'site';
// var groupName = 'thermostat devices';
// Result object with device/asset attributes/telemetry data
var result = {
deviceName: deviceName,
deviceType: deviceType,
// siteName: siteName,
// groupName: groupName,
attributes: {},
telemetry: {
ts: payloadJson.ts,
values: {
Temperature:hexToInt(payloadJson.data.substring(0,2)),
Humidity: hexToInt(payloadJson.data.substring(2,4)),
Fan: hexToInt(payloadJson.data.substring(4,6)),
Port: payloadJson.port,
Freq: payloadJson.freq,
Pressure: hexToInt(payloadJson.data.substring(6,8)),
rssi: payloadJson.rssi,
serial: payloadJson.serial
}
}
};
/** Helper functions **/
function decodeToString(payload) {
return String.fromCharCode.apply(String, payload);
}

function decodeToJson(payload) {
// covert payload to string.
var str = decodeToString(payload);
// parse string to JSON
var data = JSON.parse(str);
return data;
}

function hexToInt(value) {
return parseInt('0x' + value.match(/../g).reverse().join(''));
}

return result;

You can change the parameters and decoder code when creating a converter or editing. If the converter has already been created, click the pencil icon to edit it. Copy the sample converter configuration (or use your own configuration) and paste it into the decoder function. Then save the changes by clicking the checkmark icon.

Create Integration

After creating the Uplink converter, it is possible to create an integration. At this stage, you need to set the parameters to establish a connection between NexAI and Kafka Broker. After the connection is established, the integration will be transmitting all received data to the Uplink converter for processing and subsequent transfer to Rule Chain according to the Device profile specified in the Device.

FieldDescription
NameThe name of your integration.
TypeChoose Kafka type.
‘Enable’ CheckboxEnable / Disable Integration.
‘Debug Mode’ CheckboxEnable during integration debugging.
Allow create devices or assetsIf there was no device in NexAI, the device will be created.
Uplink data converterSelect the previously created converter.
Downlink data converterThis option is not supported through the integration, More details about Downlink below in the guide.
‘Execute remotely’ CheckboxActivate if you want to execute integration remotely from main NexAI instance.
Group IDSpecifies the name of the consumer group to which the Kafka consumer belongs.
Client IDAn Kafka consumer identifier in a consumer group.
TopicsTopics that NexAI will subscribe to after connecting to the Kafka broker.
Bootstrap serversHost and port pair that is the address of the Kafka broker to which the Kafka client first connects for bootstrapping.
Poll intervalDuration in milliseconds between polling of the messages if no new messages arrive.
Auto create topicsSet Enable if need topics to be created automatically
Other propertiesAny other additional properties could be provided for kafka broker connection..
MetadataMetadata is a key-value map with some integration specific fields. For example, you can put device type.
echo "{\"EUI\":\"43T1YH-REE\",\"ts\":1638876127000,\"data\":\"3d1f0059\",\"port\":10,\"freq\":24300,\"rssi\":-130,\"serial\":\"230165HRT\"}" | /usr/local/kafka/bin/kafka-console-producer.sh --broker-list URL_OF_YOUR_BOOTSTRAP_SERVER:9092 --topic my-topic > /dev/null Result:Also, you can check through the terminal what data came to Kafka.Copy to clipboardCopy to clipboard/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server URL_OF_YOUR_BOOTSTRAP_SERVER:9092 --topic my-topic --from-beginning
Still stuck? Contact

How can we help?

Access the CapitalAI Library

Enter your details below to view and download all of the content from our library.

PDF Download Form
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.