Exploring the Reactive Framework (RX)

A few days ago, intentionally or not, a version of the Reactive Framework was released into the wild. Let’s see how we can use the RX for computations on a stream of data. As an example we’ll take a stream of ints and produce the averages in groups of five.

image

 

Here’s the primary stream of numbers, using the static Generate()  method

Random rnd = new Random();
var generatedNums = Observable.Generate<int,int>(
                    0 // seed
                    , d => true // condition to continue
                    , d => d % 12 //generated value 
                    , d => (int)(rnd.NextDouble() * 300) //delay
                    , d => d + 1 // modify value for next iter
                    );

And to consume the stream by adding the values into an ObservableCollection

generatedNums
    .Post(sc) // move onto UI thread
    .Subscribe(num => Numbers.Add(num) // add numbers to observable collection
    );
Computing the average, in groups of 5 turns out to be harder, as the Reactive FX doesn’t seem to have a GroupBy() method at this time. Here’s what I came up with:
generatedNums
    .Grouper(a => a, (a,list) => list.Count() < 5) // group into lists of 5, returning an IObservable<IEnumerable<int>>
    .Select(list => list.Average()) // take the average of the list, so project IObservable<IEnumerable<int>> to IObservable<int> 
    .Post(sc).Subscribe(mean => Averages.Add(mean) // move onto UI and add to observable collection
    );
And the implementation for “Grouper() 
public static IObservable<IEnumerable<TResult>> Grouper<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, TResult> selector
    , Func<TSource, IEnumerable<TResult>, bool> grouper) 
{
    return new AnonymousObservable<IEnumerable<TResult>>(
        observer =>
            source.Subscribe(x =>
        {
            try
            {
                using (var er = source.GetEnumerator())
                    while (er.MoveNext())
                    {
                        bool needsMove = false;
                        var res = new List<TResult>();
                        while (grouper(er.Current, res) && ((needsMove) ? er.MoveNext() : true))
                        {
                            needsMove = true;
                            res.Add(selector(er.Current));
                        }
                        observer.OnNext(res);
                    }
            }
            catch (Exception exception)
            {
                observer.OnError(exception);
                return;
            }
        },
        new Action<Exception>(observer.OnError),
        new Action(observer.OnCompleted)));
}

No Comments