Adding Parallel Extensions to F#

In many of my presentations lately, I’ve been using the Parallel Extensions for .NET as part of my heavy computations in F#.  By doing so, I’m able to speed up some of my heavier computations by several fold and take full advantage of my machine.  Over time, with the help of others, I’ve translated many of the functions from the ParallelEnumerable class into those that can easily be consumed by F# in a meaningful way. 

From Seq to PSeq

Within the F# libraries, there is a thin wrapper over the IEnumerable<T> class called seq<’a>, and associated functions in the Seq module.  The goal here is to mimic the signatures and well defined behaviors of the Seq module while wrapping the ParallelEnumerable class and its associated IParallelEnumerable<T> interface.  Just as the Seq implementation which hides the IEnumerable<T> with seq<’a>, we’ll hide the underlying IParallelEnumerable<’a> with the pseq<’a> type.

We have some issues such as translating the F# functions to .NET delegate functions.  We must wrap each of these into the Func constructor in order to call them properly.  But other than that, it’s a pretty smooth integration between the language and the library.

So, without further ado, here is the code:

#light

/// Type wrapper over the IParallelEnumerable
type pseq<'a> = System.Linq.IParallelEnumerable<'a>

module PSeq = 
  open System
  open System.Linq

  /// Append two parallel collections together
  let append (ie1: pseq<'a>) (ie2: pseq<'a>) : pseq<'a> =
    ParallelEnumerable.Concat(ie1, ie2)

  /// This is the method to opt into Parallel LINQ.
  let adapt : seq<'a> -> pseq<'a> = 
    ParallelQuery.AsParallel

  /// This is the method to opt into Parallel LINQ with deg of parallelism
  let adaptn (n:int) (seq:seq<'a>) : pseq<'a> = 
    if n < 1  then adapt seq 
    else ParallelQuery.AsParallel( seq, n )
    
  /// AsOrdered is a method that tells PLINQ to treat a data source as if it was ordered
  let ordered : pseq<'a> -> pseq<'a> = 
    ParallelQuery.AsOrdered
  
  /// AsUnordered tells PLINQ that it should treat a particular intermediate result as if no
  /// order was implied
  let unordered : pseq<'a> -> pseq<'a> = 
    ParallelQuery.AsUnordered
  
  /// This method is to opt out of Parallel LINQ.
  let as_seq : pseq<'a> -> seq<'a> =
    ParallelQuery.AsSequential

  /// Parallel implementation of System.Linq.Enumerable.Average().
  let average_float : pseq<float> -> float
    ParallelEnumerable.Average
    
  /// Parallel implementation of System.Linq.Enumerable.Average().
  let average_float_by (f:'a -> float) (pe:pseq<'a>) : float =
    ParallelEnumerable.Average(pe, Func<_,_>(f))
    
  /// Parallel implementation of System.Linq.ParallelEnumerable.Cast
  let cast : IParallelEnumerable -> pseq<'a> =
    ParallelEnumerable.Cast
    
  /// Parallel implementation of System.Linq.ParallelEnumerable.Distinct
  let distinct : pseq<'a> -> pseq<'a> =
    ParallelEnumerable.Distinct
    
  /// Parallel implementation of System.Linq.ParallelEnumerable.Any
  let exists (f:'a -> bool) (pe:pseq<'a>) : bool =
    ParallelEnumerable.Any(pe, Func<_,_>(f))

  /// Parallel implementation of System.Linq.Enumerable.Where().
  let filter (f:'a -> bool) (pe:pseq<'a>) : pseq<'a> = 
    ParallelEnumerable.Where(pe, Func<_,_>(f))
    
  /// Parallel implementation of System.Linq.Enumerable.First
  let find (f:'a -> bool) (pe:pseq<'a>) : 'a =
    ParallelEnumerable.First(pe, Func<_,_>(f)) 

  /// Parallel implementation of System.Linq.Enumerable.Aggregate().
  let fold (f:'a -> 'b -> 'a) (seed:'a) (pe:pseq<'b>) : 'a = 
    ParallelEnumerable.Aggregate(pe, seed, Func<_,_,_>(f))
    
  /// Parallel implementation of System.Linq.Enumerable.All
  let for_all (f:'a -> bool) (pe:pseq<'a>) : bool =
    ParallelEnumerable.All(pe, Func<_,_>(f))
        
  /// Empty ParallelEnumerable
  let empty<'a> : pseq<'a> = ParallelEnumerable.Empty<'a>()
    
  /// Parallel implementation of System.Linq.Enumerable.First()
  let hd : pseq<'a> -> 'a =
    ParallelEnumerable.First
      
  /// Parallel implementation of System.Linq.ParallelEnumerable.Count
  let length : pseq<'a> -> int
    ParallelEnumerable.Count  
    
  /// Parallel implementation of System.Linq.Enumerable.Select().
  let map (f:'a -> 'b) (pe:pseq<'a>) : pseq<'b> = 
    ParallelEnumerable.Select(pe, Func<_,_>(f))           
    
  /// Parallel implementation of System.Linq.Enumerable.Select().
  let mapi (f:int -> 'a -> 'b) (pe:pseq<'a>) : pseq<'b> =
    let f' x i = f i x
    ParallelEnumerable.Select(pe, Func<_,_,_>(f'))
    
  /// Parallel implementation of System.Linq.Enumerable.Zip
  let map2 (f:'a -> 'b -> 'c) (pe1:pseq<'a>) (pe2:pseq<'b>) : pseq<'c> =
    ParallelEnumerable.Zip(pe1, pe2, Func<_,_,_>(f))
    
  /// Parallel implementation of System.Linq.Enumerable.Reverse
  let rev : pseq<'a> -> pseq<'a> = ParallelEnumerable.Reverse
      
  /// Parallel implementation of Seq.concat
  let concat (pe:pseq<pseq<'a>>) : pseq<'a> = 
    ParallelEnumerable.SelectMany(
        pe, Func<_,_>(fun x -> x :> seq<'a>))
    
  /// Parallel implementation of System.Linq.Enumerable.ElementAt
  let nth (n:int) (pe:pseq<'a>) : 'a =
    ParallelEnumerable.ElementAt(pe, n)
    
  /// Parallel implementation of System.Linq.Enumerable.OrderBy
  let order_by (f:'a -> 'b) (pe:pseq<'a>) =
    ParallelEnumerable.OrderBy(pe, Func<_,_>(f))
    
  /// Parallel implementation of System.Linq.Enumerable.Range
  let range (start:int) (count:int) : pseq<int> =
    ParallelEnumerable.Range(start, count)
    
  /// Parallel implementation of System.Linq.Enumerable.Skip
  let skip (n:int) (pe:pseq<'a>) : pseq<'a> =
    ParallelEnumerable.Skip(pe, n)
    
  /// Parallel implementation of System.Linq.Enumerable.SkipWhile
  let skip_while (f:'a -> bool) (pe:pseq<'a>) : pseq<'a> =
    ParallelEnumerable.SkipWhile(pe, Func<_,_>(f))
    
  /// Parallel implementation of System.Linq.Enumerable.Sum().
  let sum_by_int : pseq<int> -> int
    ParallelEnumerable.Sum          
    
  /// Parallel implementation of System.Linq.Enumerable.Take
  let take (n:int) (pe:pseq<'a>) : pseq<'a> =
    ParallelEnumerable.Take(pe, n)
      
  /// Parallel implementation of System.Linq.Enumerable.TakeWhile
  let take_while (f:'a -> bool) (pe:pseq<'a>) : pseq<'a> =
    ParallelEnumerable.TakeWhile(pe, Func<_,_>(f))
  
  /// Parallel implementation of System.Linq.Enumerable.ToArray().
  let to_array : pseq<'a> -> 'a array = 
    ParallelEnumerable.ToArray
    
  /// Parallel implementation of to list  
  let to_list<'a> : pseq<'a> -> 'a list =
    to_array >> Array.to_list    
      
  /// Parallel implementation of System.Linq.Enumerable.Zip
  let zip (pe1:pseq<'a>) (pe2:pseq<'b>) : pseq<'a * 'b> =
    let f a b = (a, b)
    ParallelEnumerable.Zip(pe1, pe2, Func<_,_,_>(f))             
      
[<AutoOpen>]
module Operators =
  
  /// Used for sequence expressions
  /// Example : let f = pseq [for x in [1..10] -> x * x] 
  let pseq (ie:seq<'a>) : pseq<'a> = PSeq.adapt ie
 

That’s it.  That’s all there is to it.  Now we can do such simple examples as the following:

> pseq [1..1000] |> PSeq.map ((*) 2) 
    |> PSeq.filter(fun x -> x % 3 = 0) 
    |> PSeq.sum_by_int;;
val it : int = 333666

Of course we could do more advanced than this but this is just a start.   There are plenty of possibilities given the number of operators supported, and I’ll cover more of this in later posts.

What Does the Future Hold?

With the ease of integration here, we should wonder, how in the future F# might interact with the Parallel Extensions for .NET.  With both F# and the Parallel Extensions for .NET becoming first class citizens, the case can be made that it should be an integral part of the F# libraries as well.  These are interesting times when it comes to concurrency oriented programming and with the combination of F# and the Parallel Extensions, we have a powerful tool in our chest for performing algorithms over big data. 

11 Comments

  • Hi Matthew,

    I wonder what is the interplay between async computations and pseq combinators, i.e. what would be the preferred type of binding of pseq expressions (let or let!) inside async monads. I guess async computations use the existing .net 3.5 thread pool while pseq use the enhanced 4.0 task api but stil this does not answer my question.

    Dimitris

  • @Dimitris

    There really isn't any interplay between the two. PSeq uses the PLINQ operators whereas the Async Workflows are just a thin wrapper over the ThreadPool APIs. In the future, with all of these cropping up, I wouldn't be surprised if they all started merging towards a common way of doing async and parallel computing.

    As it stands you cannot use the let! to express PSeq operations as they are not asynchronous in nature. You could though implement the List monad over the PSeq and use it that way such as this:

    type PSeqBuilder() =
    member x.Bind(l, f) = PSeq.Concat (PSeq.map f l)
    member x.Return(l) = pseq[l]
    member x.Delay(f) = f()
    let pseqM = new PSeqBuilder()

    pseqM { let! x = pseq[1..10]
    let! y = pseq[11..20]
    return x + y
    }

    Maybe in the future there will be tighter integration, but this is the best you're going to get right now.

    Matt

  • I'm very new to F#, so please forgive me if this is really elementary stuff... That said, here goes:

    I thought I'd try this out, and tried putting the PSeq bit in a module of its own.

    From the script I try to load and open the module, but while the project as a whole compiles fine, trying to run any samples from the console or the script just fails with a rather annoying message: "PSeq.fs(4,24): error FS0039: The module or namespace 'Linq' is not defined."

    I've checked my references, and really can't see what I'm doing wrong. Even Google shows up absolutely blank on that phrase.

    Any leads please? ;)

  • @Jostein

    In order to run from F# interactive, you need the following included:

    #if INTERACTIVE
    #r "System.Core.dll"
    #r "System.Threading.dll"
    #endif

    Hope this helps.

    Matt

  • Thanks, this is very useful while we wait for .net 4.0. Here's a little one that is missing:

    let iter (f: 'a -> unit) (pe: pseq) : unit =
    Parallel.ForEach(pe, f)

  • @Mauricio,

    You are correct, that would be rather trivial to add.

    Matt

  • Here is a generic implementation of sum:

    /// Generic Parallel implementation of sum.
    let inline sum (pe:pseq) =
    let zero = LanguagePrimitives.GenericZero
    ParallelEnumerable.Aggregate(pe, zero, fun a b -> a + b)

  • Has anyone been able to get this to work with VS2010 Beta 2?

  • @Talbott,

    It shouldn't be hard to port at all, and adding more combinators than what I have here is not hard.

    Matt

  • Adding parallel extensions to f.. Awful :)

  • I appreciate you taking the time to create this publish. It has been quite helpful to me in fact. Enjoy it.

Comments have been disabled for this content.