TPL Dataflow–An overview

Some time back I blogged about Parallel Tasks that was introduced in .net 4.0. This was a part of the Task Parallel Library (TPL) made available to the developer community to add parallel / concurrency concepts to their applications.

They have gone a step ahead with that in .net 4.5 and given us TPL Dataflow (TDF). In MS’s own words:

“TPL Dataflow (TDF) is a new .NET library for building concurrent applications. It promotes actor/agent-oriented designs through primitives for in-process message passing, dataflow, and pipelining. TDF builds upon the APIs and scheduling infrastructure provided by the Task Parallel Library (TPL) in .NET 4, and integrates with the language support for asynchrony provided by C#, Visual Basic, and F#.”

From an architectural level, the two interfaces that matter in TDF are the ISourceBlock<T> and the ITargetBlock<T>. Sources offer data and targets are offered data. The entire TDF is available in the System.Threading.Tasks.Dataflow.dll assembly.

To start simple, I’ll take up the ActionBlock class. This type allows us to run a snippet of code (like an Action<T> delegate) to perform some action on each input element.

   1: private static void TaskParallelLibrary()
   2: {
   3:     ActionBlock<int> actionBlock = new ActionBlock<int>(i =>
   4:     {
   5:         Console.WriteLine(i);
   6:     });
   7:  
   8:     for (int i = 0; i < 5; i++)
   9:     {
  10:         actionBlock.Post(i);
  11:     }
  12: }

We declare an ActionBlock that takes an integer as an input and writes it on the screen. We then ‘post’ data to this block of code that will get executed.

image

No surprises in the output there. This way you can create ’blocks’ of code that churn your data (in-parallel, if set up so) and all you have to do is to just post data to this network of data-processing blocks. I said, ‘network’. Yes, you can link one block to another and create a dataflow network. These linkages can also be added and removed at runtime providing us a very flexible and powerful feature. Let’s build a simple dataflow network.

   1: private static void TaskParallelLibrary()
   2: {
   3:     ActionBlock<int> actionBlock = new ActionBlock<int>(i =>
   4:     {
   5:         Console.WriteLine(i);
   6:     });
   7:  
   8:     TransformBlock<int, int> transformBlock = new TransformBlock<int, int>(i => i * 3);
   9:  
  10:     transformBlock.LinkTo(actionBlock);
  11:  
  12:     for (int i = 0; i < 20; i++)
  13:     {
  14:         transformBlock.Post(i);
  15:     }
  16: }

The main difference between an ActionBlock and a TransformBlock is that the latter has an output. In other words, ActionBlock takes an Action<> delegate, while a TransformBlock takes a Func<> delegate.

What we’re doing above is to create the TransformBlock that multiplies the input integer by 3 and returns the integer. We are posting data to the TransformBlock and since we’ve linked this block to an ActionBlock instance, the output of the TransformBlock is sent as the input to the ActionBlock.

image

When I debug through the code, we’ll see two queues on the TransformBlock – the input and the output queue.

screenshot4screenshot2

Now just because we’re using TDF does not mean things are happening in-parallel by default. Your dataflow needs to be instructed to do so.

   1: private static void TaskParallelLibrary()
   2: {
   3:     ExecutionDataflowBlockOptions executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
   4:     {
   5:         MaxDegreeOfParallelism = 4,
   6:     };
   7:  
   8:     ActionBlock<int> actionBlock = new ActionBlock<int>(i =>
   9:     {
  10:         Thread.Sleep(500);
  11:         Console.WriteLine(i);
  12:     }, executionDataflowBlockOptions
  13:     );
  14:  
  15:     TransformBlock<int, int> transformBlock = new TransformBlock<int, int>(i => i * 3);
  16:  
  17:     transformBlock.LinkTo(actionBlock);
  18:  
  19:     for (int i = 0; i < 20; i++)
  20:     {
  21:         transformBlock.Post(i);
  22:     }
  23: }

The DataflowOptions class is the base class for configuring control over a dataflow block. We’ll use the ExecutionDataflowBlockOptions class here. By default, every dataflow block processes only one message at a time. To process multiple messages at a time, you can set the MaxDegreeOfParallelism property, whose default value is 1.

I was able to record video outputs (using Camstudio) for the two scenarios – the first one with the MaxDegreeOfParallelism of 1 and the second with a value of 4.

I have also added a Thead.Sleep() of half a second to the action block, to enable us to see the difference between the two scenarios.

TPL Dataflow Without Parallelism

 

TPL Dataflow With Parallelism

There’s a Thread.Sleep() for a few seconds before I call the TaskParallelLibrary() method to allow me to record the output. But in the first video, the output begins at the 6th second and ends at the 16th second, taking 10 seconds to finish processing. This proves a straight-forward linear processing we’re all familiar with (20 items x 0.5 of sleep time = 10 seconds)

In the second video however, the output begins at around the 5th second and finishes a little after the 9th second. Clearly, this not the linear case that’s happening above and the MaxDegreeOfParallelism property seems to have done its job in processing the requests in-parallel. Also, if you observe the second video closely, the output is coming in ‘chunks’ as against in singles in the first one – another indication that things are running in-parallel.

Now to look at a little more useful application, I have a WinForms app where I download a few images (randomly chosen from my blog site) and add them to a panel.

   1: private void ShowImages_Click(object sender, EventArgs e)
   2: {
   3:     tableLayoutPanel.Controls.Clear();
   4:     ExecutionDataflowBlockOptions executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
   5:     {
   6:         TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(),
   7:         MaxDegreeOfParallelism = 4,
   8:     };
   9:  
  10:     ActionBlock<string> imageDownloader = new ActionBlock<string>(url =>
  11:     {
  12:         Image image = DownloadImage(url);
  13:         tableLayoutPanel.Controls.Add(new PictureBox
  14:         {
  15:             Image = image,
  16:             Width = image.Width,
  17:             Height = image.Height,
  18:         });
  19:     }, executionDataflowBlockOptions);
  20:  
  21:     foreach (string imageUrl in imageUrls)
  22:     {
  23:         imageDownloader.Post(imageUrl);
  24:     }
  25: }

The ExecutionDataflowBlockOptions instance has the TaskScheduler set to the SynchronizationContext. This scheduler is used to target a dataflow block to interact with the UI thread and add the downloaded images to the TableLayoutPanel.

image

You can read more about all this at Introduction to TPL Dataflow. The code used can be downloaded here.

3 Comments

  • Good article but you have oversimplified one point - an ActionBlock can accept either an Action&lt;TInput&gt; or (for async) a Func&lt;TInput,Task&gt;

  • > Now just because we’re using TDF does not mean things are happening in-parallel by default.

    They kind of do. The processing of between the blocks does happen in parallel, but not inside each block (unless you configure that, like you said).

  • Also, your last example is not very good. The WinForms SynchronizationContext can execute only one Task at a time. And you certainly shouldn't block the UI thread while downloading, like you do.

    What you should do instead is to make DownloadImage() asynchronous and use await in the lambda.

Comments have been disabled for this content.