Logic Apps fails to parse ServiceBus messages from Azure Stream Analytics

Recently I was asked for an intervention on an IoT project. Sensor data was sent via Azure IoT Hubs towards Azure Stream Analytics. Events detected in Azure Stream Analytics resulted in a Service Bus message, that had to be handled by a Logic App. Hence, the Logic Apps failed to parse that JSON message taken from a Service Bus queue. Let’s have a more detailed look at the issue!

Explanation

This diagram reflects the IoT scenario:

We encountered an issue in Logic Apps, the moment we tried to parse the message into a JSON object. After some investigation, we realized that the ServiceBus message wasn’t actually a JSON message. It was preceded by string serialization “overhead”.

@\u0006string\b3http://schemas.microsoft.com/2003/10/Serialization/\u0001{“SensorId”:”ABC”,”Timestamp”:”2017-06-09T14:51:34.2052252″,”Temperature”:16.4,”EventProcessedUtcTime”:”2017-06-09T14:51:33.7313928Z”,”PartitionId”:0,”EventEnqueuedUtcTime”:”2017-06-09T14:51:34.0170000Z”,”IoTHub”:{“MessageId”:”f26213dbb-db85-4b75-af8c-82877bbf2c55″,”CorrelationId”:null,”ConnectionDeviceId”:”XY_Temp_ABC”,”ConnectionDeviceGenerationId”:”636317217771923703″,”EnqueuedTime”:”2017-06-09T14:51:33.9600000Z”,”StreamId”:null}}\u0001
Thanks to our favourite search engine, we came across this blog that nicely explains the cause of the issue. The problem is situated at the sender of the message. The issue is caused, because the BrokeredMessage is created with a string object:
string payload = "{ \"Key1\" : \"Value1\"}";
var msg = new BrokeredMessage(payload);
queue.Send(msg);

If you control the sender side code, you can resolve the problem by passing a stream object instead.

string payload = "{ \"Key1\" : \"Value1\"}";
var payloadStream = new MemoryStream(Encoding.UTF8.GetBytes(payload));
var msg = new BrokeredMessage(payloadStream, true);
queue.Send(msg);

Solution

Unfortunately we cannot change the way Azure Stream Analytics behaves, so we need to deal with it at receiver side. I’ve found several blogs and forum answers, suggesting to clean up the “serialization garbage” with an Azure Function. Although this is a valuable solution, I always tend to avoid additional components if not really needed. Introducing Azure Functions comes with additional cost, storage, deployment complexity, maintenance, etc…

As this is actually pure string manipulation, I had a look at the available string functions in the Logic Apps Workflow Definition Language. The following expression removes the unwanted “serialization overhead”:

@substring(base64ToString(triggerBody()?[‘ContentData’]), indexof(base64ToString(triggerBody()?[‘ContentData’]), ‘{‘), sub(lastindexof(base64ToString(triggerBody()?[‘ContentData’]), ‘}’),indexof(base64ToString(triggerBody()?[‘ContentData’]), ‘{‘)))
If you use this, in combination with the Parse JSON action, you have a user-friendly way to extract data from the JSON in the next steps of the Logic App. In the sample below, I just used the Terminate action for testing purpose. You can now easily use SensorId, Temperature, etc…

Conclusion

It’s a pity that Azure Stream Analytics doesn’t behave as expected when sending messages to Azure Service Bus. Luckely, we were able to fix it easily in the Logic App. Before reaching out to Azure Functions, as an extensibility option, it’s advised to have an in-depth look at the available Logic Apps Workflow Definition Language functions.

Hope this was a timesaver!
Cheers,
Toon

ABOUT

MEET THE YOUR AZURE COACH TEAM

Your Azure Coach is specialized in organizing Azure trainings that are infused with real-life experience. All our coaches are active consultants, who are very passionate and who love to share their Azure expertise with you.