From the ReactiveFramework to StreamInsight and Back

In my last post I showed how to send StreamInsight output streams to a UI via the ReactiveFramework. Here’s we’ll do the reverse, by sending an RX stream into a CEP stream. Instead of a partial example, I’ll use an end to end example showing simulated stock ticks, computing the 5 min rolling VWAP, and showing the results on a UI.


First we’ll generate the ticks:

System.Collections.Generic.IObservable<StockTick> stockTicks =
                new Random() // inital state
                 , rnd => true // continue
                 , rnd => new StockTick() // next value
                             Price = rnd.NextDouble() * 1000,
                             Side = Sides[rnd.Next(Sides.Length - 1)],
                             Size = (long)(rnd.NextDouble() * 1000),
                             Symbol = Symbols[rnd.Next(Symbols.Length - 1)],
                             Timestamp = DateTime.Now
                         , rnd => (int)(rnd.NextDouble() * 2000)  // waitInterval
                         , rnd => rnd  // iterate

And now convert to a CEP stream:

var cepInput = stockTicks.ToCEP()
                .ToCepStream(tick => tick.Timestamp);

Where ToCep() is just the inverse of ToRx(), defined previously.

public static S.IObservable<T> ToCEP<T>(this RX.IObservable<T> rxSource)
    return new CEPAnonymousObservable<T>(
        (S.IObserver<T> cepObserver) => 
            RX.ObservableExtensions.Subscribe(rxSource, nextVal=> cepObserver.OnNext(nextVal)));

Computing the rolling 5 min VWAP, (grouped by symbol) takes some effort

var alteredDurationStream = cepInput
                            .AlterEventDuration(tick => TimeSpan.FromMinutes(5));

var fiveMinVWaps =  from fivemin in
                    group fivemin by fivemin.Symbol into fGrp
                    from evwindow in fGrp.Snapshot()
                    select new
                       Symbol = fGrp.Key, 
                       TotalAmount = evwindow.Sum(fmin => fmin.Size * fmin.Price),
                       TotalVolume = evwindow.Sum(fmin => fmin.Size), 

var fiveMinVWaps2 = from fivemin in fiveMinVWaps
                    select new VWAPItem()
                        Symbol = fivemin.Symbol,
                        VWAP = fivemin.TotalAmount / fivemin.TotalVolume,
                        Timestamp = DateTime.Now,

Although this nearly looks like conventional .Net Linq code, it isn’t. Think Linq2SQL. These are expressions, not pure CLR lambdas, so it’s not possible to place a breakpoint, nor are any arbitrary .Net computations allowed. The reason for the additional fiveMinVWaps2 projection is that it’s not possible to compute anything but the Sum or Avg in a SnapShot().

Now that we have the data, we can convert to back to a RX stream:

PropertyInfo tsProp = typeof(VWAPItem).GetProperty("Timestamp");
var vwaps = fiveMinVWaps2.ToObservable(tsProp).ToRX();

And update an ObservableCollection

vwaps.Post(_sc).Subscribe(item =>
    var exists = CEPOS1.Where(vw => vw.Symbol == item.Symbol).FirstOrDefault();
    if (exists == null)


Which displays on a UI



Comments have been disabled for this content.