Concurrency with MPI in .NET
When Is It Appropriate?
When I'm talking about MPI, I'm talking in the context of High Performance Computing. This consists of having the application run within a scheduler on a compute cluster which can have 10s or hundreds of nodes. Note that I'm not talking about grid computing such as Folding@Home which distributes work over the internet. Instead, you'll find plenty of need for this in the financial sector, insurance sector for fraud detection and data analysis, manufacturing sector for testing and calculating limits, thresholds and whatnot, and even in compiling computer animation in film. There are plenty of other scenarios that are out there, but it's not for your everyday business application.
I think the real value comes with .NET to be able to read from databases, communicate with other servers with WCF or some other communication protocol, instead of being stuck in the C or Fortran world which the HPC market has been relegated. Instead, they can cut down on the code necessary for a lot of these applications by using the built-in functions that we get with the BCL.
MPI in .NET
The problem has been to run these massively parallel algorithms left us limited to Fortran and C systems. This was ok for most things that you would want to do, cobbling together class libraries wasn't my ideal. Instead, we could use a lot of the things that we take for granted in .NET such as strong types, object oriented and functional programming constructs.
The Boost libraries were made available for MPI in C++ very recently by the University of Indiana. You can read more about it here. This allowed the MPI programmer to take advantage of many of the C++ constructs that you can do in regular C, such as OOP. Instead of dealing with functions and structs, there is a full object model for dealing with messaging.
At the same time as the Boost C++ Libraries for MPI were coming out, the .NET implementation has been made available based upon the C++ design through MPI.NET. It's basically a thin veneer over the msmpi.dll which is the Microsoft implementation of the MPICH2 standard. For a list of all operation types supported, check the API documentation here for the raw MSMPI implementation. This will give you a better sense of the capabilities more than the .NET implementation can.
What you can think of this is that several nodes will be running an instance of your program at once. So, if you have 16 nodes assigned through your scheduled job, it will spin up 16 instances of the same application. When you do this on a test machine, you'll notice 16 instances of that in your task manager. Kind of cool actually. Unfortunately, they are missing a lot of the neat features in MPI which includes "Ready Sends", "Buffered Sends", but they have included nice things such as the Graph and Cartesian communicators which are essential in MPI.
You'll need the Windows Server 2003/2008 HPC SDK in order to run these examples, so download them now, and then install MPI.NET to follow along.
Messaging Patterns
With this, we have a few messaging patterns available to us. MPI.NET has given us a few that we will be looking at and how best to use them. I'll include samples in F# as it's pretty easy to do and I'm trying to get through on the fact that F# is a better language for expressing the messaging we're doing instead of C#. But, for these simple examples, they are not hard to switch back and forth.
To execute these, just type the following:
mpiexec - n <Number of Nodes You Want> <Your program exe>
Broadcast
A broadcast is a a process in which a single process (ala a head node) sends the same data to all nodes in the cluster. We want to be efficient as possible when sending out this data for all to use, without having to loop through all sends and receives. This is good when a particular root node has a value that the rest of the cluster needs before continuing. Below is a quick example in which the head node sets the value to 42 and the rest will receive it.
#light
#R "D:\Program Files\MPI.NET\Lib\MPI.dll"
open System
open MPI
let main(args:string[]) =
using(new Environment(ref args))(fun _->
let commRank = Communicator.world.Rank
let intValue = ref 0
if commRank = 0 then
intValue := 42
Communicator.world.Broadcast(intValue, 0)
Console.WriteLine("Broadcasted {0} to all nodes", !intValue)
)
main(Environment.GetCommandLineArgs())
Blocking Send and Receive
In this scenario, we're going to use the blocking send and receive pattern. This will not allow the program to continue until I get the message I'm looking for. This is good for times when you need a particular value before proceeding to your next function from the head node or any other particular node.
#light
#R "D:\Program Files\MPI.NET\Lib\MPI.dll"
open System
open MPI
let main (args:string[]) =
using(new Environment(ref args))( fun _ ->
let commRank = Communicator.world.Rank
let commSize = Communicator.world.Size
let intValue = ref 0
match commRank with
| 0 ->
[1 .. (commSize - 1)] |> List.iter (fun i ->
Communicator.world.Receive(Communicator.anySource, Communicator.anyTag, intValue)
Console.WriteLine("Result: {0}", !intValue))
| _ ->
intValue := 4 * commRank
Communicator.world.Send(!intValue,0, 0)
)
What I'm doing here is letting the head node, rank 0, to do all the receiving work. Note, that I don't care particularly where the source was, nor what the tag was. I can specify however, if I wish to go ahead and receive from a certain node and of a certain data tag. If it's a slave process, then I'm going to go ahead and calculate the value, and send it back to the head node of 0. The head node will wait until it has received that value from any node and then print out the given value. The methods that I'm using the send and receive are generic methods. Behind the scenes, in order to send, the system will go ahead and serialize your object into an unmanaged memory stream and throw it on the wire. This is one of the fun issues when dealing with marshaling to unmanaged C code.
Nonblocking Send and Receive
In this scenario, we are not going to block as we did before with sending or receiving. We want the ability to continue on doing other things while I sent the value, while the other receivers might need that value before continuing. Eventually we can force getting that value from the node through the communication status, and then at a certain point, we can set up a barrier so that nobody can continue until we've hit that point in our program. The below sample is a quick sending of a multiplied value and letting it continue. The other nodes will have to wait until that broadcast comes, and then we'll wait at the barrier until the job is done.
let main (args:string[]) =
using(new Environment(ref args))( fun _ ->
let commRank = Communicator.world.Rank
let commSize = Communicator.world.Size
let intValue = ref 0
if commRank = 0 then
[1 .. (commSize - 1)] |> List.iter (fun _ ->
Communicator.world.Receive(Communicator.anySource, Communicator.anyTag, intValue)
Console.WriteLine("Result: {0}", !intValue))
else
intValue := 4 * commRank
let status = Communicator.world.ImmediateSend(!intValue,0, 0)
status.Wait() |> ignore
Communicator.world.Barrier()
)
main(Environment.GetCommandLineArgs())
Gather and Scatter
The gather process takes values from each process and then sends it to the root process as an array for evaluation. This is a pretty simple operation for taking all values from all nodes and combining them on the head node. What I'm doing is a simple calculation of gathering all values of commRank * 3 and sending it to the head node for evaluation.
let main (args:string[]) =
using(new Environment(ref args))( fun e ->
let commRank = Communicator.world.Rank
let intValue = commRank * 3
match commRank with
| 0 ->
let ranks = Communicator.world.Gather(intValue, commRank)
ranks |> Array.iter(fun i -> System.Console.WriteLine(" {0}", i))
| _ -> Communicator.world.Gather(intValue, 0) |> ignore
)
main(Environment.GetCommandLineArgs())
Conversely, scatter does the opposite which takes a row from the given head process and splits it apart to be spread out among all processes. In this exercise I will go ahead and create a mutable array that only the head node will modify. From there, I will scatter it across the rest of the nodes to pick up and do with whatever they please.
let main (args:string[]) =
using(new Environment(ref args))( fun e ->
let commSize = Communicator.world.Size
let commRank = Communicator.world.Rank
let mutable table = Array.create commSize 0
match commRank with
| 0 ->
table <- Array.init commSize (fun i -> i * 3)
Communicator.world.Scatter(table, 0) |> ignore
| _ ->
let scatterValue = Communicator.world.Scatter(table, 0)
Console.WriteLine("Scattered {0}", scatterValue)
)
main(System.Environment.GetCommandLineArgs())
There is an AllGather method as well which performs a similar operation to Gather, but the results are available to all processes instead of the root process.
Reduce
Another collective algorithm similar to scatter and gather is the reduce function. This allows us to combine all values from each process and perform an operation on them, whether it be to add, multiply, find the maximum, minimum and so on. The value is only available at the root process though, so I have to ignore the result for the rest of the processes. The following example shows a simple
let main (args:string[]) =
using(new Environment(ref args))( fun _ ->
let commRank = Communicator.world.Rank
let commSize = Communicator.world.Size
match commRank with
| 0 ->
let sum = Communicator.world.Reduce(Communicator.world.Rank, Operation<int>.Add, 0)
Console.WriteLine("Sum of all roots is {0}", sum)
| _ ->
Communicator.world.Reduce(Communicator.world.Rank, Operation<int>.Add, 0) |> ignore
)
main(Environment.GetCommandLineArgs())
There is another variation called the AllReduce which does very similar operations to the Reduce function, but instead makes the value available to all processes instead of just the root one. There are more operations and more communicators such as Graph and Cartesian, but this is enough to give you an idea of what you can do here.
LINQ for MPI.NET
During my search for MPI.NET solutions, I came across a rather interesting one called LINQ for MP.NET. I don't know too many of the details figuring the author has been pretty aloof as to providing the complete design details. But it has entered a private beta if you do wish to contact them for more information.
The basic idea is to provide provide some scope models which include for the current scope, the world scope, root and so on. Also, it looks like they are providing some sort of multi-threading capabilities as well. Looks interesting and I'm interested in finding out more.
Pure MPI.NET?
Another implementation of the MPI in .NET has surfaced through PureMPI.NET. This is an implementation of the MPICH2 specification as well, but built on WCF instead of the MSMPI.dll. Instead, this does not rely on the Microsoft Compute Cluster service for scheduling and instead, uses remoting and such for communication purposes. There is a CodeProject article which explains it a bit more here.
More Resources
So, you want to know more, huh? Well, most of the interesting information is out there in C, so if you can read and translate it to the other APIs, you should be fine. However, there are some good books on the subject which not only provide some decent samples, but also some guidance on how to make the most of the MPI implementation. Below are some of the basic ones which will help on learning not only the APIs, but the patterns behind their usage.
- Parallel Programming With MPI - Peter Pacheco
- MPI: The Complete Reference - Marc Snir
- Patterns for Parallel Programming - Timothy Mattson
Wrapping It Up
I hope you found some of this useful for learning about how the MPI can help for massive parallel applications. The patterns learned here as well as the technologies behind them are pretty powerful to help you think about how to make your programs a bit less linear in nature. There is more to this series to look at thinking of concurrency in .NET, so I hope you stay tuned.