Parallel Programming with C++ – Part 4 – I/O Completion Ports
So far in the Parallel Programming with C++ series I've talked about asynchronous procedure calls (APCs) and how they can be used to build efficient and responsive client applications quite easily by waiting for I/O requests to complete asynchronously without having to create additional worker threads.
Although APCs are the most efficient way to perform asynchronous I/O they have one drawback: an APC will only ever complete on the same thread that initiated the operation. This usually isn’t a problem for client applications but is woefully inadequate for server applications. Client applications may also find this unacceptable if they need to process a lot of I/O requests. The problem is that if only a single thread is being used then only a single processor is in use and any additional processors are likely sitting idle.
The challenge is coming up with a solution that scales to the available processing power. If only a single processor is available then there is little point in creating additional threads for the sake of scalability. On the other hand if more processors are available then additional threads are needed to take advantage of them and your application needs to really make this decision at runtime based on the number of processors that are available. Gone are the days where 99% of computers only had a single processor. A large percentage of computers sold today have two processors (e.g. dual core) and as of this writing four processor (e.g. quad core) computers are already quite affordable.
A common solution used by server applications is to create one thread per client connection. The reason why this technique is so popular is because it is very easy to implement using traditional synchronous programming techniques. It is also unfortunately widely promoted. This technique however does not scale to more than a handful of client connections (depending on your application’s characteristics). As more threads are introduced, the operating system ends up spending more and more time scheduling different threads on the available processors and the overhead of this context switching quickly consumes much of the computer’s time.
A common remedy that is often cited is to use a thread pool but thread pools only solve a small part of the problem. Thread pools amortize the cost of thread creation and destruction over the life of the application but they don’t in themselves solve the scheduling problem. Most thread pools will also happily create many more threads than there are available processors to service them. Although this is completely acceptable for applications that create threads to wait on various events and spend most of their time sleeping or blocked (common for client applications), it is usually unsuitable for server threads that have a lot of work to do in fulfilling client requests.
A good rule of thumb is to use one thread per processor no matter how many client connections your server application may have to handle and avoid making blocking calls on those threads. Ideally each thread is busy handling a stream of I/O requests and is never suspended to allow another thread to execute. This avoids the context switching overhead and all the available processors are being used efficiently. This is however very hard to implement but fortunately for the Windows kernel provides an excellent implementation in the form of I/O completion ports!
Whereas APCs allow asynchronous I/O requests to be fulfilled on a single thread, completion ports allow any thread to perform asynchronous I/O and have the results processed by an arbitrary thread. A completion port is a kernel object that you can associate with a number of file handles. As with APCs, these file handles may represent files on a local or remote file system or they could represent other communication mechanisms such as pipes or sockets. Once a file handle is associated with a completion port the results of any asynchronous I/O requests, known as completion packets, are queued to the completion port and can be dequeued by any available thread in the process.
The main challenge with completion ports is getting your head around the confusing API. There are a few operations involved in using a completion port:
-
Creating a completion port
-
Associating file handles
-
Dequeuing completion packets
-
Optionally queuing your own completion packets
-
Creating completion port threads
Creating a completion port
A completion port is created using the CreateIoCompletionPort function. It can get confusing since the same function is used to associate the completion port with a file handle. Here’s how to create a completion port object:
const DWORD threadCount = 0;
CHandle port(::CreateIoCompletionPort(INVALID_HANDLE_VALUE,
0, // no existing port
0, // ignored
threadCount));
if (0 == port)
{
// Call GetLastError for more information.
}
The only parameter that you should think about is the threadCount value. This defines the maximum number of threads that will be allowed to dequeue and process completion packets concurrently. If this is set to zero then the operating system allows one thread per processor. Once you’re done with the completion port you must close the port handle using the CloseHandle function. That’s what ATL’s CHandle wrapper class provides.
Associating file handles
To have the results of asynchronous I/O requests queued to the completion port you need to associate the file handles with the completion port. For this you need to call the CreateIoCompletionPort function as follows:
const ULONG_PTR completionKey = 0;
if (0 == ::CreateIoCompletionPort(file,
port,
completionKey,
0)) // ignored
{
// Call GetLastError for more information.
}
The first parameter is the file handle and the second is the completion port handle. The only parameter that you should think about in this case is the completionKey value. This can be any value and is included in the completion packet for any I/O requests for the given file handle.
Dequeuing completion packets
As I’ve mentioned, you can dequeue completion packets on any thread in the process that created the completion port. All that the thread needs is the port handle. A completion packet consists of three values:
DWORD bytesCopied = 0;
ULONG_PTR completionKey = 0;
OVERLAPPED* overlappedPointer = 0;
The completion port itself doesn’t use or interpret these values in any way. It simply provides an insanely efficient mechanism for queuing and dequeuing them across multiple threads. The GetQueuedCompletionStatus function is used to dequeue a completion packet:
DWORD milliseconds = 0;
if (!::GetQueuedCompletionStatus(port,
&bytesCopied,
&completionKey,
&overlapped,
milliseconds))
{
// Call GetLastError for more information.
}
The first parameter is the port handle. The next three parameters provide the values that make up the completion packet. The last parameter indicates how long the caller is willing to wait for a completion packet to be queued. Normally it is set to INFINITE to wait indefinitely. You can also set it to zero in which case it will dequeue a completion packet if one is available and return immediately.
A common approach is to dequeue completion packets in an endless loop until a special completion packet is dequeued marking the end of completion port’s usage. Here’s an example:
while (::GetQueuedCompletionStatus(port,
&bytesCopied,
&completionKey,
&overlapped,
INFINITE))
{
if (0 == bytesCopied && 0 == completionKey && 0 == overlapped)
{
break;
}
else
{
// Process completion packet
}
}
Queuing your own completion packets
Although completion packets are normally queued by the operating system as asynchronous I/O requests are completed, you can also queue your own completion packets. This is achieved using the PostQueuedCompletionStatus function. Here’s an example that sends the special completion packet that would instruct the loop in the previous section to come to an end:
if (!::PostQueuedCompletionStatus(port,
0, // bytesCopied
0, // completionKey
0)) // overlapped
{
// Call GetLastError for more information.
}
The first parameter is the port handle. The next three parameters make up the completion packet. As I’ve mentioned, these values are passed to the thread that calls the GetQueuedCompletionStatus function to dequeue the completion packet and have no meaning to the completion port itself.
Creating completion port threads
Before creating the threads that will dequeue and process the completion packets you need to determine how many processors are available. This is achieved using the GetNativeSystemInfo function:
SYSTEM_INFO info = { 0 };
::GetNativeSystemInfo(&info);
const DWORD threadCount = info.dwNumberOfProcessors;
You may want to experiment with different thread counts, or concurrency values, based on your application’s completion processing. For example if you cannot avoid making some blocking calls on your completion port threads you may want to increase the concurrency to allow more threads to be scheduled.
Next we need to create an array to store the thread handles. This is needed so that we can later wait for the threads to terminate. ATL’s CAtlArray can be used as follows but you’re free to use whatever container you wish:
CAtlArray<HANDLE> threads;
if (!threads.SetCount(threadCount))
{
// Out of memory...
}
Finally we can create the threads:
for (DWORD index = 0; index < threadCount; ++index)
{
threads[index] = ::CreateThread(0, // default security
0, // default stack size
CompletionThread, // start address
port, // parameter
CREATE_SUSPENDED,
0); // don't return thread Id
if (0 == threads[index])
{
break;
}
}
The thread function is passed the port handle so that it can dequeue completion packets. If you’re using the CRT then you may want to use the _beginthreadex function instead. It’s a good idea to initially suspend the threads. Should the CreateThread function fail at some point we can much more easily back out of the operation without having to wait for any running threads to terminate:
if (0 == threads[threads.GetCount() - 1])
{
// Call GetLastError for more information.
// Close any created thread handles here.
}
for (DWORD index = 0; index < threadCount; ++index)
{
ASSERT(0 != threads[index]);
::ResumeThread(threads[index]);
}
A typical thread function is quite straightforward and uses the GetQueuedCompletionStatus function introduced earlier to dequeue completion packets:
DWORD WINAPI CompletionThread(HANDLE port)
{
ASSERT(0 != port);
DWORD bytesCopied = 0;
ULONG_PTR completionKey = 0;
OVERLAPPED* overlapped = 0;
while (::GetQueuedCompletionStatus(port,
&bytesCopied,
&completionKey,
&overlapped,
INFINITE))
{
if (0 == bytesCopied && 0 == completionKey && 0 == overlapped)
{
break;
}
else
{
// Process completion packet.
}
}
return 0;
}
Making life simpler with C++
As I’ve mentioned, the completion port API is a bit confusing but with a little help from C++ we can fix that. Here’s a wrapper class that clearly defines the main completion port operations and provides a simple way to create and use completion ports from C++:
class CompletionPort : public CHandle
{
public:
CompletionPort() :
m_closeHandle(true)
{
// Do nothing
}
explicit CompletionPort(__in bool closeHandle) :
m_closeHandle(closeHandle)
{
// Do nothing
}
CompletionPort(__in bool closeHandle,
__in_opt HANDLE handle) :
m_closeHandle(closeHandle),
CHandle(handle)
{
// Do nothing
}
~CompletionPort()
{
if (!m_closeHandle)
{
Detach();
}
}
__checkReturn HRESULT Create(__in DWORD threadCount)
{
Attach(::CreateIoCompletionPort(INVALID_HANDLE_VALUE,
0, // no existing port
0, // ignored
threadCount));
if (0 == m_h)
{
return HRESULT_FROM_WIN32(::GetLastError());
}
return S_OK;
}
__checkReturn HRESULT AssociateFile(__in HANDLE file,
__in ULONG_PTR completionKey)
{
ASSERT(0 != file && INVALID_HANDLE_VALUE != file);
ASSERT(0 != m_h);
if (0 == ::CreateIoCompletionPort(file,
m_h,
completionKey,
0)) // ignored
{
return HRESULT_FROM_WIN32(::GetLastError());
}
return S_OK;
}
__checkReturn HRESULT QueuePacket(__in DWORD bytesCopied,
__in ULONG_PTR completionKey,
__in OVERLAPPED* overlapped)
{
ASSERT(0 != m_h);
if (!::PostQueuedCompletionStatus(m_h,
bytesCopied,
completionKey,
overlapped))
{
return HRESULT_FROM_WIN32(::GetLastError());
}
return S_OK;
}
__checkReturn HRESULT DequeuePacket(__in DWORD milliseconds,
__out DWORD& bytesCopied,
__out ULONG_PTR& completionKey,
__out OVERLAPPED*& overlapped)
{
ASSERT(0 != m_h);
if (!::GetQueuedCompletionStatus(m_h,
&bytesCopied,
&completionKey,
&overlapped,
milliseconds))
{
return HRESULT_FROM_WIN32(::GetLastError());
}
return S_OK;
}
private:
CompletionPort(CompletionPort&);
CompletionPort& operator=(CompletionPort&);
bool m_closeHandle;
};
A few examples
To keep the examples simple I’ve included the queuing and dequeuing on a single thread. I described how to create completion port threads earlier in this article.
Here’s an example of creating a completion port, queuing a completion packet and then dequeuing it again:
CompletionPort port;
HRESULT result = port.Create(1);
if (FAILED(result))
{
// Failed to create completion port. The HRESULT provides the reason.
}
result = port.QueuePacket(2, // bytes copied
1, // completion key
0); // overlapped
if (FAILED(result))
{
// Failed to queue a competion packet. The HRESULT provides the reason.
}
DWORD bytesCopied = 0;
ULONG_PTR completionKey = 0;
OVERLAPPED* overlapped = 0;
result = port.DequeuePacket(0, // don't wait
bytesCopied,
completionKey,
overlapped);
if (FAILED(result))
{
// Failed to dequeue a completion packet. The HRESULT provides the reason.
}
ASSERT(2 == bytesCopied);
ASSERT(1 == completionKey);
ASSERT(0 == overlapped);
Here’s an example of creating a completion port, associating it with a file handle, beginning to read from the file and then waiting for the completion packet to signal the request’s completion:
CompletionPort port;
HRESULT result = port.Create(1);
if (FAILED(result))
{
// Failed to create completion port. The HRESULT provides the reason.
}
CHandle file(::CreateFile(L"some file path",
FILE_READ_DATA,
FILE_SHARE_READ,
0, // default security
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
0)); // no template
if (INVALID_HANDLE_VALUE == file)
{
// Call GetLastError for more information.
}
result = port.AssociateFile(file,
123); // completion key
if (FAILED(result))
{
// Failed to associate file. The HRESULT provides the reason.
}
OVERLAPPED overlapped = { 0 };
BYTE buffer[256] = { 0 };
if (!::ReadFile(file,
buffer,
_countof(buffer),
0, // ignored
&overlapped))
{
// Call GetLastError for more information.
}
DWORD bytesCopied = 0;
ULONG_PTR completionKey = 0;
OVERLAPPED* overlappedPointer = 0;
result = port.DequeuePacket(INFINITE,
bytesCopied,
completionKey,
overlappedPointer);
if (FAILED(result))
{
// Failed to dequeue a completion packet. The HRESULT provides the reason.
}
ASSERT(&overlapped == overlappedPointer);
ASSERT(123 == completionKey);
Pretty straightforward. Just remember to include the FILE_FLAG_OVERLAPPED flag when creating the file handle to support overlapped I/O. When the DequeuePacket method returns the bytesCopied variable indicates the number of bytes actually read from the file.
That’s about it for completion ports. If you need to build a high performance server then completion ports is the way to go. I should mention that ATL also includes a wrapper for completion ports in its CThreadPool class. It’s not ideal for every scenario but may come in handy nonetheless.
Stay tuned for part 5.
© 2008 Kenny Kerr