Unlocking the Potential of IoT Applications With Real-Time Alerting Using Apache Kafka Data Streams and KSQL
IoT devices have revolutionized the way businesses collect and utilize data. IoT devices generate an enormous amount of data that can provide valuable insights for informed decision-making. However, processing this data in real time can be a significant challenge, particularly when managing large data volumes from numerous sources. This is where Apache Kafka and Kafka data streams come into play.
Apache Kafka is a distributed streaming platform that can handle large amounts of data in real time. It is a messaging system commonly used for sending and receiving data between systems and applications. It can also be used as a data store for real-time processing. Kafka data streams provide a powerful tool for processing and analyzing data in real time, enabling real-time analytics and decision-making.
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.model.dataformat.JsonLibrary;
public class RestApiToKafkaRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
// Set up Kafka component
from(“kafka:{{kafka.bootstrap.servers}}”)
.routeId(“kafka”)
.to(“log:received-message”);
// Set up REST API component
from(“timer://rest-api-timer?period={{rest.api.timer.period}}”)
.routeId(“rest-api”)
.to(“rest:get:{{rest.api.url}}”)
.unmarshal().json(JsonLibrary.Jackson, DeviceData.class)
.split(body())
.process(exchange -> {
// Extract device ID from data and set Kafka topic header
DeviceData deviceData = exchange.getIn().getBody(DeviceData.class);
String deviceId = deviceData.getDeviceId();
exchange.getMessage().setHeader(KafkaConstants.TOPIC, deviceId);
})
.marshal().json(JsonLibrary.Jackson)
.to(“kafka:{{kafka.topic}}”);
}
}
sample Kql query for dashboard for IOT data alerts:
CREATE TABLE pressure_alerts AS
SELECT device_id, pressure
FROM iot_data_stream
WHERE pressure > 100;
CREATE STREAM pressure_alerts_stream (device_id VARCHAR, pressure INT, alert_type VARCHAR)
WITH (kafka_topic=’pressure_alerts’, value_format=’JSON’);
CREATE TABLE pressure_alert_count AS
SELECT alert_type, COUNT(*)
FROM pressure_alerts_stream
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY alert_type;
SELECT * FROM pressure_alert_count;