From the ReactiveFramework to StreamInsight and Back
In my last post I showed how to send StreamInsight output streams to a UI via the ReactiveFramework. Here’s we’ll do the reverse, by sending an RX stream into a CEP stream. Instead of a partial example, I’ll use an end to end example showing simulated stock ticks, computing the 5 min rolling VWAP, and showing the results on a UI.
First we’ll generate the ticks:
System.Collections.Generic.IObservable<StockTick> stockTicks = System.Linq.Observable.Generate( new Random() // inital state , rnd => true // continue , rnd => new StockTick() // next value { Price = rnd.NextDouble() * 1000, Side = Sides[rnd.Next(Sides.Length - 1)], Size = (long)(rnd.NextDouble() * 1000), Symbol = Symbols[rnd.Next(Symbols.Length - 1)], Timestamp = DateTime.Now } , rnd => (int)(rnd.NextDouble() * 2000) // waitInterval , rnd => rnd // iterate );
And now convert to a CEP stream:
var cepInput = stockTicks.ToCEP() .ToCepStream(tick => tick.Timestamp);
Where ToCep() is just the inverse of ToRx(), defined previously.
public static S.IObservable<T> ToCEP<T>(this RX.IObservable<T> rxSource) { return new CEPAnonymousObservable<T>( (S.IObserver<T> cepObserver) => RX.ObservableExtensions.Subscribe(rxSource, nextVal=> cepObserver.OnNext(nextVal))); }
Computing the rolling 5 min VWAP, (grouped by symbol) takes some effort
var alteredDurationStream = cepInput .ToPointEventStream() .AlterEventDuration(tick => TimeSpan.FromMinutes(5)); var fiveMinVWaps = from fivemin in alteredDurationStream group fivemin by fivemin.Symbol into fGrp from evwindow in fGrp.Snapshot() select new { Symbol = fGrp.Key, TotalAmount = evwindow.Sum(fmin => fmin.Size * fmin.Price), TotalVolume = evwindow.Sum(fmin => fmin.Size), }; var fiveMinVWaps2 = from fivemin in fiveMinVWaps select new VWAPItem() { Symbol = fivemin.Symbol, VWAP = fivemin.TotalAmount / fivemin.TotalVolume, Timestamp = DateTime.Now, };
Although this nearly looks like conventional .Net Linq code, it isn’t. Think Linq2SQL. These are expressions, not pure CLR lambdas, so it’s not possible to place a breakpoint, nor are any arbitrary .Net computations allowed. The reason for the additional fiveMinVWaps2 projection is that it’s not possible to compute anything but the Sum or Avg in a SnapShot().
Now that we have the data, we can convert to back to a RX stream:
PropertyInfo tsProp = typeof(VWAPItem).GetProperty("Timestamp"); var vwaps = fiveMinVWaps2.ToObservable(tsProp).ToRX();
And update an ObservableCollection
vwaps.Post(_sc).Subscribe(item => { var exists = CEPOS1.Where(vw => vw.Symbol == item.Symbol).FirstOrDefault(); if (exists == null) CEPOS1.Add(item); else exists.CopyFrom(item); });
Which displays on a UI