Introducing Postal.NET
Update: Postal.NET is now available in Nuget.
Introduction
Postal.NET is a library I wrote for writing in-process decoupled applications, using the Publish/Subscribe and Domain Event patterns. Is is based loosely upon the Postal.js JavaScript library. When I say loosely, I mean, I didn’t look at the source code, but I like the ideas that it implements!
Postal.NET allows us to post messages to a “bus”, identified by a channel and topic pair. A channel can have any number of topics, and a topic can be in any number of channels, these are just strings.
We can subscribe to one or more channel/topic pairs and cancel each subscription when we no longer need it.
Messages are either posted synchronously or asynchronously. Each message is delivered “as-is”, but it is wrapped in an envelope that includes a couple of properties, like, the timestamp and the origin channel and topic.
Nothing too complex, yet, I hope you find it useful!
Usage
Some usages, first, a subscription and two messages being sent, one synchronously (Publish) and the other asynchronously (PublishAsync):
using (Postal.Box.Subscribe("channel", "topic", (env) => Console.WriteLine(env.Data)))
{
//sync
Postal.Box.Publish("channel", "topic", "Hello, World!");
//async
await Postal.Box.PublishAsync("channel", "topic", "Hello, Async World!");
}
Postal.Box.Publish("channel", "topic", "Does not appear because the subscription was disposed!");
Notice the using block that wraps a Subscribe call. Subscribe returns an IDisposable, which, when disposed of, terminates the subscription, hence the third message published will not be caught by the subscription.
using (Postal.Box.Subscribe("*", "*", (env) => Console.WriteLine("Catch all!")))
{
using (Postal.Box.Subscribe("channel", "topic", (env) => Console.WriteLine(env.Data)))
{
Postal.Box.Publish("channel", "topic", "Hello, World!");
}
}
Now we have two subscriptions, one for * (any) channel and topic, and the other for a concrete one. Both subscriptions will receive the message.
using (Postal.Box.Subscribe("channel", "topic", (env) => Console.WriteLine(env.Data), (env) => env.Data is int))
{
Postal.Box.Publish("channel", "topic", "Does not show!");
Postal.Box.Publish("channel", "topic", 12345);
}
This time we are filtering the subscription: it will only fire if the message payload is an integer.
using (var evt = new ManualResetEvent(false))
using (Postal.Box.Subscribe("channel", "topic", (env) => {
Console.WriteLine(env.Data);
evt.Set();
}))
{
Postal.Box.PublishAsync("channel", "topic", "Hello, World!");
evt.WaitOne();
}
In this example, we wait for an event to be signaled, which only happens once an asynchronous message is handled.
using (Postal.Box.AnyChannelAndTopic().Subscribe((env) => Console.WriteLine("Catch all!")))
{
using (Postal.Box.Channel("channel").Topic("topic").Subscribe((env) => Console.WriteLine(env.Data)))
{
Postal.Box.Channel("channel").Topic("topic").Publish("Hello, World!");
}
}
This final example shows the alternative fluent interface. It offers the exact same functionality.
Code
Let’s start by the Postal class, which merely acts as an holder for an IBox:
public static class Postal
{
public static readonly IBox Box = new Box();
}
The IBox interface, the core of Postal.NET, is defined as:
public interface IBox
{
IDisposable Subscribe(string channel, string topic, Action<Envelope> subscriber, Func<Envelope, bool> condition = null);
void Publish(string channel, string topic, object data);
Task PublishAsync(string channel, string topic, object data);
}
The included implementation, Box, is as follows:
public sealed class Box : IBox
{
class SubscriberId
{
private readonly Guid id;
private readonly string channel;
private readonly string topic;
private readonly int hash;
private readonly Func<Envelope, bool> condition;
public SubscriberId(Guid id, string channel, string topic, Func<Envelope, bool> condition)
{
this.id = id;
this.channel = channel;
this.topic = topic;
this.condition = condition;
unchecked
{
this.hash = 13;
this.hash = (this.hash * 17) ^ this.id.GetHashCode();
this.hash = (this.hash * 17) ^ this.channel.GetHashCode();
this.hash = (this.hash * 17) ^ this.topic.GetHashCode();
}
}
private string Normalize(string str)
{
return str
.Replace(".", "\\.")
.Replace("*", ".*");
}
public bool Matches(string channel, string topic)
{
var channelRegex = new Regex(this.Normalize(this.channel));
var topicRegex = new Regex(this.Normalize(this.topic));
return channelRegex.IsMatch(channel) == true
&& topicRegex.IsMatch(topic);
}
public override bool Equals(object obj)
{
var other = obj as SubscriberId;
if (other == null)
{
return false;
}
return (other.id == this.id) && (other.channel == this.channel) && (other.topic == this.topic);
}
public override int GetHashCode()
{
return this.hash;
}
public bool Passes(Envelope env)
{
return this.condition(env);
}
}
class DisposableSubscription : IDisposable
{
private readonly SubscriberId id;
private readonly IDictionary<SubscriberId, Action< Envelope>> subscribers;
public DisposableSubscription(SubscriberId id, IDictionary<SubscriberId, Action<Envelope>> subscribers)
{
this.id = id;
this.subscribers = subscribers;
}
public void Dispose()
{
this.subscribers.Remove(this.id);
}
}
private readonly ConcurrentDictionary<SubscriberId, Action<Envelope>> subscribers = new ConcurrentDictionary<SubscriberId, Action<Envelope>>();
public void Publish(string channel, string topic, object data)
{
this.Validate(channel, topic);
this.PublishAsync(channel, topic, data).GetAwaiter().GetResult();
}
public IDisposable Subscribe(string channel, string topic, Action<Envelope> subscriber, Func<Envelope, bool> condition = null)
{
this.Validate(channel, topic);
this.Validate(subscriber);
if (condition == null)
{
condition = (env) => true;
}
var id = new SubscriberId(Guid.NewGuid(), channel, topic, condition);
this.subscribers[id] = subscriber;
return new DisposableSubscription(id, this.subscribers);
}
public async Task PublishAsync(string channel, string topic, object data)
{
this.Validate(channel, topic);
var env = new Envelope(channel, topic, data);
foreach (var subscriber in this.GetSubscribers(channel, topic, env).AsParallel())
{
await Task.Run(() => subscriber(env));
}
}
private void Validate(string channel, string topic)
{
if (string.IsNullOrWhiteSpace(channel) == true)
{
throw new ArgumentNullException("channel");
}
if (string.IsNullOrWhiteSpace(topic) == true)
{
throw new ArgumentNullException("topic");
}
}
private void Validate(Action<Envelope> subscriber)
{
if (subscriber == null)
{
throw new ArgumentNullException("subscriber");
}
}
private void Validate(Func<Envelope, bool> condition)
{
if (condition == null)
{
throw new ArgumentNullException("condition");
}
}
private bool Matches(SubscriberId id, string channel, string topic)
{
return id.Matches(channel, topic);
}
private IEnumerable<Action<Envelope>> GetSubscribers(string channel, string topic, Envelope env)
{
foreach (var subscriber in this.subscribers)
{
if (this.Matches(subscriber.Key, channel, topic) == true)
{
if (subscriber.Key.Passes(env) == true)
{
yield return subscriber.Value;
}
}
}
}
}
And finally, the Envelope class:
public sealed class Envelope
{
public Envelope(string channel, string topic, object data)
{
this.Timestamp = DateTime.UtcNow;
this.Channel = channel;
this.Topic = topic;
this.Data = data;
}
public DateTime Timestamp { get; private set; }
public string Channel { get; private set; }
public string Topic { get; private set; }
public object Data { get; private set; }
}
The code for the fluent extensions will not be covered here, but you can find it in GitHub.
Limitations
Postal.NET will not:
-
serialize messages or send them across process boundaries;
-
apply any scheduling, messages are sent immediately;
-
apply any transformation to the messages;
-
keep sent messages internally.
But it will:
-
keep the order by which messages were sent;
-
send messages to multiple channels and topics, if the * is used;
-
send any kind of data, it really doesn’t matter.
Extensions
In the GitHub repository you will find some other projects:
-
PostalRX.NET: a Reactive Extensions (RX.NET) adapter for Postal.NET;
-
PostalConventions.NET: using conventions and a fluent interface for automatically inferring the channel and topic from the message itself.
Future Works
I have a couple of ideas, but, for now, I want to keep this simple. I will update the repository as I make progress. I’d like to hear from you on this.
Conclusion
Postal.NET will hopefully let you build more decoupled applications more easily. I hope you got the idea and you see value in it. As always, I’m always happy to hear your comments!