Use subprocess timeout by ngie-eign · Pull Request #2127 · gitpython-developers/GitPython
:param kill_after_timeout: :class:`float` or ``None``, Default = ``None`` :class:``int``, ``float``, or ``None`` (block indefinitely), Default = ``None``.
To specify a timeout in seconds for the git command, after which the process should be killed.
__slots__ = ("proc", "args", "status") __slots__ = ("proc", "args", "status", "timeout")
# If this is non-zero it will override any status code during _terminate, used # to prevent race conditions in testing. _status_code_if_terminate: int = 0
def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None: def __init__( self, proc: subprocess.Popen | None, args: Any, timeout: Optional[Union[float, int]] = None, ) -> None: self.proc = proc self.args = args self.status: Union[int, None] = None self.timeout = timeout
def _terminate(self) -> None: """Terminate the underlying process."""
self.status = self._status_code_if_terminate or status except subprocess.TimeoutExpired: _logger.warning("Process did not complete successfully in %s seconds; will forcefully kill", exc_info=True) proc.kill() status = proc.wait() # Ensure the process goes away by blocking with `timeout=None`. self.status = self._status_code_if_terminate or status except (OSError, AttributeError) as ex: # On interpreter shutdown (notably on Windows), parts of the stdlib used by # subprocess can already be torn down (e.g. `subprocess._winapi` becomes None), # which can cause AttributeError during terminate(). In that case, we prefer # to silently ignore to avoid noisy "Exception ignored in: __del__" messages. _logger.info("Ignored error while terminating process: %r", ex)
# END exception handling
def __del__(self) -> None:
# TODO: Bad choice to mimic `proc.wait()` but with different args. def wait(self, stderr: Union[None, str, bytes] = b"") -> int: def wait(self, stderr: Optional[Union[str, bytes]] = b"", timeout: Optional[Union[int, float]] = None) -> int: """Wait for the process and return its status code.
:param stderr:
1. This feature is not supported at all on Windows. 2. Effectiveness may vary by operating system. ``ps --ppid`` is used to enumerate child processes, which is available on most GNU/Linux systems but not most others. 3. Deeper descendants do not receive signals, though they may sometimes 1. Deeper descendants do not receive signals, though they may sometimes terminate as a consequence of their parent processes being killed. 4. `kill_after_timeout` uses ``SIGKILL``, which can have negative side effects on a repository. For example, stale locks in case of :manpage:`git-gc(1)` could render the repository incapable of accepting changes until the lock is manually removed.
:param with_stdout: If ``True``, default ``True``, we open stdout on the created process.
if sys.platform == "win32": if kill_after_timeout is not None: raise GitCommandError( redacted_command, '"kill_after_timeout" feature is not supported on Windows.', ) cmd_not_found_exception = OSError else: cmd_not_found_exception = FileNotFoundError cmd_not_found_exception = OSError if sys.platform == "win32" else FileNotFoundError # END handle
stdout_sink = PIPE if with_stdout else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb")
if sys.platform != "win32" and kill_after_timeout is not None: # Help mypy figure out this is not None even when used inside communicate(). timeout = kill_after_timeout
def kill_process(pid: int) -> None: """Callback to kill a process.
This callback implementation would be ineffective and unsafe on Windows. """ p = Popen(["ps", "--ppid", str(pid)], stdout=PIPE) child_pids = [] if p.stdout is not None: for line in p.stdout: if len(line.split()) > 0: local_pid = (line.split())[0] if local_pid.isdigit(): child_pids.append(int(local_pid)) try: os.kill(pid, signal.SIGKILL) for child_pid in child_pids: try: os.kill(child_pid, signal.SIGKILL) except OSError: pass # Tell the main routine that the process was killed. kill_check.set() except OSError: # It is possible that the process gets completed in the duration # after timeout happens and before we try to kill the process. pass return
def communicate() -> Tuple[AnyStr, AnyStr]: watchdog.start() out, err = proc.communicate() watchdog.cancel() if kill_check.is_set(): err = 'Timeout: the command "%s" did not complete in %d secs.' % ( " ".join(redacted_command), timeout, ) if not universal_newlines: err = err.encode(defenc) return out, err
# END helpers
kill_check = threading.Event() watchdog = threading.Timer(timeout, kill_process, args=(proc.pid,)) else: communicate = proc.communicate
# Wait for the process to return. status = 0 stdout_value: Union[str, bytes] = b"" stderr_value: Union[str, bytes] = b"" newline = "\n" if universal_newlines else b"\n" try: if output_stream is None: stdout_value, stderr_value = communicate() stdout_value, stderr_value = proc.communicate(timeout=kill_after_timeout) # Strip trailing "\n". if stdout_value is not None and stdout_value.endswith(newline) and strip_newline_in_stdout: # type: ignore[arg-type] stdout_value = stdout_value[:-1]