January 2010 - Posts

The code samples and PowerPoint deck from my presentation on the RX to the New York ALT.NET group are available (and updated) on MSDN Code samples:

http://code.msdn.microsoft.com/RxDemos

And the slide deck

Recently my building has been having issues with its boilers, and the heat has been going out for longer than is comfortable. The superintendent that makes a habit of periodically checking on the status of each of the boilers. A workable approach certainly, but figured this would be ideal for a technology assist.

 

For $9, I purchased a USB thermometer, word on the web is that the software comes with its fairly miserable (and crashed  immediately on machine), but with some Google and reflector, was able to come up with a polling based API to read the temperature

public interface IUsbTEMPer
{
    double Temperature { get; }
}

With the RX, converting the API into a stream of data is just one line:

IObservable<double> ts = Observable.Generate(
Scheduler.Later, 
() => new Notification<double>.OnNext(usbTempReader.Temperature)
).Publish();

And getting some simple alerts is easy too:

ts.Buffer(new TimeSpan(1, 5, 0))
    .Select(fiveminOfTemp => fiveminOfTemp.Average())
    .Where(avgtemp => avgtemp < 65)
    .Subscribe(cold => ToTwiter("buildingstatus account..."));

There’s a pattern used in the Reactive Extensions that I’m calling the Anonymous Implementation.

You can see it in use on IObservable’s one method

IDisposable Subscribe(IObserver<T> observer);

Given an Observable which you want to subscribe to, the brute force, naive (or pedantic) approach would be to create a new class implementing IObserver<T> and then subscribe it.

public class BasicConsoleWriteLineObserver<T>: IObserver<T>
{
    public void OnNext(T value)
    {
        Console.WriteLine(value);
    }
    
    public void OnError(Exception ex) 
    {
        throw ex;
    }
    public void OnCompleted() { }
}

IObservable<int> stream = ...;
stream.Subscribe(new BasicConsoleWriteLineObserver<int>());

But a simpler method, one that dispenses of all the ceremony of creating a new Type is to use one of the Observer factory methods, like so:

IObserver<int> basicCWLobserver = Observer.Create((int value) => Console.WriteLine(value));
stream.Subscribe(basicCWLobserver);

Observer.Create (and it’s 4 overloads) will create an anonymous implementation of the interface needed. Of course you can skip a step or two and use the extension methods on IObservable to get even shorter versions of the same:

stream.Subscribe(Console.WriteLine);

This approach is great, for all but the most complex implementations, there’s no need to ever implement IObserver<T>.  And as they say, less code is better code.

 

There are a number of other interfaces that I wish used this approach. IValueConverter, and IComparer<T> come to mind first.

The Reactive Extensions for .Net offers plenty of ways to create IObservables

 

Some primitives

IObservable<int> obs = Observable.Empty<int>();
IObservable<int> obs = Observable.Return(0);
IObservable<int> obs = Observable.Throw<int>(new Exception());

Simple streams

IObservable<long> obs = Observable.Interval(new TimeSpan(0, 0, 1));
IObservable<long> obs = Observable.Timer(DateTimeOffset.Now.AddHours(1)); // Plus 7 more overloads
IObservable<int> obs = Observable.Repeat(1); // Plus 7 more overloads
IObservable<int> obs = Observable.Range(0, 1);

 

From async data

//From an Action or Func
Observable.Start(() => 1);

//From Task
Task.Factory.StartNew(...).ToObservable();

//From AsyncPattern
// typical use case is IO or Web service calls
Func<int,int,double> sampleFunc = (a,b) => 1d;
Func<int, int, IObservable<double>> funcObs = 
                  Observable.FromAsyncPattern<int, int, double>(sampleFunc.BeginInvoke, 
                                                                      sampleFunc.EndInvoke);
IObservable<double> obs = funcObs(1, 0);

From Events

public event EventHandler<EventArgs> AnEvent;
IObservable<IEvent<EventArgs>> fromEventObs = 
Observable.FromEvent<EventArgs>(h => this.AnEvent += h, 
                                      h => this.AnEvent -= h);
From Existing Collections
IEnumerable<int> ie = new int[] {};
observable = ie.ToObservable();

By Generate()

There are 20 overloads to generate. See some prior examples here

By Create()

This creates a cold stream

IObservable<int> observable = Observable.Create<int>(o =>
                                {
                                    o.OnNext(1);
                                    o.OnNext(2);
                                    o.OnCompleted();
                                    return () => { };
                                });

To make a hot stream via Create()

List<IObserver<int>> _subscribed = new List<IObserver<int>>();
private CreateHot()
{
    observable = Observable.Create<int>(o =>
    {
        _subscribed.Add(o);
        return () => _subscribed.Remove(o);
    });

}
private void onNext(int val)
{
    foreach (var o in _subscribed)
    {
        o.OnNext(val);
    }
}

But rather than using create, a subject provides a cleaner (thread safe and tested) way of doing the above

var subj = new Subject<int>();
observable = subj.Hide();
subj.OnNext(1);

Streaming OLAP is something that comes up over and over again in the “CEP space” – using the Reactive Extensions for .Net this demo shows the basics; filtering, grouping, aggregates, and concurrent queries.

To set the context, the idea here is focus on the “query” side, so the UI is aggressively simple – no nifty visualizations here, just enough to show working code. I chose a non-financial domain, filesystem changes in this case, simply because the simplified equities example is a bit overused.

Here’s a screenshot and download link http://code.msdn.microsoft.com/FSOlapRxDemo 

app screenshot

Let’s walk through the code:

Getting the data

raw data

The .Net class FileSystemWatcher will report changes to the filesystem as events, so to get a stream of filesystem changes we only need do the following:

    • get all the fixed drives
    • create a new FileSystemWatcher for each
    • convert the Changed, Deleted, and Created events to IObservables using FromEvent()
    • Merge() those into a single IObservable
    • map the EventArgs to to a more query friendly class
public static IObservable<FileChangeFact> GetFileSystemStream()
{
    var fsEventTypes = new string[] { "Changed", "Deleted", "Created" };

    IEnumerable<IObservable<IEvent<FileSystemEventArgs>>> fsEventsAsObservables =
        DriveInfo.GetDrives()
                .Where(di => di.DriveType == DriveType.Fixed)
                .Select(drive => new FileSystemWatcher(drive.RootDirectory.FullName) { … })
                .SelectMany(fsw => fsEventTypes.Select(eventType => Observable.FromEvent…;

    return  Observable.Merge(fsEventsAsObservables)
        .Select(fsea =>
        {
            var fi = new FileInfo(fsea.EventArgs.FullPath);
            return new FileChangeFact
            {
                ChangeType = fsea.EventArgs.ChangeType,
                Path = fsea.EventArgs.FullPath,
                IsContainer = !fi.Exists,
                Length = fi.Exists ? fi.Length : 0,
                Extension = String.IsNullOrEmpty(fi.Extension) ? "(none)" : fi.Extension
            };
        });
}

Calculating Aggregates, take 1

total aggregates

The Scan() operator is ideal for computing aggregates in a streaming olap scenario. Unlike traditional queries where vectors are aggregated into a single value, we want to computing running values.

So to compute a few of the most common query aggregates, Count, Sum, Mean, and StdDev we can do the following:

public static IObservable<double> Mean(this IObservable<double> source)
{
    var temp = new { N = 0, Mean = 0d };
    return source.Scan(temp, (cur, next) =>
    {
        var n = cur.N + 1;
        var delta = next - cur.Mean;
        var meanp = cur.Mean + delta / n;
        return new
        {
            N = n,
            Mean = meanp,
        };
    }).Select(it => it.Mean);
}
public static IObservable<double> StdDev(this IObservable<double> source)
{
    var temp = new { N = 0, Mean = 0d, M2 = 0d };
    return source.Scan(temp, (cur, next) =>
    {
        var n = cur.N + 1;
        var delta = next - cur.Mean;
        var meanp = cur.Mean + delta / n;
        return new
        {
            N = n,
            Mean = meanp,
            M2 = cur.M2 + delta * (next - meanp)
        };
    }).Select(it => Math.Sqrt(it.M2 / (it.N)));
}

var fss = GetFileSystemStream();
fss.Select(fcf => (double)fcf.Length)
        .Scan(0,(c, _) => c + 1) // Count
        .Zip(lenxs.Scan(0d,(c, n) => c + n), (cnt, sum) => new FileChangeAggregate(){Sum=sum,Count...
        .Zip(lenxs.Mean(), (fca, mean) => { fca.Mean = mean; return fca; })
        .Zip(lenxs.StdDev(), (fca, stddev) => { fca.StdDev = stddev; return fca; })
        //... subscribe()...

The first two methods are just wrappers around Scan(), and to compute multiple aggregates on a single stream, the Zip() operator comes in handy, letting us essentially stitch together multiple computations.

One nice aspect of this approach is that the code for each of the aggregates cohesive, loosely coupled, and composable – all nice attributes, but it’s verbose and redundant (in that the Mean is calculated twice) and I suspect not as performant. So that takes us to

 

Calculating Aggregates, take 2

public static IObservable<StatInfoItem<T>> ToCommonAggregates<T, TSrc>(
                   this IObservable<TSrc> source,
                   Func<TSrc, double> dataSelector,
                   Func<TSrc, T> itemSelector)
{
    return source.Scan(new StatInfoItem<T>(), (cur, next) =>
    {
        double data = dataSelector(next);
        T itemp = itemSelector(next);
        var n = cur.Count + 1;
        var delta = data - cur.Mean;
        var meanp = cur.Mean + delta / n;
        var m2 = cur.M2 + delta * (data - meanp);
        var stdDevp = Math.Sqrt(m2 / n);
        return new StatInfoItem<T>()
        {
            Item = itemp,
            Sum = data + cur.Sum,
            Count = n,
            Mean = meanp,
            M2 = m2,
            StdDev = stdDevp,
            Min = Math.Min(data, cur.Min),
            Max = Math.Max(data, cur.Max),
        };
    })
    .Skip(1); // need a seed, but don't want to include seed value in the output
}

Which also add Min() & Max(). Perhaps not as elegant, but using the above is quite easy:

IOvservable<StatInfoItem<string>> aggstream = 
fss.ToCommonAggregates(fcf => fcf.Length, _ => "Label")

Filtering and Multiple Queries

image

For the demo, each query will be a “drill down” into the filesystem, implemented as a filter on the full path of the file that changed. This is done with the same Where() operator used in normal Linq. Adding multiple queries isn’t as trivial. Unlike IEnumerables, which contain their data, Observables only have a promise of future data. So if we want to say find out how much has changed in c:\windows, we need to query a store of past data as well as include future data. Here’s one approach using the StartWith() operator and a ConcurrentQueue :

private IObservable<FileChangeFact> _fss;
private ConcurrentQueue<FileChangeFact> _store = new …
private void NewQuery()
{
    _fss = GetFileSystemStream();
    _fss.Subscribe(fsi => _store.Enqueue(fsi));
    var newQuery = _fss.StartWith(_store.ToArray()).Where(fsi => {...} );
} 

A cleaner approach is to take advantage of the RX’s subjects, in particular the ReplaySubject which store history for us.

private ReplaySubject<FileChangeFact> _storeSubject;
_storeSubject = GetFileSystemStream().Replay();
var drillDownQry = _storeSubject.Where(fltr);

Grouping

grouping

Thanks to the GroupBy() and SelectMany() operators grouping turns out to be quite easy. First we group by the file extention, selecting the Length property. And for each group, compute the aggregates.

IObservable<StatInfoItem<string>> grouped = 
                newQuerystream.GroupBy(fsi => fsi.Extension, fsi => (double)fsi.Length)
.SelectMany(grp => grp.ToCommonAggregates(x => x, _ => grp.Key));

Updating the UI

Merging a stream of data into an ObservableCollection<T> is such a common operation. I’ve found myself wishing that MS offered a KeyedObservableCollection<T> and I’ve considered writting such a thing, but instead I used a smaller extension method MergeInsert() which does the job without the all the additional ceremony of a new class

public static IDisposable MergeInsert<T, TKey>(
                 this ObservableCollection<T> col, 
                 IObservable<T> stream, 
                 Func<T, TKey> keySelector)
{
    col.Clear();
    Dictionary<TKey, int> lookupTable = new Dictionary<TKey, int>();
    return stream.Subscribe(item =>
    {
        var key = keySelector(item);
        if (!lookupTable.ContainsKey(key))
        {
            lookupTable[key] = col.Count;
            col.Add(item);
        }
        else
        {
            col[lookupTable[key]] = item;
        }
    });
}

Use it like so:

GroupedByExtentionCollection = new ObservableCollection<StatInfoItem<string>>();
IDisposable _disp = GroupedByExtentionCollection.MergeInsert(grouped.ObserveOnDispatcher(), 
                                                             sii => sii.Item);
More Posts