Parallel Programming with C++ – Part 4 – I/O Completion Ports

So far in the Parallel Programming with C++ series I've talked about asynchronous procedure calls (APCs) and how they can be used to build efficient and responsive client applications quite easily by waiting for I/O requests to complete asynchronously without having to create additional worker threads.

Although APCs are the most efficient way to perform asynchronous I/O they have one drawback: an APC will only ever complete on the same thread that initiated the operation. This usually isn’t a problem for client applications but is woefully inadequate for server applications. Client applications may also find this unacceptable if they need to process a lot of I/O requests. The problem is that if only a single thread is being used then only a single processor is in use and any additional processors are likely sitting idle.

The challenge is coming up with a solution that scales to the available processing power. If only a single processor is available then there is little point in creating additional threads for the sake of scalability. On the other hand if more processors are available then additional threads are needed to take advantage of them and your application needs to really make this decision at runtime based on the number of processors that are available. Gone are the days where 99% of computers only had a single processor. A large percentage of computers sold today have two processors (e.g. dual core) and as of this writing four processor (e.g. quad core) computers are already quite affordable.

A common solution used by server applications is to create one thread per client connection. The reason why this technique is so popular is because it is very easy to implement using traditional synchronous programming techniques. It is also unfortunately widely promoted. This technique however does not scale to more than a handful of client connections (depending on your application’s characteristics). As more threads are introduced, the operating system ends up spending more and more time scheduling different threads on the available processors and the overhead of this context switching quickly consumes much of the computer’s time.

A common remedy that is often cited is to use a thread pool but thread pools only solve a small part of the problem. Thread pools amortize the cost of thread creation and destruction over the life of the application but they don’t in themselves solve the scheduling problem. Most thread pools will also happily create many more threads than there are available processors to service them. Although this is completely acceptable for applications that create threads to wait on various events and spend most of their time sleeping or blocked (common for client applications), it is usually unsuitable for server threads that have a lot of work to do in fulfilling client requests.

A good rule of thumb is to use one thread per processor no matter how many client connections your server application may have to handle and avoid making blocking calls on those threads. Ideally each thread is busy handling a stream of I/O requests and is never suspended to allow another thread to execute. This avoids the context switching overhead and all the available processors are being used efficiently. This is however very hard to implement but fortunately for the Windows kernel provides an excellent implementation in the form of I/O completion ports!

Whereas APCs allow asynchronous I/O requests to be fulfilled on a single thread, completion ports allow any thread to perform asynchronous I/O and have the results processed by an arbitrary thread. A completion port is a kernel object that you can associate with a number of file handles. As with APCs, these file handles may represent files on a local or remote file system or they could represent other communication mechanisms such as pipes or sockets. Once a file handle is associated with a completion port the results of any asynchronous I/O requests, known as completion packets, are queued to the completion port and can be dequeued by any available thread in the process.

The main challenge with completion ports is getting your head around the confusing API. There are a few operations involved in using a completion port:

  • Creating a completion port
  • Associating file handles
  • Dequeuing completion packets
  • Optionally queuing your own completion packets
  • Creating completion port threads

Creating a completion port

A completion port is created using the CreateIoCompletionPort function. It can get confusing since the same function is used to associate the completion port with a file handle. Here’s how to create a completion port object:

const DWORD threadCount = 0;

CHandle port(::CreateIoCompletionPort(INVALID_HANDLE_VALUE,
                                      0, // no existing port
                                      0, // ignored
                                      threadCount));

if (0 == port)
{
    // Call GetLastError for more information.
}

The only parameter that you should think about is the threadCount value. This defines the maximum number of threads that will be allowed to dequeue and process completion packets concurrently. If this is set to zero then the operating system allows one thread per processor. Once you’re done with the completion port you must close the port handle using the CloseHandle function. That’s what ATL’s CHandle wrapper class provides.

Associating file handles

To have the results of asynchronous I/O requests queued to the completion port you need to associate the file handles with the completion port. For this you need to call the CreateIoCompletionPort function as follows:

const ULONG_PTR completionKey = 0;

if (0 == ::CreateIoCompletionPort(file,
                                  port,
                                  completionKey,
                                  0)) // ignored
{
    // Call GetLastError for more information.
}

The first parameter is the file handle and the second is the completion port handle. The only parameter that you should think about in this case is the completionKey value. This can be any value and is included in the completion packet for any I/O requests for the given file handle.

Dequeuing completion packets

As I’ve mentioned, you can dequeue completion packets on any thread in the process that created the completion port. All that the thread needs is the port handle. A completion packet consists of three values:

DWORD bytesCopied = 0;
ULONG_PTR completionKey = 0;
OVERLAPPED* overlappedPointer = 0;

The completion port itself doesn’t use or interpret these values in any way. It simply provides an insanely efficient mechanism for queuing and dequeuing them across multiple threads. The GetQueuedCompletionStatus function is used to dequeue a completion packet:

DWORD milliseconds = 0;

if (!::GetQueuedCompletionStatus(port,
                                 &bytesCopied,
                                 &completionKey,
                                 &overlapped,
                                 milliseconds))
{
    // Call GetLastError for more information.
}

The first parameter is the port handle. The next three parameters provide the values that make up the completion packet. The last parameter indicates how long the caller is willing to wait for a completion packet to be queued. Normally it is set to INFINITE to wait indefinitely. You can also set it to zero in which case it will dequeue a completion packet if one is available and return immediately.

A common approach is to dequeue completion packets in an endless loop until a special completion packet is dequeued marking the end of completion port’s usage. Here’s an example:

while (::GetQueuedCompletionStatus(port,
                                   &bytesCopied,
                                   &completionKey,
                                   &overlapped,
                                   INFINITE))
{
    if (0 == bytesCopied && 0 == completionKey && 0 == overlapped)
    {
        break;
    }
    else
    {
        // Process completion packet
    }
}

Queuing your own completion packets

Although completion packets are normally queued by the operating system as asynchronous I/O requests are completed, you can also queue your own completion packets. This is achieved using the PostQueuedCompletionStatus function. Here’s an example that sends the special completion packet that would instruct the loop in the previous section to come to an end:

if (!::PostQueuedCompletionStatus(port,
                                  0, // bytesCopied
                                  0, // completionKey
                                  0)) // overlapped
{
    // Call GetLastError for more information.
}

The first parameter is the port handle. The next three parameters make up the completion packet. As I’ve mentioned, these values are passed to the thread that calls the GetQueuedCompletionStatus function to dequeue the completion packet and have no meaning to the completion port itself.

Creating completion port threads

Before creating the threads that will dequeue and process the completion packets you need to determine how many processors are available. This is achieved using the GetNativeSystemInfo function:

SYSTEM_INFO info = { 0 };
::GetNativeSystemInfo(&info);

const DWORD threadCount = info.dwNumberOfProcessors;

You may want to experiment with different thread counts, or concurrency values, based on your application’s completion processing. For example if you cannot avoid making some blocking calls on your completion port threads you may want to increase the concurrency to allow more threads to be scheduled.

Next we need to create an array to store the thread handles. This is needed so that we can later wait for the threads to terminate. ATL’s CAtlArray can be used as follows but you’re free to use whatever container you wish:

CAtlArray<HANDLE> threads;

if (!threads.SetCount(threadCount))
{
    // Out of memory...
}

Finally we can create the threads:

for (DWORD index = 0; index < threadCount; ++index)
{
    threads[index] = ::CreateThread(0, // default security
                                    0, // default stack size
                                    CompletionThread, // start address
                                    port, // parameter
                                    CREATE_SUSPENDED,
                                    0); // don't return thread Id

    if (0 == threads[index])
    {
        break;
    }
}

The thread function is passed the port handle so that it can dequeue completion packets. If you’re using the CRT then you may want to use the _beginthreadex function instead. It’s a good idea to initially suspend the threads. Should the CreateThread function fail at some point we can much more easily back out of the operation without having to wait for any running threads to terminate:

if (0 == threads[threads.GetCount() - 1])
{
    // Call GetLastError for more information.
    // Close any created thread handles here.
}

for (DWORD index = 0; index < threadCount; ++index)
{
    ASSERT(0 != threads[index]);
    ::ResumeThread(threads[index]);
}

A typical thread function is quite straightforward and uses the GetQueuedCompletionStatus function introduced earlier to dequeue completion packets:

DWORD WINAPI CompletionThread(HANDLE port)
{
    ASSERT(0 != port);

    DWORD bytesCopied = 0;
    ULONG_PTR completionKey = 0;
    OVERLAPPED* overlapped = 0;

    while (::GetQueuedCompletionStatus(port,
                                       &bytesCopied,
                                       &completionKey,
                                       &overlapped,
                                       INFINITE))
    {
        if (0 == bytesCopied && 0 == completionKey && 0 == overlapped)
        {
            break;
        }
        else
        {
            // Process completion packet.
        }
    }

    return 0;
}

Making life simpler with C++

As I’ve mentioned, the completion port API is a bit confusing but with a little help from C++ we can fix that. Here’s a wrapper class that clearly defines the main completion port operations and provides a simple way to create and use completion ports from C++:

class CompletionPort : public CHandle
{
public:

    CompletionPort() :
        m_closeHandle(true)
    {
        // Do nothing
    }

    explicit CompletionPort(__in bool closeHandle) :
        m_closeHandle(closeHandle)
    {
        // Do nothing
    }

    CompletionPort(__in bool closeHandle,
                   __in_opt HANDLE handle) :
        m_closeHandle(closeHandle),
        CHandle(handle)
    {
        // Do nothing
    }

    ~CompletionPort()
    {
        if (!m_closeHandle)
        {
            Detach();
        }
    }

    __checkReturn HRESULT Create(__in DWORD threadCount)
    {
        Attach(::CreateIoCompletionPort(INVALID_HANDLE_VALUE,
                                        0, // no existing port
                                        0, // ignored
                                        threadCount));

        if (0 == m_h)
        {
            return HRESULT_FROM_WIN32(::GetLastError());
        }

        return S_OK;
    }

    __checkReturn HRESULT AssociateFile(__in HANDLE file,
                                        __in ULONG_PTR completionKey)
    {
        ASSERT(0 != file && INVALID_HANDLE_VALUE != file);
        ASSERT(0 != m_h);

        if (0 == ::CreateIoCompletionPort(file,
                                          m_h,
                                          completionKey,
                                          0)) // ignored
        {
            return HRESULT_FROM_WIN32(::GetLastError());
        }

        return S_OK;
    }

    __checkReturn HRESULT QueuePacket(__in DWORD bytesCopied,
                                      __in ULONG_PTR completionKey,
                                      __in OVERLAPPED* overlapped)
    {
        ASSERT(0 != m_h);

        if (!::PostQueuedCompletionStatus(m_h,
                                          bytesCopied,
                                          completionKey,
                                          overlapped))
        {
            return HRESULT_FROM_WIN32(::GetLastError());
        }

        return S_OK;
    }

    __checkReturn HRESULT DequeuePacket(__in DWORD milliseconds,
                                        __out DWORD& bytesCopied,
                                        __out ULONG_PTR& completionKey,
                                        __out OVERLAPPED*& overlapped)
    {
        ASSERT(0 != m_h);

        if (!::GetQueuedCompletionStatus(m_h,
                                         &bytesCopied,
                                         &completionKey,
                                         &overlapped,
                                         milliseconds))
        {
            return HRESULT_FROM_WIN32(::GetLastError());
        }

        return S_OK;
    }

private:

    CompletionPort(CompletionPort&);
    CompletionPort& operator=(CompletionPort&);

    bool m_closeHandle;

};

A few examples

To keep the examples simple I’ve included the queuing and dequeuing on a single thread. I described how to create completion port threads earlier in this article.

Here’s an example of creating a completion port, queuing a completion packet and then dequeuing it again:

CompletionPort port;

HRESULT result = port.Create(1);

if (FAILED(result))
{
    // Failed to create completion port. The HRESULT provides the reason.
}

result = port.QueuePacket(2, // bytes copied
                          1, // completion key
                          0); // overlapped

if (FAILED(result))
{
    // Failed to queue a competion packet. The HRESULT provides the reason.
}

DWORD bytesCopied = 0;
ULONG_PTR completionKey = 0;
OVERLAPPED* overlapped = 0;

result = port.DequeuePacket(0, // don't wait
                            bytesCopied,
                            completionKey,
                            overlapped);

if (FAILED(result))
{
    // Failed to dequeue a completion packet. The HRESULT provides the reason.
}

ASSERT(2 == bytesCopied);
ASSERT(1 == completionKey);
ASSERT(0 == overlapped);

Here’s an example of creating a completion port, associating it with a file handle, beginning to read from the file and then waiting for the completion packet to signal the request’s completion:

CompletionPort port;

HRESULT result = port.Create(1);

if (FAILED(result))
{
    // Failed to create completion port. The HRESULT provides the reason.
}

CHandle file(::CreateFile(L"some file path",
                          FILE_READ_DATA,
                          FILE_SHARE_READ,
                          0, // default security
                          OPEN_EXISTING,
                          FILE_FLAG_OVERLAPPED,
                          0)); // no template

if (INVALID_HANDLE_VALUE == file)
{
    // Call GetLastError for more information.
}

result = port.AssociateFile(file,
                            123); // completion key

if (FAILED(result))
{
    // Failed to associate file. The HRESULT provides the reason.
}

OVERLAPPED overlapped = { 0 };
BYTE buffer[256] = { 0 };

if (!::ReadFile(file,
                buffer,
                _countof(buffer),
                0, // ignored
                &overlapped))
{
    // Call GetLastError for more information.
}

DWORD bytesCopied = 0;
ULONG_PTR completionKey = 0;
OVERLAPPED* overlappedPointer = 0;

result = port.DequeuePacket(INFINITE,
                            bytesCopied,
                            completionKey,
                            overlappedPointer);

if (FAILED(result))
{
    // Failed to dequeue a completion packet. The HRESULT provides the reason.
}

ASSERT(&overlapped == overlappedPointer);
ASSERT(123 == completionKey);

Pretty straightforward. Just remember to include the FILE_FLAG_OVERLAPPED flag when creating the file handle to support overlapped I/O. When the DequeuePacket method returns the bytesCopied variable indicates the number of bytes actually read from the file.

That’s about it for completion ports. If you need to build a high performance server then completion ports is the way to go. I should mention that ATL also includes a wrapper for completion ports in its CThreadPool class. It’s not ideal for every scenario but may come in handy nonetheless.

Stay tuned for part 5.

© 2008 Kenny Kerr

5 Comments

  • Thou shalt not...

    use SYSTEM_INFO.dwNumberOfProcessors to get the number of available processors.

    Better would be to count the bits in the affinity mask.

    Christian

  • Christian: There’s nothing inherently wrong with using SYSTEM_INFO. Calculating and maintaining the number of available processors is nontrivial and the discussion of completion ports was sufficiently long that I didn’t want to get into that.

    Sure, you can call GetProcessAffinityMask to calculate the number of processors in the system and the subset available to the process. This gets more complicated when you consider systems that support hot-swapping and hot-adding of processors as well as taking advantage of NUMA-capable hardware.

    The good news is that SYSTEM_INFO provides a reasonable starting point and the system is very good at optimizing the number of actual threads dequeuing completion packets. As I mentioned, developers should experiment with the concurrency value based on their own usage.

  • Christian: that’s what PostQueuedCompletionStatus is for. Once you’re done with the completion port you need to send one additional completion packet for each completion thread to give each thread a chance to return from GetQueuedCompletionStatus and, given the special completion packet, return from the thread function.

    To make this practical you should store an array of thread handles. You then queue the special completion packet as many times as you have handles in the array and then call WaitForMultipleObjects on the array to wait for all of the threads to terminate.

  • Thank you very much. I now pass NULL to signal the end of the job queue.

    I liked your async processing series! Keep on the good work.

    Christian

  • Thanks for the writing this series. I've been doing a lot of research into multi-threaded asynchronous I/O and detailed information has been hard to come by. I've read Russinovich's very old piece on completion ports, but it left me confused as to how multiple threads would even be useful when dealing with overlapped I/O.

    I look forward to part 5.

Comments have been disabled for this content.