Exploring the Reactive Framework part II
Talk around the water cooler is that it might be possible to use the Reactive Framework for some lightweight CEP.
I’ll correct some of the (big) mistakes from my last post and build up a “jumping” window extension method for IObservables.
In my last post I build a simple grouping method, but in it I immediately turned the push style of processing into a pull style by using the GetEnumerator() method. This is a bad idea for two key reasons, a) it takes the inherit elegance of the RX reduces it to a for loop, and b) it commits a cardinal sin of multi-threading and reserves a thread for a primarily blocking operation.
Here’s an improved version
public static IObservable<IEnumerable<TSource>> ToWindow<TSource>( this IObservable<TSource> source, Func<TSource, IEnumerable<TSource>, bool> grouper) { return RXGrouping.ToWindow(source, val => val, grouper); } public static IObservable<IEnumerable<TResult>> ToWindow<TSource, TResult>( this IObservable<TSource> source, Func<TSource, TResult> selector, Func<TSource, IEnumerable<TResult>, bool> grouper) { List<TResult> res = new List<TResult>(); return new AnonymousObservable<IEnumerable<TResult>>( observer => source.Subscribe( nextVal => { try { if (!grouper(nextVal, res)) { observer.OnNext(res); res = new List<TResult>(); } res.Add(selector(nextVal)); } catch (Exception exception) { observer.OnError(exception); return; } } ,observer.OnError ,observer.OnCompleted)); }
The mistake in the prior version stemmed in part from thinking that I needed to ask for the next value, but of course the RX will supply the next value when it’s available.
To use it:
TimeSpan windowDuration = new TimeSpan(0,0,10); generatedNums // add a Timestamp to our raw data .Select(val => new { Timestamp = DateTime.Now, Value = val }) // create a 5 min "jumping" window .ToWindow((lastVal, seq) => (seq.Count() == 0) || (lastVal.Timestamp - seq.First().Timestamp < windowDuration)) // create item for display .Select(seq => new { Timestamp = seq.First().Timestamp , Values = seq.Select(a => a.Value).ToArray() , Average = seq.Average(a => a.Value) }) // marshal and add to list .Post(sc).Subscribe(wv => WindowVals.Add(wv));
And the results
Next we’ll look into creating a sliding window.