CEP Style Sliding windows in the RX – Take 2
The bug I mentioned in my first attempt at a sliding window was the minor issue that the aggegates never went down to 0, even if the window had emptied out.
The problem line of code was cur.GroupBy(tsst => tsst.Value.Symbol) – if the window is empty, there is nothing to group – and as a result the aggregates don’t get computed.
Here’s the fix:
public IObservable<VWAPItem> GetVWAPWS(IObservable<Timestamped<StockTick>> oticks)
{
var existingWindows = new ConcurrentDictionary<string,int>();
return oticks
.ToSlidingWindow(new TimeSpan(0, 0, 0, 30), new TimeSpan(0, 0, 0, 0, 500))
.Select(sl => sl.Current)
.SelectMany(cur =>
{
IEnumerable<VWAPItem> grouped = cur.GroupBy(tsst => tsst.Value.Symbol)
.Select(grp =>
{
IEnumerable<StockTick> ticks = grp.Select(tsst2 => tsst2.Value);
var totalAmount = ticks.Sum(tk => tk.Size * tk.Price);
var totalVolume = ticks.Sum(tk => tk.Size);
return new VWAPItem(grp.Key, totalAmount,
totalVolume,
totalAmount / totalVolume);
});
foreach (var grpd in grouped)
{
existingWindows[grpd.Symbol] = 1;
}
IEnumerable< IEnumerable<VWAPItem>> outerJoin = existingWindows
.GroupJoin(grouped,
key => key.Key,
grped => grped.Symbol,
(key, item) =>
item.DefaultIfEmpty(new VWAPItem(key.Key, 0, 0, 0)));
return outerJoin.SelectMany(x => x);
});
}