Going Interactive with the Reactive Extensions

Lately in my series on the Reactive Extensions, you’ll have noticed I focused quite a bit on the IObservable<T> and IObserver interfaces as well as the extensions methods that are included.  There is one thing, however, that might have been missed with the release of the Reactive Extensions is the inclusion of System.Interactive.dll.  The idea behind this is to include many of the extension methods that are available to IObservable<T> and port them to work on the IEnumerable<T> interface.

Before we get started again, there has been a lot of good stuff posted on Channel 9 in regards to Rx:

Each of those videos are well worth watching to get an idea how things are done underneath the covers.  Now, let’s get caught up to where we are today:

Exploring Interactive

Early on in this series, we covered that when talking about the Reactive Extensions, we were really talking about the traditional interactive programming in the form of the Iterator Pattern and reactive programming with the Observer Pattern.  Erik and his team went about to implement most of the LINQ operators from LINQ to objects onto the Reactive Extensions, but along the way found there were interesting ones that could immediately apply to IObservable<T> instances that had not been implemented yet on IEnumerable<T>.  So, then they decided, to work backwards and take the implementations of these new extension methods on IObservable<T> and apply them to IEnumerable<T>.  Let’s start exploring some of these below in the EnumerableEx class.

Amb

McCarthy’s Ambiguous Function which returns the first to react.  This takes it’s name from John McCarthy, the creator of Lisp from a 1963 paper entitled, “A Basis for a Mathematical Theory of Computation”.  In the case of an IEnumerable<T>, the winner would be the first to react from a call to MoveNext().  Let’s look at the signatures of the Amb overloads:

public static IEnumerable<TSource> Amb<TSource>(
    IEnumerable<IEnumerable<TSource>> sources
)

public static IEnumerable<TSource> Amb<TSource>(
    params IEnumerable<TSource>[] sources
)

public static IEnumerable<TSource> Amb<TSource>(
    IEnumerable<TSource> leftSource,
    IEnumerable<TSource> rightSource
)

What this tells us is that it can take any number of IEnumerable<T> instances, either a sequence of sequences, many sequences or just a left and a right.  Let’s try a simple example using a random number generator to simulate a non-deterministic return.  When I run this example, I cannot predict which might return first. 

IEnumerable<T> CreateRandomEnumerable<T>(T value)
{
    var random = new Random();
    Thread.Sleep(random.Next(1, 10));
    yield return value;
}

IEnumerable<string> enumerable1 = CreateRandomEnumerable("foo");
IEnumerable<string> enumerable2 = CreateRandomEnumerable("bar");
IEnumerable<string> enumerable3 = enumerable1.Amb(enumerable2);
enumerable3.Run(Console.WriteLine);

Like it’s original implementation in Lisp, this has quite a few implications for Artificial Intelligence, Physics and so on, but even such things as traversing networks and which packet can arrive first.

Catch

The Catch function allows you to iterate a collection and then terminate based upon an exception.  This also has an overload which specifies which exception you should catch as well as any compensation handler you wish to fire.

public static IEnumerable<TSource> Catch<TSource>(
    IEnumerable<IEnumerable<TSource>> sources
)

public static IEnumerable<TSource> Catch<TSource, TException>(
    IEnumerable<TSource> source,
    Func<TException, IEnumerable<TSource>> handler
)
where TException : Exception

In this instance, we have an IEnumerable instance which yield “foo” and then will throw an exception.  Below we will compensate for that exception and return an empty list when that happens.

IEnumerable<string> ThrowingEnumerable()
{
    yield return "foo";
    throw new InvalidOperationException();
}

var caughtEnumerable = ThrowingEnumerable()
    .Catch<string, InvalidOperationException>(exn => new string[] {});
caughtEnumerable.Run(Console.WriteLine); // Yields "foo"

Concat

This method concatenates a sequences together into a single sequence.  This can either take a sequence of sequences or any number of sequence of sequences to to produce that single sequence.

public static IEnumerable<TSource> Concat<TSource>(
    IEnumerable<IEnumerable<TSource>> sources
)

public static IEnumerable<TSource> Concat<TSource>(
    params IEnumerable<TSource>[] sources
)

For example, we can have an array of arrays which holds three values each and then concatenate then together to render the values 1 through 9.

 

var sequence = new[] {new[] {1, 2, 3}, new[] {4, 5, 6}, new[] {7, 8, 9}};
IEnumerable<int> concatSequence = sequence.Concat();

concatSequence.Run(Console.WriteLine);
// Yields 1,2,3,4,5,6,7,8,9

 

Defer

This method defers the creation of the sequence through a factory function which gets called when you start enumerating.  In the past, we didn’t have a way out of the box to defer potentially expensive operations such as enumerating a directory for files.  We used to write something like this.

string[] files = Directory.GetFiles("SomeDirectory");
foreach(string file in files)
    Console.WriteLine(file);

That was ok, if you didn’t mind the blocking operation up front which could be significant and we took a hit even though we didn’t start to iterate the collection.  We now have the Defer method which takes a function which takes no arguments and returns an IEnumerable<T>.

public static IEnumerable<TSource> Defer<TSource>(
    Func<IEnumerable<TSource>> enumerableFactory
)

Now, instead of the eager evaluation we had above, we can defer such creation of this string array until we actually enumerate it such as the following:

IEnumerable<string> deferredFiles = 
    EnumerableEx.Defer(() => Directory.GetFiles("SomeDirectory"));
deferredFiles.Run(Console.WriteLine);

Lazy evaluation in this scenario can be quite helpful to defer operations until we need them at the last responsible moment.

Run

You’ve seen this method being used, so I might as well introduce it already.  We’ve seen this extension method by any number of names, whether it be ForEach, Each, Iter, etc.  The basic intent is to evaluate each item in the enumerable, optionally with an Action<T> to apply to each item as it iterates.

public static void Run<TSource>(
    IEnumerable<TSource> source
)

public static void Run<TSource>(
    IEnumerable<TSource> source,
    Action<TSource> action
)

In this instance, we’ll create a list from 1 to 10 and then write each one out to the Console.  Because Console.WriteLine can take a single argument, we can convert a simple lambda statement to a method group.

IEnumerable<int> range = Enumerable.Range(1, 10);
range.Run(Console.WriteLine); // prints each number

Let’s stop here for the time being as we have a lot of others to cover before we turn our attention once again from the interactive to the reactive side of things.

Conclusion

As I’ve stated before, the Reactive Extensions for .NET us the ability to harness reactive programming and treat events as the first class citizens they should have been using LINQ expressions and other standard LINQ combinators.  Next, we’ll continue looking at the interactive extensions that ship with Rx and go into more interesting scenarios.

No Comments