Implementing an efficient worker thread pool in Windows can be achieved through the use of IOCP, which is the barbaric acronym for (just as barbaric) Input/Output Completion Port API introduced by Microsoft a while ago.
To make use of IOCP, one has to deal with further barbaric API functions, but the principles are (somewhat) more civilized, and the implementation is quite straightforward.
What are worker threads? They’re threads that don’t have any specific task to perform, but are designed to execute arbitrary “work units” in a threaded, asynchronous fashion.
Usually they come in pools, and you just give work units to the pool. The work units are executed in order by the workers that have nothing else to do.
This is a very simple model in which works threading aspects are abstracted and become just asynchronous work units. It also has very similar real-world analogies, and can often be easier to understand than other forms of multi-threading parallelization.
Implementation-wise, it revolves around a FIFO queue, that has to be thread-safe, along with thread signaling events. This is a mechanism IOCP wraps with just three API functions, with OS kernel support.
Setting up an IOCP queue
To use IOCP the first thing to do is create an IOCP queue, this is accomplished through the CreateIoCompletionPort function. This function was initially meant for I/O, but it can use any object handle, even INVALID_FILE_HANDLE.
FIOCP := CreateIoCompletionPort(INVALID_FILE_HANDLE, 0, 0, 0);
And that’s all you need to setup an IOCP queue.
Posting work to the queue
To post work you have use the PostQueuedCompletionStatus function, it takes four arguments:
- the first is the IOCP handle, which we obtained above
- lpNumberOfBytesTransferred (showing its I/O roots), an integer
- lpCompletionKey, an integer
- lpOverlapped, a pointer
And that’s all. The good thing is that for a queue, they’re not significant, so in 32 bits, that’s effectively 12 bytes we can use, and double that in 64 bits.
One of the simpler ways to define a work unit would be to use an anonymous method (reference to procedure). In Delphi, those are in practice just hidden interfaces, so they fit in a pointer. Let’s queue one:
type TAnonymousWorkUnit = reference to procedure; PAnonymousWorkUnit = ^TAnonymousWorkUnit; ... procedure TWorkerThreadPool.QueueWork(const workUnit : TAnonymousWorkUnit); var lpOverlapped : Pointer; begin lpOverlapped := nil; PAnonymousWorkUnit(@lpOverlapped)^ := workUnit; PostQueuedCompletionStatus(FIOCP, 1, 0, lpOverlapped); end;
In the code above we copy the workUnit into a pointer, it’s important to use a copy and not a cast so as to increment the reference count of the interface aka anonymous method. The destination pointer also explicitly be nil’ed before the assignment (as the compiler won’t initialize it, so it can contain just about anything).
The we post to the queue, using lpNumberOfBytesTransferred to pass a command (1) which will come in use below.
11 thoughts on “Using IOCP for Worker Threads”
This is exactly what is used in the TSynThreadPool object of our class SynCrtSock, for the “pure WinSock” HTTP server (in case http.sys kernel mode is not a solution for you).
Resulting performance is pretty good, but you are limited to a fixed number of threads, so for a HTTP server with a lot of active connections, it is less stable than the http.sys kernel mode.
You can take a look at this unit, which is part of our mORMot framework, at http://synopse.info/fossil/finfo?name=SynCrtSock.pas
“I’ve found it boils down to just two options in practice:”
I see a third. If the threads are isolated, like for instance session in a data or web server, you can just catch the exception and only terminate / finish the corrupted thread. No need to terminate the app. But it all depends on isolation level.
I am also curious what benefit does this approach for a thread pool have over more wide spreed task pool like the one in OmniThread library or the one I use in my code. There a free thread executes a task given to it and that is basically one cycle in a execute procedure inside the thread.
You do NOT need a file on disk in order to create an IOCP queue. You can pass INVALID_HANDLE_VALUE, or any object that supports IOCP (many objects do, not just files). The documentation you linked to says as much.
@A. Bouchez: you can easily adjust the number of threads dynamically. If you maintain an active threads counter, you can use it to periodically bump up or down the number of threads depending on load. To remove a thread from the pool, the simplest approach I’ve found is to support another “command”, f.i. if lpNumberOfBytesTransferred is 2 the worker thread terminates and removes itself from the pool.
@Remy: thanks I had overlooked that part, makes it even simpler! I’ll amend the article.
@Iztok: when an unexpected exception occurs, it could be anything, including something that’ll break your thread isolation (threads share the same memory space, so a bug can result in threads messing up each other, even though they shouldn’t).
The main advantage of IOCP against user-mode code is performance, it involves very few API calls, and is implemented kernel-side. An IOCP queue can easily handle hundreds of thousands of work units per second on my old quad core f.i., and you can queue millions of work units without problems.
IOCP are used in Delphi Web Sockets at http://code.google.com/p/dwsockets/
I guess they will not compile with current version of DWS WebServer
Any plans of implementing web socket support in DWS WebServer, even only on Windows8/Server2012 ?
@chapa This is something I haven’t had time to investigate yet (and didn’t have appropriate OS for dev), but yes, I anticipate to have them in, just need to migrate my main dev rig to Win8 or 2012 first.
Please, let me know once you are into this. Will rebuild dwsockets in order to work with latest DWS Web Server release, and be used as a base, if you find the code useful.
Would be glad if I can be in any other development help in this matter.
Polling for completion using sleep in TIOCPWorkerThreadPool.Shutdown method is not nice.
I would suggest using TCountdownEvent, waiting the object to be signalled in shutdown method 😉
When you create IOCP you use CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
The last param is NumberOfConcurrentThreads and it is zero.
But MSDN says: If this parameter is zero, the system allows as many concurrently running threads as there are processors in the system.
So my question is: if zero means max concurrent threads equals the count of the processors what is the meaning of WorkerCount in dwsIOCPWorkerThreadPool? If I create 100 threads, but have 4 CPUs only 4 threads will run in fact?
@Petar Good catch about the last parameter, fixed it.
I don’t think the signal would be nicer, as that would add the Event overhead for every work unit, along with special code to reset the signal. Calls to Shutdown will typically happen just once when the app terminates, and Shutdown only makes sense for non-GUI apps (GUI apps will want to monitor the shutdown progress to give feedback and generally avoid being non-responding)
Comments are closed.