Introducing Postal.NET

Update: Postal.NET is now available in Nuget.

Introduction

Postal.NET is a library I wrote for writing in-process decoupled applications, using the Publish/Subscribe and Domain Event patterns. Is is based loosely upon the Postal.js JavaScript library. When I say loosely, I mean, I didn’t look at the source code, but I like the ideas that it implements! Smile

Postal.NET allows us to post messages to a “bus”, identified by a channel and topic pair. A channel can have any number of topics, and a topic can be in any number of channels, these are just strings.

We can subscribe to one or more channel/topic pairs and cancel each subscription when we no longer need it.

Messages are either posted synchronously or asynchronously. Each message is delivered “as-is”, but it is wrapped in an envelope that includes a couple of properties, like, the timestamp and the origin channel and topic.

Nothing too complex, yet, I hope you find it useful!

Usage

Some usages, first, a subscription and two messages being sent, one synchronously (Publish) and the other asynchronously (PublishAsync):

using (Postal.Box.Subscribe("channel", "topic", (env) => Console.WriteLine(env.Data)))
{
    //sync
    Postal.Box.Publish("channel", "topic", "Hello, World!");
 
    //async
    await Postal.Box.PublishAsync("channel", "topic", "Hello, Async World!");
}
 
Postal.Box.Publish("channel", "topic", "Does not appear because the subscription was disposed!");

Notice the using block that wraps a Subscribe call. Subscribe returns an IDisposable, which, when disposed of, terminates the subscription, hence the third message published will not be caught by the subscription.

using (Postal.Box.Subscribe("*", "*", (env) => Console.WriteLine("Catch all!")))
{
    using (Postal.Box.Subscribe("channel", "topic", (env) => Console.WriteLine(env.Data)))
    {
        Postal.Box.Publish("channel", "topic", "Hello, World!");
    }
}

Now we have two subscriptions, one for * (any) channel and topic, and the other for a concrete one. Both subscriptions will receive the message.

using (Postal.Box.Subscribe("channel", "topic", (env) => Console.WriteLine(env.Data), (env) => env.Data is int))
{
    Postal.Box.Publish("channel", "topic", "Does not show!");
    Postal.Box.Publish("channel", "topic", 12345);
}

This time we are filtering the subscription: it will only fire if the message payload is an integer.

using (var evt = new ManualResetEvent(false))
using (Postal.Box.Subscribe("channel", "topic", (env) => {
    Console.WriteLine(env.Data);
    evt.Set();
}))
{
    Postal.Box.PublishAsync("channel", "topic", "Hello, World!");
 
    evt.WaitOne();
}

In this example, we wait for an event to be signaled, which only happens once an asynchronous message is handled.

using (Postal.Box.AnyChannelAndTopic().Subscribe((env) => Console.WriteLine("Catch all!")))
{
    using (Postal.Box.Channel("channel").Topic("topic").Subscribe((env) => Console.WriteLine(env.Data)))
    {
        Postal.Box.Channel("channel").Topic("topic").Publish("Hello, World!");
    }
}

This final example shows the alternative fluent interface. It offers the exact same functionality.

Code

Let’s start by the Postal class, which merely acts as an holder for an IBox:

public static class Postal
{
    public static readonly IBox Box = new Box();
}

The IBox interface, the core of Postal.NET, is defined as:

public interface IBox
{
    IDisposable Subscribe(string channel, string topic, Action<Envelope> subscriber, Func<Envelope, bool> condition = null);
 
    void Publish(string channel, string topic, object data);
    Task PublishAsync(string channel, string topic, object data);
}

The included implementation, Box, is as follows:

public sealed class Box : IBox
{
    class SubscriberId
    {
        private readonly Guid id;
        private readonly string channel;
        private readonly string topic;
        private readonly int hash;
        private readonly Func<Envelope, bool> condition;
 
        public SubscriberId(Guid id, string channel, string topic, Func<Envelope, bool> condition)
        {
            this.id = id;
            this.channel = channel;
            this.topic = topic;
            this.condition = condition;
 
            unchecked
            {
                this.hash = 13;
                this.hash = (this.hash * 17) ^ this.id.GetHashCode();
                this.hash = (this.hash * 17) ^ this.channel.GetHashCode();
                this.hash = (this.hash * 17) ^ this.topic.GetHashCode();
            }
        }
 
        private string Normalize(string str)
        {
            return str
                .Replace(".", "\\.")
                .Replace("*", ".*");
        }
 
        public bool Matches(string channel, string topic)
        {
            var channelRegex = new Regex(this.Normalize(this.channel));
            var topicRegex = new Regex(this.Normalize(this.topic));
 
            return channelRegex.IsMatch(channel) == true
                   && topicRegex.IsMatch(topic);
        }
 
        public override bool Equals(object obj)
        {
            var other = obj as SubscriberId;
 
            if (other == null)
            {
                return false;
            }
 
            return (other.id == this.id) && (other.channel == this.channel) && (other.topic == this.topic);
        }
 
        public override int GetHashCode()
        {
            return this.hash;
        }
 
        public bool Passes(Envelope env)
        {
            return this.condition(env);
        }
    }
 
    class DisposableSubscription : IDisposable
    {
        private readonly SubscriberId id;
        private readonly IDictionary<SubscriberId, Action< Envelope>> subscribers;
 
        public DisposableSubscription(SubscriberId id, IDictionary<SubscriberId, Action<Envelope>> subscribers)
        {
            this.id = id;
            this.subscribers = subscribers;
        }
 
        public void Dispose()
        {
            this.subscribers.Remove(this.id);
        }
    }
 
    private readonly ConcurrentDictionary<SubscriberId, Action<Envelope>> subscribers = new ConcurrentDictionary<SubscriberId, Action<Envelope>>();
 
    public void Publish(string channel, string topic, object data)
    {
        this.Validate(channel, topic);
        this.PublishAsync(channel, topic, data).GetAwaiter().GetResult();
    }
 
    public IDisposable Subscribe(string channel, string topic, Action<Envelope> subscriber, Func<Envelope, bool> condition = null)
    {
        this.Validate(channel, topic);
        this.Validate(subscriber);
 
        if (condition == null)
        {
            condition = (env) => true;
        }
 
        var id = new SubscriberId(Guid.NewGuid(), channel, topic, condition);
        this.subscribers[id] = subscriber;
 
        return new DisposableSubscription(id, this.subscribers);
    }
 
    public async Task PublishAsync(string channel, string topic, object data)
    {
        this.Validate(channel, topic);
 
        var env = new Envelope(channel, topic, data);
 
        foreach (var subscriber in this.GetSubscribers(channel, topic, env).AsParallel())
        {
            await Task.Run(() => subscriber(env));
        }
    }
 
    private void Validate(string channel, string topic)
    {
        if (string.IsNullOrWhiteSpace(channel) == true)
        {
            throw new ArgumentNullException("channel");
        }
 
        if (string.IsNullOrWhiteSpace(topic) == true)
        {
            throw new ArgumentNullException("topic");
        }
    }
 
    private void Validate(Action<Envelope> subscriber)
    {
        if (subscriber == null)
        {
            throw new ArgumentNullException("subscriber");
        }
    }
 
    private void Validate(Func<Envelope, bool> condition)
    {
        if (condition == null)
        {
            throw new ArgumentNullException("condition");
        }
    }
 
    private bool Matches(SubscriberId id, string channel, string topic)
    {
        return id.Matches(channel, topic);
    }
 
    private IEnumerable<Action<Envelope>> GetSubscribers(string channel, string topic, Envelope env)
    {
        foreach (var subscriber in this.subscribers)
        {
            if (this.Matches(subscriber.Key, channel, topic) == true)
            {
                if (subscriber.Key.Passes(env) == true)
                {
                    yield return subscriber.Value;
                }
            }
        }
    }
}

And finally, the Envelope class:

public sealed class Envelope
{
    public Envelope(string channel, string topic, object data)
    {
        this.Timestamp = DateTime.UtcNow;
        this.Channel = channel;
        this.Topic = topic;
        this.Data = data;
    }
 
    public DateTime Timestamp { get; private set; }
    public string Channel { get; private set; }
    public string Topic { get; private set; }
    public object Data { get; private set; }
}

The code for the fluent extensions will not be covered here, but you can find it in GitHub.

Limitations

Postal.NET will not:

  • serialize messages or send them across process boundaries;
  • apply any scheduling, messages are sent immediately;
  • apply any transformation to the messages;
  • keep sent messages internally.

But it will:

  • keep the order by which messages were sent;
  • send messages to multiple channels and topics, if the * is used;
  • send any kind of data, it really doesn’t matter.

Extensions

In the GitHub repository you will find some other projects:

  • PostalRX.NET: a Reactive Extensions (RX.NET) adapter for Postal.NET;
  • PostalConventions.NET: using conventions and a fluent interface for automatically inferring the channel and topic from the message itself.

Future Works

I have a couple of ideas, but, for now, I want to keep this simple. I will update the repository as I make progress. I’d like to hear from you on this.

Conclusion

Postal.NET will hopefully let you build more decoupled applications more easily. I hope you got the idea and you see value in it. As always, I’m always happy to hear your comments!

                             

3 Comments

  • Have you had a chance to take a look at https://github.com/exceptionless/Foundatio ? Nice little lib you have, have you thought about making your subscribe methods async so you can eventually take it out of process? Azure ServiceBus and redis both have async subscriber methods.

  • Hi, Blake!
    I didn't knew Foundation; it seems to be significantly more complex. Postal.NET is quite a simple thing.
    Yes, I did consider it and will probably add it to the next version.
    Thank you for your feedback! :-)

  • http://aboutcode.net/postal/

Add a Comment

As it will appear on the website

Not displayed

Your website