WS-Polling implementation for WSE

I used some of free time last week to write a prototype of WS-Polling for WSE 3.0.
Unfortunately, I couldn't finish the complete specification but it is enough to execute web services asynchronously.
If you haven't read anything about WS-Polling before, it is basically a specification to execute web services asynchronously and poll to the server later in order to get the response.
If you take a look to this specification, you probably will able to see three main parts:

1. A mechanism to execute web services and store the response for later retrieval
2. Some headers to query information about the status of an execution
3. A mailbox implementation

For the moment, I only implemented the first part and I'm trying to finish the second one.

Implementation structure

The diagram below illustrates the classes used by this implementation.



I defined a abstract class PollingService that declares a method to get messages. This class also uses an IMessageStore provider to store and retrieve the request and response messages for the concrete service.
The concrete service is a normal service that implements different business methods and inherits from the base class PollingService. e.g. MyHelloWorldService
The IMessageStore is an interface that declares methods to store and retrieve messages from an specific store such as Database, MSMQ or a file.
PollingClient and ConcreteProxy are both proxies to invoke methods in the Concrete service, but the last one was created by the tool WSEWsdl3.exe and it doesn't know how to invoke the service asynchronously.
I had to develop a custom PollingClient since the proxy created with WSEWsdl3.exe does not offer the following features:

1. Access to the WS-Addressing headers. For this implementation, the client must change the wsa:ReplyTo header and get access to others headers
such as wsa:MessageID and wsa:To.
2. Make a one-way call. The proxy created by the tool waits for an answer from the service in most of the cases.

How this solution works

1. The client application wants to invoke a asynchronous method on the webservice so it creates a PollingClient instance and calls to the method "InvokeService".
2. The PollingClient instance sets the value "http://www.w3.org/2005/08/ws-polling/HoldResponse" for the header wsa:ReplyTo and uses the WSE infrastructure to send the request message to the server.
3. The concrete service receives the request message and checks the value for the wsa:ReplyTo header.
If this value is "http://www.w3.org/2005/08/ws-polling/HoldResponse", it executes the concrete method asynchronously, otherwise it executes the concrete method in the usual way.
4. If the concrete method was executed asynchronously, the service stores the request and response messages by means of the configured IStoreProvider. Otherwise, it returns the response message to the client.
5. The client application can ask later for the response message using the GetMessages method on the PollingClient instance.
(The GetMessages method will receive the wsa:MessageID for the original request message as parameter)

Some code

1. Creating the Concrete service

[WebService(Namespace = "http://tempuri.org/")]
[WebServiceBinding(ConformsTo = WsiProfiles.BasicProfile1_1)]
[Policy("ServicePolicy")]
public class Service : WSEPolling.PollingService
{

  public Service()
  {

  }


  [SoapMethod("HelloWorld")]
  public string HelloWorld(string name)
  {
    return "Hello World " + name;
  }

}

This class looks like a normal web service but it inherits from the base class PollingService.

2. Creating the client application

class Program
{
    static void Main(string[] args)
    {
      PollingClient client = new PollingClient(new Uri("http://localhost/WSEPollingService/Service.ashx"));

      client.SetPolicy("ClientPolicy");

      string s = "Test";

      Uri id = client.InvokeService("HelloWorld", s, "http://tempuri.org/");

      System.Threading.Thread.Sleep(1000);

      WSEPolling.GetMessageRequest request = new WSEPolling.GetMessageRequest();
      request.MessageID = id.ToString();

      //Gets the entire soap envelope
      SoapEnvelope response = client.GetMessage(request);

      //Gets only the body of the soap envelope
      //string message = (string)client.GetMessageBody(request, typeof(string), "http://tempuri.org/");

      string message = response.OuterXml;

       Console.WriteLine(message);
    }
}

In the code below, I created a PollingClient instance to invoke the "HelloWorld" service. The InvokeService method returns the wsa:MessageId value for the request message, which I keep in the variable "id".
At the end, I retrieve the response message from the server passing the value of the variable "id" to the GetMessage method.

3. The PollingService class

public class PollingService : SoapService
{
  static IMessageStore _store;

  static PollingService()
  {
    _store = new DatabaseMessageStore();
    _store.Init(null);
  }

  [SoapMethod(WSPolling.Actions.GetMessage)]
  public virtual SoapEnvelope GetMessage(GetMessageRequest request)
  {
    SoapEnvelope envelope = _store.GetResponse(request);

    RelatesTo relatesTo = new RelatesTo(RequestSoapContext.Current.Addressing.MessageID.Value);
    envelope.Context.Addressing.RelatesTo = relatesTo;

    return envelope;
  }

  protected override SoapMethodInvoker RouteRequest(SoapEnvelope request)
  {
    return new AsyncSoapMethodInvoker(this, base.RouteRequest(request));
  }

  protected virtual void StoreResponse(SoapEnvelope response)
  {
    _store.StoreResponse (response);
  }

  protected virtual void StoreRequest(SoapEnvelope request)
  {
     _store.StoreRequest(request);
  }

  class AsyncSoapMethodInvoker : SoapMethodInvoker
  {
    private SoapMethodInvoker _invoker;
    private PollingService _service;

    public AsyncSoapMethodInvoker(PollingService service, SoapMethodInvoker invoker)
    {
      this._invoker = invoker;
      this._service = service;
    }

    public override SoapEnvelope Invoke(SoapEnvelope message)
    {
      if (message.Context.Addressing.Action != WSPolling.Actions.GetMessage &&
        message.Context.Addressing.ReplyTo != null &&
        message.Context.Addressing.ReplyTo.Address.Value.ToString() == WSPolling.HoldResponseURI)
      {
        this._service.StoreRequest(message);
        WaitCallback callBack = new WaitCallback(this.Invoke);
        ThreadPool.QueueUserWorkItem(callBack, message);
        return new SoapEnvelope();
      }
      else
      {
        SoapEnvelope response = _invoker.Invoke(message);
        return response;
      }
    }

    public override bool OneWay
    {
      get { return _invoker.OneWay; }
    }

    private void Invoke(object state)
    {
      SoapEnvelope request = (SoapEnvelope)state;
      SoapEnvelope response = _invoker.Invoke(request);

      _service.StoreResponse(response);
    }
  }
}

The RouteRequest is the key method in this implementation.
This method returns a SoapMethodInvoker instance, which knows how to call a specific method in the concrete service. (WSE provides a default implementation of this class)
In this case, I developed my own class AsyncSoapMethodInvoker, which calls to the SoapMethod synchronously or asynchronously depending on the value of wsa:ReplyTo header.
As you can see in the code, I used the Thread pool provided by .NET to execute the web method asynchronously.

4. The Database message store implementation

class DatabaseMessageStore : IMessageStore
{
  private string _connectionString = null;

  public void Init(XmlElement configuration)
  {
    ConnectionStringSettings settings = ConfigurationManager.ConnectionStrings["Messages"];

    if(settings == null)
      throw new ConfigurationErrorsException("The connection string 'Messages' is not configured");

    if(settings.ConnectionString == null)
      throw new ConfigurationErrorsException("Invalid value for the connection string 'Messages'");

    this._connectionString = settings.ConnectionString;
  }

  public void StoreRequest(SoapEnvelope request)
  {
    using (SqlConnection connection = new SqlConnection(this._connectionString))
    {
      connection.Open();
      using (SqlCommand command = new SqlCommand("InsertMessage", connection))
      {
        command.CommandType = System.Data.CommandType.StoredProcedure;
        command.Parameters.Add(new SqlParameter("@MessageID", request.Context.Addressing.MessageID.Value.ToString()));
        command.Parameters.Add(new SqlParameter("@To", request.Context.Addressing.To.Value.ToString()));

        command.ExecuteNonQuery();
      }
      connection.Close();
    }
  }

  public void StoreResponse(SoapEnvelope response)
  {
    response.Context.Addressing.GetXml(response);
    using (SqlConnection connection = new SqlConnection(this._connectionString))
    {
      connection.Open();
      using (SqlCommand command = new SqlCommand("UpdateMessage", connection))
      {
        command.CommandType = System.Data.CommandType.StoredProcedure;
        command.Parameters.Add(new SqlParameter("@MessageID", response.Context.Addressing.RelatesTo.Value.ToString()));
        command.Parameters.Add(new SqlParameter("@Message", response.OuterXml));
        command.ExecuteNonQuery();
      }
      connection.Close();
    }
  }

  public SoapEnvelope GetResponse(GetMessageRequest request)
  {
    SoapEnvelope response = new SoapEnvelope();
    using (SqlConnection connection = new SqlConnection(this._connectionString))
    {
      connection.Open();
      using (SqlCommand command = new SqlCommand("GetMessage", connection))
      {
        command.CommandType = System.Data.CommandType.StoredProcedure;
        command.Parameters.Add(new SqlParameter("@MessageID", request.MessageID.ToString()));

        using (SqlDataReader reader = command.ExecuteReader(System.Data.CommandBehavior.CloseConnection))
        {
          if (reader.Read())
          {
            if (reader["Message"] == DBNull.Value )
            {
              NoMessageAvailable noMessage = new NoMessageAvailable(Reason.ResponseNotReady);
              response.Context.Addressing.Action = WSPolling.Actions.NoMessageAvailable;
              response.SetBodyObject(noMessage);
            }
            else
            {
              string message = (string)reader["Message"];

              response.Load(new StringReader(message));
              response.Context.Addressing.RemoveXml(response);
            }
          }
          else
          {
            NoMessageAvailable noMessage = new NoMessageAvailable(Reason.NoMessageFound);
            response.Context.Addressing.Action = WSPolling.Actions.NoMessageAvailable;
            response.SetBodyObject(noMessage);
          }

          reader.Close();

        }
      }

      connection.Close();
    }

    return response;
  }
}

This class is quite simple. It implements the interface IMessageStore and contains code to store and query messages from a SQL database.

Well, this is all I have for the moment. You can download the code from the following location.
This is a prototype and it should not be used in production environments.

No Comments