Routing StreamInsight output streams to a UI

One compelling feature of StreamInsight is it’s in-process hosting model. In addition to reducing the complexity of server side installs, it’s now possible to have a  CEP engine in the client UI.

The simplest way of getting CEP streams onto the UI would be the Reactive Framework methods. Something like

queryOutputStream
    .ToObservable(...)
    .Post(syncContext)
    .Subscribe(item=> collection.Add(item) );

But in the CTP that won’t work. As I discovered a few days ago The IObservable used in StreamInsight is defined in a different namespace and assembly than the IObservable in the System.Reactive. Furthermore the StreamInsight api lacks the base classes and extension methods defined in System.Reactive.

I didn’t want to go the normal route of creating an implementation of IObserver, on say a ViewModel, route the data through the dispatcher on onto a collection, as while it would have the benefits of simplicity and it would work, it would mean giving up on all the goodness in System.Reactive.

The first method I tried in an effort to convert a CEP IObservable into an RX IObservable didn’t work, but was instructive nonetheless.

Using StreamInsight’s own I/O adapter API, I would create an “Eventing” Adapter which would raise an conventional .Net event on an object of my choosing, then using the ReactiveFramework, convert that event to an RX IObservable. 

But it’s not easy (or possible) to do. Instances of output adapters are created by OutputAdapterFactories, which in turn are created by Factory methods. You can send in a configuration object, but it needs to be XML serializable, so there’s no sending in of Action<> delegates.

 

But it turns out that it’s not hard to convert from a CEP IObservable to an RX IObservable.

First you need a CEP AnonymousObserver<T>

using S = System;
internal class AnonymousObserver<T> : S.IObserver<T>
{
    private bool isStopped;
    private S.Action _onCompleted;
    private S.Action<S.Exception> _onError;
    private S.Action<T> _onNext;

    public AnonymousObserver(S.Action<T> onNext, S.Action<S.Exception> onError)
        : this(onNext, onError, () => { })
    {
    }
    public AnonymousObserver(S.Action<T> onNext, S.Action<S.Exception> onError, S.Action onCompleted)
    {
        _onNext = onNext;
        _onError = onError;
        _onCompleted = onCompleted;
    }
    public void OnCompleted()
    {
        if (!isStopped)
        {
            isStopped = true;
            _onCompleted();
        }
    }
    public void OnError(S.Exception exception)
    {
        if (!isStopped)
        {
            isStopped = true;
            _onError(exception);
        }
    }
    public void OnNext(T value)
    {
        if (!isStopped)
            _onNext(value);
    }
}

Then an extension method taking a CEP IObservable, returning a RX AnonymousObservable<T>, subscribing to it via the CEP IObserver and on the OnNext, calling the returning RX IObservable’s on OnNext.

Like so:

using RX = System.Collections.Generic;
using S = System;

public static class CEPExtMethods
{
    public static RX.IObservable<T> ToRX<T>(this S.IObservable<T> source)
    {
        return new AnonymousObservable<T>(
            (RX.IObserver<T> rxObserver) =>
                            source.Subscribe(new AnonymousObserver<T>(
                                    nextVal => rxObserver.OnNext(nextVal),
                                    rxObserver.OnError
                                )));
    }
}

To use it:

var queryOutputStream = CreateQueryTemplate(input);
var queryOutput = queryOutputStream.ToObservable(typeof(EventTypeCount).GetField("Time"));
queryOutput.ToRX().Send(_sc).Subscribe(v => this.CEPOS1.Add(v));

 

And results from the Observable sample on screen

image

No Comments