Streaming OLAP with the Reactive Extensions (RX) for .Net
Streaming OLAP is something that comes up over and over again in the “CEP space” – using the Reactive Extensions for .Net this demo shows the basics; filtering, grouping, aggregates, and concurrent queries.
To set the context, the idea here is focus on the “query” side, so the UI is aggressively simple – no nifty visualizations here, just enough to show working code. I chose a non-financial domain, filesystem changes in this case, simply because the simplified equities example is a bit overused.
Here’s a screenshot and download link http://code.msdn.microsoft.com/FSOlapRxDemo
Let’s walk through the code:
Getting the data
The .Net class FileSystemWatcher will report changes to the filesystem as events, so to get a stream of filesystem changes we only need do the following:
- get all the fixed drives
- create a new FileSystemWatcher for each
- convert the Changed, Deleted, and Created events to IObservables using FromEvent()
- Merge() those into a single IObservable
- map the EventArgs to to a more query friendly class
public static IObservable<FileChangeFact> GetFileSystemStream() { var fsEventTypes = new string[] { "Changed", "Deleted", "Created" }; IEnumerable<IObservable<IEvent<FileSystemEventArgs>>> fsEventsAsObservables = DriveInfo.GetDrives() .Where(di => di.DriveType == DriveType.Fixed) .Select(drive => new FileSystemWatcher(drive.RootDirectory.FullName) { … }) .SelectMany(fsw => fsEventTypes.Select(eventType => Observable.FromEvent…; return Observable.Merge(fsEventsAsObservables) .Select(fsea => { var fi = new FileInfo(fsea.EventArgs.FullPath); return new FileChangeFact { ChangeType = fsea.EventArgs.ChangeType, Path = fsea.EventArgs.FullPath, IsContainer = !fi.Exists, Length = fi.Exists ? fi.Length : 0, Extension = String.IsNullOrEmpty(fi.Extension) ? "(none)" : fi.Extension }; }); }
Calculating Aggregates, take 1
The Scan() operator is ideal for computing aggregates in a streaming olap scenario. Unlike traditional queries where vectors are aggregated into a single value, we want to computing running values.
So to compute a few of the most common query aggregates, Count, Sum, Mean, and StdDev we can do the following:
public static IObservable<double> Mean(this IObservable<double> source) { var temp = new { N = 0, Mean = 0d }; return source.Scan(temp, (cur, next) => { var n = cur.N + 1; var delta = next - cur.Mean; var meanp = cur.Mean + delta / n; return new { N = n, Mean = meanp, }; }).Select(it => it.Mean); } public static IObservable<double> StdDev(this IObservable<double> source) { var temp = new { N = 0, Mean = 0d, M2 = 0d }; return source.Scan(temp, (cur, next) => { var n = cur.N + 1; var delta = next - cur.Mean; var meanp = cur.Mean + delta / n; return new { N = n, Mean = meanp, M2 = cur.M2 + delta * (next - meanp) }; }).Select(it => Math.Sqrt(it.M2 / (it.N))); } var fss = GetFileSystemStream(); fss.Select(fcf => (double)fcf.Length) .Scan(0,(c, _) => c + 1) // Count .Zip(lenxs.Scan(0d,(c, n) => c + n), (cnt, sum) => new FileChangeAggregate(){Sum=sum,Count... .Zip(lenxs.Mean(), (fca, mean) => { fca.Mean = mean; return fca; }) .Zip(lenxs.StdDev(), (fca, stddev) => { fca.StdDev = stddev; return fca; }) //... subscribe()...
The first two methods are just wrappers around Scan(), and to compute multiple aggregates on a single stream, the Zip() operator comes in handy, letting us essentially stitch together multiple computations.
One nice aspect of this approach is that the code for each of the aggregates cohesive, loosely coupled, and composable – all nice attributes, but it’s verbose and redundant (in that the Mean is calculated twice) and I suspect not as performant. So that takes us to
Calculating Aggregates, take 2
public static IObservable<StatInfoItem<T>> ToCommonAggregates<T, TSrc>( this IObservable<TSrc> source, Func<TSrc, double> dataSelector, Func<TSrc, T> itemSelector) { return source.Scan(new StatInfoItem<T>(), (cur, next) => { double data = dataSelector(next); T itemp = itemSelector(next); var n = cur.Count + 1; var delta = data - cur.Mean; var meanp = cur.Mean + delta / n; var m2 = cur.M2 + delta * (data - meanp); var stdDevp = Math.Sqrt(m2 / n); return new StatInfoItem<T>() { Item = itemp, Sum = data + cur.Sum, Count = n, Mean = meanp, M2 = m2, StdDev = stdDevp, Min = Math.Min(data, cur.Min), Max = Math.Max(data, cur.Max), }; }) .Skip(1); // need a seed, but don't want to include seed value in the output }
Which also add Min() & Max(). Perhaps not as elegant, but using the above is quite easy:
IOvservable<StatInfoItem<string>> aggstream = fss.ToCommonAggregates(fcf => fcf.Length, _ => "Label")
Filtering and Multiple Queries
For the demo, each query will be a “drill down” into the filesystem, implemented as a filter on the full path of the file that changed. This is done with the same Where() operator used in normal Linq. Adding multiple queries isn’t as trivial. Unlike IEnumerables, which contain their data, Observables only have a promise of future data. So if we want to say find out how much has changed in c:\windows, we need to query a store of past data as well as include future data. Here’s one approach using the StartWith() operator and a ConcurrentQueue :
private IObservable<FileChangeFact> _fss; private ConcurrentQueue<FileChangeFact> _store = new … private void NewQuery() { _fss = GetFileSystemStream(); _fss.Subscribe(fsi => _store.Enqueue(fsi)); var newQuery = _fss.StartWith(_store.ToArray()).Where(fsi => {...} ); }
A cleaner approach is to take advantage of the RX’s subjects, in particular the ReplaySubject which store history for us.
private ReplaySubject<FileChangeFact> _storeSubject; _storeSubject = GetFileSystemStream().Replay(); var drillDownQry = _storeSubject.Where(fltr);
Grouping
Thanks to the GroupBy() and SelectMany() operators grouping turns out to be quite easy. First we group by the file extention, selecting the Length property. And for each group, compute the aggregates.
IObservable<StatInfoItem<string>> grouped = newQuerystream.GroupBy(fsi => fsi.Extension, fsi => (double)fsi.Length) .SelectMany(grp => grp.ToCommonAggregates(x => x, _ => grp.Key));
Updating the UI
Merging a stream of data into an ObservableCollection<T> is such a common operation. I’ve found myself wishing that MS offered a KeyedObservableCollection<T> and I’ve considered writting such a thing, but instead I used a smaller extension method MergeInsert() which does the job without the all the additional ceremony of a new class
public static IDisposable MergeInsert<T, TKey>( this ObservableCollection<T> col, IObservable<T> stream, Func<T, TKey> keySelector) { col.Clear(); Dictionary<TKey, int> lookupTable = new Dictionary<TKey, int>(); return stream.Subscribe(item => { var key = keySelector(item); if (!lookupTable.ContainsKey(key)) { lookupTable[key] = col.Count; col.Add(item); } else { col[lookupTable[key]] = item; } }); }
Use it like so:
GroupedByExtentionCollection = new ObservableCollection<StatInfoItem<string>>(); IDisposable _disp = GroupedByExtentionCollection.MergeInsert(grouped.ObserveOnDispatcher(), sii => sii.Item);