When to use concurrent.futures.wait versus Queue.get (blocking) in Python

on

I had some multi-threaded code that was actually slower than the single-threaded version. Multiple worker threads published their results to a queue and were processed by the main thread. Because of how I was waiting for the work to finish, calls to put the results onto the queue were blocking.

This post was originally posted as a comment on a GitHub pull request.

When you have worker threads pushing to a queue, there are two things to wait for in the main (reader) method:

  1. queue - all the results as they are processed from each worker thread and
  2. threads - all of the threads have completed processing or errored.

For the best performance, getting results from the queue takes precedence. If the queue fills up, it blocks processing. The worker threads are stuck until the main reader thread pulls an item from the queue. I actually encountered this behavior when I first implemented this code. The multi-threaded version was slower than the single-threaded version! See the following picture:

Three lines indicating worker
threads. Wide ovals demonstrate that calls to the queue's put method block
often, because the queue is full. A timeline for the reader thread shows that
only one message is pulled from the queue every progress interval.

Notice that most of the put calls block because of a full queue. The first way that I tried to address the full queue is to have an inner loop to drain all results from the queue each time. This was still slow. It resulted in bursty reads. Many calls to the put method were still blocked.

Just as with the
previous image, this image shows several wide ovals indicating blocked calls
to the queue's put method. Blocked calls are less frequent, but still
present. Every progress interval, all messages are pulled from the queue.

Instead, I do not wait for the threads to finish. I get the thread status with a _nowait method that doesn't block.

def _nowait(futures):
    """Separate finished and unfinished threads, much like
    :func:`concurrent.futures.wait`, but don't wait.
    """
    done = []
    not_done = []
    for future in futures:
        if future.done():
            done.append(future)
        else:
            not_done.append(future)
    return done, not_done

The final implementation blocks on getting the results from the queue.

while not_done:
    done, not_done = _nowait(not_done)
    for future in done:
        # Call result() on any finished threads to raise any
        # exceptions encountered.
        future.result()

    try:
        frame = worker_queue.get(timeout=_PROGRESS_INTERVAL)
        yield frame
    except queue.Empty:  # pragma: NO COVER
        continue

In this version, no calls to the put method
block (though they could if the main thread can't keep up with the worker
threads). There are no large gaps between calls to the get method.

The tradeoff is that this implementation waits longer than it otherwise could at the end of work. It waits up to a full _PROGRESS_INTERVAL before noticing that all streams are finished. Since these streams are expected to last quite long compared to the _PROGRESS_INTERVAL, I'm not as concerned about that. My main motivation is to ensure that calls to the put method do not block.