The worker threads
We have a queue, we can post work units to it, now we need workers. These can be accomplished by subclassing TThread and overriding Execute, a minimal implementation could be
procedure TWorkerThread.Execute; procedure ExecuteAnonymousFunction(p : PAnonymousWorkUnit); begin try p^(); finally p^._Release; end; end; var lpNumberOfBytesTransferred, lpCompletionKey : NativeUInt; lpOverlapped : Pointer; begin while not Terminated do begin if not GetQueuedCompletionStatus(FIOCP, lpNumberOfBytesTransferred, lpCompletionKey, lpOverlapped, INFINITE) then Break; if lpNumberOfBytesTransferred=1 then ExecuteAnonymousFunction(PAnonymousWorkUnit(@lpOverlapped)) else break; end; end;
The worker thread is just a loop around GetQueuedCompletionStatus calls, which will have the thread wait (in the kernel) for work units, and the execute them if the command (lpNumberOfBytesTransferred) is 1.
ExecuteAnonymousFunction is where our anonymous work units are executed, and where the reference count we incremented in QueueWork is decremented.
You may have noticed that if the command is not 1, the thread is exited, this allows to reduce the size of the worker pool by one simply by posting to the queue
PostQueuedCompletionStatus(FIOCP, 0, 0, nil);
The check on GetQueuedCompletionStatus also mean that all worker threads will terminate automatically if you close the IOCP handle, the file handle or if the application terminate.
So basically, to grow the worker pool, you just create new TWorkerThread instances in a fire and forget fashion, and to reduce it, you post a zero to the queue. To clean up you just CloseHandle the FIOCP & the FFileHandle. You don’t have to keep track of the worker threads.
Going beyond bare-bones
The previous code gets the work done, but to go beyond it there are a few more things you might want to do:
- use NameThreadForDebugging to identify your worker threads more easily in the debugger
- wrap the Execute with CoInitialize & CoUninitialize calls if you’re going to use COM in your work units
- add support for other forms of work units, such as TNotifyEvent
- trap work unit exceptions and either log or re-raise them in the main thread
The last point, ie. what to do with exceptions in threads can be a thorny problem though, ideally you want to surface them so they don’t go hidden, but there is little you can do that would be smart with a thread exception re-surfaced in the main thread.
I’ve found it boils down to just two options in practice:
- you log the exception, notify the user, and then hope for the best (ie. that the exception wasn’t serious)
- you resurface an unexpected exception in the main thread, and then terminate the app ASAP, because something could have gone horribly wrong and you don’t want it to result in widespread data corruption
Choose your poison 🙂
Ready-to use implementation
You can find a ready-to use implementation of an IOCP thread pool in DWScript utility units, more particularly in dwsIOCPWorkerThreadPool. It supports several other types of work units (a simple procedure, a TNotifyEvent…) as well as several counters to monitor the state of the queue.
If you test it, you’ll find that IOCP is a very efficient mechanism, as it can handle hundreds of thousandths of work units per second, and queues with millions of units. This efficiency means you can break down to smaller, simpler work units, which can help extract parallelism or just simplify coding and design.