Generic, Cancellable, Asynchronous operations? Yeah, I'll blog about that.

Just code today my friends.  The design is simple, create a framework that allows running an asynchronous delegate on the thread pool in a manner where it can be cancelled and any exceptions can be handled.  We use the same BeginInvoke/EndInvoke model as the rest of the asynchronous programming models under the framework.  Our generic classes allow for the creation of a new generic asynchronous result object for each piece of work we want to do.  To fully define a generic async operation we need to specify some from of input, the type of the output, and we can even strongly type the async state object.

ar = GenericCancellableOperation<RegexInput, int, Match>.BeginInvoke(
    new RegexInput(regex, "555"),
    new GenericWorkDelegate<Match, RegexInput>(RegexWorkDelegate),
    new AsyncCallback(MatchComplete),
    counter++
);

The above is a sample usage that does the same thing as my previously defined asynchronous regular expression runner.  We can no longer specify entire strings of input parameters, so we have to wrap them all in a single object.  Here I've used RegexInput to define the Regex and input to match against.  We still use the old AsyncCallback model, even though we could strongly type that as well.  In fact, BeginInvoke and EndInvoke still use IAsyncResult instead of more strongly typed equivalents.  The GenericWorkDelegate will always match the input and return types for the operation generic, basically allowing us to delegate whatever work needs to occur back to your code.  It gets wrapped with all of the protective stuff automatically.  You still can't run arbitrary code, because any old fool could wrap a try...catch in their delegate and handle the ThreadAbortException that would normally terminate the operation.

using System;
using System.Threading;

public delegate ReturnType GenericWorkDelegate<ReturnType, InputType>(InputType input);

public class GenericCancellableOperation<InputType, AsyncStateType, ReturnType> {
    public class GenericAsynchronousResult : IAsyncResult {
        // Generic State Parameters
        internal AsyncStateType asyncState = AsyncStateType.default;
        internal InputType input = InputType.default;
        internal ReturnType output = ReturnType.default;
        internal GenericWorkDelegate<ReturnType, InputType> workCallback = GenericWorkDelegate<ReturnType, InputType>.default;
       
        // Non generic state
        internal bool complete = false;
        internal bool completedSynchronously = false;
        internal bool cancelled = false;
        internal ManualResetEvent waitHandle = null;
        internal AsyncCallback callback = null;
        internal Thread currentThread = null;
        internal Exception innerException = null;
        internal Object lockObject = new Object();
       
        internal GenericAsynchronousResult(InputType input, GenericWorkDelegate<ReturnType, InputType> workCallback, AsyncCallback callback, AsyncStateType state) {
            this.asyncState = state;
            this.input = input;
            this.callback = callback;
            this.workCallback = workCallback;
            this.waitHandle = new ManualResetEvent(false);
        }

        public object AsyncState { get { return this.asyncState; } }
        public bool CompletedSynchronously { get { return this.completedSynchronously; } }
        public bool IsCompleted { get { return this.complete; } }
        public WaitHandle AsyncWaitHandle { get { return this.waitHandle; } }
       
        public AsyncStateType TrueAsyncState { get { return this.asyncState; } }
        public InputType Input { get { return this.input; } }
        public ReturnType Output { get { return this.output; } }
       
        public bool Cancel() {
            lock(lockObject) {
                if ( this.cancelled || this.complete ) {
                    return false;
                }
               
                this.cancelled = true;
                if ( this.currentThread != null ) {
                    this.currentThread.Abort();
                }
            }

            this.waitHandle.Set();
            return true;
        }
       
        internal void Complete() {
            lock(lockObject) {
                if ( this.complete || this.cancelled ) {
                    return;
                }
               
                this.complete = true;
            }
           
           
            this.waitHandle.Set();
            if ( callback != null ) {
                callback(this);
            }
        }
    }

    public static GenericAsynchronousResult BeginInvoke(
        InputType input,
        GenericWorkDelegate<ReturnType, InputType> workCallback,
        AsyncCallback callback,
        AsyncStateType state)
    {
        GenericAsynchronousResult result =  new GenericAsynchronousResult(input, workCallback, callback, state);
        if ( !result.complete ) {
            ThreadPool.QueueUserWorkItem(new WaitCallback(ThreadPool_WaitCallback), result);
        }
       
        return result;
    }
   
    private static void ThreadPool_WaitCallback(object state) {
        GenericAsynchronousResult result = state as GenericAsynchronousResult;
       
        if ( result != null && !result.cancelled ) {
            result.currentThread = Thread.CurrentThread;

            try {
                result.output = result.workCallback(result.input);
            } catch (ThreadAbortException) {
                Thread.ResetAbort();
            } catch (Exception exc) {
                result.innerException = exc;
            } finally {
                result.currentThread = null;
                result.Complete();
            }
        }
    }
   
    public static ReturnType EndInvoke(IAsyncResult ar) {
        GenericAsynchronousResult result = ar as GenericAsynchronousResult;
       
        if ( result != null ) {
            result.AsyncWaitHandle.WaitOne();
            if ( result.innerException != null ) {
                throw result.innerException;
            }
            return result.output;
        } else {
            throw new Exception("EndInvoke was called with the wrong async result");
        }
    }
}

I still don't find this as useful as the regex only version.  I think adapting the code to the particular asynchronous operation you are working on is most sufficient.  In the case of regular expressions, you could even embed the ability to abort the problem within the lexical scanner, thus removing the necessity of thread aborts and exceptions.  In fact for something like an interpreter aborting as an embedded instruction is the best way to go about things.  For now, we don't have the ability to modify the construction of regular expressions under .NET and we can't embed such abort instructions, so we'll have to live with little hacks.

 

Published Saturday, May 22, 2004 5:57 PM by Justin Rogers

Comments

Wednesday, June 09, 2004 9:16 PM by Brian Grunkemeyer

# re: Generic, Cancellable, Asynchronous operations? Yeah, I'll blog about that.

This is unfortunately a very flawed attempt at a cancellation infrastructure. The design flaw here is you're relying on aborting threads. Short of calling methods with reliability contracts and/or constrained execution regions, you can't abort threads safely. You have no idea how the Regex code is implemented internally, but it might have an appdomain-wide cache of things that it edits while doing a lookup. If that's the case, there's no guarantee that collection update will succeed if you abort it. You can be corrupting appdomain and possibly process-wide state with this code, so you should follow this up by promptly unloading the current appdomain (and possibly the process, if you can't guarantee the process state won't be corrupted). We may change Thread.Abort to be more "polite" eventually in Whidbey, but you still run the risk of corrupting appdomain & process state, as we may not "harden" all the parts of the Framework you need.

What can you use? There are three techniques you should look at:

1) Cancellation regions & cancellation signals
2) ICancellableAsyncResult
3) Some cooperative polling technique in your code to support cancellation

If you're running on Longhorn, Whidbey's new CancellationRegion & CancellationSignal classes will allow you to cancel any disk-bound IO request (assuming the driver supports cancellation). For async IO requests, ICancellableAsyncResult provides a cancel method that uses some other Longhorn feature to cancel that particular async operation.

Short of that, the best technique you can use is cancellation. One of the changes we want to make for Beta 2 is to is to add a property to CancellationRegion indicating whether a cancellation was requested. You can change your code that is CPU-intensive to poll this property and throw an OperationCanceledException. Unfortunately, it has to be done in this cooperative manner if you want to avoid state corruption that would require you to pessimistically unload the appdomain.

For more info, Chris Brumme's discussions on Reliability may be interesting (I don't know if they go into this much detail though in publically released material yet). Additionally, this same notion of avoiding corruption went into the design of Mark Gabarra's PoVBot (an AC2 game bot), with the introduction of a cooperative sleep (in addition to a normal sleep) in the bot's scripting language. The feature would allow you to interrupt the script at this point with an async handler of some sort (like a timer that fires every minute), do some operation, then recover to some known state. The script and/or the event would have to be written to deal with any state changes to the UI, etc.

(Yes, Mark & I were both surprised that a game bot would have any commonality with our reliability work. Fortunately, once he was working on the bot, we already had the hard conceptual part figured out.)

If you want more information about some of this stuff, look on the BCL web site (http://www.gotdotnet.com/team/clr/bcl/TechArticles/techarticles.aspx) and annoy us if you can't find what you want.
Wednesday, June 09, 2004 10:21 PM by Justin Rogers

# re: Generic, Cancellable, Asynchronous operations? Yeah, I'll blog about that.

To provide public commenting:

Contract structures are something I fully agree with. While you could use the above generic cancellation feature for any code, and it would work to some degree, there are code-paths that the remote thread might be in that would cause some form of failure. That is the reason I used my knowledge of the Regex codebase through Rotor to use it in that scenario and primarily as a reliability story for a high capacity web server that is trying to prevent a DOS attack by a user input expression. There are a number of ways to get to this point since even a basic expression can be non-terminating over the properly written input.

All three of your solutions are a hand-shake pattern. I'm not sure that I'll ever be able to fit all of the code I need to cancel into the design paradigm of a hand-shake routine. Some I could, yes, but others are simply going to be impossible.

The reliability store here is that of BlackBox reliability. I can trace every code-path that I'm running and ensure a cancellable IL structure, however, what I can't see behind is the BlackBox of the CLR. Sure there may be some non-cancellable features back there that would cause my code to exit not so gracefully, then mark them and tell me where they are so I can include those in my examination and determine if my reliability story is going to be good enough.

What I would propose for Thread.Abort, if you are making changes, is some awareness in the code of where it can gracefully stop and where it can't. Currently Thread.Abort instantly throws an exception (or actually I think in some cases it does wait a bit), while in the future it could have a timeout of some sort and a return value telling me whether or not the thread truly did abort. If it didn't, an enumeration telling me why would be nice, and I can try the abort a second or third time, again in the interest of a more reliable system.

What I don't want is to riddle my AI code with cancellation checks when I'm already granting it a small subset 5% of the CPU, and said 5% is being shared between 1000 automatons.
Friday, June 11, 2004 6:13 AM by TrackBack

# re: Productivity -- but to what extreme?

Leave a Comment

(required) 
(required) 
(optional)
(required)