DIY Stream Processing in C# with Events, Lambdas, DataTables, and StopWatches

Coral8 provides a custom language dedicated to event stream processing/complex event processing class/continuous intelligence. As clichéd as it may be, Coral8's strength is also its weakness, in that once you start working in ccl, you give up all the tools ( compilers, code generators, static analysis, profilers, debuggers, etc.) of a general-purpose language. As a result, every so often I start musing about how much effort it would take to re-create some of its stream based processing capabilities in C#

Largely out of intellectual curiosity here's my first take.

1) Replace streams of tuples with event Action<T> DataReceived;

2) Steal an idea from F#, and make the event stream a first-class object and add filtering

public interface IStreamedDataSource<T>
{
    event Action<T> DataReceived;
    IStreamedDataSource<T> Where(Predicate<T> p);
}

use it like so:

var rssStream = new RSSDataStream(args);
rssStream.Where(item => 
    item.Author == "Joe" 
    && item.Comments > 500 
    && item.Length < 200);
rssStream.DataReceived += trade =>
{
    Console.WriteLine("Item = {0}", item.Title);
};

3) Add the ability to split and transform streams, which looks like so on use:

var rssStream = twitterStream.Fork(
//condition
twit =>
 {
     twitsTable[twit.Author] ++;
     return (twitsTable[twit.Author] > threshhold);
 },
//new type
rss => new
         {
            OrigTwit = twit,...
        });
rssStream.DataReceived += rss => { ...}

and the implementation

public static class StreamedDataSourceEXMethods
{
    public static IStreamedDataSource<TResult> Fork<TSource, TResult>(
        this IStreamedDataSource<TSource> source,
        Predicate<TSource> filter,
        Func<TSource, TResult> map)
    {
        return new ForkingStreamDataSource<TResult, TSource>(source, filter, map);
    }
}    public class ForkingStreamDataSource<T, TSource> : IStreamedDataSource<T>
{
    public event Action<T> DataReceived;
    public ForkingStreamDataSource(IStreamedDataSource<TSource> source)
    {
        source.DataReceived += data => OnSourceData(data);
    }
    public ForkingStreamDataSource(IStreamedDataSource<TSource> source, 
        Predicate<TSource> sourceFilter, 
        Func<TSource, T> map)
    {
        Map = map;
        SourceFilter = sourceFilter;
        source.DataReceived += data => OnSourceData(data);
    }
    private void OnSourceData(TSource srcData)
    {
        if (DataReceived == null || Map == null) return;
        if (SourceFilter != null && !SourceFilter(srcData)) return;

        T resultData = Map(srcData);
        if (_whereFilter != null && !_whereFilter(resultData)) return;

        foreach (Action<T> del in DataReceived.GetInvocationList())
        {
            var d2 = del;
            del.BeginInvoke(resultData, cb => d2.EndInvoke(cb), null);
        }
    }
    public IStreamedDataSource<T> Where(Predicate<T> p)
    {
        _whereFilter = p;
        return this;
    }

    protected Predicate<TSource> SourceFilter { private get; set; }
    protected Func<TSource, T> Map { private get; set; }

    private Predicate<T> _whereFilter = null;
}
 

4) and finally (for this post) a basic window.  data tables get you about halfway there - under the hood they use red black trees, so they're efficient, and implementation provides events for row inserts, changes, and deletes. All that needs to be added is window policy. what I have right now isn't very elegant, it clearly needs a lot more work and features, but shows that this approach is viable.

//Keep last PK for timetoExpire
public class BasicRecentTwits :TwitsDataTable
{
    public BasicRecetTwitsWindow(TimeSpan timetoExpire)
    {
        Action<TimeSpan> watcher = WindowWatcher;
        watcher.BeginInvoke(timetoExpire, cb => watcher.EndInvoke(cb), null);
        this.RowChanged += (sender, e) =>
        {
            if (e.Action == DataRowAction.Change || e.Action == DataRowAction.Add)
            {
                lock (_lck)
                    _watchers[e.Row] = Stopwatch.StartNew();
                _wResetEvent.Set();
            }
        };
    }

    private void WindowWatcher(TimeSpan timetoExpire)
    {
       while (true)
       {
           _wResetEvent.WaitOne();
           DataRow[] expiredRows = null;
           TimeSpan oldestTimer;
           lock (_lck)
               oldestTimer = (from sw in _watchers.Values select sw.Elapsed).Max();
           var diffTime = timetoExpire - oldestTimer;
           if (diffTime > TimeSpan.Zero)
               Thread.Sleep(diffTime);
           lock (_lck)
           {
               expiredRows = (from kvp in _watchers where timetoExpire - kvp.Value.Elapsed <= TimeSpan.Zero select kvp.Key).ToArray();
               foreach (DataRow exRow in expiredRows)
                   _watchers.Remove(exRow);
           }
           _sc.Post(expRows =>
           {
               foreach (DataRow r in ((DataRow[])expRows))
                   r.Delete();
               AcceptChanges();
           }, expiredRows);
       }
    }
    private object _lck = new object();
    private SynchronizationContext _sc = new SynchronizationContext();
    private ManualResetEvent _wResetEvent = new ManualResetEvent(false);
    private Dictionary<DataRow, Stopwatch> _watchers = new Dictionary<DataRow, Stopwatch>();
}

No Comments