Azure Functions Isolated Worker - Sending multiple messages

enter image description here

The new Azure Functions SDK for Isolated Worker (process) has been introduced around .NET 5. While it's still in flux despite being GA-ed, it's gaining more and more popularity. And yet, there are still some sharp edges you should be careful with and validate that everything you're using with the older SDK, In-Process, is offered with the new SDK. Or at least there's a replacement.

Today, I've stumbled upon a StackOverflow question about IAsyncCollector and Service Bus messages. IAsyncCollector, as its synchronous counterpart ICollector offers the comfort of output binding and returning multiple items. For example, with Azure Service Bus, one can send out multiple messages from the executing function. Quite handy, and with the In-Process SDK, it looks like the following. The function's signature contains a collector (I call it dispatcher) that can be used to "add" messages. Those are actually getting dispatched to the queue the ServiceBus attribute is configured with by adding messages. Which, in this case, is a queue called dest.

[FunctionName("Concierge")]
public async Task<IActionResult> Handle([HttpTrigger(AuthorizationLevel.Function,"post", Route = "receive")] HttpRequest req,
    [ServiceBus("dest", Connection = "AzureServiceBus")] IAsyncCollector<ServiceBusMessage> dispatcher)

And sending messages:

for (var i = 0; i < 10; i++)
{
   var message = new ServiceBusMessage($"Message #{i}");
   await collector.AddAsync(serviceBusMessage);
}

Straight forward and simple. But how do you do the same with the new Isolated Worker (out of process) SDK?

Not the same way. The new SDK doesn't currently support native SDK types. Therefore types such as ServiceBusMessage are not supported. Also, SDK Service Bus clients are not available directly. So functions need to marshal data as strings or byte arrays to be able to send those. And receive as well. But we're focusing on sending. So what's the way to send those multiple messages?

The official documentation does mention multiple output binding. But that's in the context of using multiple different output bindings. To output multiple items to the same output binding, we need to resort to a bit of tedious work.

First, we'll need to serialize our messages. Then we'll dispatch those serialized objects using an output binding, connected to a collection property. Here's an example:

[Function("OneToMany")]
public static DispatchedMessages Run([ServiceBusTrigger("myqueue", 
    Connection = "AzureServiceBus")] string myQueueItem, FunctionContext context)
{
  // Generate 5 messages
  var messages = new List<MyMessage>();
  for (var i = 0; i < 5; i++)
  {
      var message = new MyMessage { Value = $"Message #{i}" };
      messages.Add(message);
  }

  return new DispatchedMessages
  { 
      Messages = messages.Select(x => JsonSerializer.Serialize(x)) 
  };
}

Each message of type MyMessage is serialized first.

class MyMessage
{
    public string Value { get; set; }
}

And then, we return an object of DispatchedMessage where the binding glue is:

public class DispatchedMessages
{
    [ServiceBusOutput(queueOrTopicName: "dest", Connection = "AzureServiceBus")]
    public IEnumerable<string> Messages { get; set; }
}

This object will be returned from the function and marshalled back to the SDK code that will take care to enumerate over the Messages property, taking each string and passing it as the body value to the newly constructed ServiceBusMessage. With the help of the ServiceBusOutput attribute, Functions SDK knows where to send the message and where to find the connection string. Note that w/o specifying the connection string name, the SDK will attempt to load the connection string from the variable/key named AzureWebJobsServiceBus. This means that we can have multiple dispatchers, similar to the in-process SDK multiple collectors, by having a property per destination/namespace on the returned type.

And just like this, we can kick off the function and dispatch multiple messages with the new Isolated Worker SDK.

enter image description here

4 Comments

  • Hi Sean

    I am working on the Service bus output in an azure function (c# with Microsoft.NET.Sdk.Functions 3.0.11 and Microsoft.Azure.WebJobs.Extensions.ServiceBus 4.3.0
    I use the Microsoft.Azure.ServiceBus.Message class, not the Azure.Messaging.ServiceBus.ServiceBusMessage

    If I create a service bus message and send it to the bus using IAsyncCollector.AddAsync, it fails to send it because it cannot serialize the ExpiresAtUTC that is constructed from the Enqueue time and the TTL. If I use the hack documented at https://stackoverflow.com/questions/47755066/unable-to-serialize-azure-servicebus-message-error-getting-value-from-expires/47757023 it can send the message but if my consumer is out of process (e.g. python azure function), it fails to deserialize the ExpiresAt (https://github.com/Azure/azure-functions-host/issues/7119)

    Is there a reliable way in .NET core 3.1 to write multiple messages to a service bus from an azure function that can be read from an out of process function?
    What is the difference between Microsoft.Azure.ServiceBus.Message and Azure.Messaging.ServiceBus.ServiceBusMessage and which should I use for a .NET 3.1 c# azure function?

    Thanks

    -Francois

  • @Francois,

    I'm not entirely following what's the scenario you're handling. Within the context of an Azure function you shouldn't be reflecting anything at all. That workaround would be needed for something like testing scenarios. Messages sending/publishing and receiving should not depend on any of this. Perhaps I'm not understanding the question. In that case, it would be better to go over email (Feldman period Sean at Gmail period com).
    Cheers!

  • Hello,

    I am having a really hard time getting this to work, and I see this error about being 'unable to configure binding', do you recognize this?

    'The 'myfunction' function is in error: Unable to configure binding 'Messages' of type 'serviceBus'. This may indicate invalid function.json properties. Can't figure out which ctor to call.

    The function itself is the same as yours, but perhaps you could show what versions of packages you are using and if you are doing any special configuration in your Main().

    Cheers,
    Andreas

  • @Andreas,

    I did not keep the source code around but the post was based on Isolated Worker (out of process) SDK with .NET 5 (or 6 today).
    You'll need to double-check the packages you're brining in. Today, those would probably need to be something like this:

    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.ServiceBus" Version="4.2.1" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.3.0" OutputItemType="Analyzer" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.6.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.0.13" />

    W/o seeing code hard to say what is what. Give it a whirl and if anything, ping me over feldman.sean - gmail.com

Add a Comment

As it will appear on the website

Not displayed

Your website