This post was originally published here.

This blog post dives into the details of how you can achieve batching with Logic Apps. Batching is still a highly demanded feature for a middle-ware layer. It’s mostly introduced to reduce the performance impact on the target system or for functional purposes. Let’s have a closer look.

Important update: please be aware the Logic Apps currently supports an out-of-the-box batching functionality.  It’s advised to have a look at it, it’s described over here.

Scenario

For this blog post, I decided to try to batch the following XML message.  As Logic Apps supports JSON natively, we can assume that a similar setup will work quite easily for JSON messages.  Remark that the XML snippet below contains an XML declaration, so pure string appending won’t work.  Also namespaces are included.

<?xml version=“1.0” encoding=“utf-8”?>
<ns0:Order xmlns:ns0="http://namespace">
   <OrderId>1</OrderId> 
   <Product>Consultancy</Product> 
   <Quantity>40</Quantity> 
   <Price>100</Price>
</ns0:Order>

Requirements

I came up with the following requirements for my batching solution:

  • External message store: in integration I like to avoid long-running workflow instances at all time. Therefore I prefer messages to be stored somewhere out-of-the-process, waiting to be batched, instead of keeping them active in a singleton workflow instance (e.g. BizTalk sequential convoy).
  • Message and metadata together: I want to avoid to store the message in a specific place and the metadata in another one. Keep them together, to simplify development and maintenance.
  • Native Logic Apps integration: preferably I can leverage an Azure service, that has native and smooth integration with Azure Logic Apps. It must ensure we can reliably assign messages to a specific batch and we must be able to remove them easily from the message store.
  • Multiple batch release triggers: I want to support multiple ways to decide when a batch can be released.
    > # Messages: send out batches containing each X messages
    > Time: send out a batch at a specific time of the day
    > External Trigger: release the batch when an external trigger is receive

Solution

After some analysis, I was convinced that Azure Service Bus queues are a good fit:

  • External message store: the messages can be queued for a long time in an Azure Service Bus queue.
  • Message and metadata together: the message is placed together with its properties on the queue. Each batch configuration can have its own queue assigned.
  • Native Logic Apps integration: there is a Service Bus connector to receive multiple messages inside one Logic App instance. With the peak-lock pattern, you can reliably assign messages to a batch and remove them from the queue.
  • Multiple batch release triggers:
    > # Messages: In the Service Bus connector, you can choose how many messages you want to receive in one Logic App instance

    > Time
    : Service Bus has a great property ScheduledEnqueueTimeUtc, which ensures that a message becomes only visible on the queue from a specific moment in time. This is a great way to schedule messages to be releases at a specific time, without the need for an external scheduler.

    > External Trigger
    : The Logic App can be easily instantiated via the native HTTP Request trigger

Implementation

Batching store

The goal of this workflow is to put the message on a specific queue for batching purpose.  This Logic App is very straightforward to implement. Add a Request trigger to receive the messages that need to be batched and use the Send Message Service Bus connector to send the message to a specific queue.

In case you want to release the batch only at a specific moment in time, you must provide a value for the ScheduledEnqueueTimeUtc property in the advanced settings.

Batching release

This is the more complex part of the solution. The first challenge is to receive for example 3 messages in one Logic App instance. My first attempt failed, because there is apparently a different behaviour in the Service Bus receive trigger and action:

  • When one or more messages arrive in a queue: this trigger receives messages in a batch from a Service Bus queue, but it creates for every message a specific Logic App instance. This is not desired for our scenario, but can be very useful in high throughput scenarios.
  • Get messages from a queue: this action can receive multiple messages in batch from a Service Bus queue. This results in an array of Service Bus messages, inside one Logic App instance. This is the result that we want for this batching exercise!

Let’s use the peak-lock pattern to ensure reliability and receive 3 messages in one batch:

As a result, we get this JSON array back from the Service Bus connector:

[
   {
      "ContentData": "PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0idXRmLTgiPz4NCjxuczA6T3JkZXIgeG1sbnM6bnMwPSJodHRwOi8vbmFtZXNwYWNlIj4NCgk8T3JkZXJJZD4xPC9PcmRlcklkPg0KCTxQcm9kdWN0PkNvbnN1bHRhbmN5PC9Qcm9kdWN0Pg0KCTxRdWFudGl0eT40MDwvUXVhbnRpdHk+DQoJPFByaWNlPjEwMDwvUHJpY2U+DQo8L25zMDpPcmRlcj4=",
      "ContentType": "application/xml",
      "ContentTransferEncoding": "Base64",
      "Properties": {
         "DeliveryCount": "1",
         "EnqueuedSequenceNumber": "0",
         "EnqueuedTimeUtc": "2017-02-25T06:27:51Z",
         "ExpiresAtUtc": "2017-03-11T06:27:51Z",
         "LockedUntilUtc": "2017-02-25T06:29:42Z",
         "LockToken": "9474a5bc-f0c3-41ff-a756-94b19b596135",
         "MessageId": "d341771bc55344beba97e655ad762e3e",
         "ScheduledEnqueueTimeUtc": "0001-01-01T00:00:00Z",
         "SequenceNumber": "215",
         "Size": "343",
         "State": "Active",
         "TimeToLive": "12096000000000"
      },
      "MessageId": "d341771bc55344beba97e655ad762e3e",
      "To": null,
      "ReplyTo": null,
      "ReplyToSessionId": null,
      "Label": null,
      "ScheduledEnqueueTimeUtc": "0001-01-01T00:00:00Z",
      "SessionId": null,
      "CorrelationId": null,
      "SequenceNumber": 215,
      "LockToken": "9474a5bc-f0c3-41ff-a756-94b19b596135",
      "TimeToLive": "12096000000000"
   },
   {
      "ContentData": "PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0idXRmLTgiPz4NCjxuczA6T3JkZXIgeG1sbnM6bnMwPSJodHRwOi8vbmFtZXNwYWNlIj4NCgk8T3JkZXJJZD4yPC9PcmRlcklkPg0KCTxQcm9kdWN0PlRyYWluaW5nPC9Qcm9kdWN0Pg0KCTxRdWFudGl0eT4zMDwvUXVhbnRpdHk+DQoJPFByaWNlPjEyMDwvUHJpY2U+DQo8L25zMDpPcmRlcj4=",
      "ContentType": "application/xml",
      "ContentTransferEncoding": "Base64",
      "Properties": {
         "DeliveryCount": "1",
         "EnqueuedSequenceNumber": "0",
         "EnqueuedTimeUtc": "2017-02-25T06:28:08Z",
         "ExpiresAtUtc": "2017-03-11T06:28:08Z",
         "LockedUntilUtc": "2017-02-25T06:29:42Z",
         "LockToken": "2ef6cb2d-ef13-41b0-93fe-d45509d33e58",
         "MessageId": "924ff0aa503543949521c5c8ea6431c3",
         "ScheduledEnqueueTimeUtc": "0001-01-01T00:00:00Z",
         "SequenceNumber": "216",
         "Size": "340",
         "State": "Active",
         "TimeToLive": "12096000000000"
      },
      "MessageId": "924ff0aa503543949521c5c8ea6431c3",
      "To": null,
      "ReplyTo": null,
      "ReplyToSessionId": null,
      "Label": null,
      "ScheduledEnqueueTimeUtc": "0001-01-01T00:00:00Z",
      "SessionId": null,
      "CorrelationId": null,
      "SequenceNumber": 216,
      "LockToken": "2ef6cb2d-ef13-41b0-93fe-d45509d33e58",
      "TimeToLive": "12096000000000"
   },
   {
      "ContentData": "PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0idXRmLTgiPz4NCjxuczA6T3JkZXIgeG1sbnM6bnMwPSJodHRwOi8vbmFtZXNwYWNlIj4NCgk8T3JkZXJJZD4zPC9PcmRlcklkPg0KCTxQcm9kdWN0PkRldmVsb3BtZW50PC9Qcm9kdWN0Pg0KCTxRdWFudGl0eT4xNTA8L1F1YW50aXR5Pg0KCTxQcmljZT44MDwvUHJpY2U+DQo8L25zMDpPcmRlcj4=",
      "ContentType": "application/xml",
      "ContentTransferEncoding": "Base64",
      "Properties": {
         "DeliveryCount": "1",
         "EnqueuedSequenceNumber": "0",
         "EnqueuedTimeUtc": "2017-02-25T06:28:23Z",
         "ExpiresAtUtc": "2017-03-11T06:28:23Z",
         "LockedUntilUtc": "2017-02-25T06:29:42Z",
         "LockToken": "db6ff371-f58a-4b2e-8388-15de84c550fc",
         "MessageId": "9ef0fa10026c4e1bbd8997a7201413d6",
         "ScheduledEnqueueTimeUtc": "0001-01-01T00:00:00Z",
         "SequenceNumber": "217",
         "Size": "343",
         "State": "Active",
         "TimeToLive": "12096000000000"
      },
      "MessageId": "9ef0fa10026c4e1bbd8997a7201413d6",
      "To": null,
      "ReplyTo": null,
      "ReplyToSessionId": null,
      "Label": null,
      "ScheduledEnqueueTimeUtc": "0001-01-01T00:00:00Z",
      "SessionId": null,
      "CorrelationId": null,
      "SequenceNumber": 217,
      "LockToken": "db6ff371-f58a-4b2e-8388-15de84c550fc",
      "TimeToLive": "12096000000000"
   }
]

The challenge is to parse this array, decode the base64 content in the ContentData and create a valid XML batch message from it.  I tried several complex Logic App expressions, but realized soon that Azure Functions is better suited to take care of this complicated parsing.  I created the following Azure Fuction, as a Generic Webhook C#type:

#r "Newtonsoft.Json"

using System;
using System.Net;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.IO;
using System.Linq;
using System.Xml;

public static async Task<object> Run(HttpRequestMessage req, TraceWriter log)
{
   try
   {
      JArray inputChildMessages = JArray.Parse(await req.Content.ReadAsStringAsync());

      //Create XmlDocument for the output batch
      var outputBatchXmlDoc = new XmlDocument();

      //Set root node and XML declaration
      var inputXmlDeclaration = GetXmlDeclaration(inputChildMessages);
      var outputXmlDeclaration = outputBatchXmlDoc.CreateXmlDeclaration(inputXmlDeclaration.Version, inputXmlDeclaration.Encoding, inputXmlDeclaration.Standalone);
      var outputBatchRootNode = outputBatchXmlDoc.CreateElement("MessageBatch");
      outputBatchXmlDoc.AppendChild(outputBatchRootNode);
      outputBatchXmlDoc.InsertBefore(outputXmlDeclaration, outputBatchRootNode);

      //Append child messages
      foreach (var inputChildMessage in inputChildMessages.Children<JObject>())
      {
         var outputChildXmlNode = outputBatchXmlDoc.CreateElement("Message");
         var outputChildXmlDoc = GetXmlDocumentFromBase64String(inputChildMessage.Properties().FirstOrDefault(p => p.Name == "ContentData").Value.ToString());

         outputChildXmlNode.InnerXml = outputChildXmlDoc.DocumentElement.OuterXml;
         outputBatchXmlDoc.DocumentElement.AppendChild(outputChildXmlNode);
      }

      return req.CreateResponse(HttpStatusCode.OK, outputBatchXmlDoc.OuterXml);
   }
   catch (Exception ex)
   {
      return req.CreateResponse(HttpStatusCode.InternalServerError, ex.Message);
   }
}

static private XmlDeclaration GetXmlDeclaration(JArray inputChildMessages)
{
   var firstChildMessage = GetXmlDocumentFromBase64String(inputChildMessages.Children<JObject>().FirstOrDefault().Properties().FirstOrDefault(p => p.Name == "ContentData").Value.ToString());
   var inputXmlDeclaration = firstChildMessage.ChildNodes.OfType<XmlDeclaration>().FirstOrDefault();

   if (inputXmlDeclaration == null)
      return new XmlDocument().CreateXmlDeclaration("1.0", "utf-8", null);

   return inputXmlDeclaration;
}

private static XmlDocument GetXmlDocumentFromBase64String(string base64String)
{
   var xmlMessage = new XmlDocument();
   byte[] xmlMessageBytes = Convert.FromBase64String(base64String);
   var xmlMessageSteam = new MemoryStream(xmlMessageBytes);
   xmlMessage.Load(xmlMessageSteam);
   return xmlMessage;
}

Let’s consume this function now from within our Logic App.  There is seamless integration with Logic Apps, which is really great!

As an output of the GetBatchMessage Azure Funtion, I get the following XML 🙂

<?xml version="1.0" encoding="utf-8"?>
<MessageBatch>
   <Message>
      <ns0:Order xmlns:ns0="http://namespace">
         <OrderId>1</OrderId>
         <Product>Consultancy</Product>
         <Quantity>40</Quantity>
         <Price>100</Price>
      </ns0:Order>
   </Message>
   <Message>
      <ns0:Order xmlns:ns0="http://namespace">
         <OrderId>2</OrderId>
         <Product>Training</Product>
         <Quantity>30</Quantity>
         <Price>120</Price>
      </ns0:Order>
   </Message>
   <Message>
      <ns0:Order xmlns:ns0="http://namespace">
         <OrderId>3</OrderId>
         <Product>Development</Product>
         <Quantity>150</Quantity>
         <Price>80</Price>
      </ns0:Order>
   </Message>
</MessageBatch>

Large messages

This solution is very nice, but what with large messages? Recently, I wrote a Service Bus connector that uses the claim check pattern, which exchanges large payloads via Blob Storage. In this batching scenario we can also leverage this functionality. When I have open sourced this project, I’ll update this blog with a working example.  Stay tuned for more!

Conclusion

This is a great and flexible way to perform batching within Logic Apps. It really demonstrates the power of the Better Together story with Azure Logic Apps, Service Bus and Functions. I’m sure this is not the only way to perform batching in Logic Apps, so do not hesitate to share your solution for this common integration challenge in the comments section below!

I hope this gave you some fresh insights in the capabilities of Azure Logic Apps!

ABOUT

MEET THE YOUR AZURE COACH TEAM

Your Azure Coach is specialized in organizing Azure trainings that are infused with real-life experience. All our coaches are active consultants, who are very passionate and who love to share their Azure expertise with you.