Streaming OLAP with the Reactive Extensions (RX) for .Net

Streaming OLAP is something that comes up over and over again in the “CEP space” – using the Reactive Extensions for .Net this demo shows the basics; filtering, grouping, aggregates, and concurrent queries.

To set the context, the idea here is focus on the “query” side, so the UI is aggressively simple – no nifty visualizations here, just enough to show working code. I chose a non-financial domain, filesystem changes in this case, simply because the simplified equities example is a bit overused.

Here’s a screenshot and download link http://code.msdn.microsoft.com/FSOlapRxDemo 

app screenshot

Let’s walk through the code:

Getting the data

raw data

The .Net class FileSystemWatcher will report changes to the filesystem as events, so to get a stream of filesystem changes we only need do the following:

    • get all the fixed drives
    • create a new FileSystemWatcher for each
    • convert the Changed, Deleted, and Created events to IObservables using FromEvent()
    • Merge() those into a single IObservable
    • map the EventArgs to to a more query friendly class
public static IObservable<FileChangeFact> GetFileSystemStream()
{
    var fsEventTypes = new string[] { "Changed", "Deleted", "Created" };

    IEnumerable<IObservable<IEvent<FileSystemEventArgs>>> fsEventsAsObservables =
        DriveInfo.GetDrives()
                .Where(di => di.DriveType == DriveType.Fixed)
                .Select(drive => new FileSystemWatcher(drive.RootDirectory.FullName) { … })
                .SelectMany(fsw => fsEventTypes.Select(eventType => Observable.FromEvent…;

    return  Observable.Merge(fsEventsAsObservables)
        .Select(fsea =>
        {
            var fi = new FileInfo(fsea.EventArgs.FullPath);
            return new FileChangeFact
            {
                ChangeType = fsea.EventArgs.ChangeType,
                Path = fsea.EventArgs.FullPath,
                IsContainer = !fi.Exists,
                Length = fi.Exists ? fi.Length : 0,
                Extension = String.IsNullOrEmpty(fi.Extension) ? "(none)" : fi.Extension
            };
        });
}

Calculating Aggregates, take 1

total aggregates

The Scan() operator is ideal for computing aggregates in a streaming olap scenario. Unlike traditional queries where vectors are aggregated into a single value, we want to computing running values.

So to compute a few of the most common query aggregates, Count, Sum, Mean, and StdDev we can do the following:

public static IObservable<double> Mean(this IObservable<double> source)
{
    var temp = new { N = 0, Mean = 0d };
    return source.Scan(temp, (cur, next) =>
    {
        var n = cur.N + 1;
        var delta = next - cur.Mean;
        var meanp = cur.Mean + delta / n;
        return new
        {
            N = n,
            Mean = meanp,
        };
    }).Select(it => it.Mean);
}
public static IObservable<double> StdDev(this IObservable<double> source)
{
    var temp = new { N = 0, Mean = 0d, M2 = 0d };
    return source.Scan(temp, (cur, next) =>
    {
        var n = cur.N + 1;
        var delta = next - cur.Mean;
        var meanp = cur.Mean + delta / n;
        return new
        {
            N = n,
            Mean = meanp,
            M2 = cur.M2 + delta * (next - meanp)
        };
    }).Select(it => Math.Sqrt(it.M2 / (it.N)));
}

var fss = GetFileSystemStream();
fss.Select(fcf => (double)fcf.Length)
        .Scan(0,(c, _) => c + 1) // Count
        .Zip(lenxs.Scan(0d,(c, n) => c + n), (cnt, sum) => new FileChangeAggregate(){Sum=sum,Count...
        .Zip(lenxs.Mean(), (fca, mean) => { fca.Mean = mean; return fca; })
        .Zip(lenxs.StdDev(), (fca, stddev) => { fca.StdDev = stddev; return fca; })
        //... subscribe()...

The first two methods are just wrappers around Scan(), and to compute multiple aggregates on a single stream, the Zip() operator comes in handy, letting us essentially stitch together multiple computations.

One nice aspect of this approach is that the code for each of the aggregates cohesive, loosely coupled, and composable – all nice attributes, but it’s verbose and redundant (in that the Mean is calculated twice) and I suspect not as performant. So that takes us to

 

Calculating Aggregates, take 2

public static IObservable<StatInfoItem<T>> ToCommonAggregates<T, TSrc>(
                   this IObservable<TSrc> source,
                   Func<TSrc, double> dataSelector,
                   Func<TSrc, T> itemSelector)
{
    return source.Scan(new StatInfoItem<T>(), (cur, next) =>
    {
        double data = dataSelector(next);
        T itemp = itemSelector(next);
        var n = cur.Count + 1;
        var delta = data - cur.Mean;
        var meanp = cur.Mean + delta / n;
        var m2 = cur.M2 + delta * (data - meanp);
        var stdDevp = Math.Sqrt(m2 / n);
        return new StatInfoItem<T>()
        {
            Item = itemp,
            Sum = data + cur.Sum,
            Count = n,
            Mean = meanp,
            M2 = m2,
            StdDev = stdDevp,
            Min = Math.Min(data, cur.Min),
            Max = Math.Max(data, cur.Max),
        };
    })
    .Skip(1); // need a seed, but don't want to include seed value in the output
}

Which also add Min() & Max(). Perhaps not as elegant, but using the above is quite easy:

IOvservable<StatInfoItem<string>> aggstream = 
fss.ToCommonAggregates(fcf => fcf.Length, _ => "Label")

Filtering and Multiple Queries

image

For the demo, each query will be a “drill down” into the filesystem, implemented as a filter on the full path of the file that changed. This is done with the same Where() operator used in normal Linq. Adding multiple queries isn’t as trivial. Unlike IEnumerables, which contain their data, Observables only have a promise of future data. So if we want to say find out how much has changed in c:\windows, we need to query a store of past data as well as include future data. Here’s one approach using the StartWith() operator and a ConcurrentQueue :

private IObservable<FileChangeFact> _fss;
private ConcurrentQueue<FileChangeFact> _store = new …
private void NewQuery()
{
    _fss = GetFileSystemStream();
    _fss.Subscribe(fsi => _store.Enqueue(fsi));
    var newQuery = _fss.StartWith(_store.ToArray()).Where(fsi => {...} );
} 

A cleaner approach is to take advantage of the RX’s subjects, in particular the ReplaySubject which store history for us.

private ReplaySubject<FileChangeFact> _storeSubject;
_storeSubject = GetFileSystemStream().Replay();
var drillDownQry = _storeSubject.Where(fltr);

Grouping

grouping

Thanks to the GroupBy() and SelectMany() operators grouping turns out to be quite easy. First we group by the file extention, selecting the Length property. And for each group, compute the aggregates.

IObservable<StatInfoItem<string>> grouped = 
                newQuerystream.GroupBy(fsi => fsi.Extension, fsi => (double)fsi.Length)
.SelectMany(grp => grp.ToCommonAggregates(x => x, _ => grp.Key));

Updating the UI

Merging a stream of data into an ObservableCollection<T> is such a common operation. I’ve found myself wishing that MS offered a KeyedObservableCollection<T> and I’ve considered writting such a thing, but instead I used a smaller extension method MergeInsert() which does the job without the all the additional ceremony of a new class

public static IDisposable MergeInsert<T, TKey>(
                 this ObservableCollection<T> col, 
                 IObservable<T> stream, 
                 Func<T, TKey> keySelector)
{
    col.Clear();
    Dictionary<TKey, int> lookupTable = new Dictionary<TKey, int>();
    return stream.Subscribe(item =>
    {
        var key = keySelector(item);
        if (!lookupTable.ContainsKey(key))
        {
            lookupTable[key] = col.Count;
            col.Add(item);
        }
        else
        {
            col[lookupTable[key]] = item;
        }
    });
}

Use it like so:

GroupedByExtentionCollection = new ObservableCollection<StatInfoItem<string>>();
IDisposable _disp = GroupedByExtentionCollection.MergeInsert(grouped.ObserveOnDispatcher(), 
                                                             sii => sii.Item);
Published Saturday, January 2, 2010 8:28 PM by Scott Weinstein
Filed under: , , , ,

Comments

Thursday, February 24, 2011 6:55 PM by scamb

# re: Streaming OLAP with the Reactive Extensions (RX) for .Net

Hi Scott !

This is very impressive to me, thanks!! Is there any chance, that you publish the code [the link at top of the article leads into nirvana!]?

I really would like to play with this sample a bit, to learn to understand RX better!

Thanks a lot!

br++scamb