From the ReactiveFramework to StreamInsight and Back
In my last post I showed how to send StreamInsight output streams to a UI via the ReactiveFramework. Here’s we’ll do the reverse, by sending an RX stream into a CEP stream. Instead of a partial example, I’ll use an end to end example showing simulated stock ticks, computing the 5 min rolling VWAP, and showing the results on a UI.
First we’ll generate the ticks:
System.Collections.Generic.IObservable<StockTick> stockTicks =
System.Linq.Observable.Generate(
new Random() // inital state
, rnd => true // continue
, rnd => new StockTick() // next value
{
Price = rnd.NextDouble() * 1000,
Side = Sides[rnd.Next(Sides.Length - 1)],
Size = (long)(rnd.NextDouble() * 1000),
Symbol = Symbols[rnd.Next(Symbols.Length - 1)],
Timestamp = DateTime.Now
}
, rnd => (int)(rnd.NextDouble() * 2000) // waitInterval
, rnd => rnd // iterate
);
And now convert to a CEP stream:
var cepInput = stockTicks.ToCEP()
.ToCepStream(tick => tick.Timestamp);
Where ToCep() is just the inverse of ToRx(), defined previously.
public static S.IObservable<T> ToCEP<T>(this RX.IObservable<T> rxSource)
{
return new CEPAnonymousObservable<T>(
(S.IObserver<T> cepObserver) =>
RX.ObservableExtensions.Subscribe(rxSource, nextVal=> cepObserver.OnNext(nextVal)));
}
Computing the rolling 5 min VWAP, (grouped by symbol) takes some effort
var alteredDurationStream = cepInput
.ToPointEventStream()
.AlterEventDuration(tick => TimeSpan.FromMinutes(5));
var fiveMinVWaps = from fivemin in
alteredDurationStream
group fivemin by fivemin.Symbol into fGrp
from evwindow in fGrp.Snapshot()
select new
{
Symbol = fGrp.Key,
TotalAmount = evwindow.Sum(fmin => fmin.Size * fmin.Price),
TotalVolume = evwindow.Sum(fmin => fmin.Size),
};
var fiveMinVWaps2 = from fivemin in fiveMinVWaps
select new VWAPItem()
{
Symbol = fivemin.Symbol,
VWAP = fivemin.TotalAmount / fivemin.TotalVolume,
Timestamp = DateTime.Now,
};
Although this nearly looks like conventional .Net Linq code, it isn’t. Think Linq2SQL. These are expressions, not pure CLR lambdas, so it’s not possible to place a breakpoint, nor are any arbitrary .Net computations allowed. The reason for the additional fiveMinVWaps2 projection is that it’s not possible to compute anything but the Sum or Avg in a SnapShot().
Now that we have the data, we can convert to back to a RX stream:
PropertyInfo tsProp = typeof(VWAPItem).GetProperty("Timestamp");
var vwaps = fiveMinVWaps2.ToObservable(tsProp).ToRX();
And update an ObservableCollection
vwaps.Post(_sc).Subscribe(item =>
{
var exists = CEPOS1.Where(vw => vw.Symbol == item.Symbol).FirstOrDefault();
if (exists == null)
CEPOS1.Add(item);
else
exists.CopyFrom(item);
});
Which displays on a UI