I have an idea that it may be possible to predict build success/failure based on commit data. Why Scala? It’s a JVM language, has lots of powerful type features, and it has a linear algebra library which I’ll need later.

Project definition and build

Neither maven or the scala build tool (sbt) are completely satisfactory.

This maven **archetype** (what .Net folks would call a VS project template)

mvn archetype:generate `-DarchetypeGroupId=org.scala-tools.archetypes `-DarchetypeArtifactId=scala-archetype-simple  `-DremoteRepositories=http://scala-tools.org/repo-releases `-DgroupId=org.SW -DartifactId=BuildBreakPredictor

gets you started right away with “hello world” code, unit tests demonstrating a number of different testing approaches, and even a ready made `.gitignore` file - nice! But the Scala version is behind at v2.8, and more seriously, compiling and testing was painfully slow. So much that a rapid edit – test – edit cycle was not practical. So Lab49 colleague Steve Levine tells me that I can either adjust my pom to use fsc – the fast scala compiler, or use sbt.

Sbt has some nice features

  • It’s fast – it uses fsc by default
  • It has a continuous mode, so  `> ~test` will compile and run your unit test each time you save a file
  • It’s can consume (and produce) Maven 2 dependencies
  • the build definition file can be much shorter than the equivalent pom (about 1/5 the size, as repos and dependencies can be declared on a single line)

And some real limitations

  • Limited support for 3rd party integration – for instance out of the box, TeamCity doesn’t speak sbt, nor does IntelliJ IDEA
  • Steeper learning curve for build steps outside the default

Side note: If a language has a fast compiler, why keep the slow compiler around? Even worse, why make it the default?

I choose sbt, for the faster development speed it offers.

Syntax

Scala APIs really like to use punctuation – sometimes this works well, as in the following

 map1 |+| map2 

The `|+|` defines a merge operator which does addition on the `values` of the maps.

It’s less useful here:

http(baseUrl / url >- parseJson[BuildStatus]
sure you can probably guess what `>-` does from the context, but how about `>~` or `>+`?

Language features

I’m still learning, so not much to say just yet. However case classes are quite usefull, implicits scare me, and type constructors have lots of power.

Community

A number of projects, such as https://github.com/scalala and https://github.com/scalaz/scalaz are split between github and google code – github for the src, and google code for the docs. Not sure I understand the motivation here.

Lab49 colleague Lee Campbell has a nice 7 part write-up on the Reactive Extensions

He says:

it is big in all sorts of ways:

  1. In the way that it tackles the Observer pattern is bold
  2. In the way it tackles concurrency is quite a shift from how I have done it before.
  3. The number of (extension) methods is huge.
  4. The way in which it integrates with LINQ to leverage LINQ's compensability & declarative style
  5. The fact that any .NET dev should care UI, backend algorithm coder or Integrator. It helps all of us.
  6. The future plans are even more grand, but that is a different series all together :-)

The series covers

The latest releases of the Reactive Extensions for .Net include an abstract VirtualScheduler and a concrete implementation called TestScheduler.

So now it’s possible test time dependent code without relying on the passage of time (or tide).

Here’s a sample of code that would take 3 days to complete in the real

[Fact(Timeout = 1000)]
public void TestScheduler()
{
    List<long> actual = new List<long>();
    Observable.Interval(TimeSpan.FromDays(1), _testSched)
                                .Take(3)
                                .Subscribe(actual.Add);
    _testSched.Run();
    Assert.Equal(new[] { 0L, 1, 2 }, actual.ToArray());
}

Notice that I didn’t use a blocking call, such as

.Take(3).ToEnumerable().ToArray()

to obtain a the values from the interval. The TestScheduler runs on the current thread, and as a result blocking calls never complete.

 

Here’s another example where we run for a specific duration. Usefull when testing Observables that never end

[Fact]
public void TestOneElementSlidingWindow()
{
    List<SlidingWindow<Timestamped<int>>> actual = new List<SlidingWindow<Timestamped<int>>>();
    var oneBeat = Observable.Return<int>(1, _testSched).Timestamp(_testSched);
    var sWindow = oneBeat.ToSlidingWindow(_oneSecond, _oneSecond, _testSched);
    sWindow.Subscribe(slw => actual.Add(slw));

    _testSched.RunTo(_testSched.FromTimeSpan(TimeSpan.FromSeconds(3)));

    Assert.Equal(2, actual.Count);

    Assert.Equal(1, actual[0].Added.Count());
    Assert.Equal(1, actual[0].Current.Count());
    Assert.Equal(0, actual[0].Removed.Count());

    Assert.Equal(0, actual[1].Added.Count());
    Assert.Equal(0, actual[1].Current.Count());
    Assert.Equal(1, actual[1].Removed.Count());
}

 

Code samples updated at http://code.msdn.microsoft.com/RxDemos

Also - Jeffrey van Gogh promises more to come on #c9

I gave a presentation at today’s SQL Saturday in NY on replacing SSIS with PowerShell.

You can view the presentation, or see below for the two second summary:

  • SSIS is a terrible development tool
  • Many SSIS features can be built with out much effort with PowerShell, C#, and the TPL
  • See the demos

The code is hosted at http://psis.codeplex.com/ Currently it has the following capabilities

  • Concurrent bulk data transfer
  • Single pass star-schema populator

Contact me if you’d like to contribute or collaborate on this.

Of late, our broadband internet has been feeling sluggish. A call to the company took way more hold-time than I wanted to spend, and it only fixed the problem for a short while. Thus a perfect opportunity to play with some new tech to solve a problem, in this case, documenting a systemic issue from a service provider.

The goal – a log a internet speeds, taken say every 15 min. Recording ping time, upload speed, download speed, and local LAN usage.

 

The solution

  • A WCF service to measure speeds
    • Internet speed was measured via speedtest.net
    • LAN usage was measured by querying my router for packets received and sent
  • A SQL express instance to persist the data
  • A PowerShell script to invoke the WCF service – launched by Windows’ Task Scheduler
  • An OData WCF Data Service to allow me to read the data
  • MS PowerPivot to show a nice viz (scratch that, the beta expired)
  • LinqPad to get the data, export it to excel
  • Tableau Public to show the viz

 

 

The code samples and PowerPoint deck from my presentation on the RX to the New York ALT.NET group are available (and updated) on MSDN Code samples:

http://code.msdn.microsoft.com/RxDemos

And the slide deck

Recently my building has been having issues with its boilers, and the heat has been going out for longer than is comfortable. The superintendent that makes a habit of periodically checking on the status of each of the boilers. A workable approach certainly, but figured this would be ideal for a technology assist.

 

For $9, I purchased a USB thermometer, word on the web is that the software comes with its fairly miserable (and crashed  immediately on machine), but with some Google and reflector, was able to come up with a polling based API to read the temperature

public interface IUsbTEMPer
{
    double Temperature { get; }
}

With the RX, converting the API into a stream of data is just one line:

IObservable<double> ts = Observable.Generate(
Scheduler.Later, 
() => new Notification<double>.OnNext(usbTempReader.Temperature)
).Publish();

And getting some simple alerts is easy too:

ts.Buffer(new TimeSpan(1, 5, 0))
    .Select(fiveminOfTemp => fiveminOfTemp.Average())
    .Where(avgtemp => avgtemp < 65)
    .Subscribe(cold => ToTwiter("buildingstatus account..."));

There’s a pattern used in the Reactive Extensions that I’m calling the Anonymous Implementation.

You can see it in use on IObservable’s one method

IDisposable Subscribe(IObserver<T> observer);

Given an Observable which you want to subscribe to, the brute force, naive (or pedantic) approach would be to create a new class implementing IObserver<T> and then subscribe it.

public class BasicConsoleWriteLineObserver<T>: IObserver<T>
{
    public void OnNext(T value)
    {
        Console.WriteLine(value);
    }
    
    public void OnError(Exception ex) 
    {
        throw ex;
    }
    public void OnCompleted() { }
}

IObservable<int> stream = ...;
stream.Subscribe(new BasicConsoleWriteLineObserver<int>());

But a simpler method, one that dispenses of all the ceremony of creating a new Type is to use one of the Observer factory methods, like so:

IObserver<int> basicCWLobserver = Observer.Create((int value) => Console.WriteLine(value));
stream.Subscribe(basicCWLobserver);

Observer.Create (and it’s 4 overloads) will create an anonymous implementation of the interface needed. Of course you can skip a step or two and use the extension methods on IObservable to get even shorter versions of the same:

stream.Subscribe(Console.WriteLine);

This approach is great, for all but the most complex implementations, there’s no need to ever implement IObserver<T>.  And as they say, less code is better code.

 

There are a number of other interfaces that I wish used this approach. IValueConverter, and IComparer<T> come to mind first.

The Reactive Extensions for .Net offers plenty of ways to create IObservables

 

Some primitives

IObservable<int> obs = Observable.Empty<int>();
IObservable<int> obs = Observable.Return(0);
IObservable<int> obs = Observable.Throw<int>(new Exception());

Simple streams

IObservable<long> obs = Observable.Interval(new TimeSpan(0, 0, 1));
IObservable<long> obs = Observable.Timer(DateTimeOffset.Now.AddHours(1)); // Plus 7 more overloads
IObservable<int> obs = Observable.Repeat(1); // Plus 7 more overloads
IObservable<int> obs = Observable.Range(0, 1);

 

From async data

//From an Action or Func
Observable.Start(() => 1);

//From Task
Task.Factory.StartNew(...).ToObservable();

//From AsyncPattern
// typical use case is IO or Web service calls
Func<int,int,double> sampleFunc = (a,b) => 1d;
Func<int, int, IObservable<double>> funcObs = 
                  Observable.FromAsyncPattern<int, int, double>(sampleFunc.BeginInvoke, 
                                                                      sampleFunc.EndInvoke);
IObservable<double> obs = funcObs(1, 0);

From Events

public event EventHandler<EventArgs> AnEvent;
IObservable<IEvent<EventArgs>> fromEventObs = 
Observable.FromEvent<EventArgs>(h => this.AnEvent += h, 
                                      h => this.AnEvent -= h);
From Existing Collections
IEnumerable<int> ie = new int[] {};
observable = ie.ToObservable();

By Generate()

There are 20 overloads to generate. See some prior examples here

By Create()

This creates a cold stream

IObservable<int> observable = Observable.Create<int>(o =>
                                {
                                    o.OnNext(1);
                                    o.OnNext(2);
                                    o.OnCompleted();
                                    return () => { };
                                });

To make a hot stream via Create()

List<IObserver<int>> _subscribed = new List<IObserver<int>>();
private CreateHot()
{
    observable = Observable.Create<int>(o =>
    {
        _subscribed.Add(o);
        return () => _subscribed.Remove(o);
    });

}
private void onNext(int val)
{
    foreach (var o in _subscribed)
    {
        o.OnNext(val);
    }
}

But rather than using create, a subject provides a cleaner (thread safe and tested) way of doing the above

var subj = new Subject<int>();
observable = subj.Hide();
subj.OnNext(1);

Streaming OLAP is something that comes up over and over again in the “CEP space” – using the Reactive Extensions for .Net this demo shows the basics; filtering, grouping, aggregates, and concurrent queries.

To set the context, the idea here is focus on the “query” side, so the UI is aggressively simple – no nifty visualizations here, just enough to show working code. I chose a non-financial domain, filesystem changes in this case, simply because the simplified equities example is a bit overused.

Here’s a screenshot and download link http://code.msdn.microsoft.com/FSOlapRxDemo 

app screenshot

Let’s walk through the code:

Getting the data

raw data

The .Net class FileSystemWatcher will report changes to the filesystem as events, so to get a stream of filesystem changes we only need do the following:

    • get all the fixed drives
    • create a new FileSystemWatcher for each
    • convert the Changed, Deleted, and Created events to IObservables using FromEvent()
    • Merge() those into a single IObservable
    • map the EventArgs to to a more query friendly class
public static IObservable<FileChangeFact> GetFileSystemStream()
{
    var fsEventTypes = new string[] { "Changed", "Deleted", "Created" };

    IEnumerable<IObservable<IEvent<FileSystemEventArgs>>> fsEventsAsObservables =
        DriveInfo.GetDrives()
                .Where(di => di.DriveType == DriveType.Fixed)
                .Select(drive => new FileSystemWatcher(drive.RootDirectory.FullName) { … })
                .SelectMany(fsw => fsEventTypes.Select(eventType => Observable.FromEvent…;

    return  Observable.Merge(fsEventsAsObservables)
        .Select(fsea =>
        {
            var fi = new FileInfo(fsea.EventArgs.FullPath);
            return new FileChangeFact
            {
                ChangeType = fsea.EventArgs.ChangeType,
                Path = fsea.EventArgs.FullPath,
                IsContainer = !fi.Exists,
                Length = fi.Exists ? fi.Length : 0,
                Extension = String.IsNullOrEmpty(fi.Extension) ? "(none)" : fi.Extension
            };
        });
}

Calculating Aggregates, take 1

total aggregates

The Scan() operator is ideal for computing aggregates in a streaming olap scenario. Unlike traditional queries where vectors are aggregated into a single value, we want to computing running values.

So to compute a few of the most common query aggregates, Count, Sum, Mean, and StdDev we can do the following:

public static IObservable<double> Mean(this IObservable<double> source)
{
    var temp = new { N = 0, Mean = 0d };
    return source.Scan(temp, (cur, next) =>
    {
        var n = cur.N + 1;
        var delta = next - cur.Mean;
        var meanp = cur.Mean + delta / n;
        return new
        {
            N = n,
            Mean = meanp,
        };
    }).Select(it => it.Mean);
}
public static IObservable<double> StdDev(this IObservable<double> source)
{
    var temp = new { N = 0, Mean = 0d, M2 = 0d };
    return source.Scan(temp, (cur, next) =>
    {
        var n = cur.N + 1;
        var delta = next - cur.Mean;
        var meanp = cur.Mean + delta / n;
        return new
        {
            N = n,
            Mean = meanp,
            M2 = cur.M2 + delta * (next - meanp)
        };
    }).Select(it => Math.Sqrt(it.M2 / (it.N)));
}

var fss = GetFileSystemStream();
fss.Select(fcf => (double)fcf.Length)
        .Scan(0,(c, _) => c + 1) // Count
        .Zip(lenxs.Scan(0d,(c, n) => c + n), (cnt, sum) => new FileChangeAggregate(){Sum=sum,Count...
        .Zip(lenxs.Mean(), (fca, mean) => { fca.Mean = mean; return fca; })
        .Zip(lenxs.StdDev(), (fca, stddev) => { fca.StdDev = stddev; return fca; })
        //... subscribe()...

The first two methods are just wrappers around Scan(), and to compute multiple aggregates on a single stream, the Zip() operator comes in handy, letting us essentially stitch together multiple computations.

One nice aspect of this approach is that the code for each of the aggregates cohesive, loosely coupled, and composable – all nice attributes, but it’s verbose and redundant (in that the Mean is calculated twice) and I suspect not as performant. So that takes us to

 

Calculating Aggregates, take 2

public static IObservable<StatInfoItem<T>> ToCommonAggregates<T, TSrc>(
                   this IObservable<TSrc> source,
                   Func<TSrc, double> dataSelector,
                   Func<TSrc, T> itemSelector)
{
    return source.Scan(new StatInfoItem<T>(), (cur, next) =>
    {
        double data = dataSelector(next);
        T itemp = itemSelector(next);
        var n = cur.Count + 1;
        var delta = data - cur.Mean;
        var meanp = cur.Mean + delta / n;
        var m2 = cur.M2 + delta * (data - meanp);
        var stdDevp = Math.Sqrt(m2 / n);
        return new StatInfoItem<T>()
        {
            Item = itemp,
            Sum = data + cur.Sum,
            Count = n,
            Mean = meanp,
            M2 = m2,
            StdDev = stdDevp,
            Min = Math.Min(data, cur.Min),
            Max = Math.Max(data, cur.Max),
        };
    })
    .Skip(1); // need a seed, but don't want to include seed value in the output
}

Which also add Min() & Max(). Perhaps not as elegant, but using the above is quite easy:

IOvservable<StatInfoItem<string>> aggstream = 
fss.ToCommonAggregates(fcf => fcf.Length, _ => "Label")

Filtering and Multiple Queries

image

For the demo, each query will be a “drill down” into the filesystem, implemented as a filter on the full path of the file that changed. This is done with the same Where() operator used in normal Linq. Adding multiple queries isn’t as trivial. Unlike IEnumerables, which contain their data, Observables only have a promise of future data. So if we want to say find out how much has changed in c:\windows, we need to query a store of past data as well as include future data. Here’s one approach using the StartWith() operator and a ConcurrentQueue :

private IObservable<FileChangeFact> _fss;
private ConcurrentQueue<FileChangeFact> _store = new …
private void NewQuery()
{
    _fss = GetFileSystemStream();
    _fss.Subscribe(fsi => _store.Enqueue(fsi));
    var newQuery = _fss.StartWith(_store.ToArray()).Where(fsi => {...} );
} 

A cleaner approach is to take advantage of the RX’s subjects, in particular the ReplaySubject which store history for us.

private ReplaySubject<FileChangeFact> _storeSubject;
_storeSubject = GetFileSystemStream().Replay();
var drillDownQry = _storeSubject.Where(fltr);

Grouping

grouping

Thanks to the GroupBy() and SelectMany() operators grouping turns out to be quite easy. First we group by the file extention, selecting the Length property. And for each group, compute the aggregates.

IObservable<StatInfoItem<string>> grouped = 
                newQuerystream.GroupBy(fsi => fsi.Extension, fsi => (double)fsi.Length)
.SelectMany(grp => grp.ToCommonAggregates(x => x, _ => grp.Key));

Updating the UI

Merging a stream of data into an ObservableCollection<T> is such a common operation. I’ve found myself wishing that MS offered a KeyedObservableCollection<T> and I’ve considered writting such a thing, but instead I used a smaller extension method MergeInsert() which does the job without the all the additional ceremony of a new class

public static IDisposable MergeInsert<T, TKey>(
                 this ObservableCollection<T> col, 
                 IObservable<T> stream, 
                 Func<T, TKey> keySelector)
{
    col.Clear();
    Dictionary<TKey, int> lookupTable = new Dictionary<TKey, int>();
    return stream.Subscribe(item =>
    {
        var key = keySelector(item);
        if (!lookupTable.ContainsKey(key))
        {
            lookupTable[key] = col.Count;
            col.Add(item);
        }
        else
        {
            col[lookupTable[key]] = item;
        }
    });
}

Use it like so:

GroupedByExtentionCollection = new ObservableCollection<StatInfoItem<string>>();
IDisposable _disp = GroupedByExtentionCollection.MergeInsert(grouped.ObserveOnDispatcher(), 
                                                             sii => sii.Item);
More Posts Next page »