CEP Style Sliding windows in the RX – Take 2

The bug I mentioned in my first attempt at a sliding window was the minor issue that the aggegates never went down to 0, even if the window had emptied out.

The problem line of code was cur.GroupBy(tsst => tsst.Value.Symbol) – if the window is empty, there is nothing to group – and as a result the aggregates don’t get computed.

Here’s the fix:

public IObservable<VWAPItem> GetVWAPWS(IObservable<Timestamped<StockTick>> oticks)
{
    var existingWindows = new ConcurrentDictionary<string,int>();
    return oticks
              .ToSlidingWindow(new TimeSpan(0, 0, 0, 30), new TimeSpan(0, 0, 0, 0, 500))
              .Select(sl => sl.Current)
              .SelectMany(cur =>
                  {
                      IEnumerable<VWAPItem> grouped = cur.GroupBy(tsst => tsst.Value.Symbol)
                          .Select(grp =>
                              {
                                  IEnumerable<StockTick> ticks = grp.Select(tsst2 => tsst2.Value);
                                  var totalAmount = ticks.Sum(tk => tk.Size * tk.Price);
                                  var totalVolume = ticks.Sum(tk => tk.Size);
                                  return new VWAPItem(grp.Key, totalAmount, 
								totalVolume, 
								totalAmount / totalVolume);
                              });
                      foreach (var grpd in grouped)
                      {
                          existingWindows[grpd.Symbol] = 1;
                      } 
                     IEnumerable< IEnumerable<VWAPItem>> outerJoin = existingWindows
                                                        .GroupJoin(grouped, 
                                                            key => key.Key, 
                                                            grped => grped.Symbol,
                                                            (key, item) => 
								item.DefaultIfEmpty(new VWAPItem(key.Key, 0, 0, 0)));
                     return outerJoin.SelectMany(x => x);
                  });
 
}
Published Sunday, November 29, 2009 11:01 PM by Scott Weinstein
Filed under: , , , ,

Comments

# Sliding Windows via the Reactive Framework - Scott Weinstein on .Net, Linq, PowerShell, WPF, and WCF

Pingback from  Sliding Windows via the Reactive Framework - Scott Weinstein on .Net, Linq, PowerShell, WPF, and WCF

# Twitter Trackbacks for CEP Style Sliding windows in the RX ??? Take 2 - Scott Weinstein on .Net, Linq, PowerShell, WPF, and WCF [asp.net] on Topsy.com

Pingback from  Twitter Trackbacks for                 CEP Style Sliding windows in the RX ??? Take 2 - Scott Weinstein on .Net, Linq, PowerShell, WPF, and WCF         [asp.net]        on Topsy.com

Tuesday, August 17, 2010 2:26 PM by steve

# re: CEP Style Sliding windows in the RX – Take 2

Hi,

Little bit late  but nice work.

What is ToSlidingWindow is it a SI CEP extension on observables?

thanks

Tuesday, August 17, 2010 8:01 PM by steve

# re: CEP Style Sliding windows in the RX – Take 2

silly me, the definition is in the previous post to this one. but still I can't find the definition of SlidingWindow.

thanks

Tuesday, August 17, 2010 10:30 PM by Scott Weinstein

# re: CEP Style Sliding windows in the RX – Take 2

@Steve -

A sliding window has a fixed time size, but moves forward (slides) incrementally.

Say from T0-99, then from T1-100, then T2-101. At all times, the time size of the window is 100 units.

Wednesday, August 18, 2010 7:49 AM by steve

# re: CEP Style Sliding windows in the RX – Take 2

Thanks for the reply Scott.

So it is not some Rx or StreamInsight special class. Just a window on an observable.

I came up with the following definition of SlidingWindow based on its usage in your code:

       public class SlidingWindow<T>

       {

           public IObservable<T> Current { get; set; }

           public IEnumerable<T> Added { get; set; }

           public IEnumerable<T> Removed { get; set; }

           public SlidingWindow(IObservable<T> current, IEnumerable<T> added, IEnumerable<T> removed)

           {

               Current = current;

               Added = added;

               Removed = removed;

           }

           public SlidingWindow()

           {

               Current = Observable.Empty<T>();

               Added = Enumerable.Empty<T>();

               Removed = Enumerable.Empty<T>();

           }

       }

I am trying to make the coding run to get an advanced demo on Rx capabilities. I am learning Rx and your examples seems to be real life situation of Rx usage.

Thanks for sharing these cool posts.

Sunday, August 22, 2010 3:20 PM by Scott Weinstein

# re: CEP Style Sliding windows in the RX – Take 2

@Steve - just about. I've updated the source code samples to include the sliding window.

code.msdn.microsoft.com/RxDemos

Leave a Comment

(required) 
(required) 
(optional)
(required)