PDC is here again!

This year my friend and colleague Don Demsak and Andrew Brust will be presenting a business intelligence (BI) workshop covers some of the fundamental patterns and techniques that can help developers to build modern BI applications in the Microsoft platform. Specifically, the workshop will focus on how to merge the traditional aspects of BI solutions  such as analytics, reporting, data aggregation into widely adopted technologies like SharePoint Server and Office in general.

If you are attending to PDC you should definitely attend to this workshop. Andrew and Don and both fantastic speakers and thought leaders in the community. Given said that, feel free to give Don a hard time. At the end, it is an all day workshop ;)

Next Tuesday (November 10th) my colleague Uri Katsir will be presenting a session at the South Florida BizTalk Users Group about improving agility in large BizTalk environments using the ESB toolkit. If you are a BizTalk developer or operations architect working on complex BizTalk deployments you MUST attend Uri's session. In order to keep things in perspective, Uri has prepared a series of interesting demos that illustrate the techniques used to improve the development and management experience of BizTalk solutions by leveraging the ESB toolkit. The presentation is 100% focused on real world scenarios including some of the lessons Uri has learned while working on one of the most complex BizTalk environments in the world.

My friend and colleague Uri Katsir just joined the blogosphere.  Uri is a BizTalk wizard who is currently working on one of the biggest BizTalk implementations in the world. We are extremely proud to have him as part of our technical staff at Tellago. Uri has already posted a couple of interesting write-ups about ESB itineraries troubleshooting and deployment techniques.  

As the WCF team already announced, the first chapter of the WCF Extensibility Guidance is now available on MSDN . This paper is the result of an effort I started a few months ago with my colleague and friend Pablo Cibraro. Our goal was to provide a detailed guidance of the major extensibility points of the WCF runtime. In that sense, we decided to cover major areas of the WCF extensibility programming model such as channels, client-dispatcher, security, hosting, metadata, RESTful service and even the new extensions on WCF 4.0.

Another goal of this guidance was to focus it on real world scenarios. In that spirit, we decided to explain each extensibility using a format that start with an overview of its capabilities, followed by a set of real world scenarios on which developers should consider leveraging that extensibility point. Finally, we provide a detailed explanation of how to implement and configure that specific extensibility area in the WCF runtime.

Kent Brown (WCF Sr. Product Manager) has been the mastermind behind this paper and certainly the person that made this publication possible.

In a previous post we explained the programming model of Microsoft's StreamInsight adapter framework. The fundamental capability of this framework is to streamline the flow of events in and out of the StreamInsight hosting application. One of the main advantages of this model is that enables developers to create their own adapters that can be leveraged on StreamInsight-based solutions. On this post we will explore the details of implementing an RSS/Atom adapter using StreamInsight's adapter framework.

Syndication formats such as Atom and RSS have been on the front of the Web 2.0 movement. Nowadays, more and more systems are choosing these formats as a mechanisms to expose data to external applications. The initial use of RSS and Atom was mainly focused on read-only scenarios. However, the emergence of standards such as the Atom Publishing Protocol enable applications to publish syndication feeds to specific endpoint.

From the Complex Event Processing (CEP) perspective, syndication formats can be an interesting mechanism for both accessing and publishing events processed in a CEP application. If you live in the CEP world, the idea of using text-based data sources such as web pages or flat file should be pretty familiar to you. Recently we have seen some of the CEP market leaders to start embracing syndication formats as a mechanism for represent events. As an example, StreamBase (currently, one of the top CEP vendors in the market) recently announced it support for processing Twitter streams as part of CEP applications.

For the simplicity of this example we decided to implement a input-only adapter that reads an RSS feed and generates a set of events. The first step in our implementation would be to define the event types that describe the syndication items. We can achieve that by using the class definition illustrated in the following code.

   1: public class SyndicationEvent
   2:   {
   3:       private string id;
   4:       private string title;
   5:       private string summary;
   6:       private string text;
   7:       private string uri;
   8:       private DateTime createdTime;
   9:       private DateTime lastUpdatedTime;
  10:       private string author;
  11:  
  12:       public SyndicationEvent(SyndicationItem item)
  13:       {
  14:           id= item.Id;
  15:           title= item.Title.Text;
  16:           if (item.Summary.Text.Length > 256)
  17:               summary = item.Summary.Text.Substring(0, 256);
  18:           else
  19:               summary = item.Summary.Text;
  20:         
  21:           createdTime= item.PublishDate.DateTime;
  22:           lastUpdatedTime= item.LastUpdatedTime.DateTime;
  23:           uri= item.Links[0].ToString();
  24:           if (item.Authors.Count > 0)
  25:             author= item.Authors[0].Name;
  26:       }
  27:  
  28:       public SyndicationEvent()
  29:       { }
  30:  
  31:       public string ID
  32:       {
  33:           get { return id;  }
  34:           set { id = value; }
  35:       }
  36:  
  37:       public string Title
  38:       {
  39:           get { return title; }
  40:           set { title = value; }
  41:       }
  42:  
  43:       public string Summary
  44:       {
  45:           get { return summary; }
  46:           set { summary = value; }
  47:       }
  48:  
  49:       public string Text
  50:       {
  51:           get { return text; }
  52:           set { text = value; }
  53:       }
  54:  
  55:       public string Uri
  56:       {
  57:           get { return uri; }
  58:           set { uri = value; }
  59:       }
  60:  
  61:       public string Author
  62:       {
  63:           get { return author; }
  64:           set { author = value; }
  65:       }
  66:  
  67:       public DateTime CreatedTime
  68:       {
  69:           get { return createdTime; }
  70:           set { createdTime = value; }
  71:       }
  72:  
  73:       public DateTime LastUpdatedTime
  74:       {
  75:           get { return lastUpdatedTime; }
  76:           set { lastUpdatedTime = value; }
  77:       }
  78:  
  79:   }

As you might have noticed, our sample syndication event is initialized directly from a System.ServiceModel.Syndication.SyndicationItem object.

Given the time-based nature of syndication feeds, we decided to implement our sample adapter as an interval adapter by deriving from the TypedIntervalInputAdapter<T> class. Basically, an interval adapter process events valid during a specific time interval. You can find more details in my previous post.

For the sake of this example, our adapter polls the syndication URI when initialized. In order to implement this pattern, we leverage the programming model included in the System.ServiceModel.Syndication namespace as illustrated in the following code.

   1: public class RSS20InputAdapter : TypedIntervalInputAdapter<SyndicationEvent>
   2:         {
   3:             private IntervalEvent<SyndicationEvent> pendingevent;
   4:             private DateTime pendingctitime;
   5:  
   6:             SyndicationFeed dataFeed;
   7:             IEnumerator<SyndicationItem> dataFeedItems;
   8:  
   9:             int index = 0;
  10:  
  11:             public RSS20InputAdapter(SyndicationAdapterConfig configInfo, 
  12:                                      CepEventType cepEventType)
  13:             {
  14:                 Rss20FeedFormatter rssFormatter = new Rss20FeedFormatter
  15:                                                (typeof(SyndicationFeed));
  16:                 XmlReader rssReader = XmlReader.Create(configInfo.URL);
  17:                 rssFormatter.ReadFrom(rssReader);
  18:                 rssReader.Close();
  19:                 dataFeed = rssFormatter.Feed;
  20:                 dataFeedItems= dataFeed.Items.GetEnumerator();
  21:             }
  22:  
  23:            ....
  24: }

The StreamInsight runtime will invoke the Start operation of the adapter in order to start listening for events.

   1: public override void Start()
   2: {
   3:     ProduceEvents();
   4: }
   5:  
   6: private void ProduceEvents()
   7: {
   8:   IntervalEvent<SyndicationEvent> currevent=
   9:                      default(IntervalEvent<SyndicationEvent>);
  10:   DateTime currctitime = default(DateTime);
  11:   EnqueueOperationResult result = EnqueueOperationResult.Full;
  12:  
  13:   while (true)
  14:   {
  15:   if (AdapterState.Stopping == AdapterState)
  16:   {
  17:     this.Stopped();
  18:     return;
  19:   }
  20:  
  21:   currevent = CreateEventFromSource();
  22:   pendingevent = null;
  23:  
  24:   // Enqueue point event with payload.
  25:   if (null != currevent)
  26:     result = Enqueue(ref currevent);
  27:   else
  28:   {
  29:     result = EnqueueOperationResult.Full;
  30:     PrepareToStop(currevent);
  31:   }
  32:  
  33:   // Handle Enqueue rejection
  34:   if (EnqueueOperationResult.Full == result)
  35:   {
  36:     EnqueueCtiEvent(DateTime.MaxValue);
  37:     PrepareToStop(currevent);
  38:     Stopped();
  39:     return;
  40:                      
  41:   }
  42:   // Enqueue Cti event based on application logic
  43:   result = EnqueueCtiEvent(currctitime);
  44:   }
  45: }

Notice that the adapter will queue the events corresponding to the different syndication items until there are no more syndication entries available. After that, the adapter inserts a CTI event indicating that no subsequent INSERT events can have a start time earlier than the timestamp of the CTI event.

The ProduceEvents operation is also invoked as part of the Resume method.

   1: public override void Resume()
   2: {               
   3:   ProduceEvents();
   4: }

The rest of the implementation follows a similar pattern to most adapters and it fundamentally controls the state lifecycle of the adapter. The following code illustrates the complete implementation of our sample RSS adapter.

   1: public class RSS20InputAdapter : TypedIntervalInputAdapter<SyndicationEvent>
   2:         {
   3:             private IntervalEvent<SyndicationEvent> pendingevent;
   4:             private DateTime pendingctitime;
   5:  
   6:             SyndicationFeed dataFeed;
   7:             IEnumerator<SyndicationItem> dataFeedItems;
   8:  
   9:             int index = 0;
  10:             /// <summary>
  11:             /// Constructor - Use this to initialize local resources based on configInfo
  12:             /// structures. Examples are open files or database connections, set delimiters, and
  13:             /// other parameters that enable the adapter to access the input/output device.
  14:             /// </summary>
  15:             public RSS20InputAdapter(SyndicationAdapterConfig configInfo, CepEventType cepEventType)
  16:             {
  17:                 Rss20FeedFormatter rssFormatter = new Rss20FeedFormatter(typeof(SyndicationFeed));
  18:                 XmlReader rssReader = XmlReader.Create(configInfo.URL);
  19:                 rssFormatter.ReadFrom(rssReader);
  20:                 rssReader.Close();
  21:                 dataFeed = rssFormatter.Feed;
  22:                 dataFeedItems= dataFeed.Items.GetEnumerator();
  23:             }
  24:  
  25:             /// <summary>
  26:             /// Start is the first method to be called by the CEP server once the input
  27:             /// adapter has been instantiated. So any initializations that cannot be covered
  28:             /// in the constructor can be placed here. Start() is called in a separate worker
  29:             /// thread initiated by the server. In this example (and in most scenarios), Start
  30:             /// begins producing events once it is called.
  31:             /// </summary>
  32:             public override void Start()
  33:             {
  34:                 ProduceEvents();
  35:             }
  36:  
  37:             /// <summary>
  38:             /// Resume is called by the server once it returns from being scheduled away
  39:             /// from Start, and only after the adapter has called Ready(). Resume continues
  40:             /// to produce events.
  41:             /// </summary>
  42:             public override void Resume()
  43:             {
  44:                 
  45:                 ProduceEvents();
  46:             }
  47:  
  48:             
  49:  
  50:             /// <summary>
  51:             /// Dispose is inherited from the base adapter class and is the placeholder to
  52:             /// release adapter resources when this instance is shut down.
  53:             /// </summary>
  54:             /// <param name="disposing"></param>
  55:             protected override void Dispose(bool disposing)
  56:             {
  57:             }
  58:  
  59:             /// <summary>
  60:             /// Main driver to read events from the source and enqueue them.
  61:             /// </summary>
  62:             private void ProduceEvents()
  63:             {
  64:                 IntervalEvent<SyndicationEvent> currevent = default(IntervalEvent<SyndicationEvent>);
  65:                 DateTime currctitime = default(DateTime);
  66:                 EnqueueOperationResult result = EnqueueOperationResult.Full;
  67:  
  68:                 while (true)
  69:                 {
  70:                     if (AdapterState.Stopping == AdapterState)
  71:                     {
  72:                         this.Stopped();
  73:                         return;
  74:                     }
  75:  
  76:                     currevent = CreateEventFromSource();
  77:                     pendingevent = null;
  78:  
  79:                     // Enqueue point event with payload.
  80:                     if (null != currevent)
  81:                         result = Enqueue(ref currevent);
  82:                     else
  83:                     {
  84:                       result = EnqueueOperationResult.Full;
  85:                         PrepareToStop(currevent);
  86:                        // Stopped();
  87:                     }
  88:  
  89:                     // Handle Enqueue rejection
  90:                     if (EnqueueOperationResult.Full == result)
  91:                     {
  92:                         EnqueueCtiEvent(DateTime.MaxValue);
  93:                         PrepareToStop(currevent);
  94:                         Stopped();
  95:                         return;
  96:                      
  97:                     }
  98:  
  99:                     // Enqueue Cti event based on application logic
 100:                     result = EnqueueCtiEvent(currctitime);
 101:  
 102:                 }
 103:             }
 104:  
 105:             private void PrepareToStop(IntervalEvent<SyndicationEvent> currEvent)
 106:             {
 107:                 // The server will not accept any more events, and you
 108:                 // cannot do anything about it. Release the event.
 109:                 // If you miss this step, server memory will leak.
 110:                 if (null != currEvent)
 111:                 {
 112:                     ReleaseEvent(ref currEvent);
 113:                 }
 114:             }
 115:  
 116:             private void PrepareToResume(IntervalEvent<SyndicationEvent> currevent)
 117:             {
 118:                 pendingevent = currevent;
 119:             }
 120:  
 121:             private void PrepareToResume(DateTime currctitime)
 122:             {
 123:                 pendingctitime = currctitime;
 124:             }
 125:             private bool EndofSource()
 126:             {
 127:                 return false;
 128:             }
 129:             private IntervalEvent<SyndicationEvent> CreateEventFromSource()
 130:             {
 131:                 if (dataFeedItems.MoveNext())
 132:                 {
 133:                     IntervalEvent<SyndicationEvent> syndicationEvent = this.CreateInsertEvent();
 134:                     syndicationEvent.Payload = new SyndicationEvent(dataFeedItems.Current);
 135:                     syndicationEvent.StartTime = dataFeedItems.Current.PublishDate.DateTime;
 136:                     syndicationEvent.EndTime = dataFeedItems.Current.PublishDate.DateTime;
 137:                     return syndicationEvent;
 138:                 }
 139:                 else
 140:                     return null;
 141:             }
 142:         }
 143:  
 144:         // Configuration structure to initialize the adapter.
 145:         public struct SyndicationAdapterConfig
 146:         {
 147:             private string url;
 148:  
 149:             public string URL
 150:             {
 151:                 get { return url; }
 152:                 set { url = value; }
 153:             }
 154:         }
 155:  
 156:         // Factory class is the entry point for the query binder to initialize
 157:         // and create an adapter instance.
 158:         public class SyndicationAdapterFactory : IInputAdapterFactory<SyndicationAdapterConfig>
 159:         {
 160:             public SyndicationAdapterFactory()
 161:             {
 162:             }
 163:  
 164:             public InputAdapterBase Create(SyndicationAdapterConfig configInfo,
 165:                 EventShape eventshape, CepEventType cepeventtype)
 166:             {
 167:                 InputAdapterBase adapter = default(InputAdapterBase);
 168:  
 169:                 if (EventShape.Interval == eventshape)
 170:                     adapter = new RSS20InputAdapter(configInfo, cepeventtype);
 171:  
 172:                 return adapter;
 173:             }
 174:             public void Dispose()
 175:             {
 176:             }
 177:         }

Although the adapter is fully functional we still need to enable the factory mechanism that StreamInsight applications will use to create instances of the adapter. We can accomplish this by implementing a custom adapter factory that inherits from the Microsoft.ComplexEventProcessing.Adapters.IInputAdapterFactory<T> class.

   1: public class SyndicationAdapterFactory : 
   2:                          IInputAdapterFactory<SyndicationAdapterConfig>
   3:        {
   4:            public SyndicationAdapterFactory()
   5:            {
   6:            }
   7:  
   8:            public InputAdapterBase Create(SyndicationAdapterConfig configInfo,
   9:                EventShape eventshape, CepEventType cepeventtype)
  10:            {
  11:                InputAdapterBase adapter = default(InputAdapterBase);
  12:  
  13:                if (EventShape.Interval == eventshape)
  14:                    adapter = new RSS20InputAdapter(configInfo, cepeventtype);
  15:  
  16:                return adapter;
  17:            }
  18:            public void Dispose()
  19:            {
  20:            }
  21:        }

At this point, our sample adapter can be used from any StreamInsight application. For instance, the following code illustrates a sample application that uses our adapter to query the Microsoft's Bing News feed filtering for items containing a specific keyword. The events that match the filter criteria are output it to the output console using the TextFileOutputAdapter sample included in the StreamInsight SDK.

   1: private static void RssAdapterTest(string keyword)
   2: {
   3:  Server cepServer = Server.Create();
   4:  Application cepApp = cepServer.CreateApplication("SampleCEPApplication");
   5:  InputAdapter syndicationInputAdapter = 
   6:         cepApp.CreateInputAdapter<SyndicationAdapterFactory>("sampleadapter", "");
   7:  OutputAdapter fileOutputAdapter=
   8:         cepApp.CreateOutputAdapter<TextFileOutputFactory>("sampleadapter", "");
   9:  CepEventType sensorEventType = cepApp.CreateEventType<SyndicationEvent>();
  10:  
  11:  CepStream<SyndicationEvent> dataFeedStream=
  12:                CepStream.CreateInputStream<SyndicationEvent>("datafeedstream");
  13:  
  14: var query = from s1 in dataFeedStream 
  15:             where s1.Title.Contains(keyword)
  16:             select new { Title = s1.Title, Time = s1.LastUpdatedTime,
  17:                          Summary= s1.Summary};
  18:  
  19:  QueryTemplate template = cepApp.CreateQueryTemplate("samplequery", query);
  20:  QueryBinder binder = new QueryBinder(template);
  21:  var dataFeedInputConf = new SyndicationAdapterConfig
  22:  {
  23:    URL = "http://www.bing.com/news?FORM=Z9LH6&format=rss"
  24:  };
  25:  
  26:  
  27:  var outputConfig = new TextFileOutputConfig
  28:  {
  29:    OutputFileName = String.Empty,
  30:    Delimiter = '\t',
  31:    AdapterStopSignal = "StopAdapter"
  32:  };
  33:  
  34:  binder.BindInputStream<SensorReading, SyndicationAdapterConfig>("datafeedstream", 
  35:                  syndicationInputAdapter, EventShape.Interval, dataFeedInputConf);
  36:  binder.AddQueryConsumer<TextFileOutputConfig>("queryresult", 
  37:                              fileOutputAdapter, outputConfig, EventShape.Interval, 
  38:                              StreamEventOrder.FullyOrdered);
  39:  
  40:  Query cepQuery = cepApp.CreateQuery("samplequery", binder, "");
  41:  EventWaitHandle adapterStopSignal = new EventWaitHandle(false, 
  42:                                        EventResetMode.ManualReset, "StopAdapter");
  43:  cepQuery.Start();
  44:  
  45:  adapterStopSignal.WaitOne();
  46:  Console.ReadLine();
  47:  cepQuery.Stop();
  48:  
  49: }

Adapters are a fundamental component of Complex Event Processing (CEP) applications. In a nutshell, adapters provide the interfaces that abstracts how events are produced or consumed by the CEP infrastructure. Most CEP frameworks leverage the concept of an adapter as the fundamental mechanism for interacting with heterogeneous systems. Following the same principles, Microsoft's StreamInsight uses adapters to model the flow of events in or out of the CEP host. Furthermore, StreamInsight enables a flexible programming model that allows developers to extend the core infrastructure by implementing custom adapters. From a programming model standpoint, StreamInsight classifies adapters based on the direction of event flow and on the event model used.

An StreamInsight adapter can be classified as input or output depending on whether events flow in or out of the hosting application. An input adapters accepts a set of events from data source on its native format, translate them and flow them into a StreamInsight application. Similarly, an output adapter receives events from an StreamInsight applications, translate them into its native format and send them to the target application.

In addition to the directionality of events, StreamInsight adapters can also be classified based on the event model they support. In that regard, StreamInsight supports three types of adapters: interval, point and edge.

  • The point model is used to indicate when the event is valid for a single instant of time. Examples are the arrival of an e-mail, a meter reading, a user Web click, a stock tick, or an entry into the Windows Event Log.
  • The interval model is used to indicate when an event is valid for a given period of time and both the start time and end time of the interval can be provided in the event type at the time of enqueueing the event into the server. Examples include the width of an electronic pulse or the duration of (validity of) an auction bid.
  • The edge model is used to indicate when the event is valid for a given interval of time; however, only the start time is known when it is enqueued into to the CEP server. The end time of the event is provided at a later time. Examples are Windows processes, trace events from Event Tracing for Windows (ETW), a Web user session, or quantization of an analog signal.

Another important characteristic of a StreamInsight adapter is the type of data it can process. Some adapter process that can be model by strongly typed .NET classes while other adapters require the ability to process heterogeneous event payloads. StreamInsight classifies the adapter as typed or untyped based on whether the event types are well-known and strongly defined at design time or generated dynamically at runtime.

Based on the event kind and model, StreamInsight abstracts adapters using a series of based classes as illustrated in the following table.

 

Adapter type

 

 

Input adapter base class

 

 

Output adapter base class

 

 

Typed point

 

 

TypedPointInputAdapter

 

 

TypedPointOutputAdapter

 

 

Untyped point

 

 

PointInputAdapter

 

 

PointOutputAdapter

 

 

Typed interval

 

 

TypedIntervalInputAdapter

 

 

TypedIntervalOutputAdapter

 

 

Untyped interval

 

 

IntervalInputAdapter

 

 

IntervalOutputAdapter

 

 

Typed edge

 

 

TypedEdgeInputAdapter

 

 

TypedEdgeOutputAdapter

 

 

Untyped edge

 

 

EdgeInputAdapter

 

 

EdgeOutputAdapter

 

 

Given the long running nature of most CEP application, input and output adapters go through different stages that represent their ability of producing or consuming events respectively. For instance, while an adapter is actively producing events its state can be set to "running". However, that state can be reset "suspended" when to indicate that there are no events immediately available in the source system.

StreamInsight models the lifecycle of an adapter using a sophisticated but flexible state machine illustrated in the following figure.

StAdapterModel[1]

An interesting detail of the StreamInsight programming model is that the previous state machine if used by both input and output adapters. On a future post I plan to cover in more details the adapter programming model including its state transitions.

As you can notice, the adapter state transitions are controlled by the CEP server and by the adapter itself. Specifically, a state transition occurs before the CEP server calls Start() or Resume() and after the adapter calls Enqueue(), Dequeue(), Ready(), and Stopped(). The events received by the input adapter are made available to the CEP application by using the Enqueue() operation. Similarly, the output adapter uses the Dequeue() operation to retrieve events that need to be sent to the target system.

I hope this explanation will help you to get more familiar with the StreamInsight adapter model. In a future following post we will cover the details of implementing a custom adapter using the StreamInsight programming model.

My company Tellago, Inc is looking to hire an experience ASP.NET/AJAX developer for a six months contract gig in Florida. Candidates should also have knowledge of WCF, ADO.NET Entity Framework and ADO.NET Data Services. You will be working in a highly dynamic team lead by some of our top architects. If you are interested please drop me a line at jesus dot rodriguez at tellago dot com.

One of the fundamental patterns of Complex Event Processing (CEP) applications is the ability of process events from various input sources and distribute to multiple output sources. These operations require high degrees of coordination what makes it particularly difficult to implement in real world scenarios. Why is that? Well, for starters, continuously querying data from multiple sources entails implementing certain degrees of parallelisms on the CEP application. As we all know, parallel processing techniques typically introduces challenges from the error handling and availability perspective. These complexity is increased on CEP scenarios that need to create queries that combines events from multiple sources that are being produced in parallel. The following figure helps to illustrate that scenario.

Figure: CEP concurrency scenario

Microsoft StreamInsight provides an elegant model for expressing these type of concurrency scenarios via LINQ queries. Essentially, StreamInsight abstracts the complexity of concurrently aggregating events from multiple sources by just using LINQ join and union primitives. All the mechanisms of coordinating the data from the different event sources are internally handled by the StreamInsight runtime and adapters.

Let's look at a example that illustrates these concepts. Suppose that we have multiple temperature sensors deployed on different areas of a secure facility. The sensors are constantly emitting events about the temperature of their specific area. The combination of those values can trigger certain alert conditions that should be distributed to various systems.

The first step of implementing this scenario using StreamInsight is to model the event types that are produced by the sensors. We can accomplish this by annotating a class with the EventType and EventTypeField attributes as illustrated in the following code.

   1: [EventType]
   2: public struct SensorReading
   3: {
   4:     [EventTypeField(0)]
   5:     public string SensorId { get; set; }
   6:     [EventTypeField(1)]
   7:     public int Temperature{ get; set; }
   8:     
   9: }

After creating the event types, we can instantiate various event streams based on that type. We will use to event streams for the simplicity of this sample.

1: CepStream<SensorReading> sensor1Stream =

CepStream.CreateInputStream<SensorReading>("sensor1stream");

2: CepStream<SensorReading> sensor2Stream =

CepStream.CreateInputStream<SensorReading>("sensor2stream");

We can aggregate data from our event input streams by using a LINQ join operations as illustrated in the following code. The following query creates an Alerts based on the values produced by the two sensor’s streams.

   1: var query = from s1 in sensor1Stream
   2:             join s2 in sensor2Stream
   3:             on true equals true
   4:             where s1.Temperature > 100 && s2.Temperature > 50

5: select new { Condition= "ALERT", sensor1= s1.Temperature,

sensor2= s2.Temperature };

For the sake of this example we are going to use the csv file adapter included in the StreamInsight SDK samples. The following code shows how to create both the input and output adapters.

   1: Server cepServer = Server.Create();
   2: Application cepApp = cepServer.CreateApplication("SampleCEPApplication");
   3: InputAdapter fileInputAdapter = cepApp.CreateInputAdapter<TextFileInputFactory>
   4:                                                             ("sampleadapter", "");
   5: OutputAdapter fileOutputAdapter= cepApp.CreateOutputAdapter<TextFileOutputFactory>
   6:                                                             ("sampleadapter", "");

At this point we have created all the design time StreamInsight artifacts needed by our application. Our next step is to bind our query and adapters to the physical locations containing the event data. We can accomplish that by using a query template associated with our sample query. We will add the template to a QueryBinder object that can also be used to associate the adapter type with specific location configurations.

   1: QueryTemplate template= cepApp.CreateQueryTemplate("samplequery", query);
   2: QueryBinder binder = new QueryBinder(template);
   3: var sensor1InputConf = new TextFileInputConfig
   4:             {
   5:                 InputFileName = "input1.csv",
   6:                 Delimiter = '\t',
   7:                 CTIfrequency = 9
   8:             };
   9: var sensor2InputConf = new TextFileInputConfig
  10:             {
  11:                 InputFileName = "input2.csv",
  12:                 Delimiter = '\t',
  13:                 CTIfrequency = 9
  14:             };
  15:  
  16:             
  17: var outputConfig = new TextFileOutputConfig
  18:             {
  19:                 OutputFileName = String.Empty,
  20:                 Delimiter = '\t',
  21:                 AdapterStopSignal = "StopAdapter" 
  22:             };
  23:  
  24: binder.BindInputStream<SensorReading, TextFileInputConfig>
  25:        ("sensor1stream", fileInputAdapter, EventShape.Intrval, sensor1InputConf);
  26: binder.BindInputStream<SensorReading, TextFileInputConfig>
  27:        ("sensor2stream", fileInputAdapter, EventShape.Interval, sensor2InputConf);
  28: binder.AddQueryConsumer<TextFileOutputConfig>
  29:        ("queryresult", fileOutputAdapter, outputConfig, EventShape.Point, 
  30:                                           StreamEventOrder.FullyOrdered);

In order to run our application we only need to create an instance of the Query object and invoke the start operation.

   1: Query cepQuery = cepApp.CreateQuery("samplequery", binder, "");
   2: EventWaitHandle adapterStopSignal = new EventWaitHandle(false, 
   3:                                     EventResetMode.ManualReset, "StopAdapter");
   4: cepQuery.Start();
   5: adapterStopSignal.WaitOne();

If we run this example with the following inputs.

Input1.csv:

6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    70
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    100
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    45
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    56
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    102
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    103
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    150
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    66

Input2.csv:

6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    70
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    105
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    50
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    58
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    111
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    101
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    150
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    66

The following output is produced.

INSERT 6/25/2009 12:00:00 AM   ALERT   102     58
INSERT 6/25/2009 12:00:00 AM   ALERT   102     66
INSERT 6/25/2009 12:00:00 AM   ALERT   102     70
INSERT 6/25/2009 12:00:00 AM   ALERT   102     101
INSERT 6/25/2009 12:00:00 AM   ALERT   102     105
INSERT 6/25/2009 12:00:00 AM   ALERT   102     111
INSERT 6/25/2009 12:00:00 AM   ALERT   102     150
INSERT 6/25/2009 12:00:00 AM   ALERT   103     58
INSERT 6/25/2009 12:00:00 AM   ALERT   103     66
INSERT 6/25/2009 12:00:00 AM   ALERT   103     70
INSERT 6/25/2009 12:00:00 AM   ALERT   103     101
INSERT 6/25/2009 12:00:00 AM   ALERT   103     105
INSERT 6/25/2009 12:00:00 AM   ALERT   103     111
INSERT 6/25/2009 12:00:00 AM   ALERT   103     150
INSERT 6/25/2009 12:00:00 AM   ALERT   150     58
INSERT 6/25/2009 12:00:00 AM   ALERT   150     66
INSERT 6/25/2009 12:00:00 AM   ALERT   150     70
INSERT 6/25/2009 12:00:00 AM   ALERT   150     101
INSERT 6/25/2009 12:00:00 AM   ALERT   150     105
INSERT 6/25/2009 12:00:00 AM   ALERT   150     111
INSERT 6/25/2009 12:00:00 AM   ALERT   150     150
CTI    12/31/9999 11:59:59 PM

I spent this weekend toying with Microsoft StreamInsight bits. I have a few blog posts planned for the next few days that tackle very specific CEP patterns but I figure it might make sense to start by highlighting some of the features I think differentiate Microsoft's StreamInsight from other CEP technologies in the market. Here is a n initial summary of the features I really like about Microsoft's StreamInsight:

What I like

LINQ

A continuous query language is an essential component of any CEP platform. The use of LINQ as the mechanism for querying and coordinating the processing of event streams is one of the fundamental benefits of Microsoft's StreamInsight. The continuously growth of LINQ-enabled data sources (SharePoint or Twitter might be some of the latest examples) guarantees the applicability of Microsoft's StreamInsight across highly heterogeneous environments. Furthermore, the use of a query language that is a core part of .NET guarantees great levels of extensibility and flexibility of CEP solutions.

This approach highly contrasts with other major CEP stacks in the market which rely on proprietary languages that are not part of the underlying platform. For instance, Oracle CEP uses the Continuous Query Language (CQL) as its stream query mechanism. However, COQL can't be really used from any other Java application that is not hosted within the CEP server.

Adapter model

Enabling an extensible programming model for creating input and output adapters should be a key factor for the viability of StreamInsight on highly heterogeneous environments. One of the common characteristics of CEP scenarios, is the interaction with non standards APIs such as the ones enabled by RFID readers, temperature sensors, etc. Abstracting those endpoints through a consistent programming model will enhance the management and extensibility of StreamInsight-based solutions.

Again, some of the lead CEP technologies in the market don't offer that level of extensibility which forces developers to only interact with data sources accesible through some of the adapters provided by the vendor.

Event Models

StreamInsight uses three fundamental types to describe the temporal characteristics of an event:

· Interval: The interval event model represents an event whose payload is valid for a given period of time.

· Point: A point event model represents an event occurrence as of a single point in time.

· Edge: An edge event model represents an event occurrence whose payload is valid for a given interval of time, however, only the start time is known upon arrival to the CEP server.

This feature forces the developer to declare up front the type of event model used by the application optimizing important aspects such as the query processing mechanisms.

Query Binding Model

StreamInsight uses the notion of a QueryBinder to abstract the query definition (template) from the physical streams to which the query is applied. This mechanism allows the reusability of query templates across different streams as well as it enables changes on the query templates without affecting the event streams that are being processed by the application.

Areas that still need work

Server hosting environment

At this point, StreamInsight is just a framework for implementing CEP application. More sophisticated hosting environments are obviously needed in order to apply StreamInsight to complex CEP scenarios.

Management

Management and tooling is an area that still requires some improvement in order to make StreamInsight a world class CEP platform.

Monitoring & Replay

StreamInsight includes some basic features for monitoring the flow of events for ability on a specific server endpoint. However, it should not come as a surprise that real world CEP solution will, most likely, require more robust monitoring capabilities.

Caching

Taking a lesson from other CEP platforms such as StreamBase or Oracle CEP I have the feeling that there is a lot of room for improving the processing of events with highly scalable caching infrastructures such as Microsoft Velocity or Memcache.

Integration with other Microsoft technologies

Flexibility and extensibility are by far the two fundamental benefits of Microsoft's StreamInsight. Those capabilities tremendously facilitate the integration of StreamInsight with other Microsoft technologies in order to implement real world CEP solutions. WCF, BizTalk RFID, Microsoft Sync Framework or SQL Server Business Intelligence stack are just some examples of technologies that can really both complement and benefit from the integration with Microsoft's StreamInsight. Providing best practices and samples around those integration models is going to be key for the adoption of Microsoft StreamInsight by the .NET community.

I will be speaking tomorrow at the South Florida Architecture User Group (ArcSig). The session is titled From SOA to WOA: Introducing Web Oriented Architectures and touches upon the fundamental principles and architecture techniques that can help developers to implement distributed systems that leverage the principles of REST.

If you are in the South FL area and you are interested on SOA, REST, WCF, etc…please attend to my session. I promise to keep things interesting ;)

More Posts Next page »