Processing Azure EventGrid events with NServiceBus

enter image description here

What’s EventGrid and why it’s important?

EventGrid helps with creation of event-based systems. The service manages routing of events from various source to the subscribers. Its high throughput and availability make it very appealing. Currently supports a few Azure Services but will change over time as the service is adopted by more and more other Azure services.

So, what makes it so important? Event-based development. No longer polling. Instead, it’s a push-based model where events are pushed to the subscribers. Think of Serverless applications. No longer need to poll for a storage blob or a queue. Instead, when a blob is created, or a message is received, an event is fired. If you have developed using NServiceBus in the past, event-driven development is not a new concept to you, and you’ll see that EventGrid and NServiceBus pub/sub are playing along nicely.

EventGrid providers

![enter image description here][3]

Publishing and subscribing to events

Services that already act as event providers (topics) do not require any additional work and can be subscribed to as-is. For custom events, one or more custom topics can be created. To create a custom topic and configure a Storage Queue subscriber, follow the steps provided by Microsoft here.

Note: make sure your queue name will be NServiceBus endpoint’s input queue.

To verify your code is working, the following snippet can be used to publish to the custom topic. Inspect Storage queue to find events stored as messages.

var topicEndpoint = "https://<custom-topic>.<region>-1.eventgrid.azure.net/api/events";
var topicKey = "<topic-key>";
var topicHostname = new Uri(topicEndpoint).Host;

var topicCredentials = new TopicCredentials(topicKey);
var client = new EventGridClient(topicCredentials);

await client.PublishEventsAsync(topicHostname, GetEventsList());
Console.Write("Published events to Event Grid.");

To create one or more events (EventGrid supports batches), a collection of EventGridEvent needs to be created. A single event has a well defined schema, where custom event data is stored as Data property and event type as EventType property.

new EventGridEvent
{
  Id = Guid.NewGuid().ToString(),
  Data = new BlogPostPublished
  {
	ItemUri = post
  },
  EventType = nameof(BlogPostPublished),
  EventTime = DateTime.Now,
  Subject = "Processing Azure Event Grid events with NServiceBus",
  DataVersion = "1.0"
});

Storage queue should contain all published events

enter image description here

Receiving EventGrid events with NServiceBus endpoint

NServiceBus can consume custom Storage Queue messages. EventGrid events can be treated as native integration messages. To enable this integration, a custom envelope unwrapper has to be registered.

var jsonSerializer = new Newtonsoft.Json.JsonSerializer();

transport.UnwrapMessagesWith(cloudQueueMessage =>
{
	using (var stream = new MemoryStream(cloudQueueMessage.AsBytes))
	using (var streamReader = new StreamReader(stream))
	using (var textReader = new JsonTextReader(streamReader))
	{
		var jObject = JObject.Load(textReader);

		using (var jsonReader = jObject.CreateReader())
		{
			//try deserialize to a NServiceBus envelope first
			var wrapper = jsonSerializer.Deserialize<MessageWrapper>(jsonReader);

			if (wrapper.MessageIntent != default)
			{
				//this was a envelope message
				return wrapper;
			}
		}

		//this was an EventGrid event
		using (var jsonReader = jObject.CreateReader())
		{
  			var @event = jsonSerializer.Deserialize<EventGridEvent>(jsonReader);

			var wrapper = new MessageWrapper
			{
				Id = @event.Id,
				Headers = new Dictionary<string, string>
				{
					{ "NServiceBus.EnclosedMessageTypes", @event.EventType },
					{ "NServiceBus.MessageIntent", "Publish" },
					{ "EventGrid.topic", @event.Topic },
					{ "EventGrid.subject", @event.Subject },
					{ "EventGrid.eventTime", @event.EventTime.ToString("u") },
					{ "EventGrid.dataVersion", @event.DataVersion },
					{ "EventGrid.metadataVersion", @event.MetadataVersion },
				},
				Body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(@event.Data)),
				MessageIntent = MessageIntentEnum.Publish
	    	};
	    	return wrapper;
	    }
	}
});

To help NServiceBus understand what event type the message represents, a specific header NServiceBus.EnclosedMessageTypes needs to be assigned the value of the EventGrid event published.

Note: handled EventGrid events need to be declared in your NServiceBus using declarative or convention option.

public class BlogPostPublished : IEvent
{
    public string ItemUri { get; set; }
}

EventGridEvent is an EventGrid wrapper defined by the service schema

public class EventGridEvent
{
    [JsonProperty(PropertyName = "id")]
    public string Id { get; set; }

    [JsonProperty(PropertyName = "topic")]
    public string Topic { get; set; }

    [JsonProperty(PropertyName = "subject")]
    public string Subject { get; set; }

    [JsonProperty(PropertyName = "data")]
    public object Data { get; set; }

    [JsonProperty(PropertyName = "eventType")]
    public string EventType { get; set; }

    [JsonProperty(PropertyName = "eventTime")]
    public DateTime EventTime { get; set; }

    [JsonProperty(PropertyName = "metadataVersion")]
    public string MetadataVersion { get; set; }

    [JsonProperty(PropertyName = "dataVersion")]
    public string DataVersion { get; set; }
}

At this point, an event handler can be defined

public class BlogPostPublishedHandler : IHandleMessages<BlogPostPublished>
{
    static ILog log = LogManager.GetLogger<BlogPostPublishedHandler>();

    public Task Handle(BlogPostPublished message, IMessageHandlerContext context)
    {
        log.Info($"Received {nameof(BlogPostPublished)}: {message.ItemUri}");
        log.Info($"Topic: {context.MessageHeaders["EventGrid.topic"]}");
        log.Info($"Subject: {context.MessageHeaders["EventGrid.subject"]}");
        log.Info($"Event time: {context.MessageHeaders["EventGrid.eventTime"]}");

        return Task.CompletedTask;
    }
}

Resulting in

2018-07-14 10:09:14.975 INFO BlogPostPublishedHandler Received BlogPostPublished: https://weblogs.asp.net/sfeldman/eventgrid-events-with-nservicebus
2018-07-14 10:09:14.980 INFO BlogPostPublishedHandler Topic: /subscriptions//resourceGroups/EventGrid-ASQ-RG/providers/Microsoft.EventGrid/topics/Events
2018-07-14 10:09:14.984 INFO BlogPostPublishedHandler Subject: Processing Azure Event Grid events with NServiceBus
2018-07-14 10:09:14.986 INFO BlogPostPublishedHandler Event time: 2018-07-14 16:08:48Z

Handling specific events

When a new event type is published, the endpoint will receive it and attempt to process. In case NServiceBus endpoint is not aware of the event type, it will end up in the error queue. This is caused by EventGrid subscription by default subscribing to all published events. To fix this, EventGrid subscription needs to be updated to receive only specific types.

az eventgrid event-subscription update \
--resource-id "/subscriptions/<subscription-id>/resourceGroups/eventgrid-asq-rg/providers/microsoft.eventgrid/topics/events" \
--name asq-subscription \
--included-event-types BlogPostPublished

Once the subscription is updated, the filter will be on and only specified event types (BlogPostPusblished in this case) will be passed on to the Storage queue.

{
  "destination": {
    "endpointType": "StorageQueue",
    "queueName": "queue",
    "resourceId": "/subscriptions/<subscription-id>/resourceGroups/EventGrid-ASQ-RG/providers/Microsoft.Storage/storageAccounts/eventgridasq"
  },
  "eventDeliverySchema": "InputEventSchema",
  "filter": {
    "includedEventTypes": [
  "BlogPostPublished"
    ],
    "isSubjectCaseSensitive": null,
    "subjectBeginsWith": "",
    "subjectEndsWith": ""
  },
  …
}

What's next?

Next would be taking advantage of EventGrid with custom topics or Azure services that can already emit EventGrid events. In the next blog post I'll show how easy it would be to subscribe to Storage Blob events.

Happy eventing!

3 Comments

  • Very cool indeed. Seems like you could easily eliminate the top (technically the 4th one) using block of your unwrapper function if you just moved the attempt to deserialize to an envelope down into the bottom using directive. That way, you wouldn't have to create (and dispose) a second jsonReader. Am I missing something?

  • Don't remember the error, but couldn't reuse JSON reader.
    The code is on GitHub (https://github.com/SeanFeldman/NServiceBus.AzureEventGrid.StorageQueues). Maybe I've missed something obvious :)

  • Interesting. Thanks for the response!

Comments have been disabled for this content.