Exploring the Reactive Framework part II

Talk around the water cooler is that it might be possible to use the Reactive Framework for some lightweight CEP.

I’ll correct some of the (big) mistakes from my last post and build up a “jumping” window extension method for IObservables.

 

In my last post I build a simple grouping method, but in it I immediately turned the push style of processing into a pull style by using the GetEnumerator() method. This is a bad idea for two key reasons, a) it takes the inherit elegance of the RX reduces it to a for loop, and b) it commits a cardinal sin of multi-threading and reserves a thread for a primarily blocking operation.

 

Here’s an improved version

public static IObservable<IEnumerable<TSource>> ToWindow<TSource>(
    this IObservable<TSource> source, 
    Func<TSource, IEnumerable<TSource>, bool> grouper)
{
    return RXGrouping.ToWindow(source, val => val, grouper);
}


public static IObservable<IEnumerable<TResult>> ToWindow<TSource, TResult>(
 this IObservable<TSource> source,
 Func<TSource, TResult> selector, 
 Func<TSource, IEnumerable<TResult>, bool> grouper)
{
    List<TResult> res = new List<TResult>();
    return new AnonymousObservable<IEnumerable<TResult>>(
        observer =>
            source.Subscribe(
            nextVal =>
            {
                try
                {
                    if (!grouper(nextVal, res))
                    {
                        observer.OnNext(res);
                        res = new List<TResult>();
                    }
                    res.Add(selector(nextVal));
                }
                catch (Exception exception)
                {
                    observer.OnError(exception);
                    return;
                }
            }
            ,observer.OnError
            ,observer.OnCompleted));
}

The mistake in the prior version stemmed in part from thinking that I needed to ask for the next value, but of course the RX will supply the next value when it’s available.

To use it:

TimeSpan windowDuration = new TimeSpan(0,0,10);
generatedNums
    // add a Timestamp to our raw data
    .Select(val => new { Timestamp = DateTime.Now, Value = val })
    // create a 5 min "jumping" window
    .ToWindow((lastVal, seq) => 
            (seq.Count() == 0) || 
            (lastVal.Timestamp - seq.First().Timestamp < windowDuration))
    // create item for display
    .Select(seq => new { Timestamp = seq.First().Timestamp
                        , Values = seq.Select(a => a.Value).ToArray()
                        , Average = seq.Average(a => a.Value) })
    // marshal and add to list
    .Post(sc).Subscribe(wv => WindowVals.Add(wv));

 

And the results

 image

Next we’ll look into creating a sliding window.

2 Comments

  • Would your Grouper not need to specify an OnCompleted() that somehow calls observer.OnNext() with the current collection of results?

    It looks to me as though the last group of results will always be silently thrown away. &nbsp;This might be ok for unbounded observable sequences, but would preclude generic use of your Grouper (unless I misunderstand, which is fairly probable)

  • @Paul,

    Quite possibly. Though at this point, I suspect the code won't even compile on the current release of Rx

Comments have been disabled for this content.