Recently I was asked for an intervention on an IoT project. Sensor data was sent via Azure IoT Hubs towards Azure Stream Analytics. Events detected in Azure Stream Analytics resulted in a Service Bus message, that had to be handled by a Logic App. Hence, the Logic Apps failed to parse that JSON message taken from a Service Bus queue. Let’s have a more detailed look at the issue!
Explanation
This diagram reflects the IoT scenario:
We encountered an issue in Logic Apps, the moment we tried to parse the message into a JSON object. After some investigation, we realized that the ServiceBus message wasn’t actually a JSON message. It was preceded by string serialization “overhead”.
@\u0006string\b3http://schemas.microsoft.com/2003/10/Serialization/\u0001{“SensorId”:”ABC”,”Timestamp”:”2017-06-09T14:51:34.2052252″,”Temperature”:16.4,”EventProcessedUtcTime”:”2017-06-09T14:51:33.7313928Z”,”PartitionId”:0,”EventEnqueuedUtcTime”:”2017-06-09T14:51:34.0170000Z”,”IoTHub”:{“MessageId”:”f26213dbb-db85-4b75-af8c-82877bbf2c55″,”CorrelationId”:null,”ConnectionDeviceId”:”XY_Temp_ABC”,”ConnectionDeviceGenerationId”:”636317217771923703″,”EnqueuedTime”:”2017-06-09T14:51:33.9600000Z”,”StreamId”:null}}\u0001 |
string payload = "{ \"Key1\" : \"Value1\"}"; var msg = new BrokeredMessage(payload); queue.Send(msg);
If you control the sender side code, you can resolve the problem by passing a stream object instead.
string payload = "{ \"Key1\" : \"Value1\"}"; var payloadStream = new MemoryStream(Encoding.UTF8.GetBytes(payload)); var msg = new BrokeredMessage(payloadStream, true); queue.Send(msg);
Solution
Unfortunately we cannot change the way Azure Stream Analytics behaves, so we need to deal with it at receiver side. I’ve found several blogs and forum answers, suggesting to clean up the “serialization garbage” with an Azure Function. Although this is a valuable solution, I always tend to avoid additional components if not really needed. Introducing Azure Functions comes with additional cost, storage, deployment complexity, maintenance, etc…
As this is actually pure string manipulation, I had a look at the available string functions in the Logic Apps Workflow Definition Language. The following expression removes the unwanted “serialization overhead”:
@substring(base64ToString(triggerBody()?[‘ContentData’]), indexof(base64ToString(triggerBody()?[‘ContentData’]), ‘{‘), sub(lastindexof(base64ToString(triggerBody()?[‘ContentData’]), ‘}’),indexof(base64ToString(triggerBody()?[‘ContentData’]), ‘{‘))) |