◐ Shell
reader mode source ↗
Skip to content
Closed
Hide file tree
Changes from all commits
File filter
Conversations
Jump to
Diff view
Apply and reload
Show whitespace
Diff view
Apply and reload
14 changes: 14 additions & 0 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2108,6 +2108,12 @@ with the :class:`Pool` class.
.. versionadded:: 3.4
*context*

.. note::

Worker processes within a :class:`Pool` typically live for the complete
Expand Down Expand Up @@ -2225,6 +2231,14 @@ with the :class:`Pool` class.
:ref:`typecontextmanager`. :meth:`~contextmanager.__enter__` returns the
pool object, and :meth:`~contextmanager.__exit__` calls :meth:`terminate`.


.. class:: AsyncResult

Expand Down
98 changes: 81 additions & 17 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,29 @@
RUN = "RUN"
CLOSE = "CLOSE"
TERMINATE = "TERMINATE"

#
# Miscellaneous
#

job_counter = itertools.count()

def mapstar(args):
return list(map(*args))

def starmapstar(args):
return list(itertools.starmap(args[0], args[1]))

#
# Hack to embed stringification of remote traceback in local traceback
#
Expand Down Expand Up @@ -104,6 +114,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
if initializer is not None:
initializer(*initargs)

completed = 0
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
Expand Down Expand Up @@ -189,6 +200,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
self._worker_handler.start()


Expand Down Expand Up @@ -225,17 +237,31 @@ def __repr__(self):

def _join_exited_workers(self):
"""Cleanup after any worker processes which have exited due to reaching
their specified lifetime. Returns True if any workers were cleaned up.
"""
cleaned = False
for i in reversed(range(len(self._pool))):
worker = self._pool[i]
if worker.exitcode is not None:
# worker exited
util.debug('cleaning up worker %d' % i)
worker.join()
cleaned = True
del self._pool[i]
return cleaned

def _repopulate_pool(self):
Expand All @@ -256,11 +282,21 @@ def _repopulate_pool(self):
util.debug('added worker')

def _maintain_pool(self):
"""Clean up any exited workers and start replacements for them.
"""
if self._join_exited_workers():
self._repopulate_pool()

def _setup_queues(self):
self._inqueue = self._ctx.SimpleQueue()
self._outqueue = self._ctx.SimpleQueue()
Expand Down Expand Up @@ -419,6 +455,7 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
@staticmethod
def _handle_workers(pool):
thread = threading.current_thread()

# Keep maintaining workers until the cache gets drained, unless the pool
# is terminated.
Expand All @@ -432,6 +469,7 @@ def _handle_workers(pool):
@staticmethod
def _handle_tasks(taskqueue, put, outqueue, pool, cache):
thread = threading.current_thread()

for taskseq, set_length in iter(taskqueue.get, None):
task = None
Expand Up @@ -477,6 +515,7 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache):

@staticmethod
def _handle_results(outqueue, get, cache):
thread = threading.current_thread()

while 1:
Expand Down Expand Up @@ -553,7 +592,10 @@ def close(self):
util.debug('closing pool')
if self._state == RUN:
self._state = CLOSE
self._worker_handler._state = CLOSE

def terminate(self):
util.debug('terminating pool')
Expand Up @@ -586,13 +628,21 @@ def _help_stuff_finish(inqueue, task_handler, size):
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
worker_handler, task_handler, result_handler, cache):
# this is guaranteed to only be called once
util.debug('finalizing pool')

worker_handler._state = TERMINATE
task_handler._state = TERMINATE

util.debug('helping task handler/workers to finish')
cls._help_stuff_finish(inqueue, task_handler, len(pool))

if (not result_handler.is_alive()) and (len(cache) != 0):
raise AssertionError(
Expand All @@ -603,8 +653,8 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,

# We must wait for the worker handler to exit before terminating
# workers because we don't want workers to be restarted behind our back.
util.debug('joining worker handler')
if threading.current_thread() is not worker_handler:
worker_handler.join()

# Terminate workers which haven't already finished.
Expand All @@ -614,12 +664,12 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
if p.exitcode is None:
p.terminate()

util.debug('joining task handler')
if threading.current_thread() is not task_handler:
task_handler.join()

util.debug('joining result handler')
if threading.current_thread() is not result_handler:
result_handler.join()

if pool and hasattr(pool[0], 'terminate'):
Expand All @@ -629,6 +679,7 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
# worker has not yet exited
util.debug('cleaning up worker %d' % p.pid)
p.join()

def __enter__(self):
self._check_running()
Expand Down Expand Up @@ -680,6 +731,9 @@ def _set(self, i, obj):
self._event.set()
del self._cache[self._job]

AsyncResult = ApplyResult # create alias -- see #17805

#
Expand Up @@ -723,6 +777,12 @@ def _set(self, i, success_result):
del self._cache[self._job]
self._event.set()

#
# Class whose instances are returned by `Pool.imap()`
#
Expand Down @@ -780,6 +840,10 @@ def _set(self, i, obj):
if self._index == self._length:
del self._cache[self._job]

def _set_length(self, length):
with self._cond:
self._length = length
Expand Down
108 changes: 108 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2584,6 +2584,18 @@ def raising():
def unpickleable_result():
return lambda: 42

class _TestPoolWorkerErrors(BaseTestCase):
ALLOWED_TYPES = ('processes', )

Expand Down Expand Up @@ -2624,6 +2636,102 @@ def errback(exc):
p.close()
p.join()

class _TestPoolWorkerLifetime(BaseTestCase):
ALLOWED_TYPES = ('processes', )

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Toggle all file notes Toggle all file annotations