Introducing the Reactive Framework Part I

During my series about first class events in F#, I made frequent mention of the LiveLabs Reactive Framework (Rx), without going into much detail as to what it is.  Now that the series is complete and we understand both how F# first class events work, as well as their integration into F# async workflows, let’s take a look at the Reactive Framework in this multi-part series covering the history, implementation details and more. 

You may already be familiar with the Reactive Framework due to the videos that Erik Meijer, Wes Dyer and Charles Torre have put on Channel 9.  Each of the following are well worth a watch:

What and Why?

So, the question of what are the motivations of Rx?  The first question to ask yourself is “How easy is it in today’s frameworks to do asynchronous programming?”  The answer usually that it’s actually quite difficult, to not only manage exceptional cases and cancellation, but also, how do you compose asynchronous events together to create more interesting events?  The need is actually quite great especially given our multi-core revolution.  So, how can we take advantage to do some rich, reactive programming like AJAX-style applications?

After covering some of my posts on F# async workflows and F# first class events, some of those motivations start to appear, such as how do you make asynchronous programming and reactive programming truly composable?   Let’s look at the standard ways of doing asynchronous programming currently on the .NET platform.  In the past, we’ve had to write things such as this in order to asynchronously read data from a Twitter feed:

var request = WebRequest.Create("http://twitter.com/statuses/friends_timeline.xml");
request.Credentials = new NetworkCredential("foo", "bar");
request.BeginGetResponse(ar =>
{
    using(var response = request.EndGetResponse(ar))
    using(var stream = response.GetResponseStream())
    using (var reader = new StreamReader(stream))
        PublishResult(reader.ReadToEnd()); // Do something with the result
}, null);

Since the beginning of .NET, we had both events and the Begin/End with an IAsyncResult for doing asynchronous programming, the latter of which we’ve shown above.  The problem with both is that they weren’t easy to handle exceptions and cancellation.  By the time .NET 2.0 came around, there was a new approach to doing asynchronous programming that in fact handled such concerns.  Such classes as the BackgroundWorker, WebClient and WCF service clients followed this approach.  For example, we could download a user’s timeline from Twitter using the WebClient class.

var wc = new WebClient 
    {Credentials = new NetworkCredential("foo", "bar")};
wc.DownloadStringCompleted += (o, e) =>
{
    if(e.Cancelled)
    {
        PublishCancel();
        return; // Handle cancellation
    }
    if(e.Error != null)
    {
        PublishException(e.Error);
        return; // Handle error
    }

    PublishResult(e.Result); // Handle success
};

 

 

As you can see, we can somewhat elegantly handle not only the result, but the cancellation and exceptional cases as well.  The problem of course is that there is really no way to unsubscribe from this lambda expression, and just as well, the solution really isn’t composable.  How could I merge this event with another event, or say stop listening after a certain interval or another event happens?  Unlike in F#, in C#, we have no real easy way to deal with a lot of these situations.  Not only that, but we could leave dangling events out there which could be a nice memory leak.

So, how could this Rx thing help?  Let’s look at a quick example of how it’s different before we go into detail about what it is.  In this example, I’ll build upon it to succinctly handle the exceptional case, but not only that, but also watch the progress changed until the download has completed.

var wc = new WebClient 
    { Credentials = new NetworkCredential("foo", "bar") };
    
// Watch progress only until download complete
var progress = wc.DownloadProgressChangedEvent()
    .Until(wc.DownloadStringCompletedEvent());

// Subscribe which creates IDisposable handlers
// Handle success and exceptional cases

var progressHandler = progress.Subscribe(
    e => PublishProgress(e.EventArgs.ProgressPercentage));
    
var downloadHandler = wc.DownloadStringCompletedEvent().Subscribe(
    e => PublishResults(e.EventArgs.Result),
    exception => PublishException(exception));

...

// Much later in the program, we can dispose
progressHandler.Dispose();
downloadHandler.Dispose();

What we did was allow us to handle exceptional cases as well as the success case on the next invocation of the observable.  I create extension methods which I will cover later, but I want you to get the gist of how this might work.  Since we’re only interested in one firing, we can dispose of the progress subscription quite easily since any call to Subscribe returns an IDisposable which allows us to unsubscribe from our event easily.  This is a similar approach in my F# event posts which allowed me to unsubscribe from any event when I call the subscribe extension method I created. 

To give one more example about composability, how about creating a simple mouse drag event?  How might we do that given the current state of eventing in .NET?

public event MouseEventHandler MouseDrag;

private void InitializeMouseDrag()
{
    MouseDrag += (o, e) => PublishMouseDrag(o, e);

    var fired = false;
    MouseDown += delegate { fired = true; };
    MouseUp += delegate { fired = false; };
    MouseMove += (o, e) =>
    {
        if (fired && MouseDrag != null)
            MouseDrag(this, e);
    };
}

What we’re doing is determining whether our mouse down and mouse move are firing together and if they are, publish the coordinate to the MouseDrag event.  From this code, it’s rather clumsy in order to do this.  Luckily, the Reactive Framework gives us a better alternative for handling such a thing.

// Create mouse drag
var mouseDrag = from md in this.GetMouseDown()
                from mm in this.GetMouseMove().Until(this.GetMouseUp())
                select mm;

// Subscribe
var handler = mouseDrag.Subscribe(
    e => PublishMouseDrag(e.EventArgs.Location));

...

// Clean up when we're done with the event
handler.Dispose();

When creating this mouse drag observable, I am able to use LINQ expressions to compose my Observables create from extension methods in order to track mouse down and mouse move until mouse up.  Now are we starting to see the power here?

Now that we got a feel of what it might look like, what is it?

The Reactive Framework

There has been a lot of talk around the Reactive Framework on both Channel 9 and among the blogs.  Since the Reactive Framework has been shipped as part of Silverlight 3 Toolkit, there has been a bit of interest.  Just as LINQ to objects changed the way we thought about interactive programming, the Reactive Framework will change the way we think about reactive programming.

Let’s first cover some basic terms here with interactive and reactive programming.  When we talk about interactive programming, we’re talking about asking for something and getting it in return.  One common pattern in this world is the iterator pattern.  In contrast, when we think about reactive programming, we register interest and then we have items handed to us asynchronously as they become available.  To give a sushi bar analogy, you could think of interactive programming as the conveyer and as it moves we pick another piece of sushi from the line and put it on our plate.  In contrast, with reactive programming, we register interest that we want sushi, and as they become available, the chef hands us them asynchronously. 

To put this in code terms, let’s show the interactive version of our sushi model.  We have several items that we wish to eat, but we will only have them delivered if we ask for them.

static IEnumerable<string> GetSushi()
{
    yield return "Toro";
    yield return "Maki";
    yield return "Uni";
}

// Pull each sushi piece from the conveyor
foreach(var piece in GetSushi())
    ConsumeSushi(piece);

Inside the IEnumerable<T>, we have our items waiting to go, but will only move when we ask for them.  The inverse would be where we indicate we’re hungry by subscribing to the observable which contains our sushi.  At that point, the house gives us sushi when they are ready and we don’t have to ask for each piece, instead they are pushed to us.

// Build up our menu
static IObservable<string> GetSushi()
{
    return Observable.Cons("Toro", 
        Observable.Cons("Maki", 
        Observable.Return("Uni")));
}

// Eat them as soon as they come to us
var sushiSubscription = GetSushi().Subscribe(
    piece => ConsumeSushi(piece));

In the above example, I built up our observable menu of sushi using the Cons and Return static methods on the Observable class.  Then at the bottom, I subscribe that I want to eat them, so I call subscribe passing in my action which fires on each item in our observable. 

The reason I focus on this, is that there is a certain duality to push and pull, and especially given the way they were designed.  In our above example, the Iterator pattern is a common interactive programming pattern that’s used quite frequently inside .NET.  This consists of two interfaces, the IEnumerable<T> and the IEnumerator<T>.  Listed below are some of the highlights of these two interfaces.  I cut down what they are to their essential bits.

public interface IEnumerable<T>
{
    IEnumerator<T> GetEnumerator();
}

public interface IEnumerator<T> : IDisposable
{
    T Current { get; }
    bool MoveNext();
}

The IEnumerable<T> interface exposes a single method which returns an IEnumerator<T> to iterate some object.  The IEnumerator<T> gives us the ability to get the current item and determine whether there are more items to iterate.

To think about basic asynchronous programming which we’re doing with our observables, we must consider a basic Gang of Four Pattern, the Observer Pattern.  This pattern describes where an object called an Observable, maintains a list of its dependent Observer classes and notifies them automatically of any state changes.  In Java, this should be rather familiar territory with the java.util.Observer class and the java.util.Observable interfaces.  In the Reactive Framework, this approach is no different, as we have an IObservable<T> interface which maintains a list of dependent IObserver<T> interfaces and notifies them automatically of any state changes.  Let’s look at the signatures below:

public interface IObserver<T>
{
    void OnCompleted();
    void OnError(Exception exception);
    void OnNext(T value);
}

public interface IObservable<T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

What we also get is much like the System.Linq.Enumerable class contains many extension methods for handling IEnumerable<T>, we have the System.Linq.Observable class which contains many of those same extension methods for IObservable<T> instances.  We’ll go into depth of what they are later in this series.

What you may notice, and what you may have seen on some of Erik Meijer’s Channel 9 videos is that there is a mathematical duality between the Enumerable and Observable, given the signature of these interfaces, as well as the ideas between push and pull.  I’ll go into exactly what that means in the next post.

Conclusion

SImply stated, asynchronous programming is hard.  To not only manage exceptions and cancellation, but also to make events composable is yet another challenge.  The LiveLabs Reactive Framework gives us the ability to harness reactive programming and treat events as the first class citizens they should have been using LINQ expressions and other standard LINQ combinators.  There is a lot to cover including the mathematical duality, what it is underneath the covers and so on.

3 Comments

  • I have been following the Rx framework for some time now. Thank for the nice post. One thing that I noticed on F# is that they do not provide an equivalent to SelectMany (and some others) for events. The best I came up was:

    []
    module Event =
    let flatten (source: IEvent<IEvent>): IEvent =
    let fire,evt = Event.create()
    source |> Event.listen (Event.listen(fire))
    evt

    let bind (evt: IEvent) (f: 'T -> IEvent) : IEvent =
    evt |> Event.map(fun t -> f t) |> flatten

    let until (other: IEvent) (source: IEvent) : IEvent =
    let fireSource,evt = Event.create()
    let fire = ref true
    other |> Event.listen(fun _ -> fire := false)
    source |> Event.listen(fun args -> if !fire then fireSource(args))
    evt

    This would allow me to do your Drag event like this:

    type System.Windows.UIElement with
    member this.Drag =
    Event.bind (this.MouseLeftButtonDown) (fun _ -> this.MouseMove |> Event.until this.MouseLeftButtonUp)

    It seems to work, but not being able to unsubscribe from the events on the implementation of "until" leaves a bad taste on my mouth.

    Again thanks for the post. I'm looking forward to the rest of the series. :)

  • @Paks,

    You're absolutely right, but that's not out of the ordinary for many of the combinators in the Event module anyhow. I had the same exact solution a while ago but hadn't posted it because of that issue about cleanup, which I think isn't really an option.

    I cleaned up the code a little bit and you can find it here:
    module Event =
    let flatten (source: IEvent<IEvent>): IEvent =
    let evt = new Event()
    source |> Event.listen (Event.listen evt.Trigger)
    evt.Publish

    let collect (evt: IEvent) (f: 'T -> IEvent) : IEvent =
    evt |> Event.map f |> flatten

    let until (other: IEvent) (source: IEvent) : IEvent =
    let evt = new Event()
    let fire = ref true
    other .Add(fun _ -> fire := false)
    source.Add(fun args -> if !fire then evt.Trigger args)
    evt.Publish

    []
    module EventExtensions =
    open System
    open System.Windows.Forms

    type EventBuilder() =
    member this.Bind(m, f) = Event.collect m f
    let event = new EventBuilder()

    type System.Windows.Forms.Control with
    member this.MouseDrag = event {
    let! _ = this.MouseDown
    return! this.MouseMove |> Event.until this.MouseUp }

    type IDelegateEvent Delegate > with
    member this.Subscribe(d) =
    this.AddHandler(d)
    { new IDisposable with
    member disp.Dispose() =
    this.RemoveHandler(d) }

  • Wow! I could not even guess about it)) Not bad.

Comments have been disabled for this content.