From the ReactiveFramework to StreamInsight and Back

In my last post I showed how to send StreamInsight output streams to a UI via the ReactiveFramework. Here’s we’ll do the reverse, by sending an RX stream into a CEP stream. Instead of a partial example, I’ll use an end to end example showing simulated stock ticks, computing the 5 min rolling VWAP, and showing the results on a UI.

 

First we’ll generate the ticks:

System.Collections.Generic.IObservable<StockTick> stockTicks =
    System.Linq.Observable.Generate(
                new Random() // inital state
                 , rnd => true // continue
                 , rnd => new StockTick() // next value
                         {
                             Price = rnd.NextDouble() * 1000,
                             Side = Sides[rnd.Next(Sides.Length - 1)],
                             Size = (long)(rnd.NextDouble() * 1000),
                             Symbol = Symbols[rnd.Next(Symbols.Length - 1)],
                             Timestamp = DateTime.Now
                         }
                         , rnd => (int)(rnd.NextDouble() * 2000)  // waitInterval
                         , rnd => rnd  // iterate
                         );

And now convert to a CEP stream:

var cepInput = stockTicks.ToCEP()
                .ToCepStream(tick => tick.Timestamp);

Where ToCep() is just the inverse of ToRx(), defined previously.

public static S.IObservable<T> ToCEP<T>(this RX.IObservable<T> rxSource)
{
    return new CEPAnonymousObservable<T>(
        (S.IObserver<T> cepObserver) => 
            RX.ObservableExtensions.Subscribe(rxSource, nextVal=> cepObserver.OnNext(nextVal)));
}

Computing the rolling 5 min VWAP, (grouped by symbol) takes some effort

var alteredDurationStream = cepInput
                            .ToPointEventStream()
                            .AlterEventDuration(tick => TimeSpan.FromMinutes(5));

var fiveMinVWaps =  from fivemin in
                    alteredDurationStream
                    group fivemin by fivemin.Symbol into fGrp
                    from evwindow in fGrp.Snapshot()
                    select new
                    { 
                       Symbol = fGrp.Key, 
                       TotalAmount = evwindow.Sum(fmin => fmin.Size * fmin.Price),
                       TotalVolume = evwindow.Sum(fmin => fmin.Size), 
                    };

var fiveMinVWaps2 = from fivemin in fiveMinVWaps
                    select new VWAPItem()
                    {
                        Symbol = fivemin.Symbol,
                        VWAP = fivemin.TotalAmount / fivemin.TotalVolume,
                        Timestamp = DateTime.Now,
                    };

Although this nearly looks like conventional .Net Linq code, it isn’t. Think Linq2SQL. These are expressions, not pure CLR lambdas, so it’s not possible to place a breakpoint, nor are any arbitrary .Net computations allowed. The reason for the additional fiveMinVWaps2 projection is that it’s not possible to compute anything but the Sum or Avg in a SnapShot().

Now that we have the data, we can convert to back to a RX stream:

PropertyInfo tsProp = typeof(VWAPItem).GetProperty("Timestamp");
var vwaps = fiveMinVWaps2.ToObservable(tsProp).ToRX();

And update an ObservableCollection

vwaps.Post(_sc).Subscribe(item =>
{
    var exists = CEPOS1.Where(vw => vw.Symbol == item.Symbol).FirstOrDefault();
    if (exists == null)
        CEPOS1.Add(item);
    else
        exists.CopyFrom(item);
});

 

Which displays on a UI

image

Published Tuesday, August 25, 2009 1:01 AM by Scott Weinstein

Comments

Tuesday, August 25, 2009 11:15 AM by DotNetShoutout

# From the ReactiveFramework to StreamInsight and Back - Scott Weinstein

Thank you for submitting this cool story - Trackback from DotNetShoutout

Wednesday, August 26, 2009 10:38 AM by Niels Berglund

# re: From the ReactiveFramework to StreamInsight and Back

Hey Scott,

I see you got it to work - cool!

However, your Timestamp field, does that give you the right data? See my post on the forum about this.

Niels

Wednesday, August 26, 2009 11:04 AM by Scott Weinstein

# re: From the ReactiveFramework to StreamInsight and Back

Niels,

I did, and thanks for the assist! Good discovery about the timestamps. I havn't checked in this example, but then again, I havn't verified any of the data.

I think there's a bug in the sliding window, as there are no CTI events in the stream. Further eval w/ preped data, rather than random data is needed.

Wednesday, August 26, 2009 11:18 AM by Niels Berglund

# re: From the ReactiveFramework to StreamInsight and Back

Sure, no probs - glad to be able to help.

If you find out anything about the sliding window - I'd be glad to hear.

Niels

Leave a Comment

(required) 
(required) 
(optional)
(required)