Adding Parallel Extensions to F#
In many of my presentations lately, I’ve been using the Parallel Extensions for .NET as part of my heavy computations in F#. By doing so, I’m able to speed up some of my heavier computations by several fold and take full advantage of my machine. Over time, with the help of others, I’ve translated many of the functions from the ParallelEnumerable class into those that can easily be consumed by F# in a meaningful way.
From Seq to PSeq
Within the F# libraries, there is a thin wrapper over the IEnumerable<T> class called seq<’a>, and associated functions in the Seq module. The goal here is to mimic the signatures and well defined behaviors of the Seq module while wrapping the ParallelEnumerable class and its associated IParallelEnumerable<T> interface. Just as the Seq implementation which hides the IEnumerable<T> with seq<’a>, we’ll hide the underlying IParallelEnumerable<’a> with the pseq<’a> type.
We have some issues such as translating the F# functions to .NET delegate functions. We must wrap each of these into the Func constructor in order to call them properly. But other than that, it’s a pretty smooth integration between the language and the library.
So, without further ado, here is the code:
/// Type wrapper over the IParallelEnumerable
type pseq<'a> = System.Linq.IParallelEnumerable<'a>
module PSeq =
open System
open System.Linq
/// Append two parallel collections together
let append (ie1: pseq<'a>) (ie2: pseq<'a>) : pseq<'a> =
ParallelEnumerable.Concat(ie1, ie2)
/// This is the method to opt into Parallel LINQ.
let adapt : seq<'a> -> pseq<'a> =
ParallelQuery.AsParallel
/// This is the method to opt into Parallel LINQ with deg of parallelism
let adaptn (n:int) (seq:seq<'a>) : pseq<'a> =
if n < 1 then adapt seq
else ParallelQuery.AsParallel( seq, n )
/// AsOrdered is a method that tells PLINQ to treat a data source as if it was ordered
let ordered : pseq<'a> -> pseq<'a> =
ParallelQuery.AsOrdered
/// AsUnordered tells PLINQ that it should treat a particular intermediate result as if no
/// order was implied
let unordered : pseq<'a> -> pseq<'a> =
ParallelQuery.AsUnordered
/// This method is to opt out of Parallel LINQ.
let as_seq : pseq<'a> -> seq<'a> =
ParallelQuery.AsSequential
/// Parallel implementation of System.Linq.Enumerable.Average().
let average_float : pseq<float> -> float =
ParallelEnumerable.Average
/// Parallel implementation of System.Linq.Enumerable.Average().
let average_float_by (f:'a -> float) (pe:pseq<'a>) : float =
ParallelEnumerable.Average(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.ParallelEnumerable.Cast
let cast : IParallelEnumerable -> pseq<'a> =
ParallelEnumerable.Cast
/// Parallel implementation of System.Linq.ParallelEnumerable.Distinct
let distinct : pseq<'a> -> pseq<'a> =
ParallelEnumerable.Distinct
/// Parallel implementation of System.Linq.ParallelEnumerable.Any
let exists (f:'a -> bool) (pe:pseq<'a>) : bool =
ParallelEnumerable.Any(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.Where().
let filter (f:'a -> bool) (pe:pseq<'a>) : pseq<'a> =
ParallelEnumerable.Where(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.First
let find (f:'a -> bool) (pe:pseq<'a>) : 'a =
ParallelEnumerable.First(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.Aggregate().
let fold (f:'a -> 'b -> 'a) (seed:'a) (pe:pseq<'b>) : 'a =
ParallelEnumerable.Aggregate(pe, seed, Func<_,_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.All
let for_all (f:'a -> bool) (pe:pseq<'a>) : bool =
ParallelEnumerable.All(pe, Func<_,_>(f))
/// Empty ParallelEnumerable
let empty<'a> : pseq<'a> = ParallelEnumerable.Empty<'a>()
/// Parallel implementation of System.Linq.Enumerable.First()
let hd : pseq<'a> -> 'a =
ParallelEnumerable.First
/// Parallel implementation of System.Linq.ParallelEnumerable.Count
let length : pseq<'a> -> int =
ParallelEnumerable.Count
/// Parallel implementation of System.Linq.Enumerable.Select().
let map (f:'a -> 'b) (pe:pseq<'a>) : pseq<'b> =
ParallelEnumerable.Select(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.Select().
let mapi (f:int -> 'a -> 'b) (pe:pseq<'a>) : pseq<'b> =
let f' x i = f i x
ParallelEnumerable.Select(pe, Func<_,_,_>(f'))
/// Parallel implementation of System.Linq.Enumerable.Zip
let map2 (f:'a -> 'b -> 'c) (pe1:pseq<'a>) (pe2:pseq<'b>) : pseq<'c> =
ParallelEnumerable.Zip(pe1, pe2, Func<_,_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.Reverse
let rev : pseq<'a> -> pseq<'a> = ParallelEnumerable.Reverse
/// Parallel implementation of Seq.concat
let concat (pe:pseq<pseq<'a>>) : pseq<'a> =
ParallelEnumerable.SelectMany(
pe, Func<_,_>(fun x -> x :> seq<'a>))
/// Parallel implementation of System.Linq.Enumerable.ElementAt
let nth (n:int) (pe:pseq<'a>) : 'a =
ParallelEnumerable.ElementAt(pe, n)
/// Parallel implementation of System.Linq.Enumerable.OrderBy
let order_by (f:'a -> 'b) (pe:pseq<'a>) =
ParallelEnumerable.OrderBy(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.Range
let range (start:int) (count:int) : pseq<int> =
ParallelEnumerable.Range(start, count)
/// Parallel implementation of System.Linq.Enumerable.Skip
let skip (n:int) (pe:pseq<'a>) : pseq<'a> =
ParallelEnumerable.Skip(pe, n)
/// Parallel implementation of System.Linq.Enumerable.SkipWhile
let skip_while (f:'a -> bool) (pe:pseq<'a>) : pseq<'a> =
ParallelEnumerable.SkipWhile(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.Sum().
let sum_by_int : pseq<int> -> int =
ParallelEnumerable.Sum
/// Parallel implementation of System.Linq.Enumerable.Take
let take (n:int) (pe:pseq<'a>) : pseq<'a> =
ParallelEnumerable.Take(pe, n)
/// Parallel implementation of System.Linq.Enumerable.TakeWhile
let take_while (f:'a -> bool) (pe:pseq<'a>) : pseq<'a> =
ParallelEnumerable.TakeWhile(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.ToArray().
let to_array : pseq<'a> -> 'a array =
ParallelEnumerable.ToArray
/// Parallel implementation of to list
let to_list<'a> : pseq<'a> -> 'a list =
to_array >> Array.to_list
/// Parallel implementation of System.Linq.Enumerable.Zip
let zip (pe1:pseq<'a>) (pe2:pseq<'b>) : pseq<'a * 'b> =
let f a b = (a, b)
ParallelEnumerable.Zip(pe1, pe2, Func<_,_,_>(f))
[<AutoOpen>]
module Operators =
/// Used for sequence expressions
/// Example : let f = pseq [for x in [1..10] -> x * x]
let pseq (ie:seq<'a>) : pseq<'a> = PSeq.adapt ie
That’s it. That’s all there is to it. Now we can do such simple examples as the following:
|> PSeq.filter(fun x -> x % 3 = 0)
|> PSeq.sum_by_int;;
val it : int = 333666
Of course we could do more advanced than this but this is just a start. There are plenty of possibilities given the number of operators supported, and I’ll cover more of this in later posts.
What Does the Future Hold?
With the ease of integration here, we should wonder, how in the future F# might interact with the Parallel Extensions for .NET. With both F# and the Parallel Extensions for .NET becoming first class citizens, the case can be made that it should be an integral part of the F# libraries as well. These are interesting times when it comes to concurrency oriented programming and with the combination of F# and the Parallel Extensions, we have a powerful tool in our chest for performing algorithms over big data.