September 2009 - Posts

Exploring StreamInsight's adapter model

Adapters are a fundamental component of Complex Event Processing (CEP) applications. In a nutshell, adapters provide the interfaces that abstracts how events are produced or consumed by the CEP infrastructure. Most CEP frameworks leverage the concept of an adapter as the fundamental mechanism for interacting with heterogeneous systems. Following the same principles, Microsoft's StreamInsight uses adapters to model the flow of events in or out of the CEP host. Furthermore, StreamInsight enables a flexible programming model that allows developers to extend the core infrastructure by implementing custom adapters. From a programming model standpoint, StreamInsight classifies adapters based on the direction of event flow and on the event model used.

An StreamInsight adapter can be classified as input or output depending on whether events flow in or out of the hosting application. An input adapters accepts a set of events from data source on its native format, translate them and flow them into a StreamInsight application. Similarly, an output adapter receives events from an StreamInsight applications, translate them into its native format and send them to the target application.

In addition to the directionality of events, StreamInsight adapters can also be classified based on the event model they support. In that regard, StreamInsight supports three types of adapters: interval, point and edge.

  • The point model is used to indicate when the event is valid for a single instant of time. Examples are the arrival of an e-mail, a meter reading, a user Web click, a stock tick, or an entry into the Windows Event Log.
  • The interval model is used to indicate when an event is valid for a given period of time and both the start time and end time of the interval can be provided in the event type at the time of enqueueing the event into the server. Examples include the width of an electronic pulse or the duration of (validity of) an auction bid.
  • The edge model is used to indicate when the event is valid for a given interval of time; however, only the start time is known when it is enqueued into to the CEP server. The end time of the event is provided at a later time. Examples are Windows processes, trace events from Event Tracing for Windows (ETW), a Web user session, or quantization of an analog signal.

Another important characteristic of a StreamInsight adapter is the type of data it can process. Some adapter process that can be model by strongly typed .NET classes while other adapters require the ability to process heterogeneous event payloads. StreamInsight classifies the adapter as typed or untyped based on whether the event types are well-known and strongly defined at design time or generated dynamically at runtime.

Based on the event kind and model, StreamInsight abstracts adapters using a series of based classes as illustrated in the following table.

 

Adapter type

 

 

Input adapter base class

 

 

Output adapter base class

 

 

Typed point

 

 

TypedPointInputAdapter

 

 

TypedPointOutputAdapter

 

 

Untyped point

 

 

PointInputAdapter

 

 

PointOutputAdapter

 

 

Typed interval

 

 

TypedIntervalInputAdapter

 

 

TypedIntervalOutputAdapter

 

 

Untyped interval

 

 

IntervalInputAdapter

 

 

IntervalOutputAdapter

 

 

Typed edge

 

 

TypedEdgeInputAdapter

 

 

TypedEdgeOutputAdapter

 

 

Untyped edge

 

 

EdgeInputAdapter

 

 

EdgeOutputAdapter

 

 

Given the long running nature of most CEP application, input and output adapters go through different stages that represent their ability of producing or consuming events respectively. For instance, while an adapter is actively producing events its state can be set to "running". However, that state can be reset "suspended" when to indicate that there are no events immediately available in the source system.

StreamInsight models the lifecycle of an adapter using a sophisticated but flexible state machine illustrated in the following figure.

StAdapterModel[1]

An interesting detail of the StreamInsight programming model is that the previous state machine if used by both input and output adapters. On a future post I plan to cover in more details the adapter programming model including its state transitions.

As you can notice, the adapter state transitions are controlled by the CEP server and by the adapter itself. Specifically, a state transition occurs before the CEP server calls Start() or Resume() and after the adapter calls Enqueue(), Dequeue(), Ready(), and Stopped(). The events received by the input adapter are made available to the CEP application by using the Enqueue() operation. Similarly, the output adapter uses the Dequeue() operation to retrieve events that need to be sent to the target system.

I hope this explanation will help you to get more familiar with the StreamInsight adapter model. In a future following post we will cover the details of implementing a custom adapter using the StreamInsight programming model.

Looking for an ASP.NET/AJAX developer

My company Tellago, Inc is looking to hire an experience ASP.NET/AJAX developer for a six months contract gig in Florida. Candidates should also have knowledge of WCF, ADO.NET Entity Framework and ADO.NET Data Services. You will be working in a highly dynamic team lead by some of our top architects. If you are interested please drop me a line at jesus dot rodriguez at tellago dot com.

Processing events from multiple sources using Microsoft StreamInsight

One of the fundamental patterns of Complex Event Processing (CEP) applications is the ability of process events from various input sources and distribute to multiple output sources. These operations require high degrees of coordination what makes it particularly difficult to implement in real world scenarios. Why is that? Well, for starters, continuously querying data from multiple sources entails implementing certain degrees of parallelisms on the CEP application. As we all know, parallel processing techniques typically introduces challenges from the error handling and availability perspective. These complexity is increased on CEP scenarios that need to create queries that combines events from multiple sources that are being produced in parallel. The following figure helps to illustrate that scenario.

Figure: CEP concurrency scenario

Microsoft StreamInsight provides an elegant model for expressing these type of concurrency scenarios via LINQ queries. Essentially, StreamInsight abstracts the complexity of concurrently aggregating events from multiple sources by just using LINQ join and union primitives. All the mechanisms of coordinating the data from the different event sources are internally handled by the StreamInsight runtime and adapters.

Let's look at a example that illustrates these concepts. Suppose that we have multiple temperature sensors deployed on different areas of a secure facility. The sensors are constantly emitting events about the temperature of their specific area. The combination of those values can trigger certain alert conditions that should be distributed to various systems.

The first step of implementing this scenario using StreamInsight is to model the event types that are produced by the sensors. We can accomplish this by annotating a class with the EventType and EventTypeField attributes as illustrated in the following code.

   1: [EventType]
   2: public struct SensorReading
   3: {
   4:     [EventTypeField(0)]
   5:     public string SensorId { get; set; }
   6:     [EventTypeField(1)]
   7:     public int Temperature{ get; set; }
   8:     
   9: }

After creating the event types, we can instantiate various event streams based on that type. We will use to event streams for the simplicity of this sample.

1: CepStream<SensorReading> sensor1Stream =

CepStream.CreateInputStream<SensorReading>("sensor1stream");

2: CepStream<SensorReading> sensor2Stream =

CepStream.CreateInputStream<SensorReading>("sensor2stream");

We can aggregate data from our event input streams by using a LINQ join operations as illustrated in the following code. The following query creates an Alerts based on the values produced by the two sensor’s streams.

   1: var query = from s1 in sensor1Stream
   2:             join s2 in sensor2Stream
   3:             on true equals true
   4:             where s1.Temperature > 100 && s2.Temperature > 50

5: select new { Condition= "ALERT", sensor1= s1.Temperature,

sensor2= s2.Temperature };

For the sake of this example we are going to use the csv file adapter included in the StreamInsight SDK samples. The following code shows how to create both the input and output adapters.

   1: Server cepServer = Server.Create();
   2: Application cepApp = cepServer.CreateApplication("SampleCEPApplication");
   3: InputAdapter fileInputAdapter = cepApp.CreateInputAdapter<TextFileInputFactory>
   4:                                                             ("sampleadapter", "");
   5: OutputAdapter fileOutputAdapter= cepApp.CreateOutputAdapter<TextFileOutputFactory>
   6:                                                             ("sampleadapter", "");

At this point we have created all the design time StreamInsight artifacts needed by our application. Our next step is to bind our query and adapters to the physical locations containing the event data. We can accomplish that by using a query template associated with our sample query. We will add the template to a QueryBinder object that can also be used to associate the adapter type with specific location configurations.

   1: QueryTemplate template= cepApp.CreateQueryTemplate("samplequery", query);
   2: QueryBinder binder = new QueryBinder(template);
   3: var sensor1InputConf = new TextFileInputConfig
   4:             {
   5:                 InputFileName = "input1.csv",
   6:                 Delimiter = '\t',
   7:                 CTIfrequency = 9
   8:             };
   9: var sensor2InputConf = new TextFileInputConfig
  10:             {
  11:                 InputFileName = "input2.csv",
  12:                 Delimiter = '\t',
  13:                 CTIfrequency = 9
  14:             };
  15:  
  16:             
  17: var outputConfig = new TextFileOutputConfig
  18:             {
  19:                 OutputFileName = String.Empty,
  20:                 Delimiter = '\t',
  21:                 AdapterStopSignal = "StopAdapter" 
  22:             };
  23:  
  24: binder.BindInputStream<SensorReading, TextFileInputConfig>
  25:        ("sensor1stream", fileInputAdapter, EventShape.Intrval, sensor1InputConf);
  26: binder.BindInputStream<SensorReading, TextFileInputConfig>
  27:        ("sensor2stream", fileInputAdapter, EventShape.Interval, sensor2InputConf);
  28: binder.AddQueryConsumer<TextFileOutputConfig>
  29:        ("queryresult", fileOutputAdapter, outputConfig, EventShape.Point, 
  30:                                           StreamEventOrder.FullyOrdered);

In order to run our application we only need to create an instance of the Query object and invoke the start operation.

   1: Query cepQuery = cepApp.CreateQuery("samplequery", binder, "");
   2: EventWaitHandle adapterStopSignal = new EventWaitHandle(false, 
   3:                                     EventResetMode.ManualReset, "StopAdapter");
   4: cepQuery.Start();
   5: adapterStopSignal.WaitOne();

If we run this example with the following inputs.

Input1.csv:

6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    70
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    100
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    45
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    56
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    102
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    103
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    150
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    66

Input2.csv:

6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    70
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    105
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    50
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    58
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    111
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    101
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    150
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    66

The following output is produced.

INSERT 6/25/2009 12:00:00 AM   ALERT   102     58
INSERT 6/25/2009 12:00:00 AM   ALERT   102     66
INSERT 6/25/2009 12:00:00 AM   ALERT   102     70
INSERT 6/25/2009 12:00:00 AM   ALERT   102     101
INSERT 6/25/2009 12:00:00 AM   ALERT   102     105
INSERT 6/25/2009 12:00:00 AM   ALERT   102     111
INSERT 6/25/2009 12:00:00 AM   ALERT   102     150
INSERT 6/25/2009 12:00:00 AM   ALERT   103     58
INSERT 6/25/2009 12:00:00 AM   ALERT   103     66
INSERT 6/25/2009 12:00:00 AM   ALERT   103     70
INSERT 6/25/2009 12:00:00 AM   ALERT   103     101
INSERT 6/25/2009 12:00:00 AM   ALERT   103     105
INSERT 6/25/2009 12:00:00 AM   ALERT   103     111
INSERT 6/25/2009 12:00:00 AM   ALERT   103     150
INSERT 6/25/2009 12:00:00 AM   ALERT   150     58
INSERT 6/25/2009 12:00:00 AM   ALERT   150     66
INSERT 6/25/2009 12:00:00 AM   ALERT   150     70
INSERT 6/25/2009 12:00:00 AM   ALERT   150     101
INSERT 6/25/2009 12:00:00 AM   ALERT   150     105
INSERT 6/25/2009 12:00:00 AM   ALERT   150     111
INSERT 6/25/2009 12:00:00 AM   ALERT   150     150
CTI    12/31/9999 11:59:59 PM

More Posts