Hosting StreamInsight applications using WCF

One of the fundamental differentiators of Microsoft's StreamInsight compared to other Complex Event Processing (CEP) technologies is its flexible deployment model. In that sense, a StreamInsight solution can be hosted within an application or as a server component. This duality contrasts with most of the popular CEP frameworks in the current market which are almost exclusively server based. Whether it's undoubtedly that the ability of embedding a CEP engine in your applications opens new possibilities for CEP scenarios such as Web analytics or mobile CEP it is also unquestionable that the majority of CEP scenarios still rely on a server-centric model. This blog post provides an initial overview to the server hosting capabilities of Microsoft's StreamInsight.

Instead of providing a brand new server product, the current version of Microsoft's StreamInsight relies on Windows Communication Foundation (WCF) for its hosting capabilities. This model allows to reuse familiar and proven techniques to scale and improve the performance of StreamInsight solutions. The key element of StreamInsight's server-based model is the Microsoft.ComplexEventProcessing.Server class. This class abstracts the basic functionalities of a CEP Server such as storing application's definition, binding and controlling the execution of queries, among many others. In order to expose a CEP server as a WCF service we should invoke the CreateManagementService operation which exposes an implementation of the IManagementService service contract that contains the following operations.

1: [ServiceContract(

Namespace = "http://schemas.microsoft.com/ComplexEventProcessing/2009/05/Management")]

   2:     public interface IManagementService
   3:     {
   4:        [OperationContract(...)]
   5:        ChangeQueryStateResponse ChangeQueryState(ChangeQueryStateRequest request);
   6:  
   7:        [OperationContract(...)]
   8:        void ClearDiagnosticSettings(ClearDiagnosticSettingsRequest request);
   9:  
  10:        [OperationContract(...)]
  11:        CreateResponse Create(CreateRequest request);
  12:  
  13:        [OperationContract(...)]
  14:         DeleteResponse Delete(DeleteRequest request);
  15:  
  16:        [OperationContract(...)]
  17:        EnumerateResponse Enumerate(EnumerateRequest request);
  18:         
  19:       [OperationContract(...)]
  20:        GetResponse Get(GetRequest request);
  21:  
  22:       [OperationContract(...)]

23: GetDiagnosticSettingsResponse GetDiagnosticSettings(

GetDiagnosticSettingsRequest request);

  24:        
  25:       [OperationContract(...)]

26: GetDiagnosticViewResponse GetDiagnosticView(

GetDiagnosticViewRequest request);

  27:       
  28:       [OperationContract(...)]
  29:       void SetDiagnosticSettings(SetDiagnosticSettingsRequest request);
  30:     }

In order to enable persists the definition of CEP queries, applications and events, StreamInsight uses a SQLCE-based database that uses the schema defined by the CepMetadata.sdf template included in the StreamInsight bits. The following code illustrates the techniques used to create a new StreamInsight's WCF service host.

   1: private static void StartHost()
   2: {
   3:   Console.ForegroundColor = ConsoleColor.Yellow;
   4:   ServiceHost host = null;
   5:   try
   6:   {
   7:     Server server = GetServer();
   8:     host = new ServiceHost(server.CreateManagementService());
   9:     host.Open();
  10:     if (!server.Applications.Keys.Contains("sampleapp") )
  11:       registerSampleQuery();
  12:     EventWaitHandle adapterStopSignal = new EventWaitHandle(false, 
  13:                          EventResetMode.ManualReset, "StopAdapter");
  14:     Console.ReadLine();
  15:   }
  16:   catch (Exception innerException)
  17:   {    ...  }
  18:   finally
  19:   {
  20:   if ((host != null) && (host.State != CommunicationState.Faulted))
  21:     host.Close();
  22:   if (server != null)
  23:     server.Dispose();
  24:   }
  25: }
  26:  
  27:  
  28: private static Server GetServer()
  29: {
  30:   string str = ConfigurationManager.AppSettings["SQLCEMetadataFile"];
  31:   bool flag = false;
  32:   if (new List<string>(ConfigurationManager.AppSettings.AllKeys).
  33:                        Contains("CreateSqlCeMetadataFileIfMissing"))
  34:   {
  35:     try
  36:     {
  37:       flag = bool.Parse(ConfigurationManager.
  38:             AppSettings["CreateSqlCeMetadataFileIfMissing"]);
  39:     }
  40:     catch (FormatException exception)
  41:     {...        }
  42:    }
  43:    if (str != null)
  44:    {
  45:      SqlCeMetadataProviderConfiguration config = 
  46:                     new SqlCeMetadataProviderConfiguration();
  47:      config.DataSource = str;
  48:      config.CreateDataSourceIfMissing = flag;
  49:      return Server.Create(config);
  50:    }
  51:    return Server.Create();
  52:   }

The previous code uses the following WCF configuration.

   1: <system.serviceModel>
   2:  <services>
   3:         
   4:    <service 
   5:      name="Microsoft.ComplexEventProcessing.ManagementService.ManagementService"
   6:      behaviorConfiguration="MtdBehavior">
   7:  
   8:      <host>
   9:             <baseAddresses>
  10:               <add baseAddress="http://localhost:9999/StreamInsight"/>
  11:             </baseAddresses>
  12:       </host>
  13:    
  14:      <endpoint 
  15:      address="/cephost" binding="basicHttpBinding"

16: contract=

"Microsoft.ComplexEventProcessing.ManagementService.IManagementService" />

  17:      
  18:         </service>
  19:       </services>
  20:       <behaviors>
  21:         <serviceBehaviors>
  22:           <behavior name="MtdBehavior">
  23:             <serviceMetadata httpGetEnabled="true"/>
  24:           </behavior>
  25:         </serviceBehaviors>
  26:       </behaviors>
  27:     </system.serviceModel>

Notice that our WCF service implements the Microsoft.ComplexEventProcessing.ManagementService.IManagementService contract as its main interface.

After implementing our CEP server, client applications can interact with it at the URIs specified in the service endpoint definition. It is important to notice that the definition of the CEP artifacts such as applications, queries or events remains stored in the server database. A client application can connect to the server and trigger the execution of queries as illustrated in the following code:

   1: private static void ExecuteRemoteQuery()
   2: {
   3:     Server cepServer = Server.Connect(
   4:          new EndpointAddress("http://localhost:9999/StreamInsight/cephost"), 
   5:          new BasicHttpBinding());
   6:     Application cepApplication= cepServer.Applications[my cep application...];
   7:     Query cepQuery= cepApplication.Queries[my cep query...];
   8:     cepQuery.Start();
   9:     Console.ReadLine();
  10: }

Notice that our client application simply connects to the remote CEP server and executes the query.

I will be the first person to tell you that the StreamInsight’s server model requires more work in terms of the tooling and the underlying infrastructure. However, I believe we can agree that the fact that the host model is base on a highly flexible and scalable model like WCF takes us a large way. At the end, its always easier to build tooling that infrastructure, isn’t it?

No Comments