Skip to content

Commit

Permalink
bpo-47029: Fix BrokenPipeError in multiprocessing.Queue at garbage co…
Browse files Browse the repository at this point in the history
…llection and explicit close (python#31913)
  • Loading branch information
geryogam committed May 3, 2022
1 parent f629dcf commit dfb1b9d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
23 changes: 11 additions & 12 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,10 @@ def put_nowait(self, obj):

def close(self):
self._closed = True
try:
self._reader.close()
finally:
close = self._close
if close:
self._close = None
close()
close = self._close
if close:
self._close = None
close()

def join_thread(self):
debug('Queue.join_thread()')
Expand All @@ -169,8 +166,9 @@ def _start_thread(self):
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send_bytes,
self._wlock, self._writer.close, self._ignore_epipe,
self._on_queue_feeder_error, self._sem),
self._wlock, self._reader.close, self._writer.close,
self._ignore_epipe, self._on_queue_feeder_error,
self._sem),
name='QueueFeederThread'
)
self._thread.daemon = True
Expand Down Expand Up @@ -211,8 +209,8 @@ def _finalize_close(buffer, notempty):
notempty.notify()

@staticmethod
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
onerror, queue_sem):
def _feed(buffer, notempty, send_bytes, writelock, reader_close,
writer_close, ignore_epipe, onerror, queue_sem):
debug('starting thread to feed data to pipe')
nacquire = notempty.acquire
nrelease = notempty.release
Expand All @@ -238,7 +236,8 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
obj = bpopleft()
if obj is sentinel:
debug('feeder thread got sentinel -- exiting')
close()
reader_close()
writer_close()
return

# serialize the data before acquiring the lock
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Always close the read end of the pipe used by :class:`multiprocessing.Queue`
*after* the last write of buffered data to the write end of the pipe to avoid
:exc:`BrokenPipeError` at garbage collection and at
:meth:`multiprocessing.Queue.close` calls. Patch by Géry Ogam.

0 comments on commit dfb1b9d

Please sign in to comment.