description: "How we built an enterprise-grade data pipeline to stream high-frequency motor telemetry from a physical robot to a Databricks Lakehouse."
When you're building and testing high-performance robotics, hardware is pushed to its absolute limits. Motors stall, wheels slip, and batteries drain rapidly. To monitor our robot's health in real time—and prevent catastrophic hardware failures—we needed to track electrical current (Amps) and velocity (ticks/sec) across all primary motors under load.
However, moving raw, high-frequency, and deeply nested JSON data out of a moving robot and into a clean, queryable analytical database is easier said than done.
In this article, I’ll walk through how we solved this challenge by deploying a modern data streaming stack using Confluent Cloud, Flink SQL, and most importantly, Onibex Connectors to seamlessly bridge our edge infrastructure with the cloud.
🏗️ Pipeline Architecture Overview
The telemetry pipeline is designed to process high-velocity events across five consecutive stages:
- The Robot (Source): A Java subsystem runs on the robot's onboard controller, reading motor states at high frequencies and publishing raw JSON payloads via MQTT over a local network.
- MQTT Broker: A local Mosquitto broker running in Docker ingests the raw stream.
-
Onibex MQTT Source Connector 🔌: The first critical Onibex component. It automatically subscribes to the MQTT broker and streams the raw payloads into a Confluent Cloud Kafka topic (
iron_lion_telemetry) in real time.
-
Confluent Flink SQL: Cleans, flattens, and type-casts the raw JSON payloads into a strictly typed schema (
iron_lion_telemetry_v04). -
Onibex Databricks Sink Connector 🔌: The final bridge. It consumes the clean Avro records from Kafka and automatically handles insertion and upserts into a Delta table within the Databricks SQL Warehouse. (You can also find it on the official Confluent Hub). The final bridge. It consumes the clean Avro records from Kafka and automatically handles insertion and upserts into a Delta table within the Databricks SQL Warehouse.
🔌 The Game Changer: Highlighting Onibex Connectors
Building custom integration code to connect disparate IoT edge protocols and modern cloud data warehouses can take weeks of development. Utilizing managed Onibex Connectors eliminated custom glue-code entirely, handling two of the most fragile points in the pipeline:
1. Secure Edge Ingestion (MQTT Source)
The Onibex MQTT Source Connector allowed us to tap into our local edge network with sub-millisecond ingestion latency. A crucial security takeaway during this phase:
-
Pro-Tip: You must use Cluster-scoped API keys in Confluent Cloud for connector authentication. Attempting to use global environment API keys will cause the connector to fail with
502 Bad Gatewayor401 Unauthorizederrors.
2. Automated Schema Evolution in Databricks (Lakehouse Sink)
To deploy the final data delivery stage, we issued a single JSON configuration payload to the Kafka Connect REST API (POST /api/connectors) using the Insomnia HTTP client:
{
"name": "databricks_silver_wheel_telemetry",
"config": {
"connector.class": "com.onibex.connect.datalake.jdbc.OnibexDataLakeSinkConnector",
"topics": "iron_lion_telemetry_v04",
"connection.Auth_AccessToken": "dapi01acb5...",
"connection.ConnCatalog": "workspace",
"connection.ConnSchema": "iron_lion",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "INSERT"
}
}
Why this is powerful: By enabling "auto.create": "true" and "auto.evolve": "true", the Onibex Sink Connector dynamically interprets the Avro schema registry definitions emitted by Flink. It talks directly to Databricks to automatically create the Delta table and evolve columns on the fly. No manual DDL SQL scripts needed on the destination database.
🛠️ Debugging Tip (Insomnia Credential Caching):
If you rotate your API keys and the connector deployment continuously fails with cached401errors in Insomnia, the client has likely stuck onto your old session. Fix this by closing Insomnia completely, opening PowerShell, navigating toAppData\Roaming\Insomnia\insomnia.Request.db, manually swapping the API keys inside the local SQLite database file, and restarting the app.
🪄 Real-Time Normalization with Flink SQL
As our physical robot's hardware layout evolved between test runs, our JSON payload structure changed frequently, shifting from flat parameters to nested structures. We utilized Confluent Flink SQL (executing one statement at a time per the UI requirements) along with JSON_VALUE and COALESCE to handle fallback logic gracefully:
-- 1. Create the target table (Registers strict Avro schema downstream)
CREATE TABLE `iron_lion_telemetry_v04` (
`event_timestamp` BIGINT,
`drive_mode` STRING,
`front_left_current` DOUBLE,
`front_left_velocity` DOUBLE,
`intake_current` DOUBLE
) WITH (
'connector' = 'confluent',
'value.format' = 'avro-registry'
);
-- 2. Continuous Stream Transformation
INSERT INTO `iron_lion_telemetry_v04`
SELECT
CAST(COALESCE(JSON_VALUE(`val`, '$.ts'), '0') AS BIGINT) AS `event_timestamp`,
JSON_VALUE(`val`, '$.mode') AS `drive_mode`,
CAST(COALESCE(JSON_VALUE(`val`, '$.fl_a'), JSON_VALUE(`val`, '$.chassis.fl.a'), '0.0') AS DOUBLE) AS `front_left_current`,
CAST(COALESCE(JSON_VALUE(`val`, '$.fl_v'), JSON_VALUE(`val`, '$.chassis.fl.v'), '0.0') AS DOUBLE) AS `front_left_velocity`,
CAST(COALESCE(JSON_VALUE(`val`, '$.mechanisms.intake.a'), '0.0') AS DOUBLE) AS `intake_current`
FROM `iron_lion_telemetry`;
Handling Production Schema Quirks
Production data targets often come with legacy constraints. In our case, a pre-existing target table definition in Databricks had a hardcoded typo where the word right was spelled "rigth" (e.g., front_rigth_current).
Thanks to the overall flexibility of the pipeline, we didn't need to rebuild the destination warehouse. We simply mirrored the exact typo within our Flink SQL INSERT INTO mapping, and the Onibex connector handled the downstream alignment flawlessly, preventing any dropped events.
🏆 Conclusion
Deploying an enterprise-grade streaming architecture for custom robotics engineering is an absolute game changer. We can now open up Databricks and write standard SQL queries to analyze exactly which motors experience mechanical stress, draw dangerous current spikes, or slip during high-velocity maneuvers.
By offloading connectivity infrastructure to Kafka as the core nervous system and Onibex Connectors as the ingestion and delivery pathways, we achieved a highly resilient, scalable IoT pipeline with minimal custom code overhead.
Have you ever built a real-time data pipeline for physical devices? Let me know your stack in the comments below!














