◐ Shell
clean mode source ↗

[3.6] bpo-31234: Add test.support.wait_threads_exit() (GH-3578) by vstinner · Pull Request #7315 · python/cpython

Expand Up @@ -59,12 +59,13 @@ def task(self, ident): self.done_mutex.release()
def test_starting_threads(self): # Basic test for thread creation. for i in range(NUMTASKS): self.newtask() verbose_print("waiting for tasks to complete...") self.done_mutex.acquire() verbose_print("all tasks done") with support.wait_threads_exit(): # Basic test for thread creation. for i in range(NUMTASKS): self.newtask() verbose_print("waiting for tasks to complete...") self.done_mutex.acquire() verbose_print("all tasks done")
def test_stack_size(self): # Various stack size tests. Expand Down Expand Up @@ -94,12 +95,13 @@ def test_nt_and_posix_stack_size(self): verbose_print("trying stack_size = (%d)" % tss) self.next_ident = 0 self.created = 0 for i in range(NUMTASKS): self.newtask() with support.wait_threads_exit(): for i in range(NUMTASKS): self.newtask()
verbose_print("waiting for all tasks to complete") self.done_mutex.acquire() verbose_print("all tasks done") verbose_print("waiting for all tasks to complete") self.done_mutex.acquire() verbose_print("all tasks done")
thread.stack_size(0)
Expand All @@ -109,25 +111,28 @@ def test__count(self): mut = thread.allocate_lock() mut.acquire() started = []
def task(): started.append(None) mut.acquire() mut.release() thread.start_new_thread(task, ()) while not started: time.sleep(POLL_SLEEP) self.assertEqual(thread._count(), orig + 1) # Allow the task to finish. mut.release() # The only reliable way to be sure that the thread ended from the # interpreter's point of view is to wait for the function object to be # destroyed. done = [] wr = weakref.ref(task, lambda _: done.append(None)) del task while not done: time.sleep(POLL_SLEEP) self.assertEqual(thread._count(), orig)
with support.wait_threads_exit(): thread.start_new_thread(task, ()) while not started: time.sleep(POLL_SLEEP) self.assertEqual(thread._count(), orig + 1) # Allow the task to finish. mut.release() # The only reliable way to be sure that the thread ended from the # interpreter's point of view is to wait for the function object to be # destroyed. done = [] wr = weakref.ref(task, lambda _: done.append(None)) del task while not done: time.sleep(POLL_SLEEP) self.assertEqual(thread._count(), orig)
def test_save_exception_state_on_error(self): # See issue #14474 Expand All @@ -140,16 +145,14 @@ def mywrite(self, *args): except ValueError: pass real_write(self, *args) c = thread._count() started = thread.allocate_lock() with support.captured_output("stderr") as stderr: real_write = stderr.write stderr.write = mywrite started.acquire() thread.start_new_thread(task, ()) started.acquire() while thread._count() > c: time.sleep(POLL_SLEEP) with support.wait_threads_exit(): thread.start_new_thread(task, ()) started.acquire() self.assertIn("Traceback", stderr.getvalue())

Expand Down Expand Up @@ -181,13 +184,14 @@ def enter(self): class BarrierTest(BasicThreadTest):
def test_barrier(self): self.bar = Barrier(NUMTASKS) self.running = NUMTASKS for i in range(NUMTASKS): thread.start_new_thread(self.task2, (i,)) verbose_print("waiting for tasks to end") self.done_mutex.acquire() verbose_print("tasks done") with support.wait_threads_exit(): self.bar = Barrier(NUMTASKS) self.running = NUMTASKS for i in range(NUMTASKS): thread.start_new_thread(self.task2, (i,)) verbose_print("waiting for tasks to end") self.done_mutex.acquire() verbose_print("tasks done")
def task2(self, ident): for i in range(NUMTRIPS): Expand Down Expand Up @@ -225,11 +229,10 @@ def setUp(self): @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork') @support.reap_threads def test_forkinthread(self): running = True status = "not set"
def thread1(): nonlocal running, status nonlocal status
# fork in a thread pid = os.fork() Expand All @@ -244,13 +247,11 @@ def thread1(): # parent os.close(self.write_fd) pid, status = os.waitpid(pid, 0) running = False
thread.start_new_thread(thread1, ()) self.assertEqual(os.read(self.read_fd, 2), b"OK", "Unable to fork() in thread") while running: time.sleep(POLL_SLEEP) with support.wait_threads_exit(): thread.start_new_thread(thread1, ()) self.assertEqual(os.read(self.read_fd, 2), b"OK", "Unable to fork() in thread") self.assertEqual(status, 0)
def tearDown(self): Expand Down