Downloads of the Reactive Framework (RX) can now be found at MS DevLabs. Versions for 3.5 SP1, 4.0 Beta, and Silverlight 3 are available. Interestingly, the API size appears to be substantially larger than the preview which was leaked as part of the Silverlight 3 Toolkit. That DLL was all of 84KB, the current release is weighs in at 283KB.
In regards to CEP, the comparisons between StreamInsight and RX are interesting
| RX | | StreamInsight |
| Low – it’s managed code all the way down | Leaking abstraction | High; Linq2SI is like Linq2SQL, except the underling SI implementation isn’t well understood |
| Limited by CLR and GC | Performance | High b/c of native code |
| None out of box | Windowing support | Explicit |
| Easy | Adaptor support | Not hard, but not trivial |
In my last post I showed how to send StreamInsight output streams to a UI via the ReactiveFramework. Here’s we’ll do the reverse, by sending an RX stream into a CEP stream. Instead of a partial example, I’ll use an end to end example showing simulated stock ticks, computing the 5 min rolling VWAP, and showing the results on a UI.
First we’ll generate the ticks:
System.Collections.Generic.IObservable<StockTick> stockTicks =
System.Linq.Observable.Generate(
new Random() // inital state
, rnd => true // continue
, rnd => new StockTick() // next value
{
Price = rnd.NextDouble() * 1000,
Side = Sides[rnd.Next(Sides.Length - 1)],
Size = (long)(rnd.NextDouble() * 1000),
Symbol = Symbols[rnd.Next(Symbols.Length - 1)],
Timestamp = DateTime.Now
}
, rnd => (int)(rnd.NextDouble() * 2000) // waitInterval
, rnd => rnd // iterate
);
And now convert to a CEP stream:
var cepInput = stockTicks.ToCEP()
.ToCepStream(tick => tick.Timestamp);
Where ToCep() is just the inverse of ToRx(), defined previously.
public static S.IObservable<T> ToCEP<T>(this RX.IObservable<T> rxSource)
{
return new CEPAnonymousObservable<T>(
(S.IObserver<T> cepObserver) =>
RX.ObservableExtensions.Subscribe(rxSource, nextVal=> cepObserver.OnNext(nextVal)));
}
Computing the rolling 5 min VWAP, (grouped by symbol) takes some effort
var alteredDurationStream = cepInput
.ToPointEventStream()
.AlterEventDuration(tick => TimeSpan.FromMinutes(5));
var fiveMinVWaps = from fivemin in
alteredDurationStream
group fivemin by fivemin.Symbol into fGrp
from evwindow in fGrp.Snapshot()
select new
{
Symbol = fGrp.Key,
TotalAmount = evwindow.Sum(fmin => fmin.Size * fmin.Price),
TotalVolume = evwindow.Sum(fmin => fmin.Size),
};
var fiveMinVWaps2 = from fivemin in fiveMinVWaps
select new VWAPItem()
{
Symbol = fivemin.Symbol,
VWAP = fivemin.TotalAmount / fivemin.TotalVolume,
Timestamp = DateTime.Now,
};
Although this nearly looks like conventional .Net Linq code, it isn’t. Think Linq2SQL. These are expressions, not pure CLR lambdas, so it’s not possible to place a breakpoint, nor are any arbitrary .Net computations allowed. The reason for the additional fiveMinVWaps2 projection is that it’s not possible to compute anything but the Sum or Avg in a SnapShot().
Now that we have the data, we can convert to back to a RX stream:
PropertyInfo tsProp = typeof(VWAPItem).GetProperty("Timestamp");
var vwaps = fiveMinVWaps2.ToObservable(tsProp).ToRX();
And update an ObservableCollection
vwaps.Post(_sc).Subscribe(item =>
{
var exists = CEPOS1.Where(vw => vw.Symbol == item.Symbol).FirstOrDefault();
if (exists == null)
CEPOS1.Add(item);
else
exists.CopyFrom(item);
});
Which displays on a UI

One compelling feature of StreamInsight is it’s in-process hosting model. In addition to reducing the complexity of server side installs, it’s now possible to have a CEP engine in the client UI.
The simplest way of getting CEP streams onto the UI would be the Reactive Framework methods. Something like
queryOutputStream
.ToObservable(...)
.Post(syncContext)
.Subscribe(item=> collection.Add(item) );
But in the CTP that won’t work. As I discovered a few days ago The IObservable used in StreamInsight is defined in a different namespace and assembly than the IObservable in the System.Reactive. Furthermore the StreamInsight api lacks the base classes and extension methods defined in System.Reactive.
I didn’t want to go the normal route of creating an implementation of IObserver, on say a ViewModel, route the data through the dispatcher on onto a collection, as while it would have the benefits of simplicity and it would work, it would mean giving up on all the goodness in System.Reactive.
The first method I tried in an effort to convert a CEP IObservable into an RX IObservable didn’t work, but was instructive nonetheless.
Using StreamInsight’s own I/O adapter API, I would create an “Eventing” Adapter which would raise an conventional .Net event on an object of my choosing, then using the ReactiveFramework, convert that event to an RX IObservable.
But it’s not easy (or possible) to do. Instances of output adapters are created by OutputAdapterFactories, which in turn are created by Factory methods. You can send in a configuration object, but it needs to be XML serializable, so there’s no sending in of Action<> delegates.
But it turns out that it’s not hard to convert from a CEP IObservable to an RX IObservable.
First you need a CEP AnonymousObserver<T>
using S = System;
internal class AnonymousObserver<T> : S.IObserver<T>
{
private bool isStopped;
private S.Action _onCompleted;
private S.Action<S.Exception> _onError;
private S.Action<T> _onNext;
public AnonymousObserver(S.Action<T> onNext, S.Action<S.Exception> onError)
: this(onNext, onError, () => { })
{
}
public AnonymousObserver(S.Action<T> onNext, S.Action<S.Exception> onError, S.Action onCompleted)
{
_onNext = onNext;
_onError = onError;
_onCompleted = onCompleted;
}
public void OnCompleted()
{
if (!isStopped)
{
isStopped = true;
_onCompleted();
}
}
public void OnError(S.Exception exception)
{
if (!isStopped)
{
isStopped = true;
_onError(exception);
}
}
public void OnNext(T value)
{
if (!isStopped)
_onNext(value);
}
}
Then an extension method taking a CEP IObservable, returning a RX AnonymousObservable<T>, subscribing to it via the CEP IObserver and on the OnNext, calling the returning RX IObservable’s on OnNext.
Like so:
using RX = System.Collections.Generic;
using S = System;
public static class CEPExtMethods
{
public static RX.IObservable<T> ToRX<T>(this S.IObservable<T> source)
{
return new AnonymousObservable<T>(
(RX.IObserver<T> rxObserver) =>
source.Subscribe(new AnonymousObserver<T>(
nextVal => rxObserver.OnNext(nextVal),
rxObserver.OnError
)));
}
}
To use it:
var queryOutputStream = CreateQueryTemplate(input);
var queryOutput = queryOutputStream.ToObservable(typeof(EventTypeCount).GetField("Time"));
queryOutput.ToRX().Send(_sc).Subscribe(v => this.CEPOS1.Add(v));
And results from the Observable sample on screen

This morning I was hoping to take a few minutes to modify one of the examples in the StreamInsight CTP and send an output stream to a UI, rather than the text files used in the examples. I thought this would be easy, as the readme states that there’s
“An alpha version of the StreamInsight libraries for development using the IObservable/IObserver programming paradigm.”
But it wasn’t. The IObservable used in StreamInsight is defined in a different namespace than the IObservable in the System.Reactive and the StreamInsight api lacks the base classes and extension methods defined in System.Reactive. At this time, the two APIs do not play well with each other.
Some thoughts on how to get around this temporary inconsistency:
- Recompile System.Reactive to use StreamInsight’s IObservable/IObserver
- Create a type converter between the two IObservable/IObservers
- Create a StreamInsight output adapter which just raises a .Net event, then use the RX method of converting events to IObservables
Perhaps tonight.
Talk around the water cooler is that it might be possible to use the Reactive Framework for some lightweight CEP.
I’ll correct some of the (big) mistakes from my last post and build up a “jumping” window extension method for IObservables.
In my last post I build a simple grouping method, but in it I immediately turned the push style of processing into a pull style by using the GetEnumerator() method. This is a bad idea for two key reasons, a) it takes the inherit elegance of the RX reduces it to a for loop, and b) it commits a cardinal sin of multi-threading and reserves a thread for a primarily blocking operation.
Here’s an improved version
public static IObservable<IEnumerable<TSource>> ToWindow<TSource>(
this IObservable<TSource> source,
Func<TSource, IEnumerable<TSource>, bool> grouper)
{
return RXGrouping.ToWindow(source, val => val, grouper);
}
public static IObservable<IEnumerable<TResult>> ToWindow<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, TResult> selector,
Func<TSource, IEnumerable<TResult>, bool> grouper)
{
List<TResult> res = new List<TResult>();
return new AnonymousObservable<IEnumerable<TResult>>(
observer =>
source.Subscribe(
nextVal =>
{
try
{
if (!grouper(nextVal, res))
{
observer.OnNext(res);
res = new List<TResult>();
}
res.Add(selector(nextVal));
}
catch (Exception exception)
{
observer.OnError(exception);
return;
}
}
,observer.OnError
,observer.OnCompleted));
}
The mistake in the prior version stemmed in part from thinking that I needed to ask for the next value, but of course the RX will supply the next value when it’s available.
To use it:
TimeSpan windowDuration = new TimeSpan(0,0,10);
generatedNums
// add a Timestamp to our raw data
.Select(val => new { Timestamp = DateTime.Now, Value = val })
// create a 5 min "jumping" window
.ToWindow((lastVal, seq) =>
(seq.Count() == 0) ||
(lastVal.Timestamp - seq.First().Timestamp < windowDuration))
// create item for display
.Select(seq => new { Timestamp = seq.First().Timestamp
, Values = seq.Select(a => a.Value).ToArray()
, Average = seq.Average(a => a.Value) })
// marshal and add to list
.Post(sc).Subscribe(wv => WindowVals.Add(wv));
And the results

Next we’ll look into creating a sliding window.
A few days ago, intentionally or not, a version of the Reactive Framework was released into the wild. Let’s see how we can use the RX for computations on a stream of data. As an example we’ll take a stream of ints and produce the averages in groups of five.
Here’s the primary stream of numbers, using the static Generate() method
Random rnd = new Random();
var generatedNums = Observable.Generate<int,int>(
0 // seed
, d => true // condition to continue
, d => d % 12 //generated value
, d => (int)(rnd.NextDouble() * 300) //delay
, d => d + 1 // modify value for next iter
);
And to consume the stream by adding the values into an ObservableCollection
generatedNums
.Post(sc) // move onto UI thread
.Subscribe(num => Numbers.Add(num) // add numbers to observable collection
);
Computing the average, in groups of 5 turns out to be harder, as the Reactive FX doesn’t seem to have a GroupBy() method at this time. Here’s what I came up with:
generatedNums
.Grouper(a => a, (a,list) => list.Count() < 5) // group into lists of 5, returning an IObservable<IEnumerable<int>>
.Select(list => list.Average()) // take the average of the list, so project IObservable<IEnumerable<int>> to IObservable<int>
.Post(sc).Subscribe(mean => Averages.Add(mean) // move onto UI and add to observable collection
);
And the implementation for “Grouper()” public static IObservable<IEnumerable<TResult>> Grouper<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, TResult> selector
, Func<TSource, IEnumerable<TResult>, bool> grouper)
{
return new AnonymousObservable<IEnumerable<TResult>>(
observer =>
source.Subscribe(x =>
{
try
{
using (var er = source.GetEnumerator())
while (er.MoveNext())
{
bool needsMove = false;
var res = new List<TResult>();
while (grouper(er.Current, res) && ((needsMove) ? er.MoveNext() : true))
{
needsMove = true;
res.Add(selector(er.Current));
}
observer.OnNext(res);
}
}
catch (Exception exception)
{
observer.OnError(exception);
return;
}
},
new Action<Exception>(observer.OnError),
new Action(observer.OnCompleted)));
}
Via Jafar Husain - it appears that there’s a early release of the Live Labs Reactive Framework (& with Brian Beckman and Erik Meijer) in the latest Silverlight Toolkit
In addition to some of the standard LINQ operators (Select, Where, Aggregate), some new operators look quite promising -
- ForkJoin
- Merge
- Delay
- HoldUntilChanged
- Latest
- Merge
- Throttle
Powershell V2 has some great new features, in particular Add-Type and Remoting features are likely to be quite popular and work together without much issue. That said, there are edge cases which illustrate how the types returned from remoting calls. The following script demonstrates the issue
$csCode = @"
using System;
using System.Collections.Generic;
using System.Linq;
namespace Demo {
public static class D
{
public static int Add(int a, int b)
{
return a + b;
}
public static int AddArray(int[] ints)
{
return ints.Sum();
}
public static int AddEnumerable(IEnumerable<object> ints)
{
return ints.Cast<int>().Sum();
}
public static int AddEnumerable(IEnumerable<int> ints)
{
return ints.Sum();
}
}
}
"@
Add-Type -TypeDefinition $csCode -Language CSharpVersion3
$oneRemote = Invoke-Command -ComputerName localhost -ScriptBlock { return 1 }
$listRemote = Invoke-Command -ComputerName localhost -ScriptBlock { return (1,2,3) }
$one = &{return 1}
if ($one -eq $oneRemote)
{
Write-Host "1 == 1"
}
$v = [Demo.D]::Add($one,$oneRemote)
Write-output "One + OneRemote = $v" ; $v=$null
$v = [Demo.D]::AddArray(($one,$oneRemote))
Write-output "One + OneRemote via array = $v" ; $v=$null
$v = [Demo.D]::AddArray($listRemote)
Write-output "One + OneRemote via remote array = $v" ; $v=$null
$v = [Demo.D]::AddEnumerable((1,2,3,4))
Write-output "One + OneRemote via local IEnumerable = $v" ; $v=$null
$v = [Demo.D]::AddEnumerable($listRemote)
Write-output "One + OneRemote via remote IEnumerable = $v"; $v=$null
$oneRemote | Get-Member
The output from the above is
1 == 1
One + OneRemote = 2
One + OneRemote via array = 2
One + OneRemote via remote array = 6
One + OneRemote via local IEnumerable = 10 # so far as expected
Exception calling "AddEnumerable" with "1" argument(s): "Specified cast is not valid."
At typeDemo.ps1:49 char:29
+ $v = [Demo.D]::AddEnumerable <<<< ($listRemote)
TypeName: System.Int32
Name MemberType Definition
---- ---------- ----------
CompareTo Method System.Int32 CompareTo(Object value)
Equals Method System.Boolean Equals(Object obj), System.Boolean Equals(Int32 obj)
GetHashCode Method System.Int32 GetHashCode()
GetType Method System.Type GetType()
GetTypeCode Method System.TypeCode GetTypeCode()
ToString Method System.String ToString()
PSComputerName NoteProperty System.String PSComputerName=localhost
PSShowComputerName NoteProperty System.Boolean PSShowComputerName=True
RunspaceId NoteProperty System.Guid RunspaceId=e0dc5c05-c87d-41ad-afe0-16bc1b711f08
What’s happening under the covers is that the PowerShell reporting infrastructure is returning a PSObject. By inspecting the type via Get-Member you can see that it has some extra NoteProperties. To PowerShell and .Net methods that expect integers and arrays of integers, the object looks and behaves like it should. But if your Add-Types use a more LINQ style approach, which expects an IEnumerable<T>, the PowerShell type system doesn’t properly convert the adapted type to its native underlying type and runtime exceptions are the result.
There are some interesting sessions at the 2009 EntDevCon.
Three of particular interest to me are are
1. “Coral8 Engine Integrated with Microsoft Windows HPC Server 2008” at 11 on Tuesday.
I had a chance to see some of the early bits, and for anyone interesting in streaming data with high computations needs, this promises to be well worth your time. In a future post I’ll talk about how this technology can be used.
Additionally, some co-workers at Lab49 will be presenting on Wednesday:
2. Marc Jacobs and Citi on “Agile Development using Low-Fidelity Prototypes, Expression Blend and SketchFlow:A Case Study”
3. Daniel Simon on “Patterns and practices:composite development for WPF and Silverlight using Composite Application Guidance (a.k.a. Prism 2.0)”
And if I have time, I might have a few hours at the Lab49 booth, stop by to say hi.
The PowerShell team has a short post on using V2 cmdlets to Monitor performance counters.
Building on that, and the prior work with the PoShAdapter, here’s a sample with Coral8 and PowerShell to alert you via SMS when the rolling 30 seconds average CPU hits a threshold.
create VARIABLE float CPUThreashold = 70;
create OUTPUT STREAM fakeout SCHEMA (x string);
CREATE SCHEMA CPUSchema (cpu FLOAT);
create INPUT STREAM inCPU SCHEMA CPUSchema;
create OUTPUT STREAM highCPU SCHEMA CPUSchema;
create local STREAM avgCPU SCHEMA CPUSchema;
ATTACH OUTPUT ADAPTER smsAdapter TYPE PoShAdapter TO STREAM highCPU
PROPERTIES
BEGINBLOCK = [[
Add-Type -AssemblyName System.Security
Add-Type -Path "C:\Program Files\Coral8\Server\bin\GmailHelper.dll"
$pass = ".."
]],
PROCESSBlock = [[
foreach ($cpuSpike in $input)
{
$m = "CPU spike at {0:0.00}" -f $cpuSpike['cpu']
[RC.Gmail.GmailMessage]::SendFromGmail("user", $pass,"6465555555@tmomail.net",
"Via Powershell/Coral8",$m)
}
]];
ATTACH OUTPUT ADAPTER CPUAdapter TYPE PoShAdapter TO STREAM fakeout
PROPERTIES
RESULTSSTREAM = "ccl://localhost:6789/Stream/Default/MonitorCPU/inCPU",
INPUTBLOCK = [[
Get-Counter '\Processor(*)\% Processor Time' |
select -expand CounterSamples |
? { $_.InstanceName -eq "_total" } |
% { ,,($_.CookedValue,0) } #
]];
INSERT into avgCPU
select avg(cpu)
from inCPU
keep 30 SECONDS
OUTPUT EVERY 30 SECONDS;
INSERT into highCPU
select cpu
from avgCPU
WHERE cpu > CPUThreashold;
Yields:

*I lowered the threshold for testing
** Yes, I should check my voice mail
More Posts
Next page »