Skip to content

Commit

Permalink
pythongh-114271: Fix race in Thread.join() (python#114839)
Browse files Browse the repository at this point in the history
There is a race between when `Thread._tstate_lock` is released[^1] in `Thread._wait_for_tstate_lock()`
and when `Thread._stop()` asserts[^2] that it is unlocked. Consider the following execution
involving threads A, B, and C:

1. A starts.
2. B joins A, blocking on its `_tstate_lock`.
3. C joins A, blocking on its `_tstate_lock`.
4. A finishes and releases its `_tstate_lock`.
5. B acquires A's `_tstate_lock` in `_wait_for_tstate_lock()`, releases it, but is swapped
   out before calling `_stop()`.
6. C is scheduled, acquires A's `_tstate_lock` in `_wait_for_tstate_lock()` but is swapped
   out before releasing it.
7. B is scheduled, calls `_stop()`, which asserts that A's `_tstate_lock` is not held.
   However, C holds it, so the assertion fails.

The race can be reproduced[^3] by inserting sleeps at the appropriate points in
the threading code. To do so, run the `repro_join_race.py` from the linked repo.

There are two main parts to this PR:

1. `_tstate_lock` is replaced with an event that is attached to `PyThreadState`.
   The event is set by the runtime prior to the thread being cleared (in the same
   place that `_tstate_lock` was released). `Thread.join()` blocks waiting for the
   event to be set.
2. `_PyInterpreterState_WaitForThreads()` provides the ability to wait for all
   non-daemon threads to exit. To do so, an `is_daemon` predicate was added to
   `PyThreadState`. This field is set each time a thread is created. `threading._shutdown()`
   now calls into `_PyInterpreterState_WaitForThreads()` instead of waiting on
   `_tstate_lock`s.

[^1]: https://github.com/python/cpython/blob/441affc9e7f419ef0b68f734505fa2f79fe653c7/Lib/threading.py#L1201
[^2]: https://github.com/python/cpython/blob/441affc9e7f419ef0b68f734505fa2f79fe653c7/Lib/threading.py#L1115
[^3]: mpage@8194653

---------

Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
3 people committed Mar 16, 2024
1 parent 86bc40d commit 33da0e8
Show file tree
Hide file tree
Showing 12 changed files with 767 additions and 639 deletions.
26 changes: 0 additions & 26 deletions Include/cpython/pystate.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,32 +161,6 @@ struct _ts {
*/
uintptr_t critical_section;

/* Called when a thread state is deleted normally, but not when it
* is destroyed after fork().
* Pain: to prevent rare but fatal shutdown errors (issue 18808),
* Thread.join() must wait for the join'ed thread's tstate to be unlinked
* from the tstate chain. That happens at the end of a thread's life,
* in pystate.c.
* The obvious way doesn't quite work: create a lock which the tstate
* unlinking code releases, and have Thread.join() wait to acquire that
* lock. The problem is that we _are_ at the end of the thread's life:
* if the thread holds the last reference to the lock, decref'ing the
* lock will delete the lock, and that may trigger arbitrary Python code
* if there's a weakref, with a callback, to the lock. But by this time
* _PyRuntime.gilstate.tstate_current is already NULL, so only the simplest
* of C code can be allowed to run (in particular it must not be possible to
* release the GIL).
* So instead of holding the lock directly, the tstate holds a weakref to
* the lock: that's the value of on_delete_data below. Decref'ing a
* weakref is harmless.
* on_delete points to _threadmodule.c's static release_sentinel() function.
* After the tstate is unlinked, release_sentinel is called with the
* weakref-to-lock (on_delete_data) argument, and release_sentinel releases
* the indirectly held lock.
*/
void (*on_delete)(void *);
void *on_delete_data;

int coroutine_origin_tracking_depth;

PyObject *async_gen_firstiter;
Expand Down
10 changes: 0 additions & 10 deletions Include/internal/pycore_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,6 @@ PyAPI_FUNC(void) PyEvent_Wait(PyEvent *evt);
// and 0 if the timeout expired or thread was interrupted.
PyAPI_FUNC(int) PyEvent_WaitTimed(PyEvent *evt, PyTime_t timeout_ns);

// A one-time event notification with reference counting.
typedef struct _PyEventRc {
PyEvent event;
Py_ssize_t refcount;
} _PyEventRc;

_PyEventRc *_PyEventRc_New(void);
void _PyEventRc_Incref(_PyEventRc *erc);
void _PyEventRc_Decref(_PyEventRc *erc);

// _PyRawMutex implements a word-sized mutex that that does not depend on the
// parking lot API, and therefore can be used in the parking lot
// implementation.
Expand Down
2 changes: 1 addition & 1 deletion Include/internal/pycore_pythread.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ struct _pythread_runtime_state {
} stubs;
#endif

// Linked list of ThreadHandleObjects
// Linked list of ThreadHandles
struct llist_node handles;
};

Expand Down
2 changes: 1 addition & 1 deletion Lib/test/test_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def test_threading(self):
expected = [
("_thread.start_new_thread", "(<test_func>, (), None)"),
("test.test_func", "()"),
("_thread.start_joinable_thread", "(<test_func>,)"),
("_thread.start_joinable_thread", "(<test_func>, 1, None)"),
("test.test_func", "()"),
]

Expand Down
4 changes: 2 additions & 2 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,13 @@ def test_python_finalization_error(self):
# QueueFeederThread.
orig_start_new_thread = threading._start_joinable_thread
nthread = 0
def mock_start_new_thread(func, *args):
def mock_start_new_thread(func, *args, **kwargs):
nonlocal nthread
if nthread >= 1:
raise RuntimeError("can't create new thread at "
"interpreter shutdown")
nthread += 1
return orig_start_new_thread(func, *args)
return orig_start_new_thread(func, *args, **kwargs)

with support.swap_attr(threading, '_start_joinable_thread',
mock_start_new_thread):
Expand Down
48 changes: 48 additions & 0 deletions Lib/test/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,54 @@ def joiner():
with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"):
raise error

def test_join_with_timeout(self):
lock = thread.allocate_lock()
lock.acquire()

def thr():
lock.acquire()

with threading_helper.wait_threads_exit():
handle = thread.start_joinable_thread(thr)
handle.join(0.1)
self.assertFalse(handle.is_done())
lock.release()
handle.join()
self.assertTrue(handle.is_done())

def test_join_unstarted(self):
handle = thread._ThreadHandle()
with self.assertRaisesRegex(RuntimeError, "thread not started"):
handle.join()

def test_set_done_unstarted(self):
handle = thread._ThreadHandle()
with self.assertRaisesRegex(RuntimeError, "thread not started"):
handle._set_done()

def test_start_duplicate_handle(self):
lock = thread.allocate_lock()
lock.acquire()

def func():
lock.acquire()

handle = thread._ThreadHandle()
with threading_helper.wait_threads_exit():
thread.start_joinable_thread(func, handle=handle)
with self.assertRaisesRegex(RuntimeError, "thread already started"):
thread.start_joinable_thread(func, handle=handle)
lock.release()
handle.join()

def test_start_with_none_handle(self):
def func():
pass

with threading_helper.wait_threads_exit():
handle = thread.start_joinable_thread(func, handle=None)
handle.join()


class Barrier:
def __init__(self, num_threads):
Expand Down
61 changes: 1 addition & 60 deletions Lib/test/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ def run(self):

def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
def fail_new_thread(*args, **kwargs):
raise threading.ThreadError()
_start_joinable_thread = threading._start_joinable_thread
threading._start_joinable_thread = fail_new_thread
Expand Down Expand Up @@ -912,41 +912,6 @@ def f():
rc, out, err = assert_python_ok("-c", code)
self.assertEqual(err, b"")

def test_tstate_lock(self):
# Test an implementation detail of Thread objects.
started = _thread.allocate_lock()
finish = _thread.allocate_lock()
started.acquire()
finish.acquire()
def f():
started.release()
finish.acquire()
time.sleep(0.01)
# The tstate lock is None until the thread is started
t = threading.Thread(target=f)
self.assertIs(t._tstate_lock, None)
t.start()
started.acquire()
self.assertTrue(t.is_alive())
# The tstate lock can't be acquired when the thread is running
# (or suspended).
tstate_lock = t._tstate_lock
self.assertFalse(tstate_lock.acquire(timeout=0), False)
finish.release()
# When the thread ends, the state_lock can be successfully
# acquired.
self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False)
# But is_alive() is still True: we hold _tstate_lock now, which
# prevents is_alive() from knowing the thread's end-of-life C code
# is done.
self.assertTrue(t.is_alive())
# Let is_alive() find out the C code is done.
tstate_lock.release()
self.assertFalse(t.is_alive())
# And verify the thread disposed of _tstate_lock.
self.assertIsNone(t._tstate_lock)
t.join()

def test_repr_stopped(self):
# Verify that "stopped" shows up in repr(Thread) appropriately.
started = _thread.allocate_lock()
Expand Down Expand Up @@ -1112,30 +1077,6 @@ def checker():
self.assertEqual(threading.getprofile(), old_profile)
self.assertEqual(sys.getprofile(), old_profile)

@cpython_only
def test_shutdown_locks(self):
for daemon in (False, True):
with self.subTest(daemon=daemon):
event = threading.Event()
thread = threading.Thread(target=event.wait, daemon=daemon)

# Thread.start() must add lock to _shutdown_locks,
# but only for non-daemon thread
thread.start()
tstate_lock = thread._tstate_lock
if not daemon:
self.assertIn(tstate_lock, threading._shutdown_locks)
else:
self.assertNotIn(tstate_lock, threading._shutdown_locks)

# unblock the thread and join it
event.set()
thread.join()

# Thread._stop() must remove tstate_lock from _shutdown_locks.
# Daemon threads must never add it to _shutdown_locks.
self.assertNotIn(tstate_lock, threading._shutdown_locks)

def test_locals_at_exit(self):
# bpo-19466: thread locals must not be deleted before destructors
# are called
Expand Down
Loading

0 comments on commit 33da0e8

Please sign in to comment.