F# October 2009 CTP/Beta2 – F# + Rx Together At Last

Lately, I’ve been covering a lot of F# First Class Events as well as the Reactive Framework which has been leading up to a head as it were.  It has been announced through Channel 9 that .NET 4 will include the two major interfaces from the Reactive Framework, the IObservable<T> and IObserver<T>.  Today, as Don Syme announces with the release of the F# October 2009 CTP and F# for Visual Studio 2010 Beta 2, those two interfaces, the IObservable<T> and IObserver<T> have been integrated into F# First Class Events.

What does that mean and what are the implications?

Mixing in Observables

In the previous version of F#, you may remember that we dealt primary with the IEvent interface and if you needed the ability to subscribe and unsubscribe, you would use the IDelegateEvent interface.  Let’s look at how this was implemented in the past.

type IDelegateEvent<'Delegate when 'Delegate :> System.Delegate > =
  abstract AddHandler: handler:'Delegate -> unit
  abstract RemoveHandler: handler:'Delegate -> unit 

type IEvent<'Delegate,'Args when 'Delegate : delegate<'Args,unit> and 
                                 'Delegate :> System.Delegate > =
  abstract Add: callback:('Args -> unit) -> unit
  inherit IDelegateEvent<'Delegate>

As you’ll note, we had the ability to add and remove delegate handlers using the AddHandler and RemoveHandler methods with our IDelegateEvent interface and our ability to add a handler via the Add method in the IEvent interface.  Both of these gave us some pretty rich options when dealing with first class events.  When the Reactive Framework interfaces were to be folded in to .NET 4.0, it was only a matter of time before the F# team looked on how to integrate the idea of Observables and F# First Class Events.  WIthin the Visual Studio 2010 Beta 2 download, you’ll find that F# does in fact reference the System.IObservable<T> and System.IObserver<T> interfaces.  If you are still using Visual Studio 2008, these interfaces have also been included as well so that you can take advantage of them.  Now if we are to look at how F# First Class Events work now, let’s go over the inheritance chain.

type IObserver<'T> =
  abstract OnNext : value : 'T -> unit
  abstract OnError : error : exn -> unit
  abstract OnCompleted : unit -> unit

type IObservable<'T> =
  abstract Subscribe : observer : IObserver<'T> -> System.IDisposable
  
type IDelegateEvent<'Delegate when 'Delegate :> System.Delegate > =
  abstract AddHandler: handler:'Delegate -> unit
  abstract RemoveHandler: handler:'Delegate -> unit 

type IEvent<'Delegate,'Args when 'Delegate : delegate<'Args,unit> and 
                                 'Delegate :> System.Delegate > =
  inherit IDelegateEvent<'Delegate>
  inherit IObservable<'Args>  

What you’ll now notice is that we have in code, defined our IObserver<T> and IObservable<T> which either come from the .NET 4.0 base class library or in F# itself for Visual Studio 2008, and then we have our IEvent<TDel,T> inheriting both the IDelegateEvent<TDel> and the IObservable<T> interfaces.  This means that when we’re dealing with First Class Events in F#, we can go back and forth from using the IEvent<TDel,T> and IObservable<T> interfaces, based upon our needs.  So, what does that buy us?

Exploring Observables

Much like the Event module in F# defines combinators for F# First Class Events, the Observable module defines the standard combinators for using the IObservable<T> interface.  Much like the Event module, the Observable module includes add, filter, map, partition, merge, choose, and scan.  Let’s go over these briefly about what each one is about.

  • add
    Create an observer which permanently subscribes to the given observable and which calls the given function for each observation.
  • subscribe
    Create an observer which subscribes to the given observable and which calls the given function for each observation.
  • map
    Return an observable which transforms the observations of the source by the given function. The transformation function is executed once for each subscribed observer. The returned object also propagates error observations arising from the source and completes when the source completes.
  • filter
    Return an observable which filters the observations of the source by the given function. The observable will see only those observations for which the predicate returns true. The predicate is executed once for each subscribed observer. The returned object also propagates error observations arising from the source and completes when the source completes.
  • scan
    Return an observables which, for each observer, allocates an item of state and applies the given accumulating function to successive values arising from the input. The returned object will trigger observations for each computed state value, excluding the initial value. The returned object propagates all errors arising from the source and completes when the source completes.
  • choose
    Return an observable which chooses a projection of observations from the source using the given function. The returned object will trigger observations x for which the splitter returns Some x. The returned object also propagates all errors arising from the source and completes when the source completes.
  • partition
    Return two observables which partition the observations of the source by the given function. The first will trigger observations for those values for which the predicate returns true. The second will trigger observations for those values where the predicate returns false. The predicate is  executed once for each subscribed observer. Both also propagate all error observations arising from the source and each completes when the source completes.
  • split
    Return two observables which split the observations of the source by the given function. The first will trigger observations x for which the splitter returns Choice1Of2 x. The second will trigger observations y for which the splitter returns Choice2Of2 y.  The splitter is executed once for each subscribed observer. Both also propagate error observations arising from the source and each completes when the source completes.
  • merge
  • Return an observable for the merged observations from the sources. The returned object propagates success and error values arising from either source and completes when both the sources have completed.

You’ll notice one difference from the Event module to the Observable module is the inclusion of the subscribe function.  This allows us to subscribe to an observable, which in turn will return an IDisposable.  This IDisposable, as I’ve covered before, allows us to nicely tell the system when we’re no longer interested in receiving notifications instead of having to tell the observer when we want to attach and detach.  We’ll cover more of this in a future post on the Introduction to the Reactive Framework series.

What does this change buy us exactly?  Well, now we can treat observables much the same as we would for F# first class events.  Now we can take an example from a previous post and show how we can create two observables based upon whether the X and Y coordinate of the mouse is in a certain location as we drag it across our form.

// Create form
let form = new Form(Visible=true, TopMost=true, Text="Event Sample")

// Create under and over for X and Y coordinates
let (overEvent, underEvent) =
  form.MouseDown
  |> Observable.merge form.MouseMove
  |> Observable.filter (fun args -> args.Button = MouseButtons.Left)
  |> Observable.map (fun args -> (args.X, args.Y))
  |> Observable.partition (fun (x, y) -> x > 100 && y > 100)

// Subscribe to each
let overSubscription =
  overEvent 
  |> Observable.subscribe (fun (x, y) -> printfn "Over (%d, %d)" x y)
let underSubscription =
  underEvent 
  |> Observable.subscribe (fun (x, y) -> printfn "Under (%d, %d)" x y)
  
// Much later, clean up
overSubscription.Dispose()
underSubscription.Dispose()

As you can see from this code, we’re treating our Observables no differently than we would our F# First Class Events.  With the inheritance chain of the IEvent from the IObservable, we get seamless integration between the two.  Unlike C#, we don’t need to create extension methods to turn our events into observables and instead get that for free.

Conclusion

What you will notice is that we lack many of the combinators that the Reactive Framework has built in such as Until/WaitUntil, Take/TakeWhile, Skip/SkipWhile among others for which we’ll cover in the Introduction to the Reactive Framework series.  Many of these aren’t hard to implement and we’ll get the added benefit of exception management that the Observables give us.

It’s pretty interesting to see how F# has evolved from release to release.  This release for Visual Studio 2010 Beta 2 and the F# October 2009 CTP have some nice changes as noted in the release notes.  The integration of the IObservable/IObserver is not to be overlooked as one of them.  Download them today and give the team feedback!  Now back to our Observables series.

No Comments