CEP Style Sliding windows in the RX – Take 2

The bug I mentioned in my first attempt at a sliding window was the minor issue that the aggegates never went down to 0, even if the window had emptied out.

The problem line of code was cur.GroupBy(tsst => tsst.Value.Symbol) – if the window is empty, there is nothing to group – and as a result the aggregates don’t get computed.

Here’s the fix:

public IObservable<VWAPItem> GetVWAPWS(IObservable<Timestamped<StockTick>> oticks)
{
    var existingWindows = new ConcurrentDictionary<string,int>();
    return oticks
              .ToSlidingWindow(new TimeSpan(0, 0, 0, 30), new TimeSpan(0, 0, 0, 0, 500))
              .Select(sl => sl.Current)
              .SelectMany(cur =>
                  {
                      IEnumerable<VWAPItem> grouped = cur.GroupBy(tsst => tsst.Value.Symbol)
                          .Select(grp =>
                              {
                                  IEnumerable<StockTick> ticks = grp.Select(tsst2 => tsst2.Value);
                                  var totalAmount = ticks.Sum(tk => tk.Size * tk.Price);
                                  var totalVolume = ticks.Sum(tk => tk.Size);
                                  return new VWAPItem(grp.Key, totalAmount, 
								totalVolume, 
								totalAmount / totalVolume);
                              });
                      foreach (var grpd in grouped)
                      {
                          existingWindows[grpd.Symbol] = 1;
                      } 
                     IEnumerable< IEnumerable<VWAPItem>> outerJoin = existingWindows
                                                        .GroupJoin(grouped, 
                                                            key => key.Key, 
                                                            grped => grped.Symbol,
                                                            (key, item) => 
								item.DefaultIfEmpty(new VWAPItem(key.Key, 0, 0, 0)));
                     return outerJoin.SelectMany(x => x);
                  });
 
}

5 Comments

  • Hi,

    Little bit late but nice work.

    What is ToSlidingWindow is it a SI CEP extension on observables?

    thanks

  • silly me, the definition is in the previous post to this one. but still I can't find the definition of SlidingWindow.

    thanks

  • @Steve -
    A sliding window has a fixed time size, but moves forward (slides) incrementally.

    Say from T0-99, then from T1-100, then T2-101. At all times, the time size of the window is 100 units.

  • Thanks for the reply Scott.
    So it is not some Rx or StreamInsight special class. Just a window on an observable.

    I came up with the following definition of SlidingWindow based on its usage in your code:

    public class SlidingWindow
    {
    public IObservable Current { get; set; }
    public IEnumerable Added { get; set; }
    public IEnumerable Removed { get; set; }

    public SlidingWindow(IObservable current, IEnumerable added, IEnumerable removed)
    {
    Current = current;
    Added = added;
    Removed = removed;
    }

    public SlidingWindow()
    {
    Current = Observable.Empty();
    Added = Enumerable.Empty();
    Removed = Enumerable.Empty();
    }
    }

    I am trying to make the coding run to get an advanced demo on Rx capabilities. I am learning Rx and your examples seems to be real life situation of Rx usage.

    Thanks for sharing these cool posts.

  • @Steve - just about. I've updated the source code samples to include the sliding window.

    http://code.msdn.microsoft.com/RxDemos

Comments have been disabled for this content.