When to use concurrent.futures.wait versus Queue.get (blocking) in Python

I had some multi-threaded code that was actually slower than the single-threaded version. Multiple worker threads published their results to a queue and were processed by the main thread. Because of how I was waiting for the work to finish, calls to put the results onto the queue were blocking.
This post was originally posted as a comment on a GitHub pull request.
When you have worker threads pushing to a queue, there are two things to wait for in the main (reader) method:
- queue - all the results as they are processed from each worker thread and
- threads - all of the threads have completed processing or errored.
For the best performance, getting results from the queue takes precedence. If the queue fills up, it blocks processing. The worker threads are stuck until the main reader thread pulls an item from the queue. I actually encountered this behavior when I first implemented this code. The multi-threaded version was slower than the single-threaded version! See the following picture:
Notice that most of the put calls block because of a full queue. The first way that I tried to address the full queue is to have an inner loop to drain all results from the queue each time. This was still slow. It resulted in bursty reads. Many calls to the put method were still blocked.
Instead, I do not wait for the threads to finish. I get the thread status
with a _nowait
method that doesn't block.
def _nowait(futures):
"""Separate finished and unfinished threads, much like
:func:`concurrent.futures.wait`, but don't wait.
"""
done = []
not_done = []
for future in futures:
if future.done():
done.append(future)
else:
not_done.append(future)
return done, not_done
The final implementation blocks on getting the results from the queue.
while not_done:
done, not_done = _nowait(not_done)
for future in done:
# Call result() on any finished threads to raise any
# exceptions encountered.
future.result()
try:
frame = worker_queue.get(timeout=_PROGRESS_INTERVAL)
yield frame
except queue.Empty: # pragma: NO COVER
continue
The tradeoff is that this implementation waits longer than it otherwise
could at the end of work. It waits up to a full
_PROGRESS_INTERVAL
before noticing that all streams are
finished. Since these streams are expected to last quite long compared to the
_PROGRESS_INTERVAL
, I'm not as concerned about that. My main
motivation is to ensure that calls to the put
method do not
block.