F# Actors Revisited

UPDATE: Removed ref cells to use two recursive loops

In the previous post, I covered briefly about the actor model in F#.  This style of concurrency, using asynchronous message passing and a shared-nothing approach through the use of mailboxes is a pretty powerful mechanism for achieving scalability.  With a shared-nothing approach, we can remove the need for such concurrency primitives such as locks. 

Taking cues from Erlang, F# has the capabilities as well through the use of the MailboxProcessor class.  In this post, let’s walk through a canonical example and explain some of the design patterns around this.

Mailboxes, Etc

Before we begin, let’s make sure we have some of the basics down.  As I stated above, the approach here is that inter-agent communication is accomplished through a shared-nothing asynchronous message passing system.  Each of these agents have a mailbox, which is nothing more than a queue of messages sent by other processes.  These messages are then retrieved via either a scan or receive approach, depending if you wish to continue receiving messages.  To determine which message we received, we do basic pattern matching against our given message types and decide how we want to handle it.   To continue processing, we then recurse the function again to keep sending and receiving.  We can send any number of pieces of data as part of these messages.  For our example, we’ll implement the Auction example that comes from Scala.

Going Once…  Going Twice…  Sold!

Our goal in this post is to walk through implementing the Auction example which is an example given in Scala.  The intent of this demo is to show an online auction service using the shared-nothing asynchronous message passing.  We’ll have two clients bidding against each other to see who wins the particular item.  Given that Scala has library support for actors, this is an attempt to see how well F# can handle this as well.  The first impressions of the Scala model is that it’s much more focused on classes and objects than the F# version will be. 

Without further ado, let’s start digging through the code.  First up, let’s get some utility functions out of the way to help us on our journey.  Instead of using the dot operator on some objects to send messages, I prefer the <—operator, as shown also in the Expert F# book.  Also, I need a way to deconstruct an option type so that I can deal with a bidder that may not exist yet.

let (<--) (m:_ MailboxProcessor) x = m.Post x
let unSome (Some x) = x

Both of these functions should be rather self explanatory.  The unSome takes in an option type with Some value and returns the value.  Let’s move on to the messages that we need to define.  I have comments beside each as to define what each message signifies.

type AuctionMessage =
  | Offer of int * AuctionReply MailboxProcessor // Make a bid
  | Inquire of AuctionReply MailboxProcessor     // Check the status
and AuctionReply =
  | Status of int * DateTime // Asked sum and expiration
  | BestOffer                // Ours is the best offer
  | BeatenOffer of int       // Yours is beaten by another offer
  | AuctionConcluded of      // Auction concluded
      AuctionReply MailboxProcessor * AuctionReply MailboxProcessor
  | AuctionFailed            // Failed without any bids
  | AuctionOver              // Bidding is closed

I decided to use a recursive type definition as to allow my AuctionMessage discriminated union to appear before my AuctionReply discriminated union, because both messages in AuctionMessage references a MailboxProcessor that handles the AuctionReply messages.  In F#, order does matter in which values are declared, so this is a quick way to get a readable output.  Next, we need to define some constants such as time to shutdown and our allowed bid increments:

let timeToShutdown = 3000
let bidIncrement = 10

It’s now time to define our auction agent.  This will handle the overall orchestration of how the auction is handled.  This includes handling both the Offer and the Inquire messages as well as what happens when we fail to get a message within a certain time period.  Let’s step through the code now and then explain down below:

let auctionAgent seller minBid closing =
  new AuctionMessage MailboxProcessor(fun inbox ->
    let rec loop maxBid maxBidder =
      async { let! msg = inbox.TryReceive((closing - DateTime.Now).Milliseconds)
              match msg with
                | Some ( Offer(bid, client) ) ->
                    if bid >= maxBid + bidIncrement then
                      if maxBid >= minBid then unSome maxBidder <-- BeatenOffer bid                  
                      client <-- BestOffer
                      return! loop bid (Some client)
                    else
                      client <-- BeatenOffer maxBid
                      return! loop maxBid maxBidder
                
                | Some ( Inquire client ) ->
                    client <-- Status(maxBid, closing)
                    return! loop maxBid maxBidder
                
                | None ->
                    if maxBid >= minBid then
                      let reply = AuctionConcluded(seller, unSome maxBidder)
                      unSome maxBidder <-- reply
                      seller <-- reply
                    else seller <-- AuctionFailed
                    
                    let! msg' = inbox.TryReceive timeToShutdown
                    match msg' with
                    | Some ( Offer (_, client) ) -> 
                        client <-- AuctionOver
                        return! loop maxBid maxBidder
                    | None -> return ()         
            }
    loop (minBid - bidIncrement) None)   

It’s a bit of code to digest, but the gist of this is rather simple.  When creating an instance of this auction agent, we need a seller, a minimum bid and a closing time for the auction.  From this, we create our MailboxProcessor to handle AuctionMessage types.  Since we are continuously in a cycle of receiving messages and sending results, we have a recursive loop which carries our state for us.  This state, which we carry from recursive call to the next is our maximum bid and our maximum bidder at the time.  When we begin our call to this loop function, we start with the max bid of the minimum bid minus the bid increment and no maximum bidder. 

When we are receiving messages, it’s important to handle any potential timeouts.  In this case, we use this timeout as a mechanism for determining when our auction is over.  In order to do this, we must use the TryReceive function instead of the Receive.  Let’s look at the differences below:

// Throws an exception if timeout is exceeded
member MailboxProcessor.Receive : ?timeout : int -> Async<'msg>

// Returns None if timeout is exceeded
member MailboxProcessor.TryReceive : ?timeout : int -> Async<'msg option>

Since we need to know how to handle timeouts, it’s best to use the TryReceive approach.  If we receive an offer, we determine whether the bid is acceptable, meaning greater than our current maximum plus increment.  If it is, then we check whether the maximum bid is greater than the minimum, and if so, then send a message to the current maximum bidder with the BeatenOffer message.  If we receive an inquiry, we simply return a Status message back to the client with our current bid and how much time they have left.  Finally, if we don’t receive a message in the given timeframe, it’s time to wind down our auction.  If the maximum bid is greater than our minimum, our auction is a success and we let our maximum bidder and our seller know.  If, however, we don’t receive any bids, then our auction is considered a failure.  Finally, and additional offers received will be responded to with an AuctionOver message to indicate to the client that they are not the winner.  If this operation times out, we simply exit.  A bit of a mouthful, but fairly straight forward.

Let’s move on to the client aspect.  First, let’s set up a few things necessary for our client, encapsulated in the Auction module.  We’ll set such parameters as our minimum bid, our closing date, our seller and our auction itself.

module Auction =
  let random = new Random()
  
  let minBid = 100
  let closing = DateTime.Now.AddMilliseconds 10000.
  
  let seller = new AuctionReply MailboxProcessor(fun inbox ->
    let rec loop() =
      async { let! _ = inbox.Receive() 
              return! loop()}
    loop())
  let auction = auctionAgent seller minBid closing

In order to create a seller for our auction, we need to stub out basic features of this agent.  In this case, we simply have a processing loop which receives a message, does nothing with it, and continues with the loop again.  As I have stated above, we need both our seller agent, and our auction agent, but what about our clients?  In our same module, we define them as the following:

  let client i increment top = 
    let name = sprintf "Client %i" i
    let log msg = Console.WriteLine("{0}: {1}", name, msg)
    
    new AuctionReply MailboxProcessor(fun inbox ->
    
      let rec startAuction() =
        async { log "started"
                auction <-- Inquire inbox
                let! curMsg = inbox.Receive()
                match curMsg with
                | Status(maxBid,_) ->
                    log <| sprintf "status(%d)" maxBid
                    return! loop 0 maxBid }
      and loop current max =
        async { if max >= top then log "too high for me"
                    
                let current' =
                  if current < max then
                    let current' = max + increment
                    Thread.Sleep (1 + random.Next 1000)
                    auction <-- Offer(current', inbox)
                    current'
                  else current
                
                let! msg = inbox.TryReceive timeToShutdown
                match msg with
                | Some BestOffer -> 
                    log <| sprintf "bestOffer(%d)" current'
                    return! loop current' max
                | Some (BeatenOffer maxBid) ->
                    log <| sprintf "beatenOffer(%d)" maxBid
                    return! loop current' maxBid
                | Some ( AuctionConcluded(seller, maxBidder) ) ->
                    log "auctionConcluded"; return ()
                | Some AuctionOver ->
                    log "auctionOver"; return ()
                | None -> return () }
      startAuction())

What we’re doing here is having two mutually recursive loops.  The first loop, the startAuction loop will get the max bid for us and then go to the main loop where we do our main work.  From there, we’ll receive a message sent from the auction to the inbox and determine whether we’re the highest bidder, we’re the maximum or it’s time to call it a day.  Once again, fairly straight forward code.

Lastly, it’s time to bring it all together.  Let’s set up the calling code and see the results:

open Auction

seller.Start()
auction.Start()
(client 1 20 200).Start()
(client 2 10 300).Start()
Console.ReadLine() |> ignore

Executing this code gives us the following for our results:

Client 2: started
Client 1: started
Client 2: status(90)
Client 1: status(90)
Client 2: bestOffer(100)
Client 1: bestOffer(110)
Client 2: beatenOffer(110)
Client 1: beatenOffer(120)
Client 2: bestOffer(120)
Client 2: auctionConcluded
Client 1: auctionOver

What this tells us is that client 2 starts first and bids first.  Client 2 then bids 100, only to be outdone by client 1 and goes back and forth until client 2 comes in with the best and final offer.  The auction is then concluded and client 1 is notified that the auction is over.  We can add additional client agents to the list and have just as much fun as before.  To give you an idea, if we add another agent with an increment of 30 and a max bid of 150, we get the following results:

Client 1: started
Client 2: started
Client 3: started
Client 1: status(90)
Client 2: status(90)
Client 3: status(90)
Client 1: bestOffer(110)
Client 1: beatenOffer(120)
Client 3: bestOffer(120)
Client 2: beatenOffer(120)
Client 3: auctionConcluded
Client 1: auctionOver
Client 2: bestOffer(130)
Client 2: auctionConcluded

Now what we have here is that client 2 wins with an offer of 130 and the rest are notified of their loss.  You can find the complete code for this example here.

Conclusion

Solutions such as these using shared-nothing asynchronous message passing can create rather scalable architectures.  Utilizing the MailboxProcessor class, we’re able to implement actor model concurrency to tackle some of these problems with coordination of concurrent processes.  How might Axum handle this scenario?  Good question and I hope to have that answered shortly.

No Comments