Skip to content

Commit

Permalink
bpo-29861: release references to multiprocessing Pool tasks (python#743)
Browse files Browse the repository at this point in the history
* bpo-29861: release references to multiprocessing Pool tasks

Release references to tasks, their arguments and their results as soon
as they are finished, instead of keeping them alive until another task
arrives.

* Comments in test

(cherry picked from commit 8988945)
  • Loading branch information
pitrou committed Mar 24, 2017
1 parent 90eafdb commit 7494554
Show file tree
Hide file tree
Showing 3 changed files with 931 additions and 1 deletion.
7 changes: 6 additions & 1 deletion Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
util.debug("Possible encoding error while sending result: %s" % (
wrapped))
put((job, i, (False, wrapped)))

task = job = result = func = args = kwds = None
completed += 1
util.debug('worker exiting after %d tasks' % completed)

Expand Down Expand Up @@ -402,10 +404,11 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache):
if set_length:
util.debug('doing set_length()')
set_length(i+1)
finally:
task = taskseq = job = None
else:
util.debug('task handler got sentinel')


try:
# tell result handler to finish when cache is empty
util.debug('task handler sending sentinel to result handler')
Expand Down Expand Up @@ -445,6 +448,7 @@ def _handle_results(outqueue, get, cache):
cache[job]._set(i, obj)
except KeyError:
pass
task = job = obj = None

while cache and thread._state != TERMINATE:
try:
Expand All @@ -461,6 +465,7 @@ def _handle_results(outqueue, get, cache):
cache[job]._set(i, obj)
except KeyError:
pass
task = job = obj = None

if hasattr(outqueue, '_reader'):
util.debug('ensuring that outqueue is not full')
Expand Down
28 changes: 28 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
import struct
import operator
import weakref
import test.support
import test.support.script_helper

Expand Down Expand Up @@ -1738,6 +1739,19 @@ def raise_large_valuerror(wait):
time.sleep(wait)
raise ValueError("x" * 1024**2)

def identity(x):
return x

class CountedObject(object):
n_instances = 0

def __new__(cls):
cls.n_instances += 1
return object.__new__(cls)

def __del__(self):
type(self).n_instances -= 1

class SayWhenError(ValueError): pass

def exception_throwing_generator(total, when):
Expand All @@ -1746,6 +1760,7 @@ def exception_throwing_generator(total, when):
raise SayWhenError("Somebody said when")
yield i


class _TestPool(BaseTestCase):

@classmethod
Expand Down Expand Up @@ -2000,6 +2015,19 @@ def test_map_no_failfast(self):
# check that we indeed waited for all jobs
self.assertGreater(time.time() - t_start, 0.9)

def test_release_task_refs(self):
# Issue #29861: task arguments and results should not be kept
# alive after we are done with them.
objs = [CountedObject() for i in range(10)]
refs = [weakref.ref(o) for o in objs]
self.pool.map(identity, objs)

del objs
self.assertEqual(set(wr() for wr in refs), {None})
# With a process pool, copies of the objects are returned, check
# they were released too.
self.assertEqual(CountedObject.n_instances, 0)


def raising():
raise KeyError("key")
Expand Down
Loading

0 comments on commit 7494554

Please sign in to comment.