An introduction to BlockingCollection

BlockingCollection is a class under System.Collections.Concurrent namespace and as the name implies, like any other collections under this namespace, it can also be used in concurrent and multi-tasking scenarios.

According to my experience, .many developers are familiar with ConcurrentBag, CuncurrentDictionary, ConcurrentQueue and ConcurrentStack. But less people know the power and the usage of BlockingCollection.

Before going any further, let's have a look at a classic primary school example of working with a concurrent queue. Imagine the scenario, we have several threads which are adding users' email to a queue and there is one thread which reads the emails from the queue and send them an email.

class EmailService
{
    private ConcurrentQueue<string> _queue = new ConcurrentQueue<string>();
 
    public void AddEmail(string email)
    {
        _queue.Enqueue(email);
    }
 
    public void StartSendingEmail()
    {
        while (true)
        {
            bool isNotEmpty = _queue.TryDequeue(out string email);
 
            if (isNotEmpty)
            {
                SendEmail(email);
            }
            else
            {
                Thread.Sleep(1000);
            }
        }
    }
 
    private void SendEmail(string email)
    {
        //Send email here
    }
}

In the preceding code, we have an AddEmail method which is used by different threads to add an item the queue. In StartSendingEmail method we first try to pick an email from the queue and send an email. If there is no email in the queue, we will wait for 1000 millisecond and then again try to pick a new item from the list if any exists. And this goes on and on.

We are using a simple polling technique here. The problem here is that how to come up with this 1000 millisecond. We have no idea when an email could be added to the queue to pick up. There are some other old threading classes in .NET Framework which could come in use together and solve this problem but here we will take advantage of BlockingCollection.

BlockingCollection is in fact a wrapper around the concurrent collections which have implemented IProducerConsumerCollection<T> interface. The most famous collections are ConcurrentBag, ConcurrentQueue and ConcurrentStack.

The following code is the similar solution for solving the same problem but this time with BlocingCollection.

class EmailService
{
    private BlockingCollection<string> _collection = new BlockingCollection<string>();
 
    public void AddEmail(string email)
    {
        _collection.Add(email);
    }
    public void StartSendingEmail()
    {
        while (true)
        {
            string email = _collection.Take();
 
            SendEmail(email);
        }
    }
 
    private void SendEmail(string email)
    {
        //Send email here
    }
}

Look at the StartSendingEmail method and see how it has been simplified.

BlocingCollection offers a method name Take. This method returns (moves) an item from the collections if any exists and otherwise blocks the thread until a new item is available in future (that means a new email is added to the collection later on). So we no longer need to pause the operation for 1 second and then start polling again, or even care about if the collection is empty or not.

There is also a way to inform a BlocingCollection that no new email is going to be added to the collection. That means BlocingCollection does not have to wait any longer for new items if it's already empty. This could be done by calling CompleteAdding method. After calling this method, if the collection is empty by invoking Take method an InvalidOperationException will be thrown.

Let's add another feature to our code to FinishSendingEmail.

public void FinishSendingEmail()
{
    _collection.CompleteAdding();
}
 
public void StartSendingEmail()
{
    while (true)
    {
        try
        {
            string email = _collection.Take();
 
            SendEmail(email);
        }
        catch (InvalidOperationException)
        {
            // we are done!
            return;
        }
    }
}

By call FinishSendingEmail method, Take method throws an InvalidOperationException exception (if the collection is empty). We just need to handle this exception and exit from the loop.

Now you may ask, what is the order of the items which are picked up from the collection?

At the beginning of this article, I mentioned that BlockingCollection is a wrapper around IProducerConsumerCollection<T> implementations. BlockingCollection by default is using a ConcurrentQueue as an underlying data source. But we can explicitly assign which datasource should be used. That means if we wish to pick up the items (here emails) in LIFO (last in first out) order, it could simply be done by sending an instance of ConcurrentStack to the collection while initializing:


BlockingCollection<string> _collection = new BlockingCollection<string>(new ConcurrentStack<string>());
 

Here we talked about some fundamental features of BlockingCollection but there is a still a lot offered by this class. for instance, a CancellationToken could be sent to the Take method as a parameter.

Happy Programming!

No Comments