Exploring the Reactive Framework (RX)
A few days ago, intentionally or not, a version of the Reactive Framework was released into the wild. Let’s see how we can use the RX for computations on a stream of data. As an example we’ll take a stream of ints and produce the averages in groups of five.
Here’s the primary stream of numbers, using the static Generate() method
Random rnd = new Random();
var generatedNums = Observable.Generate<int,int>(
0 // seed
, d => true // condition to continue
, d => d % 12 //generated value
, d => (int)(rnd.NextDouble() * 300) //delay
, d => d + 1 // modify value for next iter
);
And to consume the stream by adding the values into an ObservableCollection
generatedNums
.Post(sc) // move onto UI thread
.Subscribe(num => Numbers.Add(num) // add numbers to observable collection
);
Computing the average, in groups of 5 turns out to be harder, as the Reactive FX doesn’t seem to have a GroupBy() method at this time. Here’s what I came up with:
generatedNums
.Grouper(a => a, (a,list) => list.Count() < 5) // group into lists of 5, returning an IObservable<IEnumerable<int>>
.Select(list => list.Average()) // take the average of the list, so project IObservable<IEnumerable<int>> to IObservable<int>
.Post(sc).Subscribe(mean => Averages.Add(mean) // move onto UI and add to observable collection
);
And the implementation for “Grouper()” public static IObservable<IEnumerable<TResult>> Grouper<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, TResult> selector
, Func<TSource, IEnumerable<TResult>, bool> grouper)
{
return new AnonymousObservable<IEnumerable<TResult>>(
observer =>
source.Subscribe(x =>
{
try
{
using (var er = source.GetEnumerator())
while (er.MoveNext())
{
bool needsMove = false;
var res = new List<TResult>();
while (grouper(er.Current, res) && ((needsMove) ? er.MoveNext() : true))
{
needsMove = true;
res.Add(selector(er.Current));
}
observer.OnNext(res);
}
}
catch (Exception exception)
{
observer.OnError(exception);
return;
}
},
new Action<Exception>(observer.OnError),
new Action(observer.OnCompleted)));
}