Mark Palmer, CEO of StreamBase, has posted a series of predictions about the Complex Event Processing (CEP) market in 2010. Generally, I am not a big fan of making or commenting on predictions related to the technology market but I wanted to add a few interesting thoughts to Mark's list.

For the last few years, I have been actively working with several CEP initiatives involving most of the top CEP technologies in the market. During that time, I have seen CEP evolves all the way from being a fancy experiment to one of the key initiatives in the modern enterprise. I wanted to write a few thoughts about some of the trends that I expect CEP applications in the next couple of years.

CEP starts becoming an enabler for real time business intelligence (BI)

In the last few years, the technologies for implementing real time BI solutions have evolved drastically making it a viable alternative in the enterprise. A clear example is the upcoming Microsoft's PowerPivot stack that is able to process millions of records in a highly optimized multidimensional store that can be accessed from both Excel and SharePoint environments.

The missing component of real time BI solutions are the mechanisms for collecting and processing the data in highly efficient ways. This is where CEP could shine. Most CEP engines are optimized for processing a large number of events using continuous querying mechanisms. In that sense, CEP technologies should become an essential component of real time solutions in order to make these type of solutions a viable alternative in the big enterprise.

CEP moves beyond the financial sector

Undoubtedly, CEP technologies have found a home in the financial sector. Although this tendency is very likely to continue, I do believe CEP is going to start gaining more adoption on other verticals such as manufacturing, the government (military) or the retail industry (RFID). Particularly, the combination of CEP technologies and RFID stacks can enable a large variety of scenarios across various vertical markets.

Big vendors move up the ladder

This is a point on which Mark and I disagree. Personally, I believe that the fact that big technology vendors such as Oracle, Microsoft and TIBCO are embracing CEP is one of the best thing that can happen to the CEP market. Whether they might not have the best technology stacks, it is unquestionable that these vendors will drive innovation and help to make CEP technologies more mainstream.

Another factor to consider is the work vendors such as Oracle, Microsoft and TIBCO are doing integrating CEP stacks into robust middleware and data-storage technologies such as Microsoft BizTalk and SQL Server, Oracle ESB and Oracle DB or TIBCO Event Broker. This level of integration will enable a new set of alternatives for enhancing CEP scenarios.

CEP engines go mobile

Traditionally, CEP applications are based on a server-centric model that is responsible for hosting, managing and scaling the core components of the CEP engines. Even though a large number of CEP scenarios can be implemented using this model, there are native capabilities of CEP engines such as the continuous query engines or adapters that can be very effective on mobile applications. For instance, consider a traditional RIFD scenario on which a mobile reader is capturing thousands of events that need to be filtered based on different patterns. On this scenario, we could read the events using a CEP adapter for the mobile RFID reader and filter them using a CEP continuous query engine.

This post does not intend to be a formalized list predictions about the CEP market. Most of the thoughts listed there are based on tendencies that I have seen in different CEP projects throughout recent years.

What do you think? Am I way off or on the right track? Let's debate :)

The second chapter of our WCF Extensibility Guidance Pablo and I authored a few months ago is now available on MSDN. This chapter is one of my favorites given that it touches upon the most common extensibility points of WCF's client and dispatcher programming model. Specifically, this chapter covers the best practices and techniques to implement the following WCF extensibility components:

  • Message Encoders
  • Message Formatters
  • Message Filters
  • Message Formatter
  • Parameter Inspectors
  • Operation Selectors
  • Operation Invokers

Similarly to the previous chapter, we've tried to illustrate each extensibility points within the context of real world scenarios.

I hope you guys enjoy reading this chapter. Please send us feedback to jesus dot rodriguez at tellago dot com or pablo dot cibraro at tellago dot com.

A few months ago my colleague Don Demsak and I started collaborating on a paper about the principles of "Lightweight SOAs". Fundamentally, the paper intends to demystify some of the aspects around big SOA enterprise projects and propose some patterns that facilitate the implementation of these architecture using emerging architecture styles (REST, cloud computing, etc) and technologies in a very agile manner. Thanks to Diego Dagum and his team, the paper has been included in the latest issue of the Microsoft Architecture Journal.

I understand that some of the ideas proposed in the paper can seem radical to the traditional SOA practitioner but I believe they are worth exploring. All our ideas have been based on the lessons learned while working on very complex SOA solutions throughout the last few year.

I hope you enjoy the paper. Please feel free to send me feedback thru this weblog or to jesus dot rodriguez @ telllago dot com.

Sender-vouches is one of the two subject confirmation methods included in the SAML security token profile specification. Essentially, the sender-vouches scenario enables an attesting entity to vouch for the identity of a subject to a relying party. The following figure illustrates this scenario:

sender-vouches[1]

From the protocol standpoint, an attesting entity uses the sender-vouches confirmation method to assert that it is acting on behalf of the subject of SAML statements attributed with a sender-vouches SubjectConfirmation element. The SAML statements attested by the sender-vouches method must have a corresponding more sender-vouches SubjectConfirmation elements. The following code illustrates a security token that uses the sender-vouches confirmation method.

   1: <wsse:Security>
   2:    <saml:Assertion AssertionID="SAML_ID" Issuer="www.example.org" ...>
   3:       <saml:Conditions NotBefore="..." NotOnOrAfter="..."/>
   4:       <saml:AuthenticationStatement AuthenticationMethod="urn:...:password"
   5:                                     AuthenticationInstant="2005-03-19T...Z"
   6:          &lt;saml:Subject>
   7:             <saml:NameIdentifier>Sample ID</saml:NameIdentifier>
   8:             <saml:SubjectConfirmation>
   9:                <saml:ConfirmationMethod>
  10:                   urn:oasis:names:tc:SAML:1.0:cm:sender-vouches
  11:                </saml:ConfirmationMethod>
  12:             </saml:SubjectConfirmation>
  13:          </saml:Subject>
  14:       </saml:AuthenticationStatement>
  15:    </saml:Assertion>
  16:    <wsse:SecurityTokenReference wsu:Id="STR1" ...> ... </wsse:SecurityToken..>
  17:    <wsse:BinarySecurityToken ...> ... </wsse:BinarySecurityToken>
  18:    <ds:Signature>
  19:       <ds:SignedInfo>
  20:          <ds:Reference URI="#STR1"> ... </ds:Reference>
  21:          <ds:Reference URI="#body"> ... </ds:Reference>
  22:          ...
  23:       </ds:SignedInfo>
  24:    </ds:Signature>
  25: </wsse:Security>

As you can see in line 10, the subject confirmation element of the authentication statement is set to urn:oasis:names:tc:SAML:1.0:cm:sender-vouches.

The sender-vouches confirmation method is relevant to a variety of identity management scenarios and it's particularly relevant in message brokering mechanisms on which messages between a subject and a relying party are routed through an intermediary.

The current version of the Windows Identity Foundation(WIF) does not include default support for the sender-vouches confirmation method. However, this method can be easily enabled by leveraging WIF extensibility mechanisms. Specifically, we can extend the default SAML token handlers in order to generate a SAML assertion that includes the sender-vouches confirmation method. The following code illustrates this technique in order to generate a SAML 1.1 security token.

   1: public class SenderVouchesSaml11TokenHandler: Saml11SecurityTokenHandler
   2: {
   3:  
   4:   public override bool CanValidateToken
   5:   {
   6:     get
   7:       {
   8:         return true;
   9:       }
  10:     }
  11:  
  12:    public override string[] GetTokenTypeIdentifiers()
  13:    {
  14:      return new string[2]{Consts.cSAML11TokenType, Consts.cSAML11Assertion};
  15:    }
  16:  
  17:  
  18:  
  19:    public override ClaimsIdentityCollection ValidateToken(SecurityToken token)
  20:    {
  21:      SamlSecurityToken samlToken = token as SamlSecurityToken;
  22:      IClaimsIdentity claimsIdentity = this.CreateClaims(samlToken);
  23:      return new ClaimsIdentityCollection
  24:                     (new List<IClaimsIdentity> { claimsIdentity });
  25:  
  26:    }
  27:  
  28:  
  29:    protected override System.IdentityModel.Tokens.SamlSubject
  30:                     CreateSamlSubject(SecurityTokenDescriptor tokenDescriptor)
  31:    {
  32:   SamlSubject subject = new SamlSubject(Consts.cUnspecifiedNameIdentifier, 
  33:                                              Consts.cNameQualifier, "Alice");
  34:  subject.ConfirmationMethods.Clear();
  35:  subject.ConfirmationMethods.Add("urn:oasis:names:tc:SAML:1.0:cm:sender-vouches");
  36:            
  37:  return subject;
  38:    }
  39:  
  40:    protected override System.IdentityModel.Tokens.SamlAuthenticationStatement 
  41:             CreateAuthenticationStatement(System.IdentityModel.Tokens.SamlSubject  
  42:             samlSubject, IdentityModel.Claims.AuthenticationInformation authInfo, 
  43:             IdentityModel.Tokens.SecurityTokenDescriptor tokenDescriptor)
  44:     {
  45:     SamlAuthenticationStatement authStatement = new SamlAuthenticationStatement();
  46:     authStatement.AuthenticationMethod = 
  47:               Saml11Constants.AuthenticationMethods.UnspecifiedString;
  48:   authStatement.SamlSubject = samlSubject;
  49:     return authStatement;
  50:         }
  51:  
  52:         ....
  53:     }

Essentially, we need to override the CreateSamlSubject in order to create a subject that includes the sender-vouches confirmation method. After that, we just need to associated that subject with the authentication statement. After implementing the token handler we only need to add it to the STS configuration security token handlers pipeline.

config.SecurityTokenHandlers.AddOrReplace(new SenderVouchesSaml11TokenHandler());

This technique can also be applied for a SAML 2.0 token handler.

As you might have noticed, I haven't been actively blogging during the last month. The reason is that I have been hands on working on a very ambitious project to showcase Web Services interoperability between Microsoft and Oracle platforms. This experiment allowed us to explore the interoperability of WCF 4.0 and WIF RTM with the upcoming release of Oracle WebLogic within the context of a real world application. As a result, we were able to implement various complex WS interoperability scenarios encompassing diverse areas such as security, trust, federation, asynchronous reliable messaging, transactions, etc. We had the opportunity to experience firsthand the interop capabilities of both stacks as well as identify some of the areas that require improvement in order to achieve better levels of interoperability.

Our final demo scenario was highlighted this week by both Microsoft and Oracle at Gartner's Application Architecture Development and Integration Summit in Las Vegas. The details of our sample application should be available in the upcoming weeks.

During the development process, I had the opportunity of working alongside members of the WCF, WIF and WebLogic product teams and I am very thankful for the experience. Big kudos to Kent Brown for sponsoring such an ambitious project and trusting Tellago with the responsibility of delivering under such a very aggressive deadline. On our side, I had a great backup from Nicolas Salazar who implemented the user interfaces used to test the interop scenarios.

PDC is here again!

This year my friend and colleague Don Demsak and Andrew Brust will be presenting a business intelligence (BI) workshop covers some of the fundamental patterns and techniques that can help developers to build modern BI applications in the Microsoft platform. Specifically, the workshop will focus on how to merge the traditional aspects of BI solutions  such as analytics, reporting, data aggregation into widely adopted technologies like SharePoint Server and Office in general.

If you are attending to PDC you should definitely attend to this workshop. Andrew and Don and both fantastic speakers and thought leaders in the community. Given said that, feel free to give Don a hard time. At the end, it is an all day workshop ;)

Next Tuesday (November 10th) my colleague Uri Katsir will be presenting a session at the South Florida BizTalk Users Group about improving agility in large BizTalk environments using the ESB toolkit. If you are a BizTalk developer or operations architect working on complex BizTalk deployments you MUST attend Uri's session. In order to keep things in perspective, Uri has prepared a series of interesting demos that illustrate the techniques used to improve the development and management experience of BizTalk solutions by leveraging the ESB toolkit. The presentation is 100% focused on real world scenarios including some of the lessons Uri has learned while working on one of the most complex BizTalk environments in the world.

My friend and colleague Uri Katsir just joined the blogosphere.  Uri is a BizTalk wizard who is currently working on one of the biggest BizTalk implementations in the world. We are extremely proud to have him as part of our technical staff at Tellago. Uri has already posted a couple of interesting write-ups about ESB itineraries troubleshooting and deployment techniques.  

As the WCF team already announced, the first chapter of the WCF Extensibility Guidance is now available on MSDN . This paper is the result of an effort I started a few months ago with my colleague and friend Pablo Cibraro. Our goal was to provide a detailed guidance of the major extensibility points of the WCF runtime. In that sense, we decided to cover major areas of the WCF extensibility programming model such as channels, client-dispatcher, security, hosting, metadata, RESTful service and even the new extensions on WCF 4.0.

Another goal of this guidance was to focus it on real world scenarios. In that spirit, we decided to explain each extensibility using a format that start with an overview of its capabilities, followed by a set of real world scenarios on which developers should consider leveraging that extensibility point. Finally, we provide a detailed explanation of how to implement and configure that specific extensibility area in the WCF runtime.

Kent Brown (WCF Sr. Product Manager) has been the mastermind behind this paper and certainly the person that made this publication possible.

In a previous post we explained the programming model of Microsoft's StreamInsight adapter framework. The fundamental capability of this framework is to streamline the flow of events in and out of the StreamInsight hosting application. One of the main advantages of this model is that enables developers to create their own adapters that can be leveraged on StreamInsight-based solutions. On this post we will explore the details of implementing an RSS/Atom adapter using StreamInsight's adapter framework.

Syndication formats such as Atom and RSS have been on the front of the Web 2.0 movement. Nowadays, more and more systems are choosing these formats as a mechanisms to expose data to external applications. The initial use of RSS and Atom was mainly focused on read-only scenarios. However, the emergence of standards such as the Atom Publishing Protocol enable applications to publish syndication feeds to specific endpoint.

From the Complex Event Processing (CEP) perspective, syndication formats can be an interesting mechanism for both accessing and publishing events processed in a CEP application. If you live in the CEP world, the idea of using text-based data sources such as web pages or flat file should be pretty familiar to you. Recently we have seen some of the CEP market leaders to start embracing syndication formats as a mechanism for represent events. As an example, StreamBase (currently, one of the top CEP vendors in the market) recently announced it support for processing Twitter streams as part of CEP applications.

For the simplicity of this example we decided to implement a input-only adapter that reads an RSS feed and generates a set of events. The first step in our implementation would be to define the event types that describe the syndication items. We can achieve that by using the class definition illustrated in the following code.

   1: public class SyndicationEvent
   2:   {
   3:       private string id;
   4:       private string title;
   5:       private string summary;
   6:       private string text;
   7:       private string uri;
   8:       private DateTime createdTime;
   9:       private DateTime lastUpdatedTime;
  10:       private string author;
  11:  
  12:       public SyndicationEvent(SyndicationItem item)
  13:       {
  14:           id= item.Id;
  15:           title= item.Title.Text;
  16:           if (item.Summary.Text.Length > 256)
  17:               summary = item.Summary.Text.Substring(0, 256);
  18:           else
  19:               summary = item.Summary.Text;
  20:         
  21:           createdTime= item.PublishDate.DateTime;
  22:           lastUpdatedTime= item.LastUpdatedTime.DateTime;
  23:           uri= item.Links[0].ToString();
  24:           if (item.Authors.Count > 0)
  25:             author= item.Authors[0].Name;
  26:       }
  27:  
  28:       public SyndicationEvent()
  29:       { }
  30:  
  31:       public string ID
  32:       {
  33:           get { return id;  }
  34:           set { id = value; }
  35:       }
  36:  
  37:       public string Title
  38:       {
  39:           get { return title; }
  40:           set { title = value; }
  41:       }
  42:  
  43:       public string Summary
  44:       {
  45:           get { return summary; }
  46:           set { summary = value; }
  47:       }
  48:  
  49:       public string Text
  50:       {
  51:           get { return text; }
  52:           set { text = value; }
  53:       }
  54:  
  55:       public string Uri
  56:       {
  57:           get { return uri; }
  58:           set { uri = value; }
  59:       }
  60:  
  61:       public string Author
  62:       {
  63:           get { return author; }
  64:           set { author = value; }
  65:       }
  66:  
  67:       public DateTime CreatedTime
  68:       {
  69:           get { return createdTime; }
  70:           set { createdTime = value; }
  71:       }
  72:  
  73:       public DateTime LastUpdatedTime
  74:       {
  75:           get { return lastUpdatedTime; }
  76:           set { lastUpdatedTime = value; }
  77:       }
  78:  
  79:   }

As you might have noticed, our sample syndication event is initialized directly from a System.ServiceModel.Syndication.SyndicationItem object.

Given the time-based nature of syndication feeds, we decided to implement our sample adapter as an interval adapter by deriving from the TypedIntervalInputAdapter<T> class. Basically, an interval adapter process events valid during a specific time interval. You can find more details in my previous post.

For the sake of this example, our adapter polls the syndication URI when initialized. In order to implement this pattern, we leverage the programming model included in the System.ServiceModel.Syndication namespace as illustrated in the following code.

   1: public class RSS20InputAdapter : TypedIntervalInputAdapter<SyndicationEvent>
   2:         {
   3:             private IntervalEvent<SyndicationEvent> pendingevent;
   4:             private DateTime pendingctitime;
   5:  
   6:             SyndicationFeed dataFeed;
   7:             IEnumerator<SyndicationItem> dataFeedItems;
   8:  
   9:             int index = 0;
  10:  
  11:             public RSS20InputAdapter(SyndicationAdapterConfig configInfo, 
  12:                                      CepEventType cepEventType)
  13:             {
  14:                 Rss20FeedFormatter rssFormatter = new Rss20FeedFormatter
  15:                                                (typeof(SyndicationFeed));
  16:                 XmlReader rssReader = XmlReader.Create(configInfo.URL);
  17:                 rssFormatter.ReadFrom(rssReader);
  18:                 rssReader.Close();
  19:                 dataFeed = rssFormatter.Feed;
  20:                 dataFeedItems= dataFeed.Items.GetEnumerator();
  21:             }
  22:  
  23:            ....
  24: }

The StreamInsight runtime will invoke the Start operation of the adapter in order to start listening for events.

   1: public override void Start()
   2: {
   3:     ProduceEvents();
   4: }
   5:  
   6: private void ProduceEvents()
   7: {
   8:   IntervalEvent<SyndicationEvent> currevent=
   9:                      default(IntervalEvent<SyndicationEvent>);
  10:   DateTime currctitime = default(DateTime);
  11:   EnqueueOperationResult result = EnqueueOperationResult.Full;
  12:  
  13:   while (true)
  14:   {
  15:   if (AdapterState.Stopping == AdapterState)
  16:   {
  17:     this.Stopped();
  18:     return;
  19:   }
  20:  
  21:   currevent = CreateEventFromSource();
  22:   pendingevent = null;
  23:  
  24:   // Enqueue point event with payload.
  25:   if (null != currevent)
  26:     result = Enqueue(ref currevent);
  27:   else
  28:   {
  29:     result = EnqueueOperationResult.Full;
  30:     PrepareToStop(currevent);
  31:   }
  32:  
  33:   // Handle Enqueue rejection
  34:   if (EnqueueOperationResult.Full == result)
  35:   {
  36:     EnqueueCtiEvent(DateTime.MaxValue);
  37:     PrepareToStop(currevent);
  38:     Stopped();
  39:     return;
  40:                      
  41:   }
  42:   // Enqueue Cti event based on application logic
  43:   result = EnqueueCtiEvent(currctitime);
  44:   }
  45: }

Notice that the adapter will queue the events corresponding to the different syndication items until there are no more syndication entries available. After that, the adapter inserts a CTI event indicating that no subsequent INSERT events can have a start time earlier than the timestamp of the CTI event.

The ProduceEvents operation is also invoked as part of the Resume method.

   1: public override void Resume()
   2: {               
   3:   ProduceEvents();
   4: }

The rest of the implementation follows a similar pattern to most adapters and it fundamentally controls the state lifecycle of the adapter. The following code illustrates the complete implementation of our sample RSS adapter.

   1: public class RSS20InputAdapter : TypedIntervalInputAdapter<SyndicationEvent>
   2:         {
   3:             private IntervalEvent<SyndicationEvent> pendingevent;
   4:             private DateTime pendingctitime;
   5:  
   6:             SyndicationFeed dataFeed;
   7:             IEnumerator<SyndicationItem> dataFeedItems;
   8:  
   9:             int index = 0;
  10:             /// <summary>
  11:             /// Constructor - Use this to initialize local resources based on configInfo
  12:             /// structures. Examples are open files or database connections, set delimiters, and
  13:             /// other parameters that enable the adapter to access the input/output device.
  14:             /// </summary>
  15:             public RSS20InputAdapter(SyndicationAdapterConfig configInfo, CepEventType cepEventType)
  16:             {
  17:                 Rss20FeedFormatter rssFormatter = new Rss20FeedFormatter(typeof(SyndicationFeed));
  18:                 XmlReader rssReader = XmlReader.Create(configInfo.URL);
  19:                 rssFormatter.ReadFrom(rssReader);
  20:                 rssReader.Close();
  21:                 dataFeed = rssFormatter.Feed;
  22:                 dataFeedItems= dataFeed.Items.GetEnumerator();
  23:             }
  24:  
  25:             /// <summary>
  26:             /// Start is the first method to be called by the CEP server once the input
  27:             /// adapter has been instantiated. So any initializations that cannot be covered
  28:             /// in the constructor can be placed here. Start() is called in a separate worker
  29:             /// thread initiated by the server. In this example (and in most scenarios), Start
  30:             /// begins producing events once it is called.
  31:             /// </summary>
  32:             public override void Start()
  33:             {
  34:                 ProduceEvents();
  35:             }
  36:  
  37:             /// <summary>
  38:             /// Resume is called by the server once it returns from being scheduled away
  39:             /// from Start, and only after the adapter has called Ready(). Resume continues
  40:             /// to produce events.
  41:             /// </summary>
  42:             public override void Resume()
  43:             {
  44:                 
  45:                 ProduceEvents();
  46:             }
  47:  
  48:             
  49:  
  50:             /// <summary>
  51:             /// Dispose is inherited from the base adapter class and is the placeholder to
  52:             /// release adapter resources when this instance is shut down.
  53:             /// </summary>
  54:             /// <param name="disposing"></param>
  55:             protected override void Dispose(bool disposing)
  56:             {
  57:             }
  58:  
  59:             /// <summary>
  60:             /// Main driver to read events from the source and enqueue them.
  61:             /// </summary>
  62:             private void ProduceEvents()
  63:             {
  64:                 IntervalEvent<SyndicationEvent> currevent = default(IntervalEvent<SyndicationEvent>);
  65:                 DateTime currctitime = default(DateTime);
  66:                 EnqueueOperationResult result = EnqueueOperationResult.Full;
  67:  
  68:                 while (true)
  69:                 {
  70:                     if (AdapterState.Stopping == AdapterState)
  71:                     {
  72:                         this.Stopped();
  73:                         return;
  74:                     }
  75:  
  76:                     currevent = CreateEventFromSource();
  77:                     pendingevent = null;
  78:  
  79:                     // Enqueue point event with payload.
  80:                     if (null != currevent)
  81:                         result = Enqueue(ref currevent);
  82:                     else
  83:                     {
  84:                       result = EnqueueOperationResult.Full;
  85:                         PrepareToStop(currevent);
  86:                        // Stopped();
  87:                     }
  88:  
  89:                     // Handle Enqueue rejection
  90:                     if (EnqueueOperationResult.Full == result)
  91:                     {
  92:                         EnqueueCtiEvent(DateTime.MaxValue);
  93:                         PrepareToStop(currevent);
  94:                         Stopped();
  95:                         return;
  96:                      
  97:                     }
  98:  
  99:                     // Enqueue Cti event based on application logic
 100:                     result = EnqueueCtiEvent(currctitime);
 101:  
 102:                 }
 103:             }
 104:  
 105:             private void PrepareToStop(IntervalEvent<SyndicationEvent> currEvent)
 106:             {
 107:                 // The server will not accept any more events, and you
 108:                 // cannot do anything about it. Release the event.
 109:                 // If you miss this step, server memory will leak.
 110:                 if (null != currEvent)
 111:                 {
 112:                     ReleaseEvent(ref currEvent);
 113:                 }
 114:             }
 115:  
 116:             private void PrepareToResume(IntervalEvent<SyndicationEvent> currevent)
 117:             {
 118:                 pendingevent = currevent;
 119:             }
 120:  
 121:             private void PrepareToResume(DateTime currctitime)
 122:             {
 123:                 pendingctitime = currctitime;
 124:             }
 125:             private bool EndofSource()
 126:             {
 127:                 return false;
 128:             }
 129:             private IntervalEvent<SyndicationEvent> CreateEventFromSource()
 130:             {
 131:                 if (dataFeedItems.MoveNext())
 132:                 {
 133:                     IntervalEvent<SyndicationEvent> syndicationEvent = this.CreateInsertEvent();
 134:                     syndicationEvent.Payload = new SyndicationEvent(dataFeedItems.Current);
 135:                     syndicationEvent.StartTime = dataFeedItems.Current.PublishDate.DateTime;
 136:                     syndicationEvent.EndTime = dataFeedItems.Current.PublishDate.DateTime;
 137:                     return syndicationEvent;
 138:                 }
 139:                 else
 140:                     return null;
 141:             }
 142:         }
 143:  
 144:         // Configuration structure to initialize the adapter.
 145:         public struct SyndicationAdapterConfig
 146:         {
 147:             private string url;
 148:  
 149:             public string URL
 150:             {
 151:                 get { return url; }
 152:                 set { url = value; }
 153:             }
 154:         }
 155:  
 156:         // Factory class is the entry point for the query binder to initialize
 157:         // and create an adapter instance.
 158:         public class SyndicationAdapterFactory : IInputAdapterFactory<SyndicationAdapterConfig>
 159:         {
 160:             public SyndicationAdapterFactory()
 161:             {
 162:             }
 163:  
 164:             public InputAdapterBase Create(SyndicationAdapterConfig configInfo,
 165:                 EventShape eventshape, CepEventType cepeventtype)
 166:             {
 167:                 InputAdapterBase adapter = default(InputAdapterBase);
 168:  
 169:                 if (EventShape.Interval == eventshape)
 170:                     adapter = new RSS20InputAdapter(configInfo, cepeventtype);
 171:  
 172:                 return adapter;
 173:             }
 174:             public void Dispose()
 175:             {
 176:             }
 177:         }

Although the adapter is fully functional we still need to enable the factory mechanism that StreamInsight applications will use to create instances of the adapter. We can accomplish this by implementing a custom adapter factory that inherits from the Microsoft.ComplexEventProcessing.Adapters.IInputAdapterFactory<T> class.

   1: public class SyndicationAdapterFactory : 
   2:                          IInputAdapterFactory<SyndicationAdapterConfig>
   3:        {
   4:            public SyndicationAdapterFactory()
   5:            {
   6:            }
   7:  
   8:            public InputAdapterBase Create(SyndicationAdapterConfig configInfo,
   9:                EventShape eventshape, CepEventType cepeventtype)
  10:            {
  11:                InputAdapterBase adapter = default(InputAdapterBase);
  12:  
  13:                if (EventShape.Interval == eventshape)
  14:                    adapter = new RSS20InputAdapter(configInfo, cepeventtype);
  15:  
  16:                return adapter;
  17:            }
  18:            public void Dispose()
  19:            {
  20:            }
  21:        }

At this point, our sample adapter can be used from any StreamInsight application. For instance, the following code illustrates a sample application that uses our adapter to query the Microsoft's Bing News feed filtering for items containing a specific keyword. The events that match the filter criteria are output it to the output console using the TextFileOutputAdapter sample included in the StreamInsight SDK.

   1: private static void RssAdapterTest(string keyword)
   2: {
   3:  Server cepServer = Server.Create();
   4:  Application cepApp = cepServer.CreateApplication("SampleCEPApplication");
   5:  InputAdapter syndicationInputAdapter = 
   6:         cepApp.CreateInputAdapter<SyndicationAdapterFactory>("sampleadapter", "");
   7:  OutputAdapter fileOutputAdapter=
   8:         cepApp.CreateOutputAdapter<TextFileOutputFactory>("sampleadapter", "");
   9:  CepEventType sensorEventType = cepApp.CreateEventType<SyndicationEvent>();
  10:  
  11:  CepStream<SyndicationEvent> dataFeedStream=
  12:                CepStream.CreateInputStream<SyndicationEvent>("datafeedstream");
  13:  
  14: var query = from s1 in dataFeedStream 
  15:             where s1.Title.Contains(keyword)
  16:             select new { Title = s1.Title, Time = s1.LastUpdatedTime,
  17:                          Summary= s1.Summary};
  18:  
  19:  QueryTemplate template = cepApp.CreateQueryTemplate("samplequery", query);
  20:  QueryBinder binder = new QueryBinder(template);
  21:  var dataFeedInputConf = new SyndicationAdapterConfig
  22:  {
  23:    URL = "http://www.bing.com/news?FORM=Z9LH6&format=rss"
  24:  };
  25:  
  26:  
  27:  var outputConfig = new TextFileOutputConfig
  28:  {
  29:    OutputFileName = String.Empty,
  30:    Delimiter = '\t',
  31:    AdapterStopSignal = "StopAdapter"
  32:  };
  33:  
  34:  binder.BindInputStream<SensorReading, SyndicationAdapterConfig>("datafeedstream", 
  35:                  syndicationInputAdapter, EventShape.Interval, dataFeedInputConf);
  36:  binder.AddQueryConsumer<TextFileOutputConfig>("queryresult", 
  37:                              fileOutputAdapter, outputConfig, EventShape.Interval, 
  38:                              StreamEventOrder.FullyOrdered);
  39:  
  40:  Query cepQuery = cepApp.CreateQuery("samplequery", binder, "");
  41:  EventWaitHandle adapterStopSignal = new EventWaitHandle(false, 
  42:                                        EventResetMode.ManualReset, "StopAdapter");
  43:  cepQuery.Start();
  44:  
  45:  adapterStopSignal.WaitOne();
  46:  Console.ReadLine();
  47:  cepQuery.Stop();
  48:  
  49: }
More Posts Next page »