Parallel Programming with C++ – Part 1 – Asynchronous Procedure Calls

Who says you need to add additional threads to your application to keep it from becoming unresponsive? The golden rule for responsive client applications is to avoid blocking calls on window threads. A blocking function call on a window thread prevents the thread’s message loop from dispatching messages promptly and the result is an unresponsive set of windows since the window is not able to respond to input from the mouse, the keyboard, other applications or the operating system itself. A common solution is to make blocking calls on worker threads but threads are costly, introduce complexity into your application, and would itself not be doing much of use other than managing some state and waiting for the blocking call to return. One simple and efficient solution to this problem is called alertable I/O and makes use of asynchronous procedure calls (APCs) and that is the topic of this first part of the Parallel Programming with C++ series or articles.

Windows manages a queue of APCs for each thread and this allows user-mode as well as kernel-mode code to queue a function to be called at some point in the future. This feature allows you to build responsive applications with only a single thread, removing the need for background threads in many cases. Although it certainly doesn’t address every scenario, it does fit the bill quite nicely for many client applications. Let’s take a look at how this technique can be used in practice, but first I need to explain briefly how APCs work.

APCs come in kernel-mode and user-mode varieties. Kernel-mode APCs are queued by devices in the kernel and the kernel issues a software interrupt to give the APCs an opportunity to run in the context and address space associated with a thread. The primary reason for kernel-mode APCs is to allow code in the kernel access to the user-mode address space associated with a particular thread, in other words the virtual memory for a particular application.

User-mode APCs are queued in much the same way, but unlike the kernel-mode variety don’t execute without a thread’s permission. A thread needs to enter an alertable state at which point the thread handles all APCs in the queue in a first in first out order automatically. This becomes especially interesting when you realize that the kernel can queue a user-mode APC given the address of a function in the address space associated with a thread. Why is this interesting? Consider how I/O requests are fulfilled.

Let’s say you have a file handle that you would like to read from. This file handle might be a file on a local disk. It might be a file on a file server. It might not even be a file at all but rather the client end of a named pipe. The good news is that the I/O manager abstracts away the differences. The bad news is that regardless of what the file handle really represents it’s almost guaranteed to be a lot slower to read from than from a page in your address space. What is useful to realize about this type of latency is that it is not processor-bound. You might need to wait for a disk controller or a network roundtrip but the processor is otherwise free to perform other tasks such as dispatching window messages.

Under the hood an I/O request is shipped off to the kernel’s I/O manager which finds the appropriate device stack and submits an I/O request packet and then moves on. The device eventually finds the necessary data and notifies the I/O manager to complete the I/O request. At this point it needs a way to tell the application that originally made the I/O request that it has been completed and one way that it can do this is by using an APC. Assuming the application had associated an APC with the request, the kernel can easily queue a user-mode APC to the thread that made the request. The APC, also known as a completion routine in this case, is then called the next time the thread enters an alertable state. A thread enters an alertable state when it calls one of a handful of functions that suspend the thread.

The explanation above is a considerable simplification but sufficient for our needs. Now let’s look at an example to make this more concrete. I had considered some more interesting samples such as a market feed or chat client but decided that it would just needlessly complicate the samples. Let’s imagine there’s a server providing a stream of DWORD values over a named pipe.

The first step is to open the client end of the named pipe:

CHandle pipe;

pipe.Attach(::CreateFile(L"\\\\.\\pipe\\TestServer",
                         FILE_READ_DATA,
                         0, // no sharing
                         0, // default security
                         OPEN_EXISTING,
                         FILE_FLAG_OVERLAPPED,
                         0)); // no template

if (INVALID_HANDLE_VALUE == pipe)
{
    // The pipe is not accessible.
}

CHandle is simply a wrapper class provided by ATL that ensures that the underlying HANDLE is automatically closed when the variable goes out of scope. The CreateFile function opens the client end of the named pipe returning a file handle that can be used to access the pipe. The parameters to this function are unimportant to this discussion. Basically they just ensure that the handle can be used to read from the pipe using asynchronous I/O. CreateFile returns INVALID_HANDLE_VALUE if the pipe is not accessible. This may be due to a variety of reasons. For example the client may not have permission to connect to the pipe, the server may not be running, etc. Call the GetLastError function for the actual reason.

The next step is to begin reading the first value from the pipe into a buffer asynchronously:

void CALLBACK ReadFileCompleted(DWORD errorCode,
                                DWORD bytesCopied,
                                OVERLAPPED* overlapped);

DWORD buffer = 0;
OVERLAPPED overlapped = { 0 };

if (!::ReadFileEx(pipe,
                  &buffer,
                  sizeof(buffer),
                  &overlapped,
                  ReadFileCompleted))
{
    // The server may have closed the pipe or the connection was lost.
}

Unlike ReadFile, the ReadFileEx function can only be used to read asynchronously and it notifies the caller that the operation completed by queuing the caller-provided ReadFileCompleted function as an APC. The first parameter indicates the handle to read from. The second and third parameters indicate the address and size of the buffer that should receive any data that is read. The second-to-last parameter is the address of an OVERLAPPED structure. Typically OVERLAPPED is used to specify the file position for the operation since unlike synchronous I/O, an overlapped file handle does not keep track of this. Since we’re reading from a pipe however, we don’t need to specify a position and simply provide the address of a zero-initialized OVERLAPPED structure. It is still quite useful since its address is passed to your completion routine and this can be useful for identifying which I/O operation completed.

So far we’ve connected to a pipe using asynchronous I/O and began reading the first value from the pipe into the buffer asynchronously. We can’t however get the results from the read operation until the same thread that called ReadFileEx enters an alertable state. The simplest approach is to use the SleepEx function:

const DWORD sleepResult = ::SleepEx(INFINITE,
                                    TRUE); // Alertable

ASSERT(WAIT_IO_COMPLETION == sleepResult);

SleepEx’s first parameter indicates the minimum number of milliseconds that the thread should be suspended. INFINITE indicates that the call should not time out. The second parameter indicates that the thread should enter an alertable state prior to being suspended thus allowing it to handle any APCs in its queue before returning. If the APC queue is not empty when SleepEx is called it will not be suspended but instead handle all of the APCs immediately before returning. If the APC is empty it will suspend indefinitely until such time as an APC is queued at which point it will again be schedulable and will return once the queue is empty. Keep in mind that it is possible for APCs to be queued more rapidly that the thread can handle them in which case SleepEx may never return. SleepEx returns zero if the interval elapses. Alternatively it returns WAIT_IO_COMPLETION to indicate that one or more APCs were handled.

Of course suspending a thread isn’t a great way to build responsive applications. It is however a useful way to flush the APC queue. Simply use a value of zero for the timeout interval and it will not suspend the thread at all but simply handle any APCs that may have already been queued before returning. The performance-conscious developers out there may not like the sound of that as it can easily lead developers to “poll” the APC queue. We’ll take a look at an alternative approach that solves the problem in a far more efficient manner in part 2 of this series but first I want to wrap up this example. Here’s the complete example in a console application:

void CALLBACK ReadFileCompleted(DWORD errorCode,
                                DWORD bytesCopied,
                                OVERLAPPED* overlapped);

OVERLAPPED overlapped = { 0 };
CHandle pipe;
DWORD buffer = 0;
DWORD status = ERROR_SUCCESS;

int main()
{
    // Open the client end of a named pipe.
    pipe.Attach(::CreateFile(L"
\\\\.\\pipe\\TestServer",
                             FILE_READ_DATA,
                             0, // no sharing
                             0, // default security
                             OPEN_EXISTING,
                             FILE_FLAG_OVERLAPPED,
                             0)); // no template

    if (INVALID_HANDLE_VALUE == pipe)
    {
        // The pipe is not accessible.
        status = ::GetLastError();
    }
    else
    {
        // Read the first value from the pipe into the buffer asynchronously.
        if (!::ReadFileEx(pipe,
                          &buffer,
                          sizeof(buffer),
                          &overlapped,
                          ReadFileCompleted))
        {
            // The server may have closed the pipe or the connection was lost.
            status = ::GetLastError();
        }
        else
        {
            while (ERROR_SUCCESS == status)
            {
                // Wait until one or more APCs are queued.
                const DWORD sleepResult = ::SleepEx(INFINITE, // Suspend indefinitely
                                                    TRUE); // Alertable

                // Since the thread is suspended indefinitely it will only return
                // after one or more APCs are called.
                ASSERT(WAIT_IO_COMPLETION == sleepResult);
            }
        }
    }

    return status;
}

void CALLBACK ReadFileCompleted(const DWORD errorCode,
                                const DWORD bytesCopied,
                                OVERLAPPED* overlapped)
{
    // The read request may have failed asynchronously.
    // The server may have closed the pipe or the connection was lost.
    status = errorCode;

    if (ERROR_SUCCESS == status)
    {
        // The read request completed successfully.
        ASSERT(sizeof(buffer) == bytesCopied);

        // The current value is available in the buffer.
        std::cout << buffer << std::endl;

        // Read the next value from the pipe into the buffer asynchronously.
        if (!::ReadFileEx(pipe,
                          &buffer,
                          sizeof(buffer),
                          overlapped,
                          ReadFileCompleted))
        {
            // The server may have closed the pipe or the connection was lost.
            status = ::GetLastError();
        }
    }
}

If you would like to avoid global variables you can take advantage of the fact that the system provides the same address for the OVERLAPPED structure that you specify in a call to ReadFileEx to the completion routine. This means that you can hang some extra state off the end of it. You could for example declare a structure as follows:

struct SampleOverlapped
{
    SampleOverlapped() :
        Overlapped(OVERLAPPED()),
        Buffer(0),
        Status(ERROR_SUCCESS)
    {
        // Do nothing
    }

    OVERLAPPED Overlapped;
    CHandle Pipe;
    DWORD Buffer;
    DWORD Status;
};

If you pass the address to SampleOverlapped::Overlapped to ReadFileEx then you can simply cast the OVERLAPPED pointer in the completion routine to a SampleOverlapped pointer and thereby gain access to this extra state without having to make the memory global to both functions.

Read part 2 now: Asynchronous Procedure Calls and Window Messages

© 2007 Kenny Kerr

No Comments