From the ReactiveFramework to StreamInsight and Back

In my last post I showed how to send StreamInsight output streams to a UI via the ReactiveFramework. Here’s we’ll do the reverse, by sending an RX stream into a CEP stream. Instead of a partial example, I’ll use an end to end example showing simulated stock ticks, computing the 5 min rolling VWAP, and showing the results on a UI.

 

First we’ll generate the ticks:

System.Collections.Generic.IObservable<StockTick> stockTicks =
    System.Linq.Observable.Generate(
                new Random() // inital state
                 , rnd => true // continue
                 , rnd => new StockTick() // next value
                         {
                             Price = rnd.NextDouble() * 1000,
                             Side = Sides[rnd.Next(Sides.Length - 1)],
                             Size = (long)(rnd.NextDouble() * 1000),
                             Symbol = Symbols[rnd.Next(Symbols.Length - 1)],
                             Timestamp = DateTime.Now
                         }
                         , rnd => (int)(rnd.NextDouble() * 2000)  // waitInterval
                         , rnd => rnd  // iterate
                         );

And now convert to a CEP stream:

var cepInput = stockTicks.ToCEP()
                .ToCepStream(tick => tick.Timestamp);

Where ToCep() is just the inverse of ToRx(), defined previously.

public static S.IObservable<T> ToCEP<T>(this RX.IObservable<T> rxSource)
{
    return new CEPAnonymousObservable<T>(
        (S.IObserver<T> cepObserver) => 
            RX.ObservableExtensions.Subscribe(rxSource, nextVal=> cepObserver.OnNext(nextVal)));
}

Computing the rolling 5 min VWAP, (grouped by symbol) takes some effort

var alteredDurationStream = cepInput
                            .ToPointEventStream()
                            .AlterEventDuration(tick => TimeSpan.FromMinutes(5));

var fiveMinVWaps =  from fivemin in
                    alteredDurationStream
                    group fivemin by fivemin.Symbol into fGrp
                    from evwindow in fGrp.Snapshot()
                    select new
                    { 
                       Symbol = fGrp.Key, 
                       TotalAmount = evwindow.Sum(fmin => fmin.Size * fmin.Price),
                       TotalVolume = evwindow.Sum(fmin => fmin.Size), 
                    };

var fiveMinVWaps2 = from fivemin in fiveMinVWaps
                    select new VWAPItem()
                    {
                        Symbol = fivemin.Symbol,
                        VWAP = fivemin.TotalAmount / fivemin.TotalVolume,
                        Timestamp = DateTime.Now,
                    };

Although this nearly looks like conventional .Net Linq code, it isn’t. Think Linq2SQL. These are expressions, not pure CLR lambdas, so it’s not possible to place a breakpoint, nor are any arbitrary .Net computations allowed. The reason for the additional fiveMinVWaps2 projection is that it’s not possible to compute anything but the Sum or Avg in a SnapShot().

Now that we have the data, we can convert to back to a RX stream:

PropertyInfo tsProp = typeof(VWAPItem).GetProperty("Timestamp");
var vwaps = fiveMinVWaps2.ToObservable(tsProp).ToRX();

And update an ObservableCollection

vwaps.Post(_sc).Subscribe(item =>
{
    var exists = CEPOS1.Where(vw => vw.Symbol == item.Symbol).FirstOrDefault();
    if (exists == null)
        CEPOS1.Add(item);
    else
        exists.CopyFrom(item);
});

 

Which displays on a UI

image

3 Comments

Comments have been disabled for this content.