Processing events from multiple sources using Microsoft StreamInsight

One of the fundamental patterns of Complex Event Processing (CEP) applications is the ability of process events from various input sources and distribute to multiple output sources. These operations require high degrees of coordination what makes it particularly difficult to implement in real world scenarios. Why is that? Well, for starters, continuously querying data from multiple sources entails implementing certain degrees of parallelisms on the CEP application. As we all know, parallel processing techniques typically introduces challenges from the error handling and availability perspective. These complexity is increased on CEP scenarios that need to create queries that combines events from multiple sources that are being produced in parallel. The following figure helps to illustrate that scenario.

Figure: CEP concurrency scenario

Microsoft StreamInsight provides an elegant model for expressing these type of concurrency scenarios via LINQ queries. Essentially, StreamInsight abstracts the complexity of concurrently aggregating events from multiple sources by just using LINQ join and union primitives. All the mechanisms of coordinating the data from the different event sources are internally handled by the StreamInsight runtime and adapters.

Let's look at a example that illustrates these concepts. Suppose that we have multiple temperature sensors deployed on different areas of a secure facility. The sensors are constantly emitting events about the temperature of their specific area. The combination of those values can trigger certain alert conditions that should be distributed to various systems.

The first step of implementing this scenario using StreamInsight is to model the event types that are produced by the sensors. We can accomplish this by annotating a class with the EventType and EventTypeField attributes as illustrated in the following code.

   1: [EventType]
   2: public struct SensorReading
   3: {
   4:     [EventTypeField(0)]
   5:     public string SensorId { get; set; }
   6:     [EventTypeField(1)]
   7:     public int Temperature{ get; set; }
   8:     
   9: }

After creating the event types, we can instantiate various event streams based on that type. We will use to event streams for the simplicity of this sample.

1: CepStream<SensorReading> sensor1Stream =

CepStream.CreateInputStream<SensorReading>("sensor1stream");

2: CepStream<SensorReading> sensor2Stream =

CepStream.CreateInputStream<SensorReading>("sensor2stream");

We can aggregate data from our event input streams by using a LINQ join operations as illustrated in the following code. The following query creates an Alerts based on the values produced by the two sensor’s streams.

   1: var query = from s1 in sensor1Stream
   2:             join s2 in sensor2Stream
   3:             on true equals true
   4:             where s1.Temperature > 100 && s2.Temperature > 50

5: select new { Condition= "ALERT", sensor1= s1.Temperature,

sensor2= s2.Temperature };

For the sake of this example we are going to use the csv file adapter included in the StreamInsight SDK samples. The following code shows how to create both the input and output adapters.

   1: Server cepServer = Server.Create();
   2: Application cepApp = cepServer.CreateApplication("SampleCEPApplication");
   3: InputAdapter fileInputAdapter = cepApp.CreateInputAdapter<TextFileInputFactory>
   4:                                                             ("sampleadapter", "");
   5: OutputAdapter fileOutputAdapter= cepApp.CreateOutputAdapter<TextFileOutputFactory>
   6:                                                             ("sampleadapter", "");

At this point we have created all the design time StreamInsight artifacts needed by our application. Our next step is to bind our query and adapters to the physical locations containing the event data. We can accomplish that by using a query template associated with our sample query. We will add the template to a QueryBinder object that can also be used to associate the adapter type with specific location configurations.

   1: QueryTemplate template= cepApp.CreateQueryTemplate("samplequery", query);
   2: QueryBinder binder = new QueryBinder(template);
   3: var sensor1InputConf = new TextFileInputConfig
   4:             {
   5:                 InputFileName = "input1.csv",
   6:                 Delimiter = '\t',
   7:                 CTIfrequency = 9
   8:             };
   9: var sensor2InputConf = new TextFileInputConfig
  10:             {
  11:                 InputFileName = "input2.csv",
  12:                 Delimiter = '\t',
  13:                 CTIfrequency = 9
  14:             };
  15:  
  16:             
  17: var outputConfig = new TextFileOutputConfig
  18:             {
  19:                 OutputFileName = String.Empty,
  20:                 Delimiter = '\t',
  21:                 AdapterStopSignal = "StopAdapter" 
  22:             };
  23:  
  24: binder.BindInputStream<SensorReading, TextFileInputConfig>
  25:        ("sensor1stream", fileInputAdapter, EventShape.Intrval, sensor1InputConf);
  26: binder.BindInputStream<SensorReading, TextFileInputConfig>
  27:        ("sensor2stream", fileInputAdapter, EventShape.Interval, sensor2InputConf);
  28: binder.AddQueryConsumer<TextFileOutputConfig>
  29:        ("queryresult", fileOutputAdapter, outputConfig, EventShape.Point, 
  30:                                           StreamEventOrder.FullyOrdered);

In order to run our application we only need to create an instance of the Query object and invoke the start operation.

   1: Query cepQuery = cepApp.CreateQuery("samplequery", binder, "");
   2: EventWaitHandle adapterStopSignal = new EventWaitHandle(false, 
   3:                                     EventResetMode.ManualReset, "StopAdapter");
   4: cepQuery.Start();
   5: adapterStopSignal.WaitOne();

If we run this example with the following inputs.

Input1.csv:

6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    70
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    100
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    45
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    56
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    102
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    103
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    150
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    66

Input2.csv:

6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    70
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    105
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    50
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    58
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    111
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    101
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor1    150
6/25/2009 12:00:00 AM    6/25/2009 12:00:20 AM    sensor2    66

The following output is produced.

INSERT 6/25/2009 12:00:00 AM   ALERT   102     58
INSERT 6/25/2009 12:00:00 AM   ALERT   102     66
INSERT 6/25/2009 12:00:00 AM   ALERT   102     70
INSERT 6/25/2009 12:00:00 AM   ALERT   102     101
INSERT 6/25/2009 12:00:00 AM   ALERT   102     105
INSERT 6/25/2009 12:00:00 AM   ALERT   102     111
INSERT 6/25/2009 12:00:00 AM   ALERT   102     150
INSERT 6/25/2009 12:00:00 AM   ALERT   103     58
INSERT 6/25/2009 12:00:00 AM   ALERT   103     66
INSERT 6/25/2009 12:00:00 AM   ALERT   103     70
INSERT 6/25/2009 12:00:00 AM   ALERT   103     101
INSERT 6/25/2009 12:00:00 AM   ALERT   103     105
INSERT 6/25/2009 12:00:00 AM   ALERT   103     111
INSERT 6/25/2009 12:00:00 AM   ALERT   103     150
INSERT 6/25/2009 12:00:00 AM   ALERT   150     58
INSERT 6/25/2009 12:00:00 AM   ALERT   150     66
INSERT 6/25/2009 12:00:00 AM   ALERT   150     70
INSERT 6/25/2009 12:00:00 AM   ALERT   150     101
INSERT 6/25/2009 12:00:00 AM   ALERT   150     105
INSERT 6/25/2009 12:00:00 AM   ALERT   150     111
INSERT 6/25/2009 12:00:00 AM   ALERT   150     150
CTI    12/31/9999 11:59:59 PM

1 Comment

  • I don't know if it is because StreamInsight functionality has changed since you posted this or not, but when I try your linq syntax I get no output at all

Comments have been disabled for this content.