Introduction to the Reactive Framework Part IV

In the previous post in this series, we covered how to turn .NET events into first class values through IObservable instances.  By doing so, we were able to do much more interesting things than just subscribe and unsubscribe, instead we were able to create a mouse drag event with little effort through composition.  In this post, let’s look at going from push to pull by turning collections into IObservable<T> instances.

Let’s get caught up to where we are today:

From IEnumerables To Observables

As you may remember from the first post and second post in this series, we covered going from IEnumerable<T> to IObservable<T> and the duality between the two.  But, how can we switch back and forth between them?  Remember that we can create an IObservable from a couple of angles.  Let’s first create one by using the chaining together Empty and StartsWith combinators on the Observable class to create an IObservable<T>.  Note that this is not the publicly available release of the Reactive Framework, so bear with any name changes.

var observable = Observable.Empty<int>()
    .StartWith(1)
    .StartWith(2)
    .StartWith(3);

This allows me to start out with an empty IObservable<int> and then pre-append the value to the front of the instance, so in this case it would render 3, 2, 1.  Next, let’s look at how we might generate a sequence IObservable from 1 to 10.

var observable = Observable.Generate(
    1,                        // Initial value
    value => value <= 10,    // Predicate
    value => value,            // Selector
    value => value + 1);    // Iterator
observable.Subscribe(Console.WriteLine);

In this instance, we supply an initial value, then a predicate to determine when to stop, a result selector, and then finally an iterator.  There are plenty of overloads as well on the Generate to suit your needs when creating IObservable<T> values.  But, what about taking existing IEnumerable<T> values and converting them to IObservable<T> values?  One could use the the Create function which allows us to create arbitrary IObservable<T> instances. 

var enumerable = Enumerable.Range(1, 10);
var observable = Observable.Create<int>(observer =>
{
    try
    {
        foreach (var item in enumerable)
            observer.OnNext(item);
        observer.OnCompleted();
    }
    catch (Exception exception)
    {
        observer.OnError(exception);
    }

    return () => { };
});
observable.Subscribe(
    value => Console.WriteLine(value),
    exn   => Console.WriteLine(exn.ToString()));

What we’re doing in the above code is using the Create method which gives us an IObserver<T> that we can post values to.  In this case, we’re iterating through a collection and for each item, we’re calling OnNext.  At the end of the iteration, we call the OnCompleted method which indicates that there are no more values.  If there were some sort of exception when iterating the values, it would be caught and the exception posted to the observer.  We can then decide how to handle it during our Subscribe method call, if at all.  Finally, we return an Action which is to be called once we unsubscribe from the given IObserver<T>, which in this instance, we do nothing.  That’s all fine and good, but there is an easier way of course, by using the ToObservable extension method.

var enumerable = Enumerable.Range(1, 10);
var observable = enumerable.ToObservable();
observable.Subscribe(Console.WriteLine);

Going from IEnumerable<T> to IObservable<T> as you an see is quite easy.  Just as well, going back is easy as well through either of two methods, one being calling the ToEnumerable extension method, and the other being the GetEnumerator method.

// To Observable
var enumerable = Enumerable.Range(1, 10);
var observable = enumerable.ToObservable();

// And back again
var enumerableAgain = observable.ToEnumerable();
var enumerator = observable.GetEnumerator();

This ultimately leaves us in control of when we want to switch between the pull and push model, depending on our circumstances.  It’s sort of like a buy one get one free in that respect.  What makes this interesting is to take a collection and spread it out to multiple instances, for example reading in a file and then you could have multiple subscriptions each process the data in turn.  Those possibilities are endless if we want to do certain actions concurrently.

var text = File.ReadAllLines("DocumentToProcess.txt");
var textObservable = text.AsObservable();

// Process concurrently
var textLetterSub = text.Subscribe(CountLetters);
var textWordSub = text.Subscribe(CountWords);
var textVowelSub = text.Subscribe(CountVowels);

There are also more things to come when dealing with collections that we’ll get into in later posts in this series, but this is enough for now.

Conclusion

As I’ve stated before, the LiveLabs Reactive Framework gives us the ability to harness reactive programming and treat events as the first class citizens they should have been using LINQ expressions and other standard LINQ combinators.  Still, there is much to cover in this series with the standard LINQ combinators, the monadic heritage of this solution as well as turning Tasks and Asynchronous Methods into Observables.

3 Comments

Comments have been disabled for this content.