Azure IoT Operations: Control a signal tower via Microsoft Fabric RTI feedback loop

Azure IoT Operations is the Edge platform for Azure IoT, part of Azure IoT vNext (part of the latest Azure IoT stack with other features like the MQTT support for Azure EventGrid Namespaces and Microsoft Fabric Real-Time Intelligence).

In previous posts, we have seen how we can connect with the local MQTT broker running within Azure IoT Operations using MQTT clients that are operating on the local network, how we use the IoT Operations portal in the cloud to set up data flows on the edge sending data to the cloud including Microsoft Fabric Real-Time Intelligence, how we can send commands from the cloud to the edge via the MQTT broker in EventGrid, and how to create Assets based on the OPC-UA protocol. We have also seen how to run the TIG stack on Azure IoT Operations.

In this post, I want to show a possible full feedback loop where telemetry sent to Microsoft Fabric is evaluated automatically, and decisions are turned into commands, sent back to the edge.

We will combine several techniques seen in past blog posts, written for both Azure IoT Operations and Microsoft Fabric.

Do you want to see a full feedback loop controlling a tower light via Fabric Real-Time Intelligence?

Let’s see what it takes to turn on and off a light bulb!

This is part eight of a series of blog posts about Azure IoT Operations:

  1. Azure IoT Operations: (in)secure MQTT broker authentication
  2. Azure IoT Operations: Microsoft Fabric RTI Eventstream dataflow
  3. Azure IoT Operations: Sending commands via EventGrid MQTT
  4. Azure IoT Operations: Reading OPC-UA server tags
  5. Azure IoT Operations: Monthly releases
  6. Azure IoT Operations: Event Hub telemetry and commands
  7. Azure IoT Operations: Local dashboard based on TIG stack
  8. Azure IoT Operations: control a signal tower via Microsoft Fabric RTI feedback loop
  9. Azure IoT Operations, building a custom module using the SDK
  10. Microsoft Fabric RTI: Integrating public and private LoRaWAN sensors
  11. Azure IoT Operations: Deploying custom vision
  12. Azure IoT Operations: Act on ONVIF camera events
  13. No-Code Dataflow transformations via message schema inferencing
  14. Azure Arc Connected Machine agent, automatic update
  15. Azure IoT Edge and Azure IoT Operations, the Edge of Tomorrow
  16. Consuming southbound HTTP/Rest and SSE endpoints using Azure IoT Operations

This post also references multiple Microsoft Fabric posts. Check my site for more posts about Microsoft Fabric RTI and even Azure Data Explorer.

Goal for today

In this blog post, we combine several techniques seen in previous blog posts, all related to Real-Time Intelligence and the Internet of Things.

Energy meter telemetry coming from a PLC is sent via Azure IoT Operations to a Microsoft Fabric Eventstream custom destination and forwarded to a medallion architecture in Fabric Eventhouse.

The current (next to voltage) values are tested for each incoming value and classified to a certain action level, equivalent to a signal lamp color in a signal light tower.

These ‘classifications’ are made via KQL Table update policies and sent to a separate table with a limited retention time.

Via the new Azure Data Explorer CDC source capabilities, we can also listen to Eventhouse updates, and that is exactly what we use to forward the actions back to the Azure IoT Operations MQTT broker as commands.

Last but not least, the action commands are picked up and transformed into actions, lighting up and down tower lights.

This exercise is a nice example of combining several techniques to make a full round trip. There are limitations as described at the end of this post, but it’s a good indication of what we can do right now with Microsoft Fabric RTI on top of Azure IoT Operations.

Prerequisites

If you want to follow along, have an Azure IoT Operations device running. Just start with the first post in this Azure IoT Operations series if you are interested in setting this up on an actual device.

Next to Azure IoT Operations, you also need access to Fabric Real-Time Intelligence via a capacity. Browse through the large number of posts I wrote about this topic, starting with the first one.

Everything you will see is based on actual hardware; nothing is simulated. I use an actual Energy meter to read the energy consumption of an electromotor. I also use an actual signal light tower as an actuator. A PLC and IO module are added for protocol translations and physical connections (24 volts DC).

If you do not have similar devices, you could always simulate these devices. Just publish a message on the local MQTT broker and see the arrival of the command response a few seconds later.

Let’s start with the signal light tower first.

Controlling signal tower lights via local MQTT broker

As seen in the past with other demonstrations, I once got a nice signal light tower from Werma and used it in several situations as an activator:

The individual lights (and the buzzer; please don’t activate this at home 🙂 ) can be turned on and off separately via 24V DC.

It’s time to combine it with Azure IoT Operations, so I need relays on an IO Module to manage the lights.

For this, I use an Advantech Adam 6760D IO module, which has relay support (along with many other digital input and output ports) that can be switched programmatically. It uses Node-RED as a programming environment, which makes it very flexible:

In my case, it can listen to MQTT broker topics and switch the relays.

The Node-RED environment runs on the Adam IO module itself. This Node-RED comes with several Advantech-specific nodes to manipulate the ports on the device:

In our case, we are interested in setting/switching Digital output values:

Check the top part of this flow (Starting with the MQTT subscriber node).

We listen for messages arriving at the Azure IoT Operations MQTT broker MQTT topic ‘command/advantech/adam6760d’.

Note: My Node-RED connection is listening to port 31883 on the local MQTT broker, which is open for everyone, unsecured. In production, use a secure connection.

The incoming JSON Command message should look like:

{
"active": 0
}

Note: Later on, we see the JSON message has more columns like timestamp and deviceId. They are ignored in the Node-RED flow in this case.

The ‘active’ integer is a binary representation of the tower lights. So sending ‘3’ means the bottom two lights need to be lit up (1 + 2 = 3). With ‘4’, only the third light is lit up.

This command message is turned into a format that the Advantech node understands: an array list of boolean values. This function is used:

var jsonMessage = JSON.parse(msg.payload);
var newMsg;
switch(jsonMessage.active) {
case 1:
newMsg = { payload : [true,false,false,false,false,false,false,false] };
break;
case 2:
newMsg = { payload : [false,true,false,false,false,false,false,false] };
break;
case 3:
newMsg = { payload : [true,true,false,false,false,false,false,false] };
break;
case 4:
newMsg = { payload : [false,false,true,false,false,false,false,false] };
break;
case 8:
newMsg = { payload : [false,false,false,true,false,false,false,false] };
break;
default:
newMsg = { payload : [false,false,false,false,false,false,false,false] };
}
return newMsg;

Note: This is not a complete list of all mappings possible, but it’s a nice display of how the lights are switched on and off.

Note: Do you know a one-liner to turn a byte into a boolean array? Let us know in the comments.

A demonstration of how the blinking signal tower light is seen here when that JSON command message is sent to the MQTT topic the Advantech ADAM IO module is listening to:

Why is it blinking?

Check the bottom half of the Node-RED flow, where we can see how to switch every few seconds between the last incoming light configuration and the configuration for turning off all lights. This gives that blinking feel to it.

For this blinking to happen, we remember the last relay configuration in a parameter in the upper part of the flow:

Using a modulo if/then/else construction, we send this light pattern configuration (by reading that parameter) or an empty configuration to the lights.

We now have the signal tower light control in place as the activator.

Now, let’s build the Fabric feedback loop and later forward the incoming commands to the signal tower light logic.

Sending Energy meter from running ABB Electro motor data to Fabric Eventhouse (reprise)

Sending energy meter data to Microsoft Fabric is already explained in this post.

It is produced by this Wago energy meter:

The energy meter telemetry is forwarded to the local MQTT broker by a Node-RED MQTT client running on a Wago Compact Controller 100:

The energy meter is recording the 220V power (voltage and current) consumed by a m2aad56b-4 electric motor from ABB:

This 380V 3-phase motor is controlled via this beauty, the ABB scs50-01e-02a202+b001 frequency controller:

When the motor is turned on, it will consume more current. When the current is higher, we turn on one or more lights in the signal tower to indicate the level of power usage. That is the idea.

So, current data is arriving in Fabric via an Eventstream and is passed on to an Eventhouse:

Note: The flow also supports an activator to warn me if the motor is running too long. Ignore it here 🙂

We use a JSON table mapping towards the ‘raw’ table in Fabric Eventhouse, as seen in this blog post. Select the ‘direct ingestion’ option when creating the Eventhouse destination, so you are given a wizard to choose (or create ) the destination table, and you can create the related mapping.

Telemetry is now arriving in the Eventhouse. From the raw data, the current values are split from the voltage via table updates to demonstrate the medallion architecture:

So, at this point, we can ingest energy meter data into Microsoft Fabric. And we are waiting in Azure IoT Operations for commands to control the tower lights.

Let’s react on the current level and start sending commands.

Use Fabric CDC to turn a Table Update policy into a business rule

Regarding implementing a ‘business rule’ for generation commands, the next step is using Fabric Eventstream for exposing commands to Azure IoT Operations via a Custom (Kafka) Endpoint.

This is because a Kafka endpoint can be exposed by a Microsoft Fabric Eventstream, usable as a source for an Azure IoT Operations Dataflow Endpoint:

Our challenge is to react to real-time telemetry, first ingested by an Eventstream or then stored in an Eventhouse.

We will use the Eventhouse medallion architecture by turning ‘bronze’ raw data into ‘gold’ commands.

In an Eventhouse, this is typically done via table update policies.

First, I created a new table named CdcCommandTable:

.create table CdcCommandTable (deviceId: string, timestamp: datetime, description: string, active: int) with (docstring = "CDC Commands for Eventhouse")
.alter-merge table CdcCommandTable policy retention softdelete = 1m recoverability = disabled

Notice that the table has a column named ‘active’ and can take an integer. This is on par with the command accepted by the Advantech Adam IO module, listening to messages on an MQTT topic on the local MQTT broker.

Note: As we will see below, we are interested in any new row being created in the table with commands. Once created, it is picked up by the CDC mechanism and has no value anymore. Storing these rows is pointless, so to keep the table small, the retention time is set to a minute. In real life, removing retired rows is a low-priority process, so the data in the table can span many minutes before being deleted by the background process.

A KQL table update function, turning current values into commands for every incoming message looks like this:

.create function
with (docstring = 'Turn current data into CDC command', folder='ingestprojection')
SendCommandCurrentTelemetry ()
{
CurrentEnergyMeter
| extend setActive =
case(currentValue >= 0 and currentValue < 0.1, 1,
currentValue >= 0.1 and currentValue < 0.5, 2,
8)
| extend setActiveMin =
case(currentValue >= 0 and currentValue < 0.1, '0',
currentValue >= 0.1 and currentValue < 0.5, '0.1',
'0.5')
| extend setActiveMax =
case(currentValue >= 0 and currentValue < 0.1, '0.1',
currentValue >= 0.1 and currentValue < 0.5, '0.5',
'999')
| project deviceId = deviceId, timestamp = now(), description = strcat('Status set to ',setActive, ' due to current between ', setActiveMin, ' and ', setActiveMax), active = toint(setActive)
}

Based on the level of the current value, a certain light will be lit (the first (1), the second (2), or the fourth (8)). Multiple lights could work too, because it uses a binary pattern.

This function is then used for a table update policy:

.alter table
CdcCommandTable
policy update @'[{"Source": "CurrentEnergyMeter", "Query": "SendCommandCurrentTelemetry", "IsEnabled" : true, "IsTransactional": true }]'

I had the motor running (value = 2) and turned it off. So the current lowered and the update policy started adding new rows with ‘active’ value 1 instead of 2:

So, the table policy is working, but I get a separate command row for each incoming message!

This is a shortcoming of Table update policies. It would be great to check the command table to see if the ‘active’ value of the last row is different from the current calculated ‘active’ value. Something like:

Especially, by adding this comparison:

| where toint(setActive) !in (CdcCommandTable
| top 1 by ingestion_time() desc
| project active)

The function would prevent sending repetitive commands with the same ‘active’ values. Duplicates would be ignored. Only flanks are identified.

Unfortunately, this lookup is not allowed as part of the function when used in an update policy:

It’s the update policy on the source table CurrentEnergyMeter, that prevents the function from checking if CdcCommandTable has duplicate values:

Personally, this feels like a silly limitation because I expect transactional guidance, checking the right columns.

Anyway, we are creating command rows now! Let’s send those commands to the Azure IoT Operations environment.

Configure the Command Eventstream source and destination

Create a new Eventstream and provide it with a source based on Azure Data Explorer CDC:

This same type of source connection is demonstrated in this blog post.

Because Fabric Eventhouse is an Azure Data Explorer cluster, we can listen to KQL database table row inserts!

And we just start inserting command rows…

So, provide the Database and command table for the Eventhouse connection:

The connection is a separate screen leading to a connection that will be stored behind the ‘Manage Connections and Gateways’ page:

This Connection asks for the Eventhouse Query URI, found on the details page of the Eventhouse:

Notice that this connection also asks for the same database name and command table name.

Just repeat the same database and table names as entered in the Eventstream Azure Data Explorer CDC source dialog.

At this point, you should already be able to test the Eventstream command ingestion, and you should see incoming commands from the table update policy flow.

If this flow is not triggered due to a lack of incoming telemetry messages, you can insert a command table row by hand:

// 1 = blue, 2 = green, 4 = yellow, 8 = red, 0 = all off
.append CdcCommandTable <| print deviceId = "12345", timestamp = now(), description = "test light", active = toint(4)

Here, the commands are arriving in the Eventstream:

Notice that I already added a Custom destination.

This Custom destination exposes a Kafka Endpoint:

Just as seen above, add that Custom destination and publish the Eventstream to generate the secrets.

If you want to see the endpoint in action, you can test the endpoint either using the Java sample provided in the portal or this Python version I put on GitHub:

The commands are exposed outside Microsoft Fabric.

We have proven we can generate commands with the correct value.

Configure Azure IoT Operations Custom Kafka broker Dataflow

In Azure IoT Operations, create a new Dataflow Endpoint based on the Custom Kafka Broker template:

We follow the same procedure as seen in this blog post.

Give the dataflow endpoint a solid name like ‘fabric-eventstream-command-dataflow-endpoint’.

In the Host field, we fill in the Bootstrap server of the Kafka endpoint, as seen in the custom destination of the Eventstream.

We connect using the SASL authentication method and provide a synced secret name like ‘fabriccommandsecret’.

Add a new ‘Username reference of token secret’ named ‘fabriccommandsecretusername’ with secret value: ‘$ConnectionString‘.

Add a new ‘Password reference of token secret’ name ‘fabriccommandsecretpassword’ with secret value: the primary connection string of the custom destination in the Eventstream.

Finally, do not forget to enable TLS mode on the Advanced page:

Save the settings of this new Custom Kafka broker dataflow endpoint.

Enabling the endpoint on the edge takes a minute or so.

Configure the Dataflow

Finally, we need to create a dataflow that forwards command messages to the right MQTT topic on the local MQTT broker so these messages are picked up by the Advantech Adam IO module.

Create a new dataflow and give it a proper name like ‘fabric-eventstream-command-2-local-mqtt-command’:

As a source, reselect the Custom Kafka broker dataflow endpoint we just created.

Proceed to the next page and add a (Kafka) topic. Fill in the Topic name seen in the Eventstream custom destination.

As the dataflow destination, select the default dataflow endpoint name, the local MQTT broker.

Proceed to the next page and enter the MQTT topic, the same as the Advantech Adam IO module is listening to: ‘command/advantech/adam6760d’.

Save the dataflow. Enabling it will take a minute or so.

We finally closed the loop. Let’s test it.

Testing the solution

If you already have a steady stream of commands, these messages will start popping up on the local MQTT broker.

Here, the MQTT Explorer shows the incoming messages:

In Node-RED, the Digital Output values are switched accordingly:

And the (blue) signal tower light is blinking like a Christmas tree!

We have a full feedback loop.

Limitations and alternatives

Although we have a working solution, there are several observations to make using this approach.

This is a great piece of work and is useful in several use cases. Still, there are several points of attention:

  • For each arriving telemetry message, a command is created and sent back, picked up by every edge device. Apart from the filtering of each edge device, we cannot use flanks. We are not able to measure when we send a duplicate row due to the update policy limitation. This could probably be solved with timed compute like pipelines, dataflows, or notebooks. Still, even a one-minute interval in the cloud via these services (if doable) is relatively slow compared to real-time events with the shown medallion architecture.
  • As an alternative to using the Eventhouse medallion architecture, a filter in the Eventstream could work too. This is the simplest version of a feedback loop. Unfortunately, adding a transformation event like a filter has a side effect: a derived stream is added. This derived stream is a separate endpoint with different credentials, under the covers. So, all devices and services listening to the regular Eventstream destination now need to set up a new subscriber if a derived stream is added. In IoT, this is a no-go.
  • Making use of an Eventstream transformation has another side effect, too. The message format returned will be a JSON array. When you ingest single-element JSON messages and add a filter, the destination outputs arrays having a single-element JSON message. This breaks the interface of the destination, unfortunately. Also, Azure IoT Operations cannot ingest arrays using the Custom Kafka broker dataflow at this moment. This will result in empty message bodies for messages arriving at the local MQTT broker.
  • The command message format could differ from the format a possible subscriber could consume because both are different systems without knowledge of each other’s capabilities. Dataflows are in the perfect position to transform MQTT messages. At this moment, Dataflows are not able to transform JSON message bodies. Luckily, the Node-RED solution can transform the alternative message format in this case.
  • The Microsoft Fabric (compute) capacity I use is now running at 95% usage (there is so much more stuff running next to this feedback demonstration). This seems to have an impact on the response times in the Fabric Eventstream. At this moment, I experience a 25-second delay between turning on the electromotor and the change of lights. I expect this will be lower if the capacity has a little bit more air. As an alternative, merging multiple Eventstreams into one with multiple inputs and outputs could mean less need for compute, too.

In the end, I’m very happy with the solution as a great starting point for further research.

Conclusion

Yes, turning light bulbs on and off can escalate quickly!

Using many techniques seen in previous blog posts, we have constructed a full feedback loop in the cloud using Microsoft Fabric RTI for hardware controlled via Azure IoT Operations.

Back in 2017, I already demonstrated a cloud feedback loop with Azure IoT Edge SDK and Azure Stream Analytics. Now, in 2025, the same setup is created in a fraction of the time needed back then.

The Microsoft Fabric Real-Time Intelligence set of resources and tools is a very welcome addition for IoT cloud and edge solutions.

There are still a few parts on both Azure IoT Operations and Microsoft Fabric that need attention, but updates are made available, month after month.

Now is a good time to learn about both Azure IoT Operations and Microsoft Fabric RTI.