The bug I mentioned in my first attempt at a sliding window was the minor issue that the aggegates never went down to 0, even if the window had emptied out.

The problem line of code was cur.GroupBy(tsst => tsst.Value.Symbol) – if the window is empty, there is nothing to group – and as a result the aggregates don’t get computed.

Here’s the fix:

public IObservable<VWAPItem> GetVWAPWS(IObservable<Timestamped<StockTick>> oticks)
{
    var existingWindows = new ConcurrentDictionary<string,int>();
    return oticks
              .ToSlidingWindow(new TimeSpan(0, 0, 0, 30), new TimeSpan(0, 0, 0, 0, 500))
              .Select(sl => sl.Current)
              .SelectMany(cur =>
                  {
                      IEnumerable<VWAPItem> grouped = cur.GroupBy(tsst => tsst.Value.Symbol)
                          .Select(grp =>
                              {
                                  IEnumerable<StockTick> ticks = grp.Select(tsst2 => tsst2.Value);
                                  var totalAmount = ticks.Sum(tk => tk.Size * tk.Price);
                                  var totalVolume = ticks.Sum(tk => tk.Size);
                                  return new VWAPItem(grp.Key, totalAmount, 
								totalVolume, 
								totalAmount / totalVolume);
                              });
                      foreach (var grpd in grouped)
                      {
                          existingWindows[grpd.Symbol] = 1;
                      } 
                     IEnumerable< IEnumerable<VWAPItem>> outerJoin = existingWindows
                                                        .GroupJoin(grouped, 
                                                            key => key.Key, 
                                                            grped => grped.Symbol,
                                                            (key, item) => 
								item.DefaultIfEmpty(new VWAPItem(key.Key, 0, 0, 0)));
                     return outerJoin.SelectMany(x => x);
                  });
 
}

A few months ago, playing with CTP 2 of StreamInsight, I created a small VWAP demo on a sliding window. Now that a proper CTP of the RX is available, I wanted to see how much effort the same demo would be without the CEP infrastructure of StreamInsight. I’ll admit that this was a little bit harder to write then I expected – and there’s still at least one bug remaining (updated) , but the code for actually computing the VWAPS feels much cleaner in the RX version then it did in the StreamInsight version. The debugability (which is really about transparency) of RX is a welcome difference to most CEP systems.

So here’s the code:

The generation of stock ticks remained nearly identical – however instead of timestamping by hand, I used to Timestamp() extension method. And to allow multiple observers to the same IObservable, the ticks are routed to a Subject.

public IObservable<Timestamped<StockTick>> GetTicks()
{
    var subj = new Subject<Timestamped<StockTick>>();
    var gen = Observable.Generate(
                    0
                   , ii => ii < 1000  // produce 1000 ticks
                   , ii => new StockTick() // next value
...
                  )
                  .Timestamp();

    gen.Subscribe(tsst => subj.OnNext(tsst));
    return subj;
}

Compute VWAP on a 10 second sliding window

public IObservable<VWAPItem> GetVWAPWS(IObservable<Timestamped<StockTick>> oticks)
{
    return oticks
              .ToSlidingWindow(new TimeSpan(0, 0, 0,10), new TimeSpan(0, 0, 0, 0, 500))
              .Select(sl => sl.Current)
              .SelectMany(cur =>
                      cur.GroupBy(tsst => tsst.Value.Symbol)
                          .Select(grp =>
                              {
                                  IEnumerable<StockTick> ticks = grp.Select(tsst2 => tsst2.Value);
                                  var totalAmount = ticks.Sum(tk => tk.Size * tk.Price);
                                  var totalVolume = ticks.Sum(tk => tk.Size);
                                  return new VWAPItem(grp.Key, totalAmount, totalVolume, 
                                     totalAmount / totalVolume);
                              }));

}

And the code for ToSlidingWindow()

public static IObservable<SlidingWindow<Timestamped<T>>> ToSlidingWindow<T>(
                                     this IObservable<Timestamped<T>> source, 
              TimeSpan size, TimeSpan resolution)
{
    Func<SlidingWindow<Timestamped<T>>, TimeoutJoinItem<T>, SlidingWindow<Timestamped<T>>> 
     windowing = (window, item) =>
    {
        Func<Timestamped<T>, bool> checkTimestamp = 
                 cwi => cwi.Timestamp.Add(size) <= item.ComparisonTimestamp;

        var newCurrent = window.Current.SkipWhile(checkTimestamp);
        var removed = window.Current.TakeWhile(checkTimestamp);

        var added = Enumerable.Repeat(item.TSItem, (item.IsTimeout) ? 0 : 1);
        return 
             new SlidingWindow<Timestamped<T>>(newCurrent.Concat(added), added, removed);
    };

    DateTime priorleft = DateTime.MinValue;
    return source.CombineLatest(Observable.Timer(resolution, resolution).Timestamp(), 
           (left, right) =>
           {
               bool isTimeout = left.Timestamp == priorleft;
               priorleft = left.Timestamp;
               return new TimeoutJoinItem<T>(left,
                         (isTimeout)? right.Timestamp: left.Timestamp,  
                          isTimeout);
           }).Scan(new SlidingWindow<Timestamped<T>>(), windowing)
             .Where(sl => sl.Added.Count() > 0 || sl.Removed.Count() > 0);
}

The key elements in the above are

  • Observable.Timer – this is our heartbeat which allows us to detect passage of time without new events
  • CombineLatest – Join two IObservables – the data stream and the time stream
  • Scan – this is Accumulate() for Observables – the windowing function takes the current Window and computes the new windows based on which elements have expired and been added
  • And finally reduce noise by removing SlidingWindows which have not changed

The code to wire it up to a windows is standard stuff, just

var ticks = _model.GetTicks();
ticks
    .ObserveOnDispatcher()
    .Subscribe(tst => TickCollection.Add(tst));

var vwapDict= new Dictionary<string,VWAPItem>();
_model.GetVWAPWS(ticks)
    .ObserveOnDispatcher()
    .Subscribe(vwap =>
        {
            if (vwapDict.ContainsKey(vwap.Symbol))
                VWAPCollection.Remove(vwapDict[vwap.Symbol]);
            vwapDict[vwap.Symbol] = vwap;
            VWAPCollection.Add(vwap);
        });

And of course the required screenshot

image

kick it on DotNetKicks.com

Downloads of the Reactive Framework (RX) can now be found at MS DevLabs. Versions for 3.5 SP1, 4.0 Beta, and Silverlight 3 are available. Interestingly, the API size appears to be substantially larger than the preview which was leaked as part of the Silverlight 3 Toolkit. That DLL was all of 84KB, the current release is weighs in at 283KB.

 

In regards to CEP, the comparisons between StreamInsight and RX are interesting

RX   StreamInsight
Low – it’s managed code all the way down Leaking abstraction High; Linq2SI is like Linq2SQL, except the underling SI implementation isn’t well understood
Limited by CLR and GC Performance High b/c of native code
None out of box Windowing support Explicit
Easy Adaptor support Not hard, but not trivial

In my last post I showed how to send StreamInsight output streams to a UI via the ReactiveFramework. Here’s we’ll do the reverse, by sending an RX stream into a CEP stream. Instead of a partial example, I’ll use an end to end example showing simulated stock ticks, computing the 5 min rolling VWAP, and showing the results on a UI.

 

First we’ll generate the ticks:

System.Collections.Generic.IObservable<StockTick> stockTicks =
    System.Linq.Observable.Generate(
                new Random() // inital state
                 , rnd => true // continue
                 , rnd => new StockTick() // next value
                         {
                             Price = rnd.NextDouble() * 1000,
                             Side = Sides[rnd.Next(Sides.Length - 1)],
                             Size = (long)(rnd.NextDouble() * 1000),
                             Symbol = Symbols[rnd.Next(Symbols.Length - 1)],
                             Timestamp = DateTime.Now
                         }
                         , rnd => (int)(rnd.NextDouble() * 2000)  // waitInterval
                         , rnd => rnd  // iterate
                         );

And now convert to a CEP stream:

var cepInput = stockTicks.ToCEP()
                .ToCepStream(tick => tick.Timestamp);

Where ToCep() is just the inverse of ToRx(), defined previously.

public static S.IObservable<T> ToCEP<T>(this RX.IObservable<T> rxSource)
{
    return new CEPAnonymousObservable<T>(
        (S.IObserver<T> cepObserver) => 
            RX.ObservableExtensions.Subscribe(rxSource, nextVal=> cepObserver.OnNext(nextVal)));
}

Computing the rolling 5 min VWAP, (grouped by symbol) takes some effort

var alteredDurationStream = cepInput
                            .ToPointEventStream()
                            .AlterEventDuration(tick => TimeSpan.FromMinutes(5));

var fiveMinVWaps =  from fivemin in
                    alteredDurationStream
                    group fivemin by fivemin.Symbol into fGrp
                    from evwindow in fGrp.Snapshot()
                    select new
                    { 
                       Symbol = fGrp.Key, 
                       TotalAmount = evwindow.Sum(fmin => fmin.Size * fmin.Price),
                       TotalVolume = evwindow.Sum(fmin => fmin.Size), 
                    };

var fiveMinVWaps2 = from fivemin in fiveMinVWaps
                    select new VWAPItem()
                    {
                        Symbol = fivemin.Symbol,
                        VWAP = fivemin.TotalAmount / fivemin.TotalVolume,
                        Timestamp = DateTime.Now,
                    };

Although this nearly looks like conventional .Net Linq code, it isn’t. Think Linq2SQL. These are expressions, not pure CLR lambdas, so it’s not possible to place a breakpoint, nor are any arbitrary .Net computations allowed. The reason for the additional fiveMinVWaps2 projection is that it’s not possible to compute anything but the Sum or Avg in a SnapShot().

Now that we have the data, we can convert to back to a RX stream:

PropertyInfo tsProp = typeof(VWAPItem).GetProperty("Timestamp");
var vwaps = fiveMinVWaps2.ToObservable(tsProp).ToRX();

And update an ObservableCollection

vwaps.Post(_sc).Subscribe(item =>
{
    var exists = CEPOS1.Where(vw => vw.Symbol == item.Symbol).FirstOrDefault();
    if (exists == null)
        CEPOS1.Add(item);
    else
        exists.CopyFrom(item);
});

 

Which displays on a UI

image

One compelling feature of StreamInsight is it’s in-process hosting model. In addition to reducing the complexity of server side installs, it’s now possible to have a  CEP engine in the client UI.

The simplest way of getting CEP streams onto the UI would be the Reactive Framework methods. Something like

queryOutputStream
    .ToObservable(...)
    .Post(syncContext)
    .Subscribe(item=> collection.Add(item) );

But in the CTP that won’t work. As I discovered a few days ago The IObservable used in StreamInsight is defined in a different namespace and assembly than the IObservable in the System.Reactive. Furthermore the StreamInsight api lacks the base classes and extension methods defined in System.Reactive.

I didn’t want to go the normal route of creating an implementation of IObserver, on say a ViewModel, route the data through the dispatcher on onto a collection, as while it would have the benefits of simplicity and it would work, it would mean giving up on all the goodness in System.Reactive.

The first method I tried in an effort to convert a CEP IObservable into an RX IObservable didn’t work, but was instructive nonetheless.

Using StreamInsight’s own I/O adapter API, I would create an “Eventing” Adapter which would raise an conventional .Net event on an object of my choosing, then using the ReactiveFramework, convert that event to an RX IObservable. 

But it’s not easy (or possible) to do. Instances of output adapters are created by OutputAdapterFactories, which in turn are created by Factory methods. You can send in a configuration object, but it needs to be XML serializable, so there’s no sending in of Action<> delegates.

 

But it turns out that it’s not hard to convert from a CEP IObservable to an RX IObservable.

First you need a CEP AnonymousObserver<T>

using S = System;
internal class AnonymousObserver<T> : S.IObserver<T>
{
    private bool isStopped;
    private S.Action _onCompleted;
    private S.Action<S.Exception> _onError;
    private S.Action<T> _onNext;

    public AnonymousObserver(S.Action<T> onNext, S.Action<S.Exception> onError)
        : this(onNext, onError, () => { })
    {
    }
    public AnonymousObserver(S.Action<T> onNext, S.Action<S.Exception> onError, S.Action onCompleted)
    {
        _onNext = onNext;
        _onError = onError;
        _onCompleted = onCompleted;
    }
    public void OnCompleted()
    {
        if (!isStopped)
        {
            isStopped = true;
            _onCompleted();
        }
    }
    public void OnError(S.Exception exception)
    {
        if (!isStopped)
        {
            isStopped = true;
            _onError(exception);
        }
    }
    public void OnNext(T value)
    {
        if (!isStopped)
            _onNext(value);
    }
}

Then an extension method taking a CEP IObservable, returning a RX AnonymousObservable<T>, subscribing to it via the CEP IObserver and on the OnNext, calling the returning RX IObservable’s on OnNext.

Like so:

using RX = System.Collections.Generic;
using S = System;

public static class CEPExtMethods
{
    public static RX.IObservable<T> ToRX<T>(this S.IObservable<T> source)
    {
        return new AnonymousObservable<T>(
            (RX.IObserver<T> rxObserver) =>
                            source.Subscribe(new AnonymousObserver<T>(
                                    nextVal => rxObserver.OnNext(nextVal),
                                    rxObserver.OnError
                                )));
    }
}

To use it:

var queryOutputStream = CreateQueryTemplate(input);
var queryOutput = queryOutputStream.ToObservable(typeof(EventTypeCount).GetField("Time"));
queryOutput.ToRX().Send(_sc).Subscribe(v => this.CEPOS1.Add(v));

 

And results from the Observable sample on screen

image

This morning I was hoping to take a few minutes to modify one of the examples in the StreamInsight CTP and send an output stream to a UI, rather than the text files used in the examples. I thought this would be easy, as the readme states that there’s

“An alpha version of the StreamInsight libraries for development using the IObservable/IObserver programming paradigm.”

But it wasn’t. The IObservable used in StreamInsight is defined in a different namespace than the IObservable in the System.Reactive and the StreamInsight api lacks the base classes and extension methods defined in System.Reactive. At this time, the two APIs do not play well with each other.

 

Some thoughts on how to get around this temporary inconsistency:

  • Recompile System.Reactive to use StreamInsight’s IObservable/IObserver
  • Create a type converter between the two IObservable/IObservers
  • Create a StreamInsight output adapter which just raises a .Net event, then use the RX method of converting events to IObservables

Perhaps tonight.

Talk around the water cooler is that it might be possible to use the Reactive Framework for some lightweight CEP.

I’ll correct some of the (big) mistakes from my last post and build up a “jumping” window extension method for IObservables.

 

In my last post I build a simple grouping method, but in it I immediately turned the push style of processing into a pull style by using the GetEnumerator() method. This is a bad idea for two key reasons, a) it takes the inherit elegance of the RX reduces it to a for loop, and b) it commits a cardinal sin of multi-threading and reserves a thread for a primarily blocking operation.

 

Here’s an improved version

public static IObservable<IEnumerable<TSource>> ToWindow<TSource>(
    this IObservable<TSource> source, 
    Func<TSource, IEnumerable<TSource>, bool> grouper)
{
    return RXGrouping.ToWindow(source, val => val, grouper);
}


public static IObservable<IEnumerable<TResult>> ToWindow<TSource, TResult>(
 this IObservable<TSource> source,
 Func<TSource, TResult> selector, 
 Func<TSource, IEnumerable<TResult>, bool> grouper)
{
    List<TResult> res = new List<TResult>();
    return new AnonymousObservable<IEnumerable<TResult>>(
        observer =>
            source.Subscribe(
            nextVal =>
            {
                try
                {
                    if (!grouper(nextVal, res))
                    {
                        observer.OnNext(res);
                        res = new List<TResult>();
                    }
                    res.Add(selector(nextVal));
                }
                catch (Exception exception)
                {
                    observer.OnError(exception);
                    return;
                }
            }
            ,observer.OnError
            ,observer.OnCompleted));
}

The mistake in the prior version stemmed in part from thinking that I needed to ask for the next value, but of course the RX will supply the next value when it’s available.

To use it:

TimeSpan windowDuration = new TimeSpan(0,0,10);
generatedNums
    // add a Timestamp to our raw data
    .Select(val => new { Timestamp = DateTime.Now, Value = val })
    // create a 5 min "jumping" window
    .ToWindow((lastVal, seq) => 
            (seq.Count() == 0) || 
            (lastVal.Timestamp - seq.First().Timestamp < windowDuration))
    // create item for display
    .Select(seq => new { Timestamp = seq.First().Timestamp
                        , Values = seq.Select(a => a.Value).ToArray()
                        , Average = seq.Average(a => a.Value) })
    // marshal and add to list
    .Post(sc).Subscribe(wv => WindowVals.Add(wv));

 

And the results

 image

Next we’ll look into creating a sliding window.

A few days ago, intentionally or not, a version of the Reactive Framework was released into the wild. Let’s see how we can use the RX for computations on a stream of data. As an example we’ll take a stream of ints and produce the averages in groups of five.

image

 

Here’s the primary stream of numbers, using the static Generate()  method

Random rnd = new Random();
var generatedNums = Observable.Generate<int,int>(
                    0 // seed
                    , d => true // condition to continue
                    , d => d % 12 //generated value 
                    , d => (int)(rnd.NextDouble() * 300) //delay
                    , d => d + 1 // modify value for next iter
                    );

And to consume the stream by adding the values into an ObservableCollection

generatedNums
    .Post(sc) // move onto UI thread
    .Subscribe(num => Numbers.Add(num) // add numbers to observable collection
    );
Computing the average, in groups of 5 turns out to be harder, as the Reactive FX doesn’t seem to have a GroupBy() method at this time. Here’s what I came up with:
generatedNums
    .Grouper(a => a, (a,list) => list.Count() < 5) // group into lists of 5, returning an IObservable<IEnumerable<int>>
    .Select(list => list.Average()) // take the average of the list, so project IObservable<IEnumerable<int>> to IObservable<int> 
    .Post(sc).Subscribe(mean => Averages.Add(mean) // move onto UI and add to observable collection
    );
And the implementation for “Grouper() 
public static IObservable<IEnumerable<TResult>> Grouper<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, TResult> selector
    , Func<TSource, IEnumerable<TResult>, bool> grouper) 
{
    return new AnonymousObservable<IEnumerable<TResult>>(
        observer =>
            source.Subscribe(x =>
        {
            try
            {
                using (var er = source.GetEnumerator())
                    while (er.MoveNext())
                    {
                        bool needsMove = false;
                        var res = new List<TResult>();
                        while (grouper(er.Current, res) && ((needsMove) ? er.MoveNext() : true))
                        {
                            needsMove = true;
                            res.Add(selector(er.Current));
                        }
                        observer.OnNext(res);
                    }
            }
            catch (Exception exception)
            {
                observer.OnError(exception);
                return;
            }
        },
        new Action<Exception>(observer.OnError),
        new Action(observer.OnCompleted)));
}

Via Jafar Husain - it appears that there’s a early release of the Live Labs Reactive Framework (& with Brian Beckman and Erik Meijer) in the latest Silverlight Toolkit

 

In addition to some of the standard LINQ operators (Select, Where, Aggregate), some new operators look quite promising  -

  • ForkJoin
  • Merge
  • Delay
  • HoldUntilChanged
  • Latest
  • Merge
  • Throttle

 

Powershell V2 has some great new features, in particular Add-Type and Remoting features are likely to be quite popular and work together without much issue. That said, there are edge cases which illustrate how the types returned from remoting calls. The following script demonstrates the issue

$csCode = @"
using System;
using System.Collections.Generic;
using System.Linq;
namespace Demo {
    public static class D
    {
        public static int Add(int a, int b)
        {
            return a + b;
        }
        public static int AddArray(int[] ints)
        {
            return ints.Sum();
        }
        public static int AddEnumerable(IEnumerable<object> ints)
        {
            return ints.Cast<int>().Sum();
        }
        public static int AddEnumerable(IEnumerable<int> ints)
        {
            return ints.Sum();
        }
    }
}
"@

Add-Type -TypeDefinition $csCode -Language CSharpVersion3
$oneRemote = Invoke-Command -ComputerName localhost  -ScriptBlock { return 1 }
$listRemote = Invoke-Command -ComputerName localhost  -ScriptBlock { return (1,2,3) }
$one = &{return 1}
if ($one -eq $oneRemote)
{
    Write-Host "1 == 1"
}

$v =  [Demo.D]::Add($one,$oneRemote)
Write-output "One + OneRemote = $v"  ; $v=$null
$v = [Demo.D]::AddArray(($one,$oneRemote))
Write-output "One + OneRemote via array =  $v" ; $v=$null

$v = [Demo.D]::AddArray($listRemote)
Write-output "One + OneRemote via remote array =  $v" ; $v=$null

$v = [Demo.D]::AddEnumerable((1,2,3,4))
Write-output "One + OneRemote via local IEnumerable =  $v" ; $v=$null

$v = [Demo.D]::AddEnumerable($listRemote)
Write-output "One + OneRemote via remote IEnumerable =  $v"; $v=$null



$oneRemote | Get-Member

The output from the above is

1 == 1
One + OneRemote = 2
One + OneRemote via array =  2
One + OneRemote via remote array =  6
One + OneRemote via local IEnumerable =  10 # so far as expected

 
Exception calling "AddEnumerable" with "1" argument(s): "Specified cast is not valid."
At typeDemo.ps1:49 char:29
+ $v = [Demo.D]::AddEnumerable <<<< ($listRemote)

   TypeName: System.Int32

Name               MemberType   Definition                                                                                                                                                                       
----               ----------   ----------                                                                                                                                                                       
CompareTo          Method       System.Int32 CompareTo(Object value)
Equals             Method       System.Boolean Equals(Object obj), System.Boolean Equals(Int32 obj)                                                                                                              
GetHashCode        Method       System.Int32 GetHashCode()                                                                                                                                                       
GetType            Method       System.Type GetType()                                                                                                                                                            
GetTypeCode        Method       System.TypeCode GetTypeCode()                                                                                                                                                    
ToString           Method       System.String ToString()
PSComputerName     NoteProperty System.String PSComputerName=localhost                                                                                                                                           
PSShowComputerName NoteProperty System.Boolean PSShowComputerName=True                                                                                                                                           
RunspaceId         NoteProperty System.Guid RunspaceId=e0dc5c05-c87d-41ad-afe0-16bc1b711f08                                                                                                                      

What’s happening under the covers is that the PowerShell reporting infrastructure is returning a PSObject. By inspecting the type via Get-Member you can see that it has some extra NoteProperties. To PowerShell and .Net methods that expect integers and arrays of integers, the object looks and behaves like it should. But if your Add-Types use a more LINQ style approach, which expects an IEnumerable<T>, the PowerShell type system doesn’t properly convert the adapted type to its native underlying type and runtime exceptions are the result.

Posted by Scott Weinstein | 2 comment(s)
Filed under: , , ,
More Posts « Previous page - Next page »