Combining durable messaging and workflow services: Building a custom context channel

On my previous MSDN WebCast I showed a demo of how to use a custom WCF protocol channel to combine Workflow Services and durable messaging transports channels like MSMQ. If you watch the recording of the WebCast you can see the entire demo, but there are some things that I would like to highlight about the structure of the channel.

Using durable transports with Workflow Services entails some challenges that should be considered carefully. The first thing is related with the OneWay nature of most durable transports. The context exchange mechanism used by Workflow Services requires, typically, that the context information is returned back to the client after the call that initiates the session. The mechanism for transporting that context could be either HTTTP cookies or SOAP headers depending of the context binding we are using. Obviously, this does not work well with OneWay transports because somehow we need to get a response on the client side. The workaround for that is very simple and requires isolating the operation that initiates the session in a separate contract that uses a transport channel that supports request response operations. This operation is needed only to initiate the session. The following code shows the two contracts I used as part of my WebCast demo.

    [ServiceContract(SessionMode=SessionMode.Allowed)]

          public interface ISampleWFService

          {

        [OperationContract()]

        bool Start(int param);

 

        [OperationContract(IsOneWay = true)]

        void End();

          }

 

    [ServiceContract(SessionMode = SessionMode.Allowed)]

    public interface IOneWaySampleService

    {

        [OperationContract(IsOneWay = true)]

        void AddValue(int param);

 

    }

 

After the Start operation is invoked and we've received the context on the client side we can start using a durable transport to send message to the same instance. An alternative to this solution will require implementing a custom mechanism to deliver the context information back to the client, the use of Duplex communication might be interesting on this case.

The first idea in order to get a the MSMQ transport channel working with workflow services is to create a custom binding that combines the two together, my buddy Mike Taulty explains some of the details here. The idea totally makes sense given that the context channel only needs to extract the context information from the message (using either HTTP cookies of SOAP headers) and promoted as a ContextMessageProperty to the dispatcher. The following figure illustrates that configuration.

<binding name="DefaultDurableMessagingSettings">

          <textMessageEncoding />

          <context />

          <msmqTransport exactlyOnce="false">

            <msmqTransportSecurity msmqProtectionLevel="None" msmqAuthenticationMode="None" />

          </msmqTransport>

        </binding>

 

Well, if you have tried this before you know it DOES NOT WORK AT ALL. The reason being is that the ContextChannel does not work with IInputChannels. If we take a look to the CanBuildChannelListener implementation in the ContextBindingElement class we can notice that it return false if the type passed is IInputChannel.

public override bool CanBuildChannelListener<TChannel>(BindingContext context)

where TChannel: class, IChannel

{

    if (context == null)

    {

        throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("context");

    }

    if (((typeof(TChannel) != typeof(IReplyChannel)) && (typeof(TChannel) != typeof(IReplySessionChannel))) && ((typeof(TChannel) != typeof(IDuplexSessionChannel)) || (this.ContextExchangeMechanism == ContextExchangeMechanism.HttpCookie)))

    {

        return false;

    }

    return context.CanBuildInnerChannelListener<TChannel>();

}

The ContextBindingElement class is the one that instantiates the ContextChannelListener which in turn instantiates the different context channels supported like illustrated in the following figure.

 

So that leaves us with the option of building a custom protocol channel that can support IInputChannels. Also we should extend the current functionally, provided by the workflow services context channels, and not to replace it, so that this solution can work on the existing scenarios supported by the ContextChannel. Fortunately, the task is not as tough as it appears to be at first glance. Yes, we certainly need to implement an IInputChannel and the corresponding listeners and bindings but the good news is that other than the normal things we need to implement every time we create one of those components, the rest of the work is relatively simple. In that sense, the first thing in order is to implement an IInputChannel that receives the message and serializes the context information into a ContextMessageProperty so that it can be accessed by the other elements of the workflow services infrastructure. The following code illustrates that part.

class ContextInputChannel<TChannel>

        : ContextChannelBase<TChannel>, IInputChannel

        where TChannel : class, IInputChannel

    {

        public ContextInputChannel(

            ChannelManagerBase manager, TChannel innerChannel)

            : base(manager, innerChannel)

        {

           

        }

 

        bool ProcessReceivedMessage(ref Message message)

        {

            int index= message.Headers.FindHeader(ContextMessageHeader.cContextName, ContextMessageHeader.cContextNamespace) ;

            if (index >= 0)

            {

                ContextMessageProperty contextProperty = ContextMessageHeader.ParseContextHeader(

message.Headers.GetReaderAtHeader(index));

                contextProperty.AddOrReplaceInMessage(message);

                return true;

            }

            else

                return false;

        }

    

         //rest of the implementation...

 

    }

 

The rest of the implementation of this channel are the typical things that are required for any other IInputChannel.

After that, we need to implement the ChannelListener that instantiates the channel so that it can be accessed by the other elements of the runtime. The following code shows that part.

  class ContextInputChannelListener<TChannel>

        : ChannelListenerBase<TChannel>

        where TChannel : class, IChannel

    {

 

        protected override TChannel OnAcceptChannel(TimeSpan timeout)

        {

            TChannel innerChannel = this.innerChannelListener.AcceptChannel(timeout);

            return WrapChannel(innerChannel);

        }

 

 

        TChannel WrapChannel(TChannel innerChannel)

        {

            if (innerChannel == null)

            {

                return null;

            }

 

            if (typeof(TChannel) == typeof(IInputChannel))

            {

                return (TChannel)(object)new ContextInputChannel<IInputChannel>(this, (IInputChannel)innerChannel);

            }

            return innerChannel;

        }

 

//rest of the implementation....

}

Finally, and probably the most important part, we need to create a Binding element that instantiates the ChannelListener. This binding element should extend the ContextBindingElement to add the support for IInputChannels and at the same time keep the existing functionality. The following code illustrates that component.

public class ContextInputChannelBindingElement

                 : ContextBindingElement

    {

 

        public ContextInputChannelBindingElement():base()

        {

        }

 

        public override bool CanBuildChannelListener<TChannel>(BindingContext context)

        {

            if (typeof(TChannel) == typeof(IInputChannel))

                return context.CanBuildInnerChannelListener<TChannel>();

            else

               return base.CanBuildChannelListener<TChannel>(context);

        }

 

        public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)

        {

            if (typeof(TChannel) == typeof(IInputChannel))

                return new ContextInputChannelListener<TChannel>(context);

            else

                return base.BuildChannelListener<TChannel>(context);

        }

 

protected ContextInputChannelBindingElement(ContextInputChannelBindingElement other)

            : base()

        {

            //this.interceptor = other.Interceptor;

        }

 

        public override BindingElement Clone()

        {

            base.Clone();

            return new ContextInputChannelBindingElement(this);

        }

        public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)

        {

            return base.BuildChannelFactory<TChannel>(context);

        }

 

        public override bool CanBuildChannelFactory<TChannel>(BindingContext context)

        {

            return base.CanBuildChannelFactory<TChannel>(context);

        }

 

        public override T GetProperty<T>(BindingContext context)

        {

            /*if (typeof(T) == typeof(ChannelMessageInterceptor))

            {

                return (T)(object)this.Interceptor;

            }*/

 

            return base.GetProperty<T>(context);

        } 

    }

 

As you can see, the binding element will only create an instance of our Channel Listener in the case we are using an IInputChannel, on all the other cases this class will delegate the functionality onto the ContextBindingElement base class.

Having all that, we can use a workflow service to implement our contracts as illustrated in the following figure.

The configuration for this service is detailed in the following figure. Notice that the binding configuration is now using our custom channel which means that the workflow service can now receive message using MSMQ as a transport.

<?xml version="1.0" encoding="utf-8" ?>

<configuration>

 

  <system.serviceModel>

 

    <extensions>

      <bindingElementExtensions>

        <add name="ContextInputChannel" type="Tellago.ServiceModel.Samples.ContextInputChannelBindingExtensionElement, ContextInputChannel, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"/> 

      </bindingElementExtensions>

    </extensions>

   

    <services>

    <service name="SampleWFServices.MathWFService" behaviorConfiguration="ServiceBehavior" >

      <host>

        <baseAddresses>

          <add baseAddress="http://localhost:8888/WFServices/" />

          <add baseAddress="net.msmq://localhost/private/"/>

        </baseAddresses>

      </host>

 

      <endpoint address="/MathService"

                binding="wsHttpContextBinding"

                bindingConfiguration="basicContextSettings"

                contract="SampleWFServices.ISampleWFService" />

 

 

       <endpoint address="/WFQueue"

                binding="customBinding"

                bindingConfiguration="DurableMessagingSettings"

                contract="SampleWFServices.IOneWaySampleService" />

 

 

    </service>

    </services>

 

    <bindings>

      <wsHttpContextBinding>

        <binding name="basicContextSettings" allowCookies="false">

          <security mode="None" />

        </binding>

      </wsHttpContextBinding>

 

      <customBinding>

        <binding name="DurableMessagingSettings">

          <textMessageEncoding />

           <ContextInputChannel />       

         

           <msmqTransport exactlyOnce="false">

             <msmqTransportSecurity msmqProtectionLevel="None" msmqAuthenticationMode="None" />

          </msmqTransport>

        </binding>

 

      </customBinding>

     

     

    </bindings>

   

    <behaviors>

      <serviceBehaviors>

        <behavior name="ServiceBehavior"  >

          <serviceMetadata httpGetEnabled="true" />

          <serviceDebug includeExceptionDetailInFaults="true" />

 

          <workflowRuntime name="WorkflowServiceHostRuntime" validateOnCreate="true" enablePerformanceCounters="true">

            <services>

              <add type="System.Workflow.Runtime.Hosting.SqlWorkflowPersistenceService, System.Workflow.Runtime, Version=3.0.00000.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"

                   connectionString="my connection string..."

                   LoadIntervalSeconds="1" UnLoadOnIdle="true"  />

            </services>

          </workflowRuntime>

        </behavior>

      </serviceBehaviors>

    </behaviors>

   

  </system.serviceModel>

 

</configuration>

 

These are all the components needed on the server side.

The client implementation is relatively simple given that it just needs to add the necessary headers to the corresponding messages. We can do that either by implementing an IOutputChannel or by simply adding the headers manually as illustrated on the following code.

private static void MSMQMessagingWFService()

        {

            SampleWFServiceClient startEndpoint = new SampleWFServiceClient();

            bool started = startEndpoint.Start(10);

            if (started)

            {

                context = ContextManager.ExtractContextFromChannel(startEndpoint.InnerChannel);

                //Create the content headers

                ContextMessageHeader contextHeader = new ContextMessageHeader(context);

               

                //use the MSMQ endpoint

                OneWaySampleServiceClient msmqEndpoint = new OneWaySampleServiceClient();

                using (OperationContextScope scope = new OperationContextScope(msmqEndpoint.InnerChannel))

                {

                    //add the context to the output message headers

                    OperationContext.Current.OutgoingMessageHeaders.Add(contextHeader);

                    for (int index = 0; index <= 20; index++)

                    {

 

                        try

                        {

                            Console.WriteLine("Sending message value = {0}", index);

                            msmqEndpoint.AddValue(index);

                        }

 

                        catch (Exception ex)

                        {

 

                        }

                       

                    }

                }

            }

 

The ContextMessageHeader class inherits from the MessageHeader class and serializes the context information in the format expected by the channel.

public class ContextMessageHeader: MessageHeader

          {

        private IDictionary<string, string> contextDictionary;

 

        public const string cContextName = "Context";

        public const string cContextNamespace = "http://schemas.microsoft.com/ws/2006/05/context";

 

        public static ContextMessageProperty ParseContextHeader(XmlReader reader)

        {

            ContextMessageProperty property = new ContextMessageProperty();

            try

            {

                if (reader.IsEmptyElement)

                {

                    return property;

                }

                reader.ReadStartElement(cContextName, cContextNamespace);

                while (reader.MoveToContent() == XmlNodeType.Element)

                {

                    string attribute = reader.GetAttribute("name");

                    property.Context[attribute] = reader.ReadElementString();

                }

            }

            catch (XmlException exception)

            {

            }

            return property;

        }

 

        public ContextMessageHeader(IDictionary<string, string> context)

        {

            contextDictionary = context;

        }

     

 

        public override string Name

        {

            get { return cContextName; }

        }

        public override string Namespace

        {

            get { return cContextNamespace;}

        }

 

        protected override void OnWriteHeaderContents(XmlDictionaryWriter writer, MessageVersion messageVersion)

        {

            foreach (KeyValuePair<string, string> pair in this.contextDictionary)

            {

                writer.WriteStartElement("Property", cContextNamespace);

                writer.WriteAttributeString("name", null, pair.Key);

                writer.WriteValue(pair.Value);

                writer.WriteEndElement();

            }

        }

 

}

 

With these components we have, in my opinion, a good starting point to start combining durable messaging and services on different scenarios.

If you watch the recording of the WebCast you are going to notice that, on my first demo we started with a simple client-service interaction using workflow services. After a few seconds, I shot down the service host for while I let the client sending a few messages that got stored in a MSMQ queue. After I started the service host back on, all the messages were delivered to the same service instance we were using before.

As I said in my previous post you can download all the demos of the WebCast here. Give it a try and send me some feedback. On the next few days I plan to blog some of the specific scenarios you can build combining durable messaging and workflow services or durable services.

 

 

No Comments