Exploring MapReduce with F#

With my exploration into mass concurrency and big data problems, I’m always finding challenges to give myself on how I might solve a given issue.  Such examples that have intrigued me along the way as PLINQ, MPI/.NET, Data Parallel Haskell, but one in particular has intrigued me more – MapReduce.  A challenge I gave myself is to fully understand this paradigm and implement a version using F#.

What Is MapReduce?

MapReduce is a Google programming model and implementation for processing and generating large data sets.  Programs written using this more functional style can be parallelized over a large cluster of machines without needing the knowledge of concurrent programming.  The actual runtime can then partition the data, schedule and handle any potential failure.  The Google implementation of it runs on a large cluster of machines and can process terabytes of data at a time. 

The programming model is based upon five simple concepts:

  1. Iteration over input
  2. Computation of key/value pairs from each input
  3. Grouping of all intermediate values by key
  4. Iteration over the resulting groups
  5. Reduction of each group

So, where does this Map/Reduce wording come in?  Well, let’s explore each:

  • Map
    Function written by the user to take an input pair and produce a result of intermediate key/value pairs.  The intermediate values are then grouped with the same intermediate key and passed to the Reduce function.
  • Reduce
    Function also written by the user to take the intermediate key and sequence of values for that key.  The sequence of values are then merged to produce a possibly smaller set of values.  Zero or one output value is produced per our Reduce function invocation.  In order to manage memory, an iterator is used for potentially large data sets.

How useful is this programming model?  Using this model, we’re able to express some interesting problems such as the following:

  • Word counts in large documents
  • Distributed Grep – Find patterns in a document
  • Count of URL Access Frequency
  • Reverse Web-Link Graph – Find all source URLs for a given destination
  • Term-Vector per Host - Summarize most important words in a document
  • Inverted Index
  • Distributed Sort

That’s only a start to what we can do.  After the paper which explained the technology was published in 2004, there have been several implementations including the open-source Hadoop project and the Microsoft Research Project Dryad.  Of course, it doesn’t come without some criticism about the resource usage such as this post called “eBay doesn’t love MapReduce”.

Dryad is more interesting in that its functionality subsumes MapReduce and can do in regards to job creation, resource management, job monitoring, visualization and more.  What’s even more interesting is the associated project DryadLINQ, which allows us to compile LINQ expressions to have sent across the cluster so that I could perform intense data analysis using LINQ operators that we’re used for our other data tasks.  What does the future hold for Dryad remains to be seen, but the possibilities seem plenty.

F# Implementation

The challenge I gave myself was to understand the basics of the programming model.  Ralf Lämmel, formerly of the Microsoft Programmability Team, published a paper entitled Google’s MapReduce Programming Model – Revisited which I used as a source for how my F# implementation works.  If you want to understand not only the deep internal details, but also how an implementation in Haskell would look, I highly recommend this paper.

In our example, we’re going to use a MapReduce implementation to look for the most frequent IP addresses from our IIS logs.  Let’s first start by defining the outer skeleton needed.  Let’s define what the map_reduce function and associated steps will look like:

#light 

namespace CodeBetter.MapReduceExample 

module MapReduceModule = 

  let map_reduce  
    // Map function take pair and create sequence of key/value pairs
    (m:'k1 -> 'v1 -> seq<'k2 * 'v2>)  
    // Reduce function takes key and sequence to produce optional value
    (r:'k2 -> seq<'v2> -> 'v3 option)
    // Takes an input of key/value pairs to produce an output key/value pairs
    : Map<'k1, 'v1> -> Map<'k2, 'v3> = 

    map_per_key >>     // 1. Apply map function to each key/value pair
      group_by_key >>  // 2. Group intermediate data per key
      reduce_per_key   // 3. Apply reduce to each group

I’m using the standard sequences instead of lists for this because the data may be large and the List<T> in F# by default is eagerly evaluated.  The case could be made to use the LazyList<T> to model the data, but as I might want to use the Parallel Extensions and the PSeq module I used before, I’ll stick with the IEnumerable<T>.

The comments here in the code say a lot about what’s going on.  Our map_reduce function takes two functions, the map and the reduce which are explained in the code comments.  It’s important to understand what happens last is our series of three functions, our map_per_key, group_by_key and reduce_per_key.  Each of those steps fall into line with the explanation from above.  At this point, we haven’t actually yet implemented the functions, but let’s go ahead and do that now to flush out our map_per_key, group_by_key and reduce_per_key.  The following code comes before the last statement which ties our three functions together:

    let map_per_key : Map<'k1, 'v1> -> seq<('k2 * 'v2)> =
      Map.to_seq >>                   // 1. Map into a sequence
        Seq.map (Tuple.uncurry m) >>  // 2. Map m over a list of pairs
        Seq.concat                    // 3. Concat per-key lists
  
    let group_by_key (l:seq<('k2 * 'v2)>) : Map<'k2,seq<'v2>> =
      let insert d (k2, v2) = Map.insert_with Seq.append k2 (seq [v2]) d
      let func (f:Map<'a, seq<'b>> -> Map<'a, seq<'b>>) (c:'a * 'b) 
        : (Map<'a, seq<'b>> -> Map<'a, seq<'b>>)
        fun x -> f(insert x c)
      (Seq.fold func (fun x -> x) l) Map.empty
 
    let reduce_per_key : Map<'k2, seq<'v2>> -> Map<'k2,'v3> =
      let un_some k (Some v) = v // Remove optional type
      let is_some k = function
        | Some _ -> true         // Keep entires
        | None   -> false        // Remove entries
        
      Map.mapi r >>           // 1. Apply reduce per key
        Map.filter is_some >> // 2. Remove None entries
        Map.mapi un_some      // 3. Transform to remove option
The first function is the map_per_key which first turns a map into a sequence.  Then we map the m function over our list of pairs, and then finally concat the per-key lists.  Next our group by key might look a little interesting, but it’s simply a right fold over our insert function and our sequence of key/value pairs.  As I had in a previous post about how you can do right-folds in the terms of a left-fold.  Finally, our reduce per key applies the reduce function, then filters the removable entries, and finally transforms to remove the optional type.

There you have it, our implementation of a MapReduce.  Now let’s step ahead with our implementation to step through our IIS logs.  Let’s first retrieve the data for our logs from a given directory.  We can use the async workflows in F# to retrieve this data in parallel given a base directory.  Next, we need a way to harvest the data from the given logs so that we can get the IP address from it.

module LogCount =
  open System.IO
          
  let processed_logs : (string * string) array = 
    Async.Run (
      Async.Parallel 
        [for file in Directory.GetFiles @"C:\Logs\" -> 
           async { return file, File.ReadAllText file }])
           
  let harvest_data (data:string) : seq<string> =
    seq { for line in String.lines data do
            if not (line.StartsWith "#") then
              yield line.Split([|' '|]).[2]
        }

Finally, we can count get the IP occurrence count by implementing our map and reduce functions and calling our map_reduce implementation. 

  let ip_occurrence_count : Map<string, string> -> Map<string, int> =
    let m = const' (harvest_data >> Seq.map(flip Tuple.curry 1))
    let r = const' (Seq.sum >> Some) 
    MapReduce.map_reduce m r
 
module MainModule =
  [<EntryPoint>]
  let main(args:string array) =
    printfn "%A"
      (LogCount.ip_occurrence_count
        (Map.of_seq LogCount.processed_logs))
    0

The m function above harvests the data from the string value and then mark each IP address with an occurrence of 1.  Then our reduce function will sum the sequence of 1s for each given key to produce our result.  There is some boilerplate code that has been omitted due to space constraints but will be available at the end of this post.  Once we run this against a known set of data, we might get the following results:

seq
  [[192.168.1.2, 46]; [192.168.1.8, 339];
   [192.168.1.10, 11]; [192.168.1.15, 43]]
Press any key to continue . . .

 

With this simple example, we can see some of the power and possibilities such a programming model can bring.  This exercise is more of an idea of how one might be written using the programming model, as I didn’t write any system to handle concurrency, scheduling and so on. 

Conclusion

In many of my adventures in big data analysis and data parallel concurrency, MapReduce is one of the more interesting solutions.  By utilizing functional programming constructs, we are able to write rich code to analyze our data that may not as structured as we wish.  But, does our adventure stop here with this implementation?  Not necessarily, but either way it was a lot of fun.

You can find the source code for this example here.

9 Comments

Comments have been disabled for this content.