[3.10] bpo-45212: Fix dangling threads in skipped tests in test_socke… by serhiy-storchaka · Pull Request #28409 · python/cpython
def serverExplicitReady(self): """This method allows the server to explicitly indicate that
self.server_ready = threading.Event() self.client_ready = threading.Event() self.done = threading.Event() self.queue = queue.Queue(1) self.server_crashed = False
def raise_queued_exception(): if self.queue.qsize(): raise self.queue.get() self.addCleanup(raise_queued_exception)
# Do some munging to start the client test. methodname = self.id() i = methodname.rfind('.')
def _tearDown(self): self.__tearDown() self.done.wait() self.wait_threads.__exit__(None, None, None)
if self.queue.qsize(): exc = self.queue.get() raise exc self.addCleanup(self.done.wait)
def clientRun(self, test_func): self.server_ready.wait()
# errors