◐ Shell
clean mode source ↗

[3.6] bpo-31234: fork_wait tests now join threads (#3139) by vstinner · Pull Request #3187 · python/cpython

Expand Up @@ -11,7 +11,8 @@
import os, sys, time, unittest import test.support as support _thread = support.import_module('_thread')
threading = support.import_module('threading')
LONGSLEEP = 2 SHORTSLEEP = 0.5 Expand All @@ -20,8 +21,19 @@ class ForkWait(unittest.TestCase):
def setUp(self): self._threading_key = support.threading_setup() self.alive = {} self.stop = 0 self.threads = []
def tearDown(self): # Stop threads self.stop = 1 for thread in self.threads: thread.join() thread = None self.threads.clear() support.threading_cleanup(*self._threading_key)
def f(self, id): while not self.stop: Expand All @@ -43,10 +55,11 @@ def wait_impl(self, cpid): self.assertEqual(spid, cpid) self.assertEqual(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8))
@support.reap_threads def test_wait(self): for i in range(NUM_THREADS): _thread.start_new(self.f, (i,)) thread = threading.Thread(target=self.f, args=(i,)) thread.start() self.threads.append(thread)
# busy-loop to wait for threads deadline = time.monotonic() + 10.0 Expand Down Expand Up @@ -75,8 +88,4 @@ def test_wait(self): os._exit(n) else: # Parent try: self.wait_impl(cpid) finally: # Tell threads to die self.stop = 1 self.wait_impl(cpid)