Concurrency with MPI in .NET

In my previous post, I looked at some of the options we have for concurrency programming in .NET applications.  One of the interesting ones, yet specialized is the Message Passing Interface (MPI).  Microsoft made the initiative to get into the high performance computing space with the Windows Server 2003 Compute Cluster Server SKU.  This allowed developers to run their given algorithms using MPI on a massive parallelized scale.  And now with the Windows Server 2008 HPC SKU, it is a bit improved with WCF support for scheduling and such.  If you're not part of the beta and are interested, I'd urge you to go through Microsoft Connect. 

When Is It Appropriate?

When I'm talking about MPI, I'm talking in the context of High Performance Computing.  This consists of having the application run within a scheduler on a compute cluster which can have 10s or hundreds of nodes.  Note that I'm not talking about grid computing such as Folding@Home which distributes work over the internet.  Instead, you'll find plenty of need for this in the financial sector, insurance sector for fraud detection and data analysis, manufacturing sector for testing and calculating limits, thresholds and whatnot, and even in compiling computer animation in film.  There are plenty of other scenarios that are out there, but it's not for your everyday business application.

I think the real value comes with .NET to be able to read from databases, communicate with other servers with WCF or some other communication protocol, instead of being stuck in the C or Fortran world which the HPC market has been relegated.  Instead, they can cut down on the code necessary for a lot of these applications by using the built-in functions that we get with the BCL.

MPI in .NET

The problem has been to run these massively parallel algorithms left us limited to Fortran and C systems.  This was ok for most things that you would want to do, cobbling together class libraries wasn't my ideal.  Instead, we could use a lot of the things that we take for granted in .NET such as strong types, object oriented and functional programming constructs.

The Boost libraries were made available for MPI in C++ very recently by the University of Indiana.  You can read more about it here.  This allowed the MPI programmer to take advantage of many of the C++ constructs that you can do in regular C, such as OOP.  Instead of dealing with functions and structs, there is a full object model for dealing with messaging.

At the same time as the Boost C++ Libraries for MPI were coming out, the .NET implementation has been made available based upon the C++ design through MPI.NET.  It's basically a thin veneer over the msmpi.dll which is the Microsoft implementation of the MPICH2 standard.  For a list of all operation types supported, check the API documentation here for the raw MSMPI implementation.  This will give you a better sense of the capabilities more than the .NET implementation can.

What you can think of this is that several nodes will be running an instance of your program at once.  So, if you have 16 nodes assigned through your scheduled job, it will spin up 16 instances of the same application.  When you do this on a test machine, you'll notice 16 instances of that in your task manager.  Kind of cool actually.  Unfortunately, they are missing a lot of the neat features in MPI which includes "Ready Sends", "Buffered Sends", but they have included nice things such as the Graph and Cartesian communicators which are essential in MPI.

You'll need the Windows Server 2003/2008 HPC SDK in order to run these examples, so download them now, and then install MPI.NET to follow along.

Messaging Patterns

With this, we have a few messaging patterns available to us.  MPI.NET has given us a few that we will be looking at and how best to use them.  I'll include samples in F# as it's pretty easy to do and I'm trying to get through on the fact that F# is a better language for expressing the messaging we're doing instead of C#.  But, for these simple examples, they are not hard to switch back and forth.

To execute these, just type the following:

mpiexec - n <Number of Nodes You Want> <Your program exe>

Broadcast

A broadcast is a a process in which a single process (ala a head node) sends the same data to all nodes in the cluster.  We want to be efficient as possible when sending out this data for all to use, without having to loop through all sends and receives.  This is good when a particular root node has a value that the rest of the cluster needs before continuing.  Below is a quick example in which the head node sets the value to 42 and the rest will receive it.

#light

#R "D:\Program Files\MPI.NET\Lib\MPI.dll"

open System
open MPI

let main(args:string[]) =
  using(new Environment(ref args))(fun _->
    let commRank = Communicator.world.Rank

    let intValue = ref 0
    if commRank = 0 then
      intValue := 42
     
    Communicator.world.Broadcast(intValue, 0)
    Console.WriteLine("Broadcasted {0} to all nodes", !intValue)
  )
main(Environment.GetCommandLineArgs())

Blocking Send and Receive

In this scenario, we're going to use the blocking send and receive pattern.  This will not allow the program to continue until I get the message I'm looking for.  This is good for times when you need a particular value before proceeding to your next function from the head node or any other particular node.

#light

#R "D:\Program Files\MPI.NET\Lib\MPI.dll"

open System
open MPI

let main (args:string[]) =
  using(new Environment(ref args))( fun _ ->
    let commRank = Communicator.world.Rank
    let commSize = Communicator.world.Size
    let intValue = ref 0
    match commRank with
    | 0 ->
      [1 .. (commSize - 1)] |> List.iter (fun i ->
        Communicator.world.Receive(Communicator.anySource, Communicator.anyTag, intValue)
        Console.WriteLine("Result: {0}", !intValue))
    | _ ->
      intValue := 4 * commRank
      Communicator.world.Send(!intValue,0, 0)
  )

What I'm doing here is letting the head node, rank 0, to do all the receiving work.  Note, that I don't care particularly where the source was, nor what the tag was.  I can specify however, if I wish to go ahead and receive from a certain node and of a certain data tag.  If it's a slave process, then I'm going to go ahead and calculate the value, and send it back to the head node of 0.  The head node will wait until it has received that value from any node and then print out the given value.  The methods that I'm using the send and receive are generic methods.  Behind the scenes, in order to send, the system will go ahead and serialize your object into an unmanaged memory stream and throw it on the wire.  This is one of the fun issues when dealing with marshaling to unmanaged C code.

Nonblocking Send and Receive

In this scenario, we are not going to block as we did before with sending or receiving.  We want the ability to continue on doing other things while I sent the value, while the other receivers might need that value before continuing.  Eventually we can force getting that value from the node through the communication status, and then at a certain point, we can set up a barrier so that nobody can continue until we've hit that point in our program.  The below sample is a quick sending of a multiplied value and letting it continue.  The other nodes will have to wait until that broadcast comes, and then we'll wait at the barrier until the job is done.

let main (args:string[]) =
  using(new Environment(ref args))( fun _ ->
    let commRank = Communicator.world.Rank
    let commSize = Communicator.world.Size
   
    let intValue = ref 0
    if commRank = 0 then
      [1 .. (commSize - 1)] |> List.iter (fun _ ->
        Communicator.world.Receive(Communicator.anySource, Communicator.anyTag, intValue)
        Console.WriteLine("Result: {0}", !intValue))
    else
      intValue := 4 * commRank
      let status = Communicator.world.ImmediateSend(!intValue,0, 0)
      status.Wait() |> ignore
     
    Communicator.world.Barrier()
  )
 
main(Environment.GetCommandLineArgs())

Gather and Scatter

The gather process takes values from each process and then sends it to the root process as an array for evaluation.  This is a pretty simple operation for taking all values from all nodes and combining them on the head node.  What I'm doing is a simple calculation of gathering all values of commRank * 3 and sending it to the head node for evaluation.

let main (args:string[]) =
  using(new Environment(ref args))( fun e ->
    let commRank = Communicator.world.Rank
    let intValue = commRank * 3
   
    match commRank with
    | 0 ->
      let ranks = Communicator.world.Gather(intValue, commRank)
      ranks |> Array.iter(fun i -> System.Console.WriteLine(" {0}", i))
    | _ -> Communicator.world.Gather(intValue, 0) |> ignore
  )
 
main(Environment.GetCommandLineArgs())

Conversely, scatter does the opposite which takes a row from the given head process and splits it apart to be spread out among all processes.  In this exercise I will go ahead and create a mutable array that only the head node will modify.  From there, I will scatter it across the rest of the nodes to pick up and do with whatever they please.

let main (args:string[]) =
  using(new Environment(ref args))( fun e ->
    let commSize = Communicator.world.Size
    let commRank = Communicator.world.Rank
    let mutable table = Array.create commSize 0
   
    match commRank with
    | 0 ->
      table <- Array.init commSize (fun i -> i * 3)
      Communicator.world.Scatter(table, 0) |> ignore
    | _ ->
      let scatterValue = Communicator.world.Scatter(table, 0)
      Console.WriteLine("Scattered {0}", scatterValue)
  )
 
main(System.Environment.GetCommandLineArgs())

There is an AllGather method as well which performs a similar operation to Gather, but the results are available to all processes instead of the root process. 

Reduce

Another collective algorithm similar to scatter and gather is the reduce function.  This allows us to combine all values from each process and perform an operation on them, whether it be to add, multiply, find the maximum, minimum and so on.  The value is only available at the root process though, so I have to ignore the result for the rest of the processes.  The following example shows a simple

let main (args:string[]) =
  using(new Environment(ref args))( fun _ ->
    let commRank = Communicator.world.Rank
    let commSize = Communicator.world.Size
   
    match commRank with
    | 0 ->
      let sum = Communicator.world.Reduce(Communicator.world.Rank, Operation<int>.Add, 0)
      Console.WriteLine("Sum of all roots is {0}", sum)
    | _ ->
      Communicator.world.Reduce(Communicator.world.Rank, Operation<int>.Add, 0) |> ignore
  )
 
main(Environment.GetCommandLineArgs())

There is another variation called the AllReduce which does very similar operations to the Reduce function, but instead makes the value available to all processes instead of just the root one.  There are more operations and more communicators such as Graph and Cartesian, but this is enough to give you an idea of what you can do here. 

LINQ for MPI.NET

During my search for MPI.NET solutions, I came across a rather interesting one called LINQ for MP.NET.  I don't know too many of the details figuring the author has been pretty aloof as to providing the complete design details.  But it has entered a private beta if you do wish to contact them for more information.

The basic idea is to provide provide some scope models which include for the current scope, the world scope, root and so on.  Also, it looks like they are providing some sort of multi-threading capabilities as well.  Looks interesting and I'm interested in finding out more.

Pure MPI.NET?

Another implementation of the MPI in .NET has surfaced through PureMPI.NET.   This is an implementation of the MPICH2 specification as well, but built on WCF instead of the MSMPI.dll.  Instead, this does not rely on the Microsoft Compute Cluster service for scheduling and instead, uses remoting and such for communication purposes.  There is a CodeProject article which explains it a bit more here.

More Resources

So, you want to know more, huh?  Well, most of the interesting information is out there in C, so if you can read and translate it to the other APIs, you should be fine.  However, there are some good books on the subject which not only provide some decent samples, but also some guidance on how to make the most of the MPI implementation.  Below are some of the basic ones which will help on learning not only the APIs, but the patterns behind their usage.


Wrapping It Up

I hope you found some of this useful for learning about how the MPI can help for massive parallel applications.  The patterns learned here as well as the technologies behind them are pretty powerful to help you think about how to make your programs a bit less linear in nature.  There is more to this series to look at thinking of concurrency in .NET, so I hope you stay tuned.

kick it on DotNetKicks.com

9 Comments

  • I have a couple questions as you are planning on writing a series of these articles.
    1. How do you deal with failure? With every machine you add, your mean time between failure changes. How do you recover lost work? How do you deal with a machine being unavailable?
    2. How do you know your application is worth parallelizing without writing it?
    3. How do you deal with a heterogenous network of machines? If you have machine A which is 2x the speed of machine B, MPI forces you to give them equal sized work. Your synchronization time it severely impacted.
    4. How do you load balance?
    5. How do you deal with other people running their programs on the same cluster?
    6. When you are pushing the work to nodes, you are making two assumptions: The machine is available and ready for work, and you know that the machine can handle the load you are giving it. In light of the previous questions, what do you do about this? When looking at the CMSD (Cache Memory Swap Die) envelope, giving each node the same amount of work can be catastrophic.
    7. What do you do for long running jobs? How to you deal with checkpointing? How much work will you loose? How much overhead is there? How hard is it to program?

  • @Ian

    Yes, this is the plan to cover these topics in detail. Many of the things that are discussed are not exclusively MPI issues, instead, focus more on the HPC Adminstration console. Unfortunately, in code, MPI does not provide such operations.

    1. Re: Failure, What kind of failure are we talking about? Hardware failure? Failure in logic? Failure in communication?

    2. Not sure what you mean by this. Are you saying what is the value proposition of writing it in parallel versus serial and spreading across nodes? I break down my applications into smallest possible units, and whatever I can spread to my other nodes, I do so.

    3. MPI by itself, yes, does force this, but we're getting into administration at this point. The scheduler handles many of these tasks. If you read the Windows HPC documentation, it spells it out quite nicely. But what's even nicer is the ability to connect to the system quite nicely not through MPI per se, but the scheduler and administration to handle resubmits and such.

    4. Load balancing is another thing I'm working on. That's not something natively supported by MPI, no.

    5. Once again, that's an administration issue that can be set up by not allowing them to reserve nodes in the cluster and let the scheduler figure it out.

    6. Once again, I have always let the scheduler determine whether the particular nodes for my given job are ready for such things. If I were blindly using MPI without this mechanism, yes, I would definitely have this problem.

    7. That's another topic I'll get into at some point.

    Matt

  • In regards to dealing with failures - the question is in regards to any kind of failure - assuming you have a cluster that distriutes a work amongst several dozen nodes - how can we assure that one of the following will not block the whole computation made by all the other nodes:
    1. HW failure - the machine stopped responding at all after it recieved the job - this is due to electricity outage or similar
    2. SW failure - again, the machine stopped responding after ut received the job, in this case let assume it occured due to OS problem (Windows, can happen you know...)
    3. SW failure - again, the same thing but this time due to your own code problem - let's say process crash.

    In all the above cases, the scheduler process does not know of the actual reason, what is clear however is that the job has been distributed already and that the distributor is awaiting some kind of response which will not be available from the failed node. Since the distributor does not familiar with the nodes at all - what can he do in order to overcome the failure without re-sendiong the job again to all the currently available nodes?

  • Do you really need buffered and ready sends? In my experience with MPI, these features are very rarely used and then are typically used incorrectly.

  • @Gal C

    I hope to cover this soon as I relook at MPI.NET again.

    Matt

  • @Doug

    I've used them several times with a bit of success, so I'd be in favor of at least supporting it, but, yes, it's not widely used.

    Matt

  • Concurrency with mpi in net.. Slap-up :)

  • Concurrency with mpi in net.. Dandy :)

  • Quality content is the secret to attract the people to visit the web page, that's what this web page is providing.

Comments have been disabled for this content.