- DelphiTools - https://www.delphitools.info -

Using IOCP for Worker Threads

SpeedImplementing an efficient worker thread pool in Windows can be achieved through the use of IOCP [1], which is the barbaric acronym for (just as barbaric) Input/Output Completion Port API [2] introduced by Microsoft a while ago.

To make use of IOCP, one has to deal with further barbaric API functions, but the principles are (somewhat) more civilized, and the implementation is quite straightforward.

Worker Threads

What are worker threads? They’re threads that don’t have any specific task to perform, but are designed to execute arbitrary “work units” in a threaded, asynchronous fashion.

Usually they come in pools, and you just give work units to the pool. The work units are executed in order by the workers that have nothing else to do.

This is a very simple model in which works threading aspects are abstracted and become just asynchronous work units. It also has very similar real-world analogies, and can often be easier to understand than other forms of multi-threading parallelization.

Implementation-wise, it revolves around a FIFO queue [3], that has to be thread-safe, along with thread signaling events. This is a mechanism IOCP wraps with just three API functions, with OS kernel support.

Setting up an IOCP queue

To use IOCP the first thing to do is create an IOCP queue, this is accomplished through the CreateIoCompletionPort [4] function. This function was initially meant for I/O, but it can use any object handle, even INVALID_FILE_HANDLE.

FIOCP := CreateIoCompletionPort(INVALID_FILE_HANDLE, 0, 0, 0);

And that’s all you need to setup an IOCP queue.

Posting work to the queue

To post work you have use the PostQueuedCompletionStatus [5] function, it takes four arguments:

And that’s all. The good thing is that for a queue, they’re not significant, so in 32 bits, that’s effectively 12 bytes we can use, and double that in 64 bits.

One of the simpler ways to define a work unit would be to use an anonymous method (reference to procedure). In Delphi, those are in practice just hidden interfaces, so they fit in a pointer. Let’s queue one:

type
   TAnonymousWorkUnit = reference to procedure;
   PAnonymousWorkUnit = ^TAnonymousWorkUnit;
...
procedure TWorkerThreadPool.QueueWork(const workUnit : TAnonymousWorkUnit);
var
   lpOverlapped : Pointer;
begin
   lpOverlapped := nil;
   PAnonymousWorkUnit(@lpOverlapped)^ := workUnit;
   PostQueuedCompletionStatus(FIOCP, 1, 0, lpOverlapped);
end;

In the code above we copy the workUnit into a pointer, it’s important to use a copy and not a cast so as to increment the reference count of the interface aka anonymous method. The destination pointer also explicitly be nil’ed before the assignment (as the compiler won’t initialize it, so it can contain just about anything).
The we post to the queue, using lpNumberOfBytesTransferred to pass a command (1) which will come in use below.

Next: The Worker Threads [6]

Previous: Introduction. [7]

The worker threads

We have a queue, we can post work units to it, now we need workers. These can be accomplished by subclassing TThread and overriding Execute, a minimal implementation could be

procedure TWorkerThread.Execute;

   procedure ExecuteAnonymousFunction(p : PAnonymousWorkUnit);
   begin
      try
         p^();
      finally
         p^._Release;
      end;
   end;

var
   lpNumberOfBytesTransferred, lpCompletionKey : NativeUInt;
   lpOverlapped : Pointer;
begin
   while not Terminated do begin
      if not GetQueuedCompletionStatus(FIOCP,
                                       lpNumberOfBytesTransferred,
                                       lpCompletionKey,
                                       lpOverlapped, INFINITE) then Break;
      if lpNumberOfBytesTransferred=1 then 
         ExecuteAnonymousFunction(PAnonymousWorkUnit(@lpOverlapped))
      else break;
   end;
end;

The worker thread is just a loop around GetQueuedCompletionStatus [8] calls, which will have the thread wait (in the kernel) for work units, and the execute them if the command (lpNumberOfBytesTransferred) is 1.

ExecuteAnonymousFunction is where our anonymous work units are executed, and where the reference count we incremented in QueueWork is decremented.

You may have noticed that if the command is not 1, the thread is exited, this allows to reduce the size of the worker pool by one simply by posting to the queue

PostQueuedCompletionStatus(FIOCP, 0, 0, nil);

The check on GetQueuedCompletionStatus also mean that all worker threads will terminate automatically if you close the IOCP handle, the file handle or if the application terminate.

So basically, to grow the worker pool, you just create new TWorkerThread instances in a fire and forget fashion, and to reduce it, you post a zero to the queue. To clean up you just CloseHandle [9] the FIOCP & the FFileHandle. You don’t have to keep track of the worker threads.

Going beyond bare-bones

The previous code gets the work done, but to go beyond it there are a few more things you might want to do:

The last point, ie. what to do with exceptions in threads can be a thorny problem though, ideally you want to surface them so they don’t go hidden, but there is little you can do that would be smart with a thread exception re-surfaced in the main thread.

I’ve found it boils down to just two options in practice:

  1. you log the exception, notify the user, and then hope for the best (ie. that the exception wasn’t serious)
  2. you resurface an unexpected exception in the main thread, and then terminate the app ASAP, because something could have gone horribly wrong and you don’t want it to result in widespread data corruption

Choose your poison 🙂

Ready-to use implementation

You can find a ready-to use implementation of an IOCP thread pool in DWScript utility units, more particularly in dwsIOCPWorkerThreadPool [10]. It supports several other types of work units (a simple procedure, a TNotifyEvent…) as well as several counters to monitor the state of the queue.

If you test it, you’ll find that IOCP is a very efficient mechanism, as it can handle hundreds of thousandths of work units per second, and queues with millions of units. This efficiency means you can break down to smaller, simpler work units, which can help extract parallelism or just simplify coding and design.