Axum – Ping Pong with Dataflow Networks

 In the previous post, I gave the canonical Ping-Pong example in Axum and how it compared with Axum.  I want to revisit this post because there are some areas in which we can rework it in addition to the other solutions we’ll visit.  Some parts were needlessly chatty and instead we’ll work in some other language features to help clean up our solution.

Actors in F#

But, before we begin, Nicklas Gustafsson, the architect behind Axum, was inspired by my post and made an F# version of the canonical Ping-Pong example.    F# supports a concept of mailbox processing as a first class citizen, much like other languages such as Scala and Erlang, although not as central to the core as it is in Erlang.

To use these, simply use the MailboxProcessor class.  Below is an implementation as given by Nicklas and we can compare with my previous post with the Erlang implementation.  The Start function creates a new MailboxProcessor instance and starts it with the given inbox.  Much like the Erlang solution, this relies upon recursion as a technique for keeping the mailboxes alive.  In order for us to communicate from one mailbox to the other, we must as part of our message, include a reference to our mailbox of origin in order to communicate. 

#light

type Message = Finished | Msg of int * Message MailboxProcessor

let ping iters (outbox : Message MailboxProcessor) =
    MailboxProcessor.Start(fun inbox -> 
        let rec loop n = async { 
            match n with
            | 0 -> outbox.Post Finished
                   printfn "ping finished"
                   return ()
            | _ -> outbox.Post <| Msg(n, inbox)
                   let! msg = inbox.Receive()
                   printfn "ping received pong"
                   return! loop(n-1)}
        loop iters)
            
let pong() =
    MailboxProcessor.Start(fun inbox -> 
        let rec loop () = async { 
            let! msg = inbox.Receive()
            match msg with
            | Finished -> 
                printfn "pong finished"
                return ()
            | Msg(n, outbox) -> 
                printfn "pong received ping"
                outbox.Post <| Msg(n, inbox)
                return! loop() }
                    
        loop())

ping 100 <| pong() |> ignore
System.Console.ReadLine() |> ignore

It’s a fairly straight forward and follows the mold of Erlang quite nicely.  Scala as well also follows this mold in their Ping-Pong example.  Using the actor package in Haskell, we can also simulate the Ping-Pong example.

Of the criticisms mentioned in the post, I have no real issues with these such as the issues with no clear endpoints and how to distinguish the actors from one another.  The type safety of Axum does give us some advantages as to define an explicit contract between actors.

Getting back to the issue at hand, what could we do to clean up our example in Axum?

Cleaning Up with Dataflow Networks

In our previous example, where we had the two agents communicating with each other, the Ping and the Pong, we pretty much driven by the data received from other agents.  Instead, we can turn our attention to using dataflow networks, which is to be driven only by the availability of data entering our network and the computations that are performed as we move through this network.

Using Axum, we have several operators that can help us.  The Forward Operator ( ==> ) sends each message produced by the source to the target.  We also have the Forward Once ( –> ) operator which forwards a message from the source to the target, then disconnects after the first message.  Using these operators, we can build pretty rich pipelines for our data to flow, but just as well, we could use it to build an event-driven system where we respond to events.

Let’s take a look at the rewrite of the main agent in which we create the endpoints for Ping and then send the message on the HowMany port.  After that, we then forward the received value of Done to the Done state.  Much cleaner than before.

using System;
using System.Concurrency;
using Microsoft.Axum;
using System.Concurrency.Messaging;

public agent Program : channel Application
{
    public Program()
    {
        // Create instance of ping and send msg
        var chan = Ping.CreateInNewDomain();
        chan::HowMany <-- 10;
        chan::Done ==> Done;
    }
}

Next up, let’s take a look at the Ping agent.  You’ll note no change in our PingPongStatus channel and the associated ports.  The interesting part is in the Ping constructor in which we forward the message from the HowMany port to the Process method which takes the number of iterations to perform.  In Process method, we create the Pong instance and communication channels, then we loop through our number of iterations to send the Signal to the Ping port, and receive the return Signal on the Pong port.  After the loop, we send a message to the Done to close the interaction and then return a Signal to end our Ping.  Below is the implementation of the code.

public channel PingPongStatus
{
    input int HowMany : Signal;
    output Signal Done;
    Start: { HowMany -> End; }
}

public agent Ping : channel PingPongStatus
{
    public Ping()
    {
        PrimaryChannel::HowMany ==> Process;
    }
   
    private Signal Process(int iters)
    {
        // Create pong
        var chan = Pong.CreateInNewDomain();      
        
        // Send pings and receive pongs
        for (int i = 0; i < iters; i++)
        {
            chan::Ping <-- Signal.Value;
            receive(chan::Pong);
            Console.WriteLine("ping received pong");
        }
        chan::Done <-- Signal.Value;
        return Signal.Value;
    }  
}

You’ll notice that we’re no longer interested in sending the number of iterations to the Pong.  It shouldn’t be needed and instead, we can send the Signal message on the Done port when we’re complete.  The Pong can listen in an infinite loop until it receives the message on the Done port.

Lastly, let’s take a look at the Pong agent.  We’ll clean up the channel to rid ourselves of the HowMany port.  Instead, all we need are Signals to communicate at this point inside the infinite loop that I mentioned above.  Let’s take a look at the code required. 

public channel PingPong
{
    input Signal Done;
    
    input Signal Ping;
    output Signal Pong;
}

public agent Pong : channel PingPong
{
    public Pong()
    {
        while (true) receive
        {
          from PrimaryChannel::Done:
            return;
          from PrimaryChannel::Ping: 
            Console.WriteLine("pong received ping");
            PrimaryChannel::Pong <-- Signal.Value;
            break;    
        }
    }
}

As you can see, inside the Pong constructor, we’re performing an infinite loop in which we call receive each time, then we can react to the messages coming on the specified ports, in this case, the Done and the Ping ports.  If we receive a message on the Ping port, we then send a Signal back on the Pong port and then break.  Else, if we receive a message on the Done port, then we exit the agent.  This approach is a bit more cleaner than our previous attempt at the Ping-Pong example. 

 

 

 

Conclusion

As I indicated before, are other angles I wish to tackle with this example, including using asynchronous methods, ordered interaction points and so on, which I will get into in subsequent posts.  With this little cleanup, I hope this shows some of the power of the language that we can indeed create concise, yet safe concurrent apps using message passing.  Download it, use it in anger, and give the team feedback on the MSDN forum.  Once again, with your help we can help shape the future of an actor based concurrency model on .NET.

No Comments