Windows Workflow Runtime Service: The Transaction Service

By Jesus Rodriguez

Introduction

This is the second in a series of brief articles intended to illustrate the functionality and customizability of the Windows Workflow Foundation(WF) Runtime Services. The previous article provided a general description of the WF runtime concepts focusing on  WF’s persistence service. This article provides a more detailed description of the WF Transaction Service and ways to customize it. Upcoming articles will cover the other WF services such as tracking, communication, and timer. This article assumes that you are familiar with WF fundamentals. For an introduction to the WF basics, you can read this interesting article from David Chappell.

WF Transactions

One of my favorite features of .NET framework 2.0 is the new System.Transactions namespace which allows developers to easily write transactional code. Using System.Transactions for handling transactions reduces overhead considerably while significantly minimizing the amount of custom code necessary to implement transactional behavior.  System.Transaction also separates transaction handling from the application hosting environment and from instance management.  WF uses this namespace extensively in order to add either atomic or long-running transactional capabilities to workflows.

 

Atomic Transactions

The current version of WF (still in Beta) supports either atomic or long-running transactions. By implementing atomic transactions, activities inherit the ACID properties -Atomic, Consistent, Isolated, and Durable. WF atomic transactions behave fairly similarly to DTC transactions but they are not DTC transactions by default. However if we can explicitly use DTC transactions if the being used in the transaction are COM+ objects derived from System.EnterpriseServices.ServicedComponents and that isolation levels agree between transaction components.

 

 

Long Running Transactions

 

Long-running transactions, on the other hand, are not truly transactions at all, but are instead simply long-running processes which must implement some level of instance management in order to allow the process to run over an extended lifetime. Long-running transactions possess the ACID properties of consistency and durability but not atomicity and isolation. Long-running transaction management typically involves handling state persistence and communications capabilities. Long-running transactions are often associated with compensation blocks which contain the logic to be performed before the transaction commits. In WF, Long-running transactions can contain atomic transactions or other long-running transactions nested to arbitrary depths.

WF includes a special activity known as the Transaction Context activity intended to group activities together for transactional execution, compensation, etc. The activities inside a Transaction Context as well as the Transaction Context itself are synchronized to guarantee that the data accessed by one activity remains locked for writing and inaccessible by other activities.

In WF the runtime transaction service is in charge to handle the transaction capabilities required by the workflows.  

WF Transaction Service.

 

The WF runtime utilizes the DefaultWorkflowTransactionService by default if no other transaction service is specified. WF also utilizes the SharedConnectionWorkflowTransactionService which is intended to handle transactions directly associated with objects that share a connection string.  Both transaction services are built on top of the System.Transactions functionalities. You can add the Transaction Service to the Workflow runtime using inline code or using the application configuration file.

The following code shows both ways of adding the transaction service: SharedConnectionWorkflowTransactionService to the WorkflowRuntime.

workflowRuntime.AddService(new SqlStatePersistenceService("Data Source=tc2003s\\xserver;Initial Catalog=WFState;Integrated Security=True"));

workflowRuntime.AddService(new SharedConnectionWorkflowTransactionService("Data Source=tc2003s\\xserver;Initial Catalog=testDB;Integrated Security=True"));

 

You can also add the SharedConnectionWorkflowTransactionService using the application configuration file.

<configuration>

    <configSections>

        <section name="WorkflowServiceContainer" type="System.Workflow.Runtime.Configuration.WorkflowRuntimeSection, System.Workflow.Runtime, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35" />

    </configSections>

    <WorkflowServiceContainer Name="Container Name" UnloadOnIdle="true">

        <CommonParameters>

            <add name="ConnectionString" value="Initial Catalog=WorkFlowStore;Data Source=localhost;Integrated Security=SSPI;" />

        </CommonParameters>

        <Services>

            <add type="System.Workflow.Runtime.Hosting.SharedConnectionWorkflowTransactionService, System.Workflow.Runtime, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35" />

            <add type="System.Workflow.Runtime.Hosting.SqlStatePersistenceService, System.Workflow.Runtime, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>

        </Services>

    </WorkflowServiceContainer>

    <system.diagnostics>

    </system.diagnostics>

</configuration>

 

Both DefaultWorkflowTransactionService and SharedConnectionWorkflowTransactionService inherit from WorkflowTransactionService which is the base transaction service class.

 

The following code shows the WorkflowTransactionService class definition.

public abstract class WorkflowTransactionService : WorkflowRuntimeService

{

      public WorkflowTransactionService();

      public virtual void Complete(Transaction transaction);

      public virtual Transaction CreatePromotableTransaction(TransactionOptions options);

      public virtual Transaction CreateTransaction();

}

 

 

Interacting with the transaction service

 

In the first article in this series I described how WF provides an extensible and open model that allows workflow developers to easily interact with the runtime services directly.  Another example of this extensibility is the several ways available to interact with the WF runtime service in order to get a reference to the transaction service. One way is by calling the GetService method of the WorkflowRuntime while another way is by using the ActivityExecutionContext classes. Activity creators can gain access to the ActivityExecutionContext via the Execute method of the Activity class.

 

The following code shows how to obtain a reference to the SharedConnectionWorkflowTransactionService inside a code Activity.

public void SampleCodeHandler(object sender, EventArgs e)

{

ActivityExecutionContext context = new ActivityExecutionContext((Activity)sender);

SharedConnectionWorkflowTransactionService tran_service = (SharedConnectionWorkflowTransactionService)context.GetService(typeof(SharedConnectionWorkflowTransactionService));

Transaction tran = tran_service.CreatePromotableTransaction(new TransactionOptions());

Console.WriteLine(tran_service.GetConnectionInfo(tran).DBConnection.ConnectionString);

}

 

The code above uses the CreatePromotableTransaction method of the SharedConnectionWorkflowTransactionService class which creates a transaction that is handled by the Lightweight Transaction Manager (LTM). LTM manages transactions in a single application domain and promotes them to distributed transactions (transactions handled by the OleTx Transaction Manager).

 

NOTE Transaction manager promotion is an innovative technique supported by System.Transactions. The idea behind promotion is simple: Developers should only decide on the desired programming model (either explicit or declarative transaction management), and System.Transactions will correctly assign the appropriate transaction manager. You can dive into more details around transaction promotion in this article from Juval Lowy.

Transactions and State Batching

There are a number of scenarios in workflow solutions in which it makes sense to group a set of actions using a single transaction, thus keeping the workflow state consistent across all of the actions. For example, one typical scenario is when developing a set of actions sequentially keeping a consistent state and committing all of them in a single transaction.

WF provides the System.Workflow.Runtime.IWorkBatch and System.Workflow.Runtime.IPendingWork namespaces to help address this scenario.

When the WF runtime engine calls services, it provides a System.Workflow.Runtime.IWorkBatch as part of its thread call context. Your service can add a pending work item to this work batch, so that the runtime engine can commit all related work items in a single transaction.

Sequence of Actions when a Component Is Invoked

 

1.      Before first invocation, the workflow creates a work batch.

2.      The workflow attaches the work batch to the invocation of a method on a    component.

3.      The component creates a work item and appends it to the work batch.

4.      Steps 2 and 3 are repeated for other component invocations.

 

Sequence of Actions at Commit Point

 

1.      The workflow creates a transaction.

2.      The workflow iterates over the work batch and collects all work items for a component, maintaining order, to create a work batch. The workflow invokes the Commit method on the component, passing in the transaction and the work batch.

3.      The component adds the work in the work batch to the transaction.

4.      Steps 2 and 3 are repeated for all components with work items in work batches.

5.      On success of the Commit notifications, the workflow commits the corresponding transactions.

6.      On successful commit of a transaction, the workflow iterates over the work batch and collects all work items per component as in step 2. The workflow invokes the Complete method for each component, passing it  the corresponding transaction and work batch.

 

Sample

Let’s explore an example that shows how to create a custom transaction service that uses IPendingWork to control the items present in the transaction. The example groups a set of invocations to a stored procedure in a work batch and commits them as part of a single transaction.

The database in our example represents a repository of mathematic operations.  The following figure shows the DB diagram.

 

The following stored procedure inserts either an addition or subtraction operation record in the MathOp table

create procedure InsertOperation @op1 int, @op2 int, @operation nvarchar(50)

as

declare @operationid int

 

select @operationid= id

  from operations

  where operation= @operation

 

declare @operationresult int

if(@operationid = 1)

  set @operationresult= @op1 + @op2

else

 if(@operationid= 2)

    set @operationresult= @op1 - @op2

 

insert into MathOp

  select @op1, @op2, @operationid, @operationresult

GO

 

The following code shows a sample transaction service that uses IPendingWork to batch the request to the InsertOperation stored procedure and group them as part of a single transaction.

namespace WF.Samples.SampleTransactionService

{

            public enum MathOperation { ADD, SUB}

 

            public class SampleTransactionService: IPendingWork

            {

                        #region Pending Work

                       

                        void IPendingWork.Complete(bool succeeded, ICollection items)

                        {

                        }

 

                        bool IPendingWork.MustCommit(ICollection items)

                        {

                                    return true;

                        }

 

                        void IPendingWork.Commit(System.Transactions.Transaction transaction, ICollection items)

                        {

                                    connection.EnlistTransaction(transaction);

                                    foreach (MathOpRequest request in items)

                                    {

                                                InsertRequest(transaction, request);

                                    }

                        }

 

        #endregion

 

                        private SqlConnection connection;

        //private WorkflowRuntime _runTime = null;

 

        public SampleTransactionService(NameValueCollection parameters)

        {

 

            if (parameters == null)

            {

                throw new ArgumentNullException("parameters", "The name value collection parameters cannot be null");

 

            }

            string connectionstring = parameters["ConnectionString"];

            if (connectionstring != null)

            {

                                                InitConnection(connectionstring);

            }

            else

            {

                throw new ArgumentNullException("ConnectionString","Connection string not found in the name value collection");

            }

        }

 

                        public SampleTransactionService(string ConnectionString)

        {

            if (ConnectionString == null)

            {

                throw new ArgumentNullException("ConnectionString", "ConnectionString cannot be null");

            }

 

                                    InitConnection(ConnectionString);

        }

 

        private void InitConnection(string ConnectionString)

        {

            try

            {

                                                connection = new SqlConnection(ConnectionString);

                connection.Open();

            }

            catch (Exception e)

            {

                throw e;

            }

        }

 

                        private void InsertRequest(Transaction transaction, MathOpRequest request)

                        {

                                    try

                                    {

                                                SqlCommand command = new SqlCommand();

                                                command.CommandText= "InsertOperation";

                                                command.Connection = connection;

                                                command.CommandType = CommandType.StoredProcedure;

                                                SqlParameter op1_param = command.Parameters.Add("@op1", SqlDbType.Int);

                                                op1_param.Value = request.op1;

                                                SqlParameter op2_param= command.Parameters.Add("@op2", SqlDbType.Int);

                                                op2_param.Value= request.op2;

                                                SqlParameter operation_param= command.Parameters.Add("@operation", SqlDbType.NVarChar, 50);

                                                operation_param.Value= request.operation.ToString();

                                                command.ExecuteNonQuery();

                                    }

                                    catch (Exception e)

                                    { }

                        }

 

                        public void Add(int op1, int op2)

                        {

                                    MathOpRequest request= new MathOpRequest(op1, op2, MathOperation.ADD);

                                    BatchEnvironment.CurrentBatch.Add(this, request);

                        }

 

                        public void Sub(int op1, int op2)

                        {

                                    MathOpRequest request = new MathOpRequest(op1, op2, MathOperation.SUB);

                                    BatchEnvironment.CurrentBatch.Add(this, request);

                        }

            }

 

            [Serializable]

            internal class MathOpRequest

            {

                        public int op1;

                        public int op2;

                        public MathOperation operation;

 

                        public MathOpRequest(int p_op1, int p_op2, MathOperation p_operation)

                        {

                                    op1 = p_op1;

                                    op2 = p_op2;

                                    operation = p_operation;

                        }

            }

}

 

The Add and Sub operations add the requests to the Work Batch and then all of the operations are committed when the Commit method is called.

Our sample sequential workflow consists of two code activities that invoke the Add and Sub operations of the SampleTransactionService runtime service.

            public sealed partial class Workflow1 : SequentialWorkflow

            {

                        public Workflow1()

                        {

                                    InitializeComponent();

                        }

 

                        public void FirstCodeHandler(object sender, EventArgs e)

                        {

                                    ActivityExecutionContext context = new ActivityExecutionContext((Activity)sender);

                                    object obj_service= context.GetService(typeof(SampleTransactionService));

                                    SampleTransactionService tran_service = (SampleTransactionService)obj_service;

                                    tran_service.Add(10, 5);

                        }

 

                        public void SecondCodeHandler(object sender, EventArgs e)

                        {

                                    ActivityExecutionContext context = new ActivityExecutionContext((Activity)sender);

                                    object obj_service= context.GetService(typeof(SampleTransactionService));

                                    SampleTransactionService tran_service = (SampleTransactionService)obj_service;

                                    tran_service.Sub(10, 5);

                        }

            }

 

Both activities use the techniques explained in the first section of obtaining a reference to the transaction service and then invoking the Add or Sub operation respectively.

Finally the host application must add the SampleTransactionService to the WorkflowRuntime. The following code shows how to do that.

class Program

    {

        static AutoResetEvent waitHandle = new AutoResetEvent(false);

 

        static void Main(string[] args)

        {

            WorkflowRuntime workflowRuntime = new WorkflowRuntime();

                                    workflowRuntime.AddService(new SqlStatePersistenceService("Data Source=tc2003s\\xserver;Initial Catalog=WFState;Integrated Security=True"));

                                    workflowRuntime.AddService(new SampleTransactionService("Data Source=tc2003s\\xserver;Initial Catalog=testDB;Integrated Security=True"));

            workflowRuntime.StartRuntime();

 

            workflowRuntime.WorkflowCompleted += OnWorkflowCompleted;

 

            Type type = typeof(TransactionServiceSample.Workflow1);

            workflowRuntime.StartWorkflow(type);

 

            waitHandle.WaitOne();

 

            workflowRuntime.StopRuntime();

        }

 

        static void OnWorkflowCompleted(object sender, WorkflowCompletedEventArgs e)

        {

            waitHandle.Set();

        }

    }

 

What does it all Mean?

Windows Workflow Foundation provides support for atomic and long-running transactions that are handled by the Transaction Runtime Services. Using the Workflow Runtime Services workflow developers can utilize the existing Transaction Services to modify the default transactional behavior. Furthermore, combining IWorkBatch and IPendingWork workflow developers can create their own custom Transaction Services that addresses specific scenarios.  

No Comments