August 2009 - Posts

In my last post I showed how to send StreamInsight output streams to a UI via the ReactiveFramework. Here’s we’ll do the reverse, by sending an RX stream into a CEP stream. Instead of a partial example, I’ll use an end to end example showing simulated stock ticks, computing the 5 min rolling VWAP, and showing the results on a UI.

 

First we’ll generate the ticks:

System.Collections.Generic.IObservable<StockTick> stockTicks =
    System.Linq.Observable.Generate(
                new Random() // inital state
                 , rnd => true // continue
                 , rnd => new StockTick() // next value
                         {
                             Price = rnd.NextDouble() * 1000,
                             Side = Sides[rnd.Next(Sides.Length - 1)],
                             Size = (long)(rnd.NextDouble() * 1000),
                             Symbol = Symbols[rnd.Next(Symbols.Length - 1)],
                             Timestamp = DateTime.Now
                         }
                         , rnd => (int)(rnd.NextDouble() * 2000)  // waitInterval
                         , rnd => rnd  // iterate
                         );

And now convert to a CEP stream:

var cepInput = stockTicks.ToCEP()
                .ToCepStream(tick => tick.Timestamp);

Where ToCep() is just the inverse of ToRx(), defined previously.

public static S.IObservable<T> ToCEP<T>(this RX.IObservable<T> rxSource)
{
    return new CEPAnonymousObservable<T>(
        (S.IObserver<T> cepObserver) => 
            RX.ObservableExtensions.Subscribe(rxSource, nextVal=> cepObserver.OnNext(nextVal)));
}

Computing the rolling 5 min VWAP, (grouped by symbol) takes some effort

var alteredDurationStream = cepInput
                            .ToPointEventStream()
                            .AlterEventDuration(tick => TimeSpan.FromMinutes(5));

var fiveMinVWaps =  from fivemin in
                    alteredDurationStream
                    group fivemin by fivemin.Symbol into fGrp
                    from evwindow in fGrp.Snapshot()
                    select new
                    { 
                       Symbol = fGrp.Key, 
                       TotalAmount = evwindow.Sum(fmin => fmin.Size * fmin.Price),
                       TotalVolume = evwindow.Sum(fmin => fmin.Size), 
                    };

var fiveMinVWaps2 = from fivemin in fiveMinVWaps
                    select new VWAPItem()
                    {
                        Symbol = fivemin.Symbol,
                        VWAP = fivemin.TotalAmount / fivemin.TotalVolume,
                        Timestamp = DateTime.Now,
                    };

Although this nearly looks like conventional .Net Linq code, it isn’t. Think Linq2SQL. These are expressions, not pure CLR lambdas, so it’s not possible to place a breakpoint, nor are any arbitrary .Net computations allowed. The reason for the additional fiveMinVWaps2 projection is that it’s not possible to compute anything but the Sum or Avg in a SnapShot().

Now that we have the data, we can convert to back to a RX stream:

PropertyInfo tsProp = typeof(VWAPItem).GetProperty("Timestamp");
var vwaps = fiveMinVWaps2.ToObservable(tsProp).ToRX();

And update an ObservableCollection

vwaps.Post(_sc).Subscribe(item =>
{
    var exists = CEPOS1.Where(vw => vw.Symbol == item.Symbol).FirstOrDefault();
    if (exists == null)
        CEPOS1.Add(item);
    else
        exists.CopyFrom(item);
});

 

Which displays on a UI

image

One compelling feature of StreamInsight is it’s in-process hosting model. In addition to reducing the complexity of server side installs, it’s now possible to have a  CEP engine in the client UI.

The simplest way of getting CEP streams onto the UI would be the Reactive Framework methods. Something like

queryOutputStream
    .ToObservable(...)
    .Post(syncContext)
    .Subscribe(item=> collection.Add(item) );

But in the CTP that won’t work. As I discovered a few days ago The IObservable used in StreamInsight is defined in a different namespace and assembly than the IObservable in the System.Reactive. Furthermore the StreamInsight api lacks the base classes and extension methods defined in System.Reactive.

I didn’t want to go the normal route of creating an implementation of IObserver, on say a ViewModel, route the data through the dispatcher on onto a collection, as while it would have the benefits of simplicity and it would work, it would mean giving up on all the goodness in System.Reactive.

The first method I tried in an effort to convert a CEP IObservable into an RX IObservable didn’t work, but was instructive nonetheless.

Using StreamInsight’s own I/O adapter API, I would create an “Eventing” Adapter which would raise an conventional .Net event on an object of my choosing, then using the ReactiveFramework, convert that event to an RX IObservable. 

But it’s not easy (or possible) to do. Instances of output adapters are created by OutputAdapterFactories, which in turn are created by Factory methods. You can send in a configuration object, but it needs to be XML serializable, so there’s no sending in of Action<> delegates.

 

But it turns out that it’s not hard to convert from a CEP IObservable to an RX IObservable.

First you need a CEP AnonymousObserver<T>

using S = System;
internal class AnonymousObserver<T> : S.IObserver<T>
{
    private bool isStopped;
    private S.Action _onCompleted;
    private S.Action<S.Exception> _onError;
    private S.Action<T> _onNext;

    public AnonymousObserver(S.Action<T> onNext, S.Action<S.Exception> onError)
        : this(onNext, onError, () => { })
    {
    }
    public AnonymousObserver(S.Action<T> onNext, S.Action<S.Exception> onError, S.Action onCompleted)
    {
        _onNext = onNext;
        _onError = onError;
        _onCompleted = onCompleted;
    }
    public void OnCompleted()
    {
        if (!isStopped)
        {
            isStopped = true;
            _onCompleted();
        }
    }
    public void OnError(S.Exception exception)
    {
        if (!isStopped)
        {
            isStopped = true;
            _onError(exception);
        }
    }
    public void OnNext(T value)
    {
        if (!isStopped)
            _onNext(value);
    }
}

Then an extension method taking a CEP IObservable, returning a RX AnonymousObservable<T>, subscribing to it via the CEP IObserver and on the OnNext, calling the returning RX IObservable’s on OnNext.

Like so:

using RX = System.Collections.Generic;
using S = System;

public static class CEPExtMethods
{
    public static RX.IObservable<T> ToRX<T>(this S.IObservable<T> source)
    {
        return new AnonymousObservable<T>(
            (RX.IObserver<T> rxObserver) =>
                            source.Subscribe(new AnonymousObserver<T>(
                                    nextVal => rxObserver.OnNext(nextVal),
                                    rxObserver.OnError
                                )));
    }
}

To use it:

var queryOutputStream = CreateQueryTemplate(input);
var queryOutput = queryOutputStream.ToObservable(typeof(EventTypeCount).GetField("Time"));
queryOutput.ToRX().Send(_sc).Subscribe(v => this.CEPOS1.Add(v));

 

And results from the Observable sample on screen

image

This morning I was hoping to take a few minutes to modify one of the examples in the StreamInsight CTP and send an output stream to a UI, rather than the text files used in the examples. I thought this would be easy, as the readme states that there’s

“An alpha version of the StreamInsight libraries for development using the IObservable/IObserver programming paradigm.”

But it wasn’t. The IObservable used in StreamInsight is defined in a different namespace than the IObservable in the System.Reactive and the StreamInsight api lacks the base classes and extension methods defined in System.Reactive. At this time, the two APIs do not play well with each other.

 

Some thoughts on how to get around this temporary inconsistency:

  • Recompile System.Reactive to use StreamInsight’s IObservable/IObserver
  • Create a type converter between the two IObservable/IObservers
  • Create a StreamInsight output adapter which just raises a .Net event, then use the RX method of converting events to IObservables

Perhaps tonight.

Talk around the water cooler is that it might be possible to use the Reactive Framework for some lightweight CEP.

I’ll correct some of the (big) mistakes from my last post and build up a “jumping” window extension method for IObservables.

 

In my last post I build a simple grouping method, but in it I immediately turned the push style of processing into a pull style by using the GetEnumerator() method. This is a bad idea for two key reasons, a) it takes the inherit elegance of the RX reduces it to a for loop, and b) it commits a cardinal sin of multi-threading and reserves a thread for a primarily blocking operation.

 

Here’s an improved version

public static IObservable<IEnumerable<TSource>> ToWindow<TSource>(
    this IObservable<TSource> source, 
    Func<TSource, IEnumerable<TSource>, bool> grouper)
{
    return RXGrouping.ToWindow(source, val => val, grouper);
}


public static IObservable<IEnumerable<TResult>> ToWindow<TSource, TResult>(
 this IObservable<TSource> source,
 Func<TSource, TResult> selector, 
 Func<TSource, IEnumerable<TResult>, bool> grouper)
{
    List<TResult> res = new List<TResult>();
    return new AnonymousObservable<IEnumerable<TResult>>(
        observer =>
            source.Subscribe(
            nextVal =>
            {
                try
                {
                    if (!grouper(nextVal, res))
                    {
                        observer.OnNext(res);
                        res = new List<TResult>();
                    }
                    res.Add(selector(nextVal));
                }
                catch (Exception exception)
                {
                    observer.OnError(exception);
                    return;
                }
            }
            ,observer.OnError
            ,observer.OnCompleted));
}

The mistake in the prior version stemmed in part from thinking that I needed to ask for the next value, but of course the RX will supply the next value when it’s available.

To use it:

TimeSpan windowDuration = new TimeSpan(0,0,10);
generatedNums
    // add a Timestamp to our raw data
    .Select(val => new { Timestamp = DateTime.Now, Value = val })
    // create a 5 min "jumping" window
    .ToWindow((lastVal, seq) => 
            (seq.Count() == 0) || 
            (lastVal.Timestamp - seq.First().Timestamp < windowDuration))
    // create item for display
    .Select(seq => new { Timestamp = seq.First().Timestamp
                        , Values = seq.Select(a => a.Value).ToArray()
                        , Average = seq.Average(a => a.Value) })
    // marshal and add to list
    .Post(sc).Subscribe(wv => WindowVals.Add(wv));

 

And the results

 image

Next we’ll look into creating a sliding window.

More Posts