I had some multi-threaded code that was actually slower
than the single-threaded version. Multiple worker threads published their
results to a queue and were processed by the main thread. Because of how I
was waiting for the work to finish, calls to put the results onto the queue
were blocking.
This post was originally
posted as a comment on a GitHub pull request.
When you have worker threads pushing to a queue, there are two things to
wait for in the main (reader) method:
- queue - all the results as they are processed from each worker
thread and
- threads - all of the threads have completed processing or errored.
For the best performance, getting results from the queue takes precedence.
If the queue fills up, it blocks processing. The worker threads are stuck
until the main reader thread pulls an item from the queue. I actually
encountered this behavior when I first implemented this code. The
multi-threaded version was slower than the single-threaded version! See the
following picture:
Notice that most of the put calls block because of a full queue. The first
way that I tried to address the full queue is to have an inner loop to drain
all results from the queue each time. This was still slow. It resulted in
bursty reads. Many calls to the put method were still blocked.
Instead, I do not wait for the threads to finish. I get the thread status
with a _nowait
method that doesn't block.
def _nowait(futures):
"""Separate finished and unfinished threads, much like
:func:`concurrent.futures.wait`, but don't wait.
"""
done = []
not_done = []
for future in futures:
if future.done():
done.append(future)
else:
not_done.append(future)
return done, not_done
The final implementation blocks on getting the results from the queue.
while not_done:
done, not_done = _nowait(not_done)
for future in done:
# Call result() on any finished threads to raise any
# exceptions encountered.
future.result()
try:
frame = worker_queue.get(timeout=_PROGRESS_INTERVAL)
yield frame
except queue.Empty: # pragma: NO COVER
continue
The tradeoff is that this implementation waits longer than it otherwise
could at the end of work. It waits up to a full
_PROGRESS_INTERVAL
before noticing that all streams are
finished. Since these streams are expected to last quite long compared to the
_PROGRESS_INTERVAL
, I'm not as concerned about that. My main
motivation is to ensure that calls to the put
method do not
block.