[3.7] bpo-37531: Fix regrtest timeout for subprocesses (GH-15072) by vstinner · Pull Request #15280 · python/cpython
from test.libregrtest.runtest import ( runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME, format_test_result, TestResult, is_failed) format_test_result, TestResult, is_failed, TIMEOUT) from test.libregrtest.setup import setup_tests from test.libregrtest.utils import format_duration
class MultiprocessThread(threading.Thread): def __init__(self, pending, output, ns): def __init__(self, pending, output, ns, timeout): super().__init__() self.pending = pending self.output = output self.ns = ns self.timeout = timeout self.current_test_name = None self.start_time = None self._popen = None
def kill(self): """ Kill the current process (if any).
This method can be called by the thread running the process, or by another thread. """ self._killed = True
popen = self._popen
def mp_result_error(self, test_name, error_type, stdout='', stderr='', err_msg=None): test_time = time.monotonic() - self.start_time result = TestResult(test_name, error_type, test_time, None) return MultiprocessResult(result, stdout, stderr, err_msg)
def _runtest(self, test_name): try:
try: stdout, stderr = popen.communicate(timeout=self.timeout) except subprocess.TimeoutExpired: if self._killed: # kill() has been called: communicate() fails # on reading closed stdout/stderr raise ExitThread
popen.kill() stdout, stderr = popen.communicate() self.kill()
return self.mp_result_error(test_name, TIMEOUT, stdout, stderr) except OSError: if self._killed: # kill() has been called: communicate() fails
retcode = popen.wait()
if err_msg is not None: test_time = time.monotonic() - self.start_time result = TestResult(test_name, CHILD_ERROR, test_time, None) return self.mp_result_error(test_name, CHILD_ERROR, stdout, stderr, err_msg)
return MultiprocessResult(result, stdout, stderr, err_msg)
def start_workers(self): self.workers = [MultiprocessThread(self.pending, self.output, self.ns) self.workers = [MultiprocessThread(self.pending, self.output, self.ns, self.worker_timeout) for _ in range(self.ns.use_mp)] print("Run tests in parallel using %s child processes" % len(self.workers))
while True: if self.test_timeout is not None: faulthandler.dump_traceback_later(self.test_timeout, exit=True) if self.main_timeout is not None: faulthandler.dump_traceback_later(self.main_timeout, exit=True)
# wait for a thread timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
# a test failed (and --failfast is set) or all tests completed