[3.6] bpo-31234: Add test.support.wait_threads_exit() (GH-3578) by vstinner · Pull Request #7315 · python/cpython
def test_starting_threads(self): # Basic test for thread creation. for i in range(NUMTASKS): self.newtask() verbose_print("waiting for tasks to complete...") self.done_mutex.acquire() verbose_print("all tasks done") with support.wait_threads_exit(): # Basic test for thread creation. for i in range(NUMTASKS): self.newtask() verbose_print("waiting for tasks to complete...") self.done_mutex.acquire() verbose_print("all tasks done")
def test_stack_size(self): # Various stack size tests.
verbose_print("waiting for all tasks to complete") self.done_mutex.acquire() verbose_print("all tasks done") verbose_print("waiting for all tasks to complete") self.done_mutex.acquire() verbose_print("all tasks done")
thread.stack_size(0)
def task(): started.append(None) mut.acquire() mut.release() thread.start_new_thread(task, ()) while not started: time.sleep(POLL_SLEEP) self.assertEqual(thread._count(), orig + 1) # Allow the task to finish. mut.release() # The only reliable way to be sure that the thread ended from the # interpreter's point of view is to wait for the function object to be # destroyed. done = [] wr = weakref.ref(task, lambda _: done.append(None)) del task while not done: time.sleep(POLL_SLEEP) self.assertEqual(thread._count(), orig)
with support.wait_threads_exit(): thread.start_new_thread(task, ()) while not started: time.sleep(POLL_SLEEP) self.assertEqual(thread._count(), orig + 1) # Allow the task to finish. mut.release() # The only reliable way to be sure that the thread ended from the # interpreter's point of view is to wait for the function object to be # destroyed. done = [] wr = weakref.ref(task, lambda _: done.append(None)) del task while not done: time.sleep(POLL_SLEEP) self.assertEqual(thread._count(), orig)
def test_save_exception_state_on_error(self): # See issue #14474
def test_barrier(self): self.bar = Barrier(NUMTASKS) self.running = NUMTASKS for i in range(NUMTASKS): thread.start_new_thread(self.task2, (i,)) verbose_print("waiting for tasks to end") self.done_mutex.acquire() verbose_print("tasks done") with support.wait_threads_exit(): self.bar = Barrier(NUMTASKS) self.running = NUMTASKS for i in range(NUMTASKS): thread.start_new_thread(self.task2, (i,)) verbose_print("waiting for tasks to end") self.done_mutex.acquire() verbose_print("tasks done")
def task2(self, ident): for i in range(NUMTRIPS):
def thread1(): nonlocal running, status nonlocal status
# fork in a thread pid = os.fork()
thread.start_new_thread(thread1, ()) self.assertEqual(os.read(self.read_fd, 2), b"OK", "Unable to fork() in thread") while running: time.sleep(POLL_SLEEP) with support.wait_threads_exit(): thread.start_new_thread(thread1, ()) self.assertEqual(os.read(self.read_fd, 2), b"OK", "Unable to fork() in thread") self.assertEqual(status, 0)
def tearDown(self):