[3.6] bpo-31234: fork_wait tests now join threads (#3139) by vstinner · Pull Request #3187 · python/cpython
import os, sys, time, unittest import test.support as support _thread = support.import_module('_thread')
threading = support.import_module('threading')
LONGSLEEP = 2 SHORTSLEEP = 0.5
def setUp(self): self._threading_key = support.threading_setup() self.alive = {} self.stop = 0 self.threads = []
def tearDown(self): # Stop threads self.stop = 1 for thread in self.threads: thread.join() thread = None self.threads.clear() support.threading_cleanup(*self._threading_key)
def f(self, id): while not self.stop:
@support.reap_threads def test_wait(self): for i in range(NUM_THREADS): _thread.start_new(self.f, (i,)) thread = threading.Thread(target=self.f, args=(i,)) thread.start() self.threads.append(thread)
# busy-loop to wait for threads deadline = time.monotonic() + 10.0