From 784ac588553cd1024e8f9c99c99bbd70e80644cc Mon Sep 17 00:00:00 2001 From: Harvey Lynden Date: Mon, 1 Dec 2025 15:33:47 +0100 Subject: [PATCH 1/2] Migration of Process Module Migration of Process Module Improved Unit/Functional / RST Docstrings Reference: https://github.com/avocado-framework/aautils/issues/52 Signed-off-by: Harvey Lynden --- autils/devel/process.py | 1462 +++++++++++++++++++++ docs/source/utils.rst | 4 + metadata/devel/process.yml | 19 + tests/functional/modules/devel/process.py | 287 ++++ tests/unit/modules/devel/process.py | 953 ++++++++++++++ tests/utils.py | 41 + 6 files changed, 2766 insertions(+) create mode 100644 autils/devel/process.py create mode 100644 metadata/devel/process.yml create mode 100644 tests/functional/modules/devel/process.py create mode 100644 tests/unit/modules/devel/process.py diff --git a/autils/devel/process.py b/autils/devel/process.py new file mode 100644 index 0000000..bf6117a --- /dev/null +++ b/autils/devel/process.py @@ -0,0 +1,1462 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright: Red Hat Inc. 2013-2014 +# Author: Lucas Meneghel Rodrigues + +# pylint: disable=C0302 + +"""Functions dedicated to find and run external commands.""" + +import contextlib +import errno +import glob +import logging +import os +import re +import select +import shlex +import signal +import subprocess +import threading +import time +from io import BytesIO, UnsupportedOperation + +from autils.devel import astring +from autils.devel.wait import wait_for +from autils.file import path + +LOG = logging.getLogger(__name__) + +# variable=value bash assignment +_RE_BASH_SET_VARIABLE = re.compile(r"[a-zA-Z]\w*=.*") + + +class CmdError(Exception): + """Exception raised when a command fails execution.""" + + def __init__( + self, command=None, result=None, additional_text=None + ): # pylint: disable=W0231 + self.command = command + self.result = result + self.additional_text = additional_text + + def __str__(self): + return ( + f"Command '{self.command}' failed.\nstdout: " + f"{self.result.stdout!r}\nstderr: " + f"{self.result.stderr!r}\nadditional_info: " + f"{self.additional_text}" + ) + + +class CmdInputError(Exception): + """Raised when the command given is invalid, such as an empty command.""" + + +def can_sudo(cmd=None): + """Check whether sudo is available or if running as root. + + This function checks if the current process has the ability to run commands + with elevated privileges. It first checks if the process is running as root + (UID 0), then checks if sudo is installed and functional. + + :param cmd: Optional command to test sudo capabilities with. If provided, + tests whether this specific command can be run with sudo. + If not provided, tests basic sudo functionality. + :type cmd: str or None + :return: True if sudo is available or running as root, False otherwise. + :rtype: bool + + Example:: + + >>> can_sudo() + True + >>> can_sudo("ls /root") + True + """ + if not os.getuid(): # Root + return True + + try: # Does sudo binary exists? + path.find_command("sudo") + except path.CmdNotFoundError: + return False + + try: + if cmd: # Am I able to run the cmd or plain sudo id? + return not system(cmd, ignore_status=True, sudo=True) + if system_output("id -u", ignore_status=True, sudo=True).strip() == "0": + return True + return False + except OSError: # Broken sudo binary + return False + + +def get_capabilities(pid=None): + """Gets a list of all capabilities for a process. + + In case the getpcaps command is not available, and empty list will be + returned. + + It supports getpcaps' two different formats, the current and the so + called legacy/ugly. + + :param pid: the process ID (PID), if one is not given, the current + PID is used (given by :func:`os.getpid`) + :type pid: int + :returns: all capabilities + :rtype: list + """ + if pid is None: + pid = os.getpid() + try: + result = run(f"getpcaps {int(pid)}", ignore_status=True) + except FileNotFoundError: + return [] + if result.exit_status: + return [] + if result.stderr_text.startswith("Capabilities "): + info = result.stderr_text + separator = "=" + else: + info = result.stdout_text + separator = ":" + return info.split(separator, 1)[1].strip().split(",") + + +def has_capability(capability, pid=None): + """Check if a process has a given Linux capability. + + This is a simple wrapper around getpcaps, part of the libcap package. + In case the getpcaps command is not available, the capability will be + considered *not* to be available. + + :param capability: The name of the capability (e.g., "cap_sys_admin"). + Refer to capabilities(7) man page for more information. + Note: capability names are UPPERCASE in capabilities(7) + (e.g., CAP_SYS_ADMIN) but must be lowercase in Python + (e.g., "cap_sys_admin"). + :type capability: str + :param pid: The process ID to check. If None, checks the current process. + :type pid: int or None + :return: True if the capability is available, False otherwise. + :rtype: bool + + Example:: + + >>> has_capability("cap_chown") + True + >>> has_capability("cap_sys_admin", pid=1234) + False + """ + return capability in get_capabilities(pid) + + +def pid_exists(pid): + """Check if a process with the given PID exists. + + This function uses os.kill with signal 0 to check if a process exists + without actually sending a signal to it. + + :param pid: The process ID number to check. + :type pid: int + :return: True if the process exists, False otherwise. + :rtype: bool + + Example:: + + >>> pid_exists(1) + True + >>> pid_exists(999999) + False + """ + try: + os.kill(pid, 0) + except OSError as detail: + if detail.errno == errno.ESRCH: + return False + return True + + +def safe_kill(pid, signal): # pylint: disable=W0621 + """Attempt to send a signal to a process that may or may not exist. + + This function safely sends a signal to a process, handling cases where + the process might not exist or require elevated privileges. + + :param pid: The process ID to send the signal to. + :type pid: int + :param signal: The signal number to send (e.g., signal.SIGTERM). + :type signal: int + :return: True if signal was sent successfully, False otherwise. + :rtype: bool + + Example:: + + >>> safe_kill(1234, signal.SIGTERM) + True + """ + if not get_owner_id(int(pid)): + kill_cmd = f"kill -{int(signal)} {int(pid)}" + try: + run(kill_cmd, sudo=True) + return True + except CmdError: + return False + + try: + os.kill(pid, signal) + return True + except Exception: # pylint: disable=W0703 + return False + + +def get_parent_pid(pid): + """Get the parent process ID for a given process. + + This function reads the /proc filesystem to determine the parent PID. + + .. note:: This is currently Linux specific. + + :param pid: The PID of the child process. + :type pid: int + :return: The parent process ID. + :rtype: int + :raises IOError: If the /proc entry cannot be read. + + Example:: + + >>> get_parent_pid(1234) + 1 + """ + with open(f"/proc/{int(pid)}/stat", "rb") as proc_stat: + parent_pid = proc_stat.read().split(b" ")[-49] + return int(parent_pid) + + +def _get_pid_from_proc_pid_stat(proc_path): + match = re.match(r"\/proc\/([0-9]+)\/.*", proc_path) + if match is not None: + return int(match.group(1)) + return None + + +def get_children_pids(parent_pid, recursive=False): + """Get the list of child process IDs for a given parent process. + + This function scans the /proc filesystem to find all child processes + of the specified parent PID. + + .. note:: This is currently Linux specific. + + :param parent_pid: The PID of the parent process. + :type parent_pid: int + :param recursive: If True, also returns grandchildren and all descendants. + If False, only returns direct children. + :type recursive: bool + :return: List of child process IDs. + :rtype: list of int + + Example:: + + >>> get_children_pids(1) + [234, 456, 789] + >>> get_children_pids(1, recursive=True) + [234, 456, 789, 1011, 1213] + """ + proc_stats = glob.glob("/proc/[123456789]*/stat") + children = [] + for proc_stat in proc_stats: + try: + with open(proc_stat, "rb") as proc_stat_fp: + this_parent_pid = int(proc_stat_fp.read().split(b" ")[-49]) + except IOError: + continue + + if this_parent_pid == parent_pid: + children.append(_get_pid_from_proc_pid_stat(proc_stat)) + + if recursive: + for child in children: + children.extend(get_children_pids(child)) + return children + + +def kill_process_tree(pid, sig=None, send_sigcont=True, timeout=0): + """Signal a process and all of its children. + + If the process does not exist -- return. + + :param pid: The pid of the process to signal. + :type pid: int + :param sig: The signal to send to the processes, defaults to + :data:`signal.SIGKILL` + :type sig: int or None + :param send_sigcont: Send SIGCONT to allow killing stopped processes. + :type send_sigcont: bool + :param timeout: How long to wait for the pid(s) to die + (negative=infinity, 0=don't wait, + positive=number_of_seconds). + :type timeout: int or float + :return: List of all PIDs we sent signal to. + :rtype: list + :raises RuntimeError: If timeout is reached waiting for processes to die. + """ + + def _all_pids_dead(killed_pids): + for pid in killed_pids: + if pid_exists(pid): + return False + return True + + if sig is None: + sig = signal.SIGKILL + + if timeout > 0: + start = time.monotonic() + + if not safe_kill(pid, signal.SIGSTOP): + return [pid] + killed_pids = [pid] + for child in get_children_pids(pid): + killed_pids.extend(kill_process_tree(int(child), sig, False)) + safe_kill(pid, sig) + if send_sigcont: + for killed_pid in killed_pids: + safe_kill(killed_pid, signal.SIGCONT) + if not timeout: + return killed_pids + if timeout > 0: + if not wait_for( + _all_pids_dead, + timeout + start - time.monotonic(), + step=0.01, + args=(killed_pids[::-1],), + ): + raise RuntimeError( + f"Timeout reached when waiting for pid {pid} " + f"and children to die ({timeout})" + ) + else: + while not _all_pids_dead(killed_pids[::-1]): + time.sleep(0.01) + return killed_pids + + +def kill_process_by_pattern(pattern): + """Send SIGTERM signal to processes matching a pattern. + + This function uses the pkill command to terminate processes whose + command line matches the given pattern. + + :param pattern: Pattern to match against process command lines. + This is matched using pkill's -f flag, which matches + against the full command line. + :type pattern: str + + Example:: + + >>> kill_process_by_pattern("firefox") + >>> kill_process_by_pattern("python.*test_script") + """ + cmd = f"pkill -f {pattern}" + result = run(cmd, ignore_status=True) + if result.exit_status: + LOG.error("Failed to run '%s': %s", cmd, result) + else: + LOG.info("Succeed to run '%s'.", cmd) + + +def process_in_ptree_is_defunct(ppid): + """Verify if any processes deriving from PPID are in the defunct state. + + Attempt to verify if parent process and any children from PPID is defunct + (zombie) or not. + + This relies on the GNU version of "ps" and is not guaranteed to work in + MacOS. + + :param ppid: The parent PID of the process to verify. + :type ppid: int + :return: True if any process in the tree is defunct, False otherwise. + :rtype: bool + """ + defunct = False + try: + pids = get_children_pids(ppid) + except CmdError: # Process doesn't exist + return True + for pid in pids: + cmd = f"ps --no-headers -o cmd {int(pid)}" + proc_name = system_output(cmd, ignore_status=True, verbose=False) + if "" in proc_name: + defunct = True + break + return defunct + + +def binary_from_shell_cmd(cmd): + """Extract the first binary path from a shell-like command string. + + This function parses a shell command and returns the first binary/executable + found, skipping environment variable assignments. + + .. note:: This is a naive implementation that handles common patterns like + environment variable assignments before the binary name. + + :param cmd: A shell-like command string to parse. + :type cmd: str + :return: The first binary/executable found in the command. + :rtype: str + :raises ValueError: If no binary can be extracted from the command. + + Example:: + + >>> binary_from_shell_cmd("binary") + 'binary' + >>> binary_from_shell_cmd("VAR=VAL binary -args") + 'binary' + >>> binary_from_shell_cmd("FOO=bar ./script.py") + './script.py' + """ + cmds = shlex.split(cmd) + for item in cmds: + if not _RE_BASH_SET_VARIABLE.match(item): + return item + raise ValueError(f"Unable to parse first binary from '{cmd}'") + + +#: This is kept for compatibility purposes, but is now deprecated and +#: will be removed in later versions. Please use :func:`shlex.split` +#: instead. +cmd_split = shlex.split + + +class CmdResult: + """Command execution result. + + :param command: the command line itself + :type command: str + :param exit_status: exit code of the process + :type exit_status: int + :param stdout: content of the process stdout + :type stdout: bytes + :param stderr: content of the process stderr + :type stderr: bytes + :param duration: elapsed wall clock time running the process + :type duration: float + :param pid: ID of the process + :type pid: int + :param encoding: the encoding to use for the text version + of stdout and stderr, by default + :data:`avocado.utils.astring.ENCODING` + :type encoding: str + """ + + # pylint: disable=R0913, R0902 + def __init__( + self, + command="", + stdout=b"", + stderr=b"", + exit_status=None, + duration=0, + pid=None, + encoding=None, + ): + self.command = command + self.exit_status = exit_status + #: The raw stdout (bytes) + self.stdout = stdout + #: The raw stderr (bytes) + self.stderr = stderr + self.duration = duration + self.interrupted = False + self.pid = pid + if encoding is None: + encoding = astring.ENCODING + self.encoding = encoding + + def __str__(self): + return "\n".join( + f"{key}: {getattr(self, key, 'MISSING')!r}" + for key in ( + "command", + "exit_status", + "duration", + "interrupted", + "pid", + "encoding", + "stdout", + "stderr", + ) + ) + + @property + def stdout_text(self): + """Return stdout decoded as text. + + :return: The stdout content as a string. + :rtype: str + :raises TypeError: If stdout cannot be decoded. + """ + if hasattr(self.stdout, "decode"): + return self.stdout.decode(self.encoding) + if isinstance(self.stdout, str): + return self.stdout + raise TypeError("Unable to decode stdout into a string-like type") + + @property + def stderr_text(self): + """Return stderr decoded as text. + + :return: The stderr content as a string. + :rtype: str + :raises TypeError: If stderr cannot be decoded. + """ + if hasattr(self.stderr, "decode"): + return self.stderr.decode(self.encoding) + if isinstance(self.stderr, str): + return self.stderr + raise TypeError("Unable to decode stderr into a string-like type") + + +class FDDrainer: + """Reads data from a file descriptor in a thread, storing locally.""" + + # pylint: disable=R0913, R0902 + def __init__( + self, + fd, + result, + name=None, + logger=None, + logger_prefix="%s", + stream_logger=None, + ignore_bg_processes=False, + verbose=False, + ): + """Initialize FDDrainer to read from a file descriptor in a thread. + + Stores data locally in a file-like :attr:`data` object. + + :param fd: a file descriptor that will be read (drained) from + :type fd: int + :param result: a :class:`CmdResult` instance associated with the process + used to detect if the process is still running and + if there's still data to be read. + :type result: CmdResult + :param name: a descriptive name that will be passed to the Thread name + :type name: str + :param logger: the logger that will be used to (interactively) write + the content from the file descriptor + :type logger: logging.Logger + :param logger_prefix: the prefix used when logging the data + :type logger_prefix: str + :param stream_logger: a logger for streaming output + :type stream_logger: logging.Logger + :param ignore_bg_processes: When True the process does not wait for + child processes which keep opened stdout/stderr streams + after the main process finishes (eg. forked daemon which + did not closed the stdout/stderr). Note this might result + in missing output produced by those daemons after the + main thread finishes and also it allows those daemons + to be running after the process finishes. + :type ignore_bg_processes: bool + :param verbose: whether to log in both the logger and stream_logger + :type verbose: bool + """ + self.fd = fd + self.name = name + self.data = BytesIO() + self._result = result + self._thread = None + self._logger = logger + self._logger_prefix = logger_prefix + self._stream_logger = stream_logger + self._ignore_bg_processes = ignore_bg_processes + self._verbose = verbose + + def _log_line(self, line, newline_for_stream="\n"): + line = astring.to_text(line, self._result.encoding, "replace") + if self._logger is not None: + self._logger.debug(self._logger_prefix, line) + if self._stream_logger is not None: + self._stream_logger.debug(line + newline_for_stream) + + def _drainer(self): + """Read from fd, storing and optionally logging the output.""" + bfr = b"" + while True: + if self._ignore_bg_processes: + has_io = select.select([self.fd], [], [], 1)[0] + if not has_io and self._result.exit_status is not None: + # Exit if no new data and main process has finished + break + if not has_io: + # Don't read unless there are new data available + continue + try: + tmp = os.read(self.fd, 8192) + except OSError: + break + if not tmp: + break + self.data.write(tmp) + if self._verbose: + bfr += tmp + lines = bfr.splitlines() + for line in lines[:-1]: + self._log_line(line) + if bfr.endswith(b"\n"): + self._log_line(lines[-1]) + else: + self._log_line(lines[-1], "") + bfr = b"" + + def start(self): + """Start the drainer thread to read from the file descriptor.""" + self._thread = threading.Thread(target=self._drainer, name=self.name) + self._thread.daemon = True + self._thread.start() + + def flush(self): + """Wait for drainer thread to complete and flush stream handlers.""" + self._thread.join() + if self._stream_logger is not None: + for handler in self._stream_logger.handlers: + # FileHandler has a close() method, which we expect will + # flush the file on disk. SocketHandler, MemoryHandler + # and other logging handlers (custom ones?) also have + # the same interface, so let's try to use it if available + stream = getattr(handler, "stream", None) + if (stream is not None) and (not stream.closed): + if hasattr(stream, "fileno"): + try: + fileno = stream.fileno() + os.fsync(fileno) + except UnsupportedOperation: + pass + if hasattr(handler, "close"): + handler.close() + + +class SubProcess: + """Run a subprocess in the background, collecting stdout/stderr streams.""" + + # pylint: disable=R0913, R0902 + def __init__( + self, + cmd, + verbose=True, + shell=False, + env=None, + sudo=False, + ignore_bg_processes=False, + encoding=None, + logger=None, + ): + """Create the subprocess object, stdout/err, reader threads and locks. + + :param cmd: Command line to run. + :type cmd: str + :param verbose: Whether to log the command run and stdout/stderr. + :type verbose: bool + :param shell: Whether to run the subprocess in a subshell. + :type shell: bool + :param env: Use extra environment variables. + :type env: dict + :param sudo: Whether the command requires admin privileges to run, + so that sudo will be prepended to the command. + The assumption here is that the user running the command + has a sudo configuration such that a password won't be + prompted. If that's not the case, the command will + straight out fail. + :type sudo: bool + :param ignore_bg_processes: When True the process does not wait for + child processes which keep opened stdout/stderr streams + after the main process finishes (eg. forked daemon which + did not closed the stdout/stderr). Note this might result + in missing output produced by those daemons after the + main thread finishes and also it allows those daemons + to be running after the process finishes. + :type ignore_bg_processes: bool + :param encoding: the encoding to use for the text representation + of the command result stdout and stderr, by default + :data:`avocado.utils.astring.ENCODING` + :type encoding: str + :param logger: User's custom logger, which will be logging the subprocess + outputs. When this parameter is not set, the + `avocado.utils.process` logger will be used. + :type logger: logging.Logger + :raises ValueError: If incorrect values are given to parameters. + """ + if encoding is None: + encoding = astring.ENCODING + if sudo: + self.cmd = self._prepend_sudo(cmd, shell) + else: + self.cmd = cmd + self.verbose = verbose + self.result = CmdResult(self.cmd, encoding=encoding) + self.shell = shell + if env: + self.env = os.environ.copy() + self.env.update(env) + else: + self.env = None + self._popen = None + + self.logger = logger or LOG + self.stdout_logger = self.logger.getChild("stdout") + self.stderr_logger = self.logger.getChild("stderr") + self.output_logger = self.logger.getChild("output") + # Drainers used when reading from the PIPEs and writing to + # files and logs + self._stdout_drainer = None + self._stderr_drainer = None + + self._ignore_bg_processes = ignore_bg_processes + + def __repr__(self): + if self._popen is None: + rc = "(not started)" + elif self.result.exit_status is None: + rc = "(running)" + else: + rc = self.result.exit_status + return f"{self.__class__.__name__}(cmd={self.cmd!r}, rc={rc!r})" + + def __str__(self): + if self._popen is None: + rc = "(not started)" + elif self.result.exit_status is None: + rc = "(running)" + else: + rc = f"(finished with exit status={int(self.result.exit_status)})" + return f"{self.cmd} {rc}" + + @staticmethod + def _prepend_sudo(cmd, shell): + if os.getuid(): + try: + sudo_cmd = f"{path.find_command('sudo', check_exec=False)} -n" + except path.CmdNotFoundError as details: + LOG.error(details) + LOG.error( + "Parameter sudo=True provided, but sudo was " + "not found. Please consider adding sudo to " + "your OS image" + ) + return cmd + if shell: + if " -s" not in sudo_cmd: + sudo_cmd = f"{sudo_cmd} -s" + cmd = f"{sudo_cmd} {cmd}" + return cmd + + def _init_subprocess(self): + def signal_handler(*args): + self.result.interrupted = "signal/ctrl+c" + self.wait() + signal.default_int_handler(*args) + + if self._popen is not None: + return + + if self.verbose: + LOG.info("Running '%s'", self.cmd) + if self.shell is False: + cmd = shlex.split(self.cmd) + else: + cmd = self.cmd + try: + self._popen = subprocess.Popen( # pylint: disable=R1732 + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=self.shell, + env=self.env, + ) + except OSError as details: + details.strerror += f" ({self.cmd})" + raise details + + self.start_time = time.monotonic() # pylint: disable=W0201 + + # prepare fd drainers + self._stdout_drainer = FDDrainer( + self._popen.stdout.fileno(), + self.result, + name=f"{self.cmd}-stdout", + logger=self.logger, + logger_prefix="[stdout] %s", + stream_logger=None, + ignore_bg_processes=self._ignore_bg_processes, + verbose=self.verbose, + ) + self._stderr_drainer = FDDrainer( + self._popen.stderr.fileno(), + self.result, + name=f"{self.cmd}-stderr", + logger=self.logger, + logger_prefix="[stderr] %s", + stream_logger=None, + ignore_bg_processes=self._ignore_bg_processes, + verbose=self.verbose, + ) + + # start stdout/stderr threads + self._stdout_drainer.start() + self._stderr_drainer.start() + + try: + signal.signal(signal.SIGINT, signal_handler) + except ValueError: + if self.verbose: + LOG.info("Command %s running on a thread", self.cmd) + + def _fill_results(self, rc): + self._init_subprocess() + self.result.exit_status = rc + if not self.result.duration: + self.result.duration = time.monotonic() - self.start_time + if self.verbose: + LOG.info( + "Command '%s' finished with %s after %.9fs", + self.cmd, + rc, + self.result.duration, + ) + self.result.pid = self._popen.pid + self._fill_streams() + + def _fill_streams(self): + """Close subprocess stdout and stderr, and put values into result obj.""" + # Cleaning up threads + if self._stdout_drainer is not None: + self._stdout_drainer.flush() + if self._stderr_drainer is not None: + self._stderr_drainer.flush() + # Clean subprocess pipes and populate stdout/err + self.result.stdout = self.get_stdout() + self.result.stderr = self.get_stderr() + + def start(self): + """Start running the subprocess. + + This method is particularly useful for background processes, since + you can start the subprocess and not block your test flow. + + :return: Subprocess PID. + :rtype: int + """ + self._init_subprocess() + return self._popen.pid + + def get_stdout(self): + """Get the full stdout of the subprocess so far. + + :return: Standard output of the process. + :rtype: bytes + """ + self._init_subprocess() + return self._stdout_drainer.data.getvalue() + + def get_stderr(self): + """Get the full stderr of the subprocess so far. + + :return: Standard error of the process. + :rtype: bytes + """ + self._init_subprocess() + return self._stderr_drainer.data.getvalue() + + def terminate(self): + """Send a :attr:`signal.SIGTERM` to the process. + + Please consider using :meth:`stop` instead if you want to + do all that's possible to finalize the process and wait for it to finish. + """ + self._init_subprocess() + self.send_signal(signal.SIGTERM) + + def kill(self): + """Send a :attr:`signal.SIGKILL` to the process. + + Please consider using :meth:`stop` instead if you want to + do all that's possible to finalize the process and wait for it to finish. + """ + self._init_subprocess() + self.send_signal(signal.SIGKILL) + + def send_signal(self, sig): + """Send the specified signal to the process. + + :param sig: Signal to send. + :type sig: int + """ + self._init_subprocess() + if self.is_sudo_enabled(): + pids = get_children_pids(self.get_pid()) + pids.append(self.get_pid()) + for pid in pids: + kill_cmd = f"kill -{int(sig)} {int(pid)}" + with contextlib.suppress(Exception): + run(kill_cmd, sudo=True) + else: + self._popen.send_signal(sig) + + def poll(self): + """Call the subprocess poll() method, fill results if rc is not None. + + :return: Return code if process has finished, None otherwise. + :rtype: int or None + """ + self._init_subprocess() + rc = self._popen.poll() + if rc is not None: + self._fill_results(rc) + return rc + + def wait(self, timeout=None, sig=signal.SIGTERM): + """Wait for subprocess to complete, fill results when done. + + :param timeout: Time (seconds) we'll wait until the process is + finished. If it's not, we'll try to terminate it + and it's children using ``sig`` and get a + status. When the process refuses to die + within 1s we use SIGKILL and report the status + (be it exit_code or zombie). + :type timeout: float or None + :param sig: Signal to send to the process in case it did not end after + the specified timeout. + :type sig: int + :return: Exit status of the process. + :rtype: int + :raises AssertionError: If the process becomes a zombie. + """ + + def nuke_myself(): + timeout = time.monotonic() - self.start_time + self.result.interrupted = f"timeout after {timeout:.9f}s" + try: + kill_process_tree(self.get_pid(), sig, timeout=1) + except RuntimeError: + try: + kill_process_tree(self.get_pid(), signal.SIGKILL, timeout=1) + LOG.warning( + "Process '%s' refused to die in 1s after " + "sending %s to, destroyed it successfully " + "using SIGKILL.", + self.cmd, + sig, + ) + except RuntimeError: + LOG.error( + "Process '%s' refused to die in 1s after " + "sending %s, followed by SIGKILL, probably " + "dealing with a zombie process.", + self.cmd, + sig, + ) + + self._init_subprocess() + rc = None + + if timeout is None: + rc = self._popen.wait() + elif timeout > 0.0: + timer = threading.Timer(timeout, nuke_myself) + try: + timer.start() + rc = self._popen.wait() + finally: + timer.cancel() + + if rc is None: + stop_time = time.monotonic() + 1 + while time.monotonic() < stop_time: + rc = self._popen.poll() + if rc is not None: + break + else: + nuke_myself() + rc = self._popen.poll() + + if rc is None: + # If all this work fails, we're dealing with a zombie process. + raise AssertionError(f"Zombie Process {self._popen.pid}") + self._fill_results(rc) + return rc + + def stop(self, timeout=None): + """Stop background subprocess. + + Call this method to terminate the background subprocess and + wait for its results. + + :param timeout: Time (seconds) we'll wait until the process is + finished. If it's not, we'll try to terminate it + and its children using ``sig`` and get a + status. When the process refuses to die + within 1s we use SIGKILL and report the status + (be it exit_code or zombie). + :type timeout: float or None + :return: Exit status of the process. + :rtype: int + """ + self._init_subprocess() + if self.result.exit_status is None: + self.terminate() + return self.wait(timeout) + + def get_pid(self): + """Report PID of this process. + + :return: Process ID. + :rtype: int + """ + self._init_subprocess() + return self._popen.pid + + def get_user_id(self): + """Report user id of this process. + + :return: User ID of the process owner. + :rtype: int or None + """ + self._init_subprocess() + return get_owner_id(self.get_pid()) + + def is_sudo_enabled(self): + """Return whether the subprocess is running with sudo enabled. + + :return: True if running as root (UID 0), False otherwise. + :rtype: bool + """ + self._init_subprocess() + return not self.get_user_id() + + def run(self, timeout=None, sig=signal.SIGTERM): + """Start a process and wait for it to end, returning the result attr. + + If the process was already started using .start(), this will simply + wait for it to end. + + :param timeout: Time (seconds) we'll wait until the process is + finished. If it's not, we'll try to terminate it + and its children using ``sig`` and get a + status. When the process refuses to die + within 1s we use SIGKILL and report the status + (be it exit_code or zombie). + :type timeout: float or None + :param sig: Signal to send to the process in case it did not end after + the specified timeout. + :type sig: int + :return: The command result object. + :rtype: CmdResult + """ + self._init_subprocess() + self.wait(timeout, sig) + return self.result + + +# pylint: disable=R0913 +def run( + cmd, + timeout=None, + verbose=True, + ignore_status=False, + shell=False, + env=None, + sudo=False, + ignore_bg_processes=False, + encoding=None, + logger=None, +): + """Run a subprocess, returning a CmdResult object. + + :param cmd: Command line to run. + :type cmd: str + :param timeout: Time limit in seconds before attempting to kill the + running process. This function will take a few seconds + longer than 'timeout' to complete if it has to kill the + process. + :type timeout: float or None + :param verbose: Whether to log the command run and stdout/stderr. + :type verbose: bool + :param ignore_status: Whether to raise an exception when command returns + =! 0 (False), or not (True). + :type ignore_status: bool + :param shell: Whether to run the command on a subshell. + :type shell: bool + :param env: Use extra environment variables. + :type env: dict + :param sudo: Whether the command requires admin privileges to run, + so that sudo will be prepended to the command. + The assumption here is that the user running the command + has a sudo configuration such that a password won't be + prompted. If that's not the case, the command will + straight out fail. + :type sudo: bool + :param ignore_bg_processes: Whether to ignore background processes. + :type ignore_bg_processes: bool + :param encoding: the encoding to use for the text representation + of the command result stdout and stderr, by default + :data:`avocado.utils.astring.ENCODING` + :type encoding: str + :param logger: User's custom logger, which will be logging the subprocess + outputs. When this parameter is not set, the + `avocado.utils.process` logger will be used. + :type logger: logging.Logger + :return: A CmdResult object. + :rtype: CmdResult + :raises CmdInputError: If the command is empty. + :raises CmdError: If ``ignore_status=False`` and command fails. + """ + if not cmd: + raise CmdInputError("Invalid empty command") + if encoding is None: + encoding = astring.ENCODING + sp = SubProcess( + cmd=cmd, + verbose=verbose, + shell=shell, + env=env, + sudo=sudo, + ignore_bg_processes=ignore_bg_processes, + encoding=encoding, + logger=logger, + ) + cmd_result = sp.run(timeout=timeout) + fail_condition = cmd_result.exit_status or cmd_result.interrupted + if fail_condition and not ignore_status: + raise CmdError(cmd, sp.result) + return cmd_result + + +# pylint: disable=R0913 +def system( + cmd, + timeout=None, + verbose=True, + ignore_status=False, + shell=False, + env=None, + sudo=False, + ignore_bg_processes=False, + encoding=None, + logger=None, +): + """Run a subprocess, returning its exit code. + + :param cmd: Command line to run. + :type cmd: str + :param timeout: Time limit in seconds before attempting to kill the + running process. This function will take a few seconds + longer than 'timeout' to complete if it has to kill the + process. + :type timeout: float or None + :param verbose: Whether to log the command run and stdout/stderr. + :type verbose: bool + :param ignore_status: Whether to raise an exception when command returns + =! 0 (False), or not (True). + :type ignore_status: bool + :param shell: Whether to run the command on a subshell. + :type shell: bool + :param env: Use extra environment variables. + :type env: dict + :param sudo: Whether the command requires admin privileges to run, + so that sudo will be prepended to the command. + The assumption here is that the user running the command + has a sudo configuration such that a password won't be + prompted. If that's not the case, the command will + straight out fail. + :type sudo: bool + :param ignore_bg_processes: Whether to ignore background processes. + :type ignore_bg_processes: bool + :param encoding: the encoding to use for the text representation + of the command result stdout and stderr, by default + :data:`avocado.utils.astring.ENCODING` + :type encoding: str + :param logger: User's custom logger, which will be logging the subprocess + outputs. When this parameter is not set, the + `avocado.utils.process` logger will be used. + :type logger: logging.Logger + :return: Exit code. + :rtype: int + :raises CmdError: If ``ignore_status=False`` and command fails. + """ + cmd_result = run( + cmd=cmd, + timeout=timeout, + verbose=verbose, + ignore_status=ignore_status, + shell=shell, + env=env, + sudo=sudo, + ignore_bg_processes=ignore_bg_processes, + encoding=encoding, + logger=logger, + ) + return cmd_result.exit_status + + +# pylint: disable=R0913 +def system_output( + cmd, + timeout=None, + verbose=True, + ignore_status=False, + shell=False, + env=None, + sudo=False, + ignore_bg_processes=False, + strip_trail_nl=True, + encoding=None, + logger=None, +): + """Run a subprocess, returning its output. + + :param cmd: Command line to run. + :type cmd: str + :param timeout: Time limit in seconds before attempting to kill the + running process. This function will take a few seconds + longer than 'timeout' to complete if it has to kill the + process. + :type timeout: float or None + :param verbose: Whether to log the command run and stdout/stderr. + :type verbose: bool + :param ignore_status: Whether to raise an exception when command returns + =! 0 (False), or not (True). + :type ignore_status: bool + :param shell: Whether to run the command on a subshell. + :type shell: bool + :param env: Use extra environment variables. + :type env: dict + :param sudo: Whether the command requires admin privileges to run, + so that sudo will be prepended to the command. + The assumption here is that the user running the command + has a sudo configuration such that a password won't be + prompted. If that's not the case, the command will + straight out fail. + :type sudo: bool + :param ignore_bg_processes: Whether to ignore background processes. + :type ignore_bg_processes: bool + :param strip_trail_nl: Whether to strip the trailing newline. + :type strip_trail_nl: bool + :param encoding: the encoding to use for the text representation + of the command result stdout and stderr, by default + :data:`avocado.utils.astring.ENCODING` + :type encoding: str + :param logger: User's custom logger, which will be logging the subprocess + outputs. When this parameter is not set, the + `avocado.utils.process` logger will be used. + :type logger: logging.Logger + :return: Command output. + :rtype: bytes + :raises CmdError: If ``ignore_status=False`` and command fails. + """ + cmd_result = run( + cmd=cmd, + timeout=timeout, + verbose=verbose, + ignore_status=ignore_status, + shell=shell, + env=env, + sudo=sudo, + ignore_bg_processes=ignore_bg_processes, + encoding=encoding, + logger=logger, + ) + if strip_trail_nl: + return cmd_result.stdout.rstrip(b"\n\r") + return cmd_result.stdout + + +# pylint: disable=R0913 +def getoutput( + cmd, + timeout=None, + verbose=False, + ignore_status=True, + shell=True, + env=None, + sudo=False, + ignore_bg_processes=False, + logger=None, +): + """Return output (stdout or stderr) of executing cmd in a shell. + + Because commands module is removed in Python3 and it redirect stderr + to stdout, we port commands.getoutput to make code compatible. + + :param cmd: Command line to run. + :type cmd: str + :param timeout: Time limit in seconds before attempting to kill the + running process. This function will take a few seconds + longer than 'timeout' to complete if it has to kill the + process. + :type timeout: float or None + :param verbose: Whether to log the command run and stdout/stderr. + :type verbose: bool + :param ignore_status: Whether to raise an exception when command returns + =! 0 (False), or not (True). + :type ignore_status: bool + :param shell: Whether to run the command on a subshell. + :type shell: bool + :param env: Use extra environment variables. + :type env: dict + :param sudo: Whether the command requires admin privileges to run, + so that sudo will be prepended to the command. + The assumption here is that the user running the command + has a sudo configuration such that a password won't be + prompted. If that's not the case, the command will + straight out fail. + :type sudo: bool + :param ignore_bg_processes: Whether to ignore background processes. + :type ignore_bg_processes: bool + :param logger: User's custom logger, which will be logging the subprocess + outputs. When this parameter is not set, the + `avocado.utils.process` logger will be used. + :type logger: logging.Logger + :return: Command output (stdout or stderr). + :rtype: str + """ + return getstatusoutput( + cmd=cmd, + timeout=timeout, + verbose=verbose, + ignore_status=ignore_status, + shell=shell, + env=env, + sudo=sudo, + ignore_bg_processes=ignore_bg_processes, + logger=logger, + )[1] + + +# pylint: disable=R0913 +def getstatusoutput( + cmd, + timeout=None, + verbose=False, + ignore_status=True, + shell=True, + env=None, + sudo=False, + ignore_bg_processes=False, + logger=None, +): + """Return (status, output) of executing cmd in a shell. + + Because commands module is removed in Python3 and it redirect stderr + to stdout, we port commands.getstatusoutput to make code compatible. + + :param cmd: Command line to run. + :type cmd: str + :param timeout: Time limit in seconds before attempting to kill the + running process. This function will take a few seconds + longer than 'timeout' to complete if it has to kill the + process. + :type timeout: float or None + :param verbose: Whether to log the command run and stdout/stderr. + :type verbose: bool + :param ignore_status: Whether to raise an exception when command returns + =! 0 (False), or not (True). + :type ignore_status: bool + :param shell: Whether to run the command on a subshell. + :type shell: bool + :param env: Use extra environment variables. + :type env: dict + :param sudo: Whether the command requires admin privileges to run, + so that sudo will be prepended to the command. + The assumption here is that the user running the command + has a sudo configuration such that a password won't be + prompted. If that's not the case, the command will + straight out fail. + :type sudo: bool + :param ignore_bg_processes: Whether to ignore background processes. + :type ignore_bg_processes: bool + :param logger: User's custom logger, which will be logging the subprocess + outputs. When this parameter is not set, the + `avocado.utils.process` logger will be used. + :type logger: logging.Logger + :return: Exit status and command output (stdout and stderr). + :rtype: tuple + """ + cmd_result = run( + cmd=cmd, + timeout=timeout, + verbose=verbose, + ignore_status=ignore_status, + shell=shell, + env=env, + sudo=sudo, + ignore_bg_processes=ignore_bg_processes, + logger=logger, + ) + text = cmd_result.stdout_text + sts = cmd_result.exit_status + if text[-1:] == "\n": + text = text[:-1] + return (sts, text) + + +def get_owner_id(pid): + """Get the user ID of the process owner. + + This function reads the /proc filesystem to determine the user ID + that owns the specified process. + + .. note:: This is currently Linux specific. + + :param pid: The process ID to query. + :type pid: int + :return: The user ID of the process owner, or None if not found. + :rtype: int or None + + Example:: + + >>> get_owner_id(1) + 0 + >>> get_owner_id(999999) + None + """ + try: + return os.stat(f"/proc/{int(pid)}/").st_uid + except OSError: + return None + + +def get_command_output_matching(command, pattern): + """Run a command and return lines matching a pattern. + + This function executes a command and searches its output for lines + containing the specified pattern, returning all matching lines. + + :param command: The command to execute. + :type command: str + :param pattern: Pattern to search for in the output. Matching is done + on a line-by-line basis using substring matching. + :type pattern: str + :return: List of lines from the command output that contain the pattern. + :rtype: list of str + + Example:: + + >>> get_command_output_matching("ls -la", "txt") + ['file1.txt', 'file2.txt'] + """ + return [line for line in run(command).stdout_text.splitlines() if pattern in line] diff --git a/docs/source/utils.rst b/docs/source/utils.rst index 04a8b89..99b79be 100644 --- a/docs/source/utils.rst +++ b/docs/source/utils.rst @@ -39,6 +39,10 @@ Output ------ .. automodule:: autils.devel.output +Process +------- +.. automodule:: autils.devel.process + Script ------ .. automodule:: autils.devel.script diff --git a/metadata/devel/process.yml b/metadata/devel/process.yml new file mode 100644 index 0000000..5db68db --- /dev/null +++ b/metadata/devel/process.yml @@ -0,0 +1,19 @@ +name: process +description: Functions dedicated to find and run external commands + +categories: + - OS + - Misc +maintainers: + - name: Harvey James Lynden + email: hlynden@redhat.com + github_usr_name: harvey0100 +supported_platforms: + - CentOS Stream 9 + - Fedora 36 + - Fedora 37 +tests: + - tests/unit/modules/devel/process.py + - tests/functional/modules/devel/process.py +remote: false + diff --git a/tests/functional/modules/devel/process.py b/tests/functional/modules/devel/process.py new file mode 100644 index 0000000..e9ce4ef --- /dev/null +++ b/tests/functional/modules/devel/process.py @@ -0,0 +1,287 @@ +import os +import stat +import sys +import time + +from autils.devel import process +from tests.utils import TestCaseTmpDir, skipOnLevelsInferiorThan + +# What is commonly known as "0775" or "u=rwx,g=rwx,o=rx" +DEFAULT_MODE = ( + stat.S_IRUSR + | stat.S_IWUSR + | stat.S_IXUSR + | stat.S_IRGRP + | stat.S_IWGRP + | stat.S_IXGRP + | stat.S_IROTH + | stat.S_IXOTH +) + +FAKE_VMSTAT_CONTENTS = f"""#!{sys.executable} +import time +import random +import signal +import sys + +class FakeVMStat: + def __init__(self, interval, interrupt_delay=0): + self.interval = interval + self._sysrand = random.SystemRandom() + def interrupt_handler(signum, frame): + time.sleep(interrupt_delay) + sys.exit(0) + signal.signal(signal.SIGINT, interrupt_handler) + signal.signal(signal.SIGTERM, interrupt_handler) + + def get_r(self): + return self._sysrand.randint(0, 2) + + def get_b(self): + return 0 + + def get_swpd(self): + return 0 + + def get_free(self): + return self._sysrand.randint(1500000, 1600000) + + def get_buff(self): + return self._sysrand.randint(290000, 300000) + + def get_cache(self): + return self._sysrand.randint(2900000, 3000000) + + def get_si(self): + return 0 + + def get_so(self): + return 0 + + def get_bi(self): + return self._sysrand.randint(0, 50) + + def get_bo(self): + return self._sysrand.randint(0, 500) + + def get_in(self): + return self._sysrand.randint(200, 3000) + + def get_cs(self): + return self._sysrand.randint(1000, 4000) + + def get_us(self): + return self._sysrand.randint(0, 40) + + def get_sy(self): + return self._sysrand.randint(1, 5) + + def get_id(self): + return self._sysrand.randint(50, 100) + + def get_wa(self): + return 0 + + def get_st(self): + return 0 + + def start(self): + print("procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----") + print(" r b swpd free buff cache si so bi bo in cs us sy id wa st") + while True: + r = self.get_r() + b = self.get_b() + swpd = self.get_swpd() + free = self.get_free() + buff = self.get_buff() + cache = self.get_cache() + si = self.get_si() + so = self.get_so() + bi = self.get_bi() + bo = self.get_bo() + m_in = self.get_in() + cs = self.get_cs() + us = self.get_us() + sy = self.get_sy() + m_id = self.get_id() + wa = self.get_wa() + st = self.get_st() + print("%2d %2d %2d %7d %6d %7d %1d %1d %2d %3d %4d %2d %2d %1d %3d %1d %1d" % + (r, b, swpd, free, buff, cache, si, so, bi, bo, m_in, cs, + us, sy, m_id, wa, st)) + time.sleep(self.interval) + +if __name__ == '__main__': + vmstat = FakeVMStat(interval=float(sys.argv[1]), interrupt_delay=float(sys.argv[2])) + vmstat.start() +""" + +FAKE_UPTIME_CONTENTS = f"""#!{sys.executable} +if __name__ == '__main__': + print("17:56:34 up 8:06, 7 users, load average: 0.26, 0.20, 0.21") + +""" + +ENV_TEST_SCRIPT = """#!{interpreter} +import os +print(os.environ.get('TEST_VAR', 'NOT_SET')) +""" + +SUCCESS_SCRIPT = """#!{interpreter} +print('success') +""" + +LARGE_OUTPUT_SCRIPT = """#!{interpreter} +for i in range(1000): + print(f'Line {{i}}' * 100) +""" + +UNICODE_TEST_SCRIPT = """#!{interpreter} +# -*- coding: utf-8 -*- +print('Avokádo') +""" + + +class ProcessTest(TestCaseTmpDir): + def setUp(self): + super().setUp() + self.fake_vmstat = os.path.join(self.tmpdir.name, "vmstat") + with open(self.fake_vmstat, "w", encoding="utf-8") as fake_vmstat_obj: + fake_vmstat_obj.write(FAKE_VMSTAT_CONTENTS) + os.chmod(self.fake_vmstat, DEFAULT_MODE) + + self.fake_uptime = os.path.join(self.tmpdir.name, "uptime") + with open(self.fake_uptime, "w", encoding="utf-8") as fake_uptime_obj: + fake_uptime_obj.write(FAKE_UPTIME_CONTENTS) + os.chmod(self.fake_uptime, DEFAULT_MODE) + + def _create_script(self, name, contents): + """Create a script file from a template with the interpreter substituted. + + :param name: Name for the script file + :param contents: Script contents template with {interpreter} placeholder + :returns: Path to the created script + """ + script_path = os.path.join(self.tmpdir.name, name) + with open(script_path, "w", encoding="utf-8") as script_file: + script_file.write(contents.format(interpreter=sys.executable)) + os.chmod(script_path, DEFAULT_MODE) + return script_path + + @skipOnLevelsInferiorThan(2) + def test_process_start(self): + """ + :avocado: tags=parallel:1 + """ + proc = process.SubProcess(f"{self.fake_vmstat} 1 0") + proc.start() + time.sleep(3) + proc.terminate() + proc.wait(timeout=1) + stdout = proc.get_stdout().decode() + self.assertIn("memory", stdout, f"result: {stdout}") + self.assertRegex(stdout, "[0-9]+") + + @skipOnLevelsInferiorThan(2) + def test_process_stop_interrupted(self): + """ + :avocado: tags=parallel:1 + """ + proc = process.SubProcess(f"{self.fake_vmstat} 1 3") + proc.start() + time.sleep(3) + proc.stop(2) + result = proc.result + self.assertIn("timeout after", result.interrupted, "Process wasn't interrupted") + + @skipOnLevelsInferiorThan(2) + def test_process_stop_uninterrupted(self): + """ + :avocado: tags=parallel:1 + """ + proc = process.SubProcess(f"{self.fake_vmstat} 1 3") + proc.start() + time.sleep(3) + proc.stop(4) + result = proc.result + self.assertFalse(result.interrupted, "Process was interrupted to early") + + def test_process_run(self): + proc = process.SubProcess(self.fake_uptime) + result = proc.run() + self.assertEqual(result.exit_status, 0, f"result: {result}") + self.assertIn(b"load average", result.stdout) + + def test_run_and_system_output_with_environment(self): + """Test process.run() and process.system_output() with environment variables""" + env_script = self._create_script("env_test.py", ENV_TEST_SCRIPT) + + # Test process.run() with custom environment variable + result = process.run(env_script, env={"TEST_VAR": "custom_value"}) + self.assertEqual(result.stdout.strip(), b"custom_value") + self.assertEqual(result.exit_status, 0) + + # Test process.system_output() also respects environment + output = process.system_output( + env_script, env={"TEST_VAR": "from_system_output"} + ) + self.assertEqual(output.strip(), b"from_system_output") + + def test_getstatusoutput_integration(self): + """Test getstatusoutput function with real commands""" + success_script = self._create_script("success.py", SUCCESS_SCRIPT) + + status, output = process.getstatusoutput(success_script) + self.assertEqual(status, 0) + self.assertEqual(output, "success") + + def test_multiple_subprocess_instances(self): + """Test running multiple subprocess instances concurrently""" + procs = [] + for i in range(3): + proc = process.SubProcess( + f"{sys.executable} -c 'import time; time.sleep(1); print({i})'" + ) + proc.start() + procs.append(proc) + + # Wait for all processes to complete + for i, proc in enumerate(procs): + proc.wait() + self.assertEqual(proc.result.exit_status, 0) + self.assertIn(str(i).encode(), proc.result.stdout) + + @skipOnLevelsInferiorThan(2) + def test_process_with_large_output(self): + """Test process captures complete large stdout output""" + large_output_script = self._create_script( + "large_output.py", LARGE_OUTPUT_SCRIPT + ) + + result = process.run(large_output_script) + self.assertEqual(result.exit_status, 0) + + # Verify we captured EXACTLY the expected output + lines = result.stdout.decode().strip().split("\n") + self.assertEqual(len(lines), 1000, "Should capture exactly 1000 lines") + + # Verify format of first and last lines to ensure no corruption + self.assertEqual(lines[0], "Line 0" * 100) + self.assertEqual(lines[999], "Line 999" * 100) + + def test_cmdresult_encoding(self): + """Test CmdResult with different character encoding""" + unicode_script = self._create_script("unicode_test.py", UNICODE_TEST_SCRIPT) + + result = process.run(unicode_script, encoding="utf-8") + self.assertEqual(result.encoding, "utf-8") + self.assertIn("Avokádo", result.stdout_text) + + def test_binary_from_shell_cmd_realworld(self): + """Test binary_from_shell_cmd with real-world command patterns""" + # Simulating common command patterns + cmd1 = "FOO=bar BAR=baz /usr/bin/python script.py --arg value" + self.assertEqual(process.binary_from_shell_cmd(cmd1), "/usr/bin/python") + + cmd2 = "sudo -u user /bin/bash -c 'echo test'" + self.assertEqual(process.binary_from_shell_cmd(cmd2), "sudo") diff --git a/tests/unit/modules/devel/process.py b/tests/unit/modules/devel/process.py new file mode 100644 index 0000000..942b67d --- /dev/null +++ b/tests/unit/modules/devel/process.py @@ -0,0 +1,953 @@ +# pylint: disable=too-many-arguments +import errno +import io +import logging +import os +import sys +import time +import unittest.mock + +from autils.devel import process, script +from autils.file import path +from tests.utils import ( + setup_autils_loggers, + skipOnLevelsInferiorThan, + skipUnlessPathExists, +) + +setup_autils_loggers() + + +def probe_binary(binary): + try: + return path.find_command(binary) + except path.CmdNotFoundError: + return None + + +ECHO_CMD = probe_binary("echo") +FICTIONAL_CMD = "/usr/bin/fictional_cmd" + +REFUSE_TO_DIE = """import signal +import time + +for sig in range(64): + try: + signal.signal(sig, signal.SIG_IGN) + except: + pass + +end = time.monotonic() + 120 + +while time.monotonic() < end: + time.sleep(1)""" + + +class TestSubProcess(unittest.TestCase): + @unittest.mock.patch("autils.devel.process.SubProcess._init_subprocess") + @unittest.mock.patch("autils.devel.process.SubProcess.is_sudo_enabled") + @unittest.mock.patch("autils.devel.process.SubProcess.get_pid") + @unittest.mock.patch("autils.devel.process.get_children_pids") + @unittest.mock.patch("autils.devel.process.run") + def test_send_signal_sudo_enabled(self, run, get_children, get_pid, sudo, _): + signal = 1 + pid = 122 + child_pid = 123 + sudo.return_value = True + get_pid.return_value = pid + get_children.return_value = [child_pid] + + subprocess = process.SubProcess(FICTIONAL_CMD) + subprocess.send_signal(signal) + + kill_cmd = "kill -%d %d" + calls = [ + unittest.mock.call(kill_cmd % (signal, child_pid), sudo=True), + unittest.mock.call(kill_cmd % (signal, pid), sudo=True), + ] + run.assert_has_calls(calls) + + @unittest.mock.patch("autils.devel.process.SubProcess._init_subprocess") + @unittest.mock.patch("autils.devel.process.SubProcess.is_sudo_enabled") + @unittest.mock.patch("autils.devel.process.SubProcess.get_pid") + @unittest.mock.patch("autils.devel.process.get_children_pids") + @unittest.mock.patch("autils.devel.process.run") + def test_send_signal_sudo_enabled_with_exception( + self, run, get_children, get_pid, sudo, _ + ): + signal = 1 + pid = 122 + child_pid = 123 + sudo.return_value = True + get_pid.return_value = pid + get_children.return_value = [child_pid] + run.side_effect = Exception() + + subprocess = process.SubProcess(FICTIONAL_CMD) + subprocess.send_signal(signal) + + kill_cmd = "kill -%d %d" + calls = [ + unittest.mock.call(kill_cmd % (signal, child_pid), sudo=True), + unittest.mock.call(kill_cmd % (signal, pid), sudo=True), + ] + run.assert_has_calls(calls) + + @unittest.mock.patch("autils.devel.process.SubProcess._init_subprocess") + @unittest.mock.patch("autils.devel.process.SubProcess.get_pid") + @unittest.mock.patch("autils.devel.process.get_owner_id") + def test_get_user_id(self, get_owner, get_pid, _): + user_id = 1 + process_id = 123 + get_pid.return_value = process_id + get_owner.return_value = user_id + + subprocess = process.SubProcess(FICTIONAL_CMD) + + self.assertEqual(subprocess.get_user_id(), user_id) + get_owner.assert_called_with(process_id) + + @unittest.mock.patch("autils.devel.process.SubProcess._init_subprocess") + @unittest.mock.patch("autils.devel.process.SubProcess.get_pid") + @unittest.mock.patch("autils.devel.process.get_owner_id") + def test_is_sudo_enabled_false(self, get_owner, get_pid, _): + user_id = 1 + process_id = 123 + get_pid.return_value = process_id + get_owner.return_value = user_id + + subprocess = process.SubProcess(FICTIONAL_CMD) + + self.assertFalse(subprocess.is_sudo_enabled()) + get_owner.assert_called_with(process_id) + + @unittest.mock.patch("autils.devel.process.SubProcess._init_subprocess") + @unittest.mock.patch("autils.devel.process.SubProcess.get_pid") + @unittest.mock.patch("autils.devel.process.get_owner_id") + def test_is_sudo_enabled_true(self, get_owner, get_pid, _): + user_id = 0 + process_id = 123 + get_pid.return_value = process_id + get_owner.return_value = user_id + + subprocess = process.SubProcess(FICTIONAL_CMD) + + self.assertTrue(subprocess.is_sudo_enabled()) + get_owner.assert_called_with(process_id) + + +def mock_fail_find_cmd(cmd, default=None, check_exec=True): # pylint: disable=W0613 + path_paths = [ + "/usr/libexec", + "/usr/local/sbin", + "/usr/local/bin", + "/usr/sbin", + "/usr/bin", + "/sbin", + "/bin", + ] + raise path.CmdNotFoundError(cmd, path_paths) + + +class TestProcessRun(unittest.TestCase): + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=1000)) + def test_subprocess_nosudo(self): + expected_command = "ls -l" + p = process.SubProcess(cmd="ls -l") + self.assertEqual(p.cmd, expected_command) + + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=0)) + def test_subprocess_nosudo_uid_0(self): + expected_command = "ls -l" + p = process.SubProcess(cmd="ls -l") + self.assertEqual(p.cmd, expected_command) + + @unittest.mock.patch.object( + path, "find_command", unittest.mock.Mock(return_value="/bin/sudo") + ) + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=1000)) + def test_subprocess_sudo(self): + expected_command = "/bin/sudo -n ls -l" + p = process.SubProcess(cmd="ls -l", sudo=True) + path.find_command.assert_called_once_with("sudo", check_exec=False) + self.assertEqual(p.cmd, expected_command) + + @unittest.mock.patch.object(path, "find_command", mock_fail_find_cmd) + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=1000)) + def test_subprocess_sudo_no_sudo_installed(self): + expected_command = "ls -l" + p = process.SubProcess(cmd="ls -l", sudo=True) + self.assertEqual(p.cmd, expected_command) + + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=0)) + def test_subprocess_sudo_uid_0(self): + expected_command = "ls -l" + p = process.SubProcess(cmd="ls -l", sudo=True) + self.assertEqual(p.cmd, expected_command) + + @unittest.mock.patch.object( + path, "find_command", unittest.mock.Mock(return_value="/bin/sudo") + ) + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=1000)) + def test_subprocess_sudo_shell(self): + expected_command = "/bin/sudo -n -s ls -l" + p = process.SubProcess(cmd="ls -l", sudo=True, shell=True) + path.find_command.assert_called_once_with("sudo", check_exec=False) + self.assertEqual(p.cmd, expected_command) + + @unittest.mock.patch.object(path, "find_command", mock_fail_find_cmd) + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=1000)) + def test_subprocess_sudo_shell_no_sudo_installed(self): + expected_command = "ls -l" + p = process.SubProcess(cmd="ls -l", sudo=True, shell=True) + self.assertEqual(p.cmd, expected_command) + + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=0)) + def test_subprocess_sudo_shell_uid_0(self): + expected_command = "ls -l" + p = process.SubProcess(cmd="ls -l", sudo=True, shell=True) + self.assertEqual(p.cmd, expected_command) + + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=1000)) + def test_run_nosudo(self): + expected_command = "ls -l" + p = process.run(cmd="ls -l", ignore_status=True) + self.assertEqual(p.command, expected_command) + + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=0)) + def test_run_nosudo_uid_0(self): + expected_command = "ls -l" + p = process.run(cmd="ls -l", ignore_status=True) + self.assertEqual(p.command, expected_command) + + @skipUnlessPathExists("/bin/sudo") + @unittest.mock.patch.object( + path, "find_command", unittest.mock.Mock(return_value="/bin/sudo") + ) + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=1000)) + def test_run_sudo(self): + expected_command = "/bin/sudo -n ls -l" + p = process.run(cmd="ls -l", sudo=True, ignore_status=True) + path.find_command.assert_called_once_with("sudo", check_exec=False) + self.assertEqual(p.command, expected_command) + + @unittest.mock.patch.object(path, "find_command", mock_fail_find_cmd) + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=1000)) + def test_run_sudo_no_sudo_installed(self): + expected_command = "ls -l" + p = process.run(cmd="ls -l", sudo=True, ignore_status=True) + self.assertEqual(p.command, expected_command) + + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=0)) + def test_run_sudo_uid_0(self): + expected_command = "ls -l" + p = process.run(cmd="ls -l", sudo=True, ignore_status=True) + self.assertEqual(p.command, expected_command) + + @unittest.mock.patch.object( + path, "find_command", unittest.mock.Mock(return_value="/bin/sudo") + ) + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=1000)) + def test_run_sudo_shell(self): + expected_command = "/bin/sudo -n -s ls -l" + p = process.run(cmd="ls -l", sudo=True, shell=True, ignore_status=True) + path.find_command.assert_called_once_with("sudo", check_exec=False) + self.assertEqual(p.command, expected_command) + + @unittest.mock.patch.object(path, "find_command", mock_fail_find_cmd) + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=1000)) + def test_run_sudo_shell_no_sudo_installed(self): + expected_command = "ls -l" + p = process.run(cmd="ls -l", sudo=True, shell=True, ignore_status=True) + self.assertEqual(p.command, expected_command) + + @unittest.mock.patch.object(os, "getuid", unittest.mock.Mock(return_value=0)) + def test_run_sudo_shell_uid_0(self): + expected_command = "ls -l" + p = process.run(cmd="ls -l", sudo=True, shell=True, ignore_status=True) + self.assertEqual(p.command, expected_command) + + @unittest.skipUnless(ECHO_CMD, "Echo command not available in system") + def test_run_unicode_output(self): + # Using encoded string as shlex does not support decoding + # but the behavior is exactly the same as if shell binary + # produced unicode + text = "Avok\xe1do" + # Even though code point used is "LATIN SMALL LETTER A WITH ACUTE" + # (http://unicode.scarfboy.com/?s=u%2B00e1) when encoded to proper + # utf-8, it becomes two bytes because it is >= 0x80 + # See https://en.wikipedia.org/wiki/UTF-8 + encoded_text = b"Avok\xc3\xa1do" + self.assertEqual(text.encode("utf-8"), encoded_text) + self.assertEqual(encoded_text.decode("utf-8"), text) + cmd = f"{ECHO_CMD} -n {text}" + result = process.run(cmd, encoding="utf-8") + self.assertEqual(result.stdout, encoded_text) + self.assertEqual(result.stdout_text, text) + + @skipOnLevelsInferiorThan(2) + def test_run_with_timeout_ugly_cmd(self): + """ + :avocado: tags=parallel:1 + """ + with script.TemporaryScript("refuse_to_die", REFUSE_TO_DIE) as exe: + cmd = f"{sys.executable} '{exe.path}'" + # Wait 1s to set the traps + res = process.run(cmd, timeout=1, ignore_status=True) + self.assertLess( + res.duration, + 100, + ( + f"Took longer than expected, process probably " + f"not interrupted by Avocado.\n{res}" + ), + ) + self.assertNotEqual( + res.exit_status, + 0, + ( + f"Command finished without reporting " + f"failure but should be killed.\n{res}" + ), + ) + + @skipOnLevelsInferiorThan(2) + def test_run_with_negative_timeout_ugly_cmd(self): + """ + :avocado: tags=parallel:1 + """ + with script.TemporaryScript("refuse_to_die", REFUSE_TO_DIE) as exe: + cmd = f"{sys.executable} '{exe.path}'" + # Wait 1s to set the traps + proc = process.SubProcess(cmd) + proc.start() + time.sleep(1) + proc.wait(-1) + res = proc.result + self.assertLess( + res.duration, + 100, + ( + f"Took longer than expected, process probably " + f"not interrupted by Avocado.\n{res}" + ), + ) + self.assertNotEqual( + res.exit_status, + 0, + ( + f"Command finished without reporting " + f"failure but should be killed.\n{res}" + ), + ) + + +class MiscProcessTests(unittest.TestCase): + def test_binary_from_shell(self): + self.assertEqual("binary", process.binary_from_shell_cmd("binary")) + res = process.binary_from_shell_cmd( + "MY_VAR=foo myV4r=123 " + "quote='a b c' QUOTE=\"1 2 && b\" " + "QuOtE=\"1 2\"foo' 3 4' first_bin " + "second_bin VAR=xyz" + ) + self.assertEqual("first_bin", res) + res = process.binary_from_shell_cmd( + "VAR=VALUE 1st_binary var=value second_binary" + ) + self.assertEqual("1st_binary", res) + res = process.binary_from_shell_cmd("FOO=bar ./bin var=value") + self.assertEqual("./bin", res) + + def test_binary_from_shell_cmd_invalid(self): + """Test binary_from_shell_cmd with invalid input""" + with self.assertRaises(ValueError): + process.binary_from_shell_cmd("VAR=value OTHER=value") + with self.assertRaises(ValueError): + process.binary_from_shell_cmd("") + + def test_cmd_split(self): + self.assertEqual(process.cmd_split(""), []) + self.assertEqual( + process.cmd_split("avok\xe1do_test_runner arguments"), + ["avok\xe1do_test_runner", "arguments"], + ) + + def test_get_parent_pid(self): + stat = b"18405 (bash) S 24139 18405 18405 34818 8056 4210688 9792 170102 0 7 11 4 257 84 20 0 1 0 44336493 235409408 4281 18446744073709551615 94723230367744 94723231442728 140723100226000 0 0 0 65536 3670020 1266777851 0 0 0 17 1 0 0 0 0 0 94723233541456 94723233588580 94723248717824 140723100229613 140723100229623 140723100229623 140723100233710 0" + with unittest.mock.patch("builtins.open", return_value=io.BytesIO(stat)): + self.assertTrue(process.get_parent_pid(0), 24139) + + @unittest.skipUnless( + sys.platform.startswith("linux"), "Linux specific feature and test" + ) + def test_get_children_pids(self): + """ + Gets the list of children process. Linux only. + """ + self.assertGreaterEqual(len(process.get_children_pids(os.getppid())), 1) + + @unittest.mock.patch("autils.devel.process.os.kill") + @unittest.mock.patch("autils.devel.process.get_owner_id") + def test_safe_kill(self, owner_mocked, kill_mocked): + owner_id = 1 + process_id = 123 + signal = 1 + owner_mocked.return_value = owner_id + + killed = process.safe_kill(process_id, signal) + self.assertTrue(killed) + kill_mocked.assert_called_with(process_id, signal) + + @unittest.mock.patch("autils.devel.process.os.kill") + @unittest.mock.patch("autils.devel.process.get_owner_id") + def test_safe_kill_with_exception(self, owner_mocked, kill_mocked): + owner_id = 1 + process_id = 123 + signal = 1 + owner_mocked.return_value = owner_id + kill_mocked.side_effect = Exception() + + killed = process.safe_kill(process_id, signal) + self.assertFalse(killed) + kill_mocked.assert_called_with(process_id, signal) + + @unittest.mock.patch("autils.devel.process.run") + @unittest.mock.patch("autils.devel.process.get_owner_id") + def test_safe_kill_sudo_enabled(self, owner_mocked, run_mocked): + owner_id = 0 + process_id = 123 + signal = 1 + owner_mocked.return_value = owner_id + expected_cmd = f"kill -{signal} {process_id}" + + killed = process.safe_kill(process_id, signal) + self.assertTrue(killed) + run_mocked.assert_called_with(expected_cmd, sudo=True) + + @unittest.mock.patch("autils.devel.process.run") + @unittest.mock.patch("autils.devel.process.get_owner_id") + def test_safe_kill_sudo_enabled_with_exception(self, owner_mocked, run_mocked): + owner_id = 0 + process_id = 123 + signal = 1 + owner_mocked.return_value = owner_id + expected_cmd = f"kill -{signal} {process_id}" + run_mocked.side_effect = process.CmdError() + + killed = process.safe_kill(process_id, signal) + self.assertFalse(killed) + run_mocked.assert_called_with(expected_cmd, sudo=True) + + @unittest.mock.patch("autils.devel.process.os.stat") + def test_process_get_owner_id(self, stat_mock): + process_id = 123 + owner_user_id = 13 + stat_mock.return_value = unittest.mock.Mock(st_uid=owner_user_id) + + returned_owner_id = process.get_owner_id(process_id) + + self.assertEqual(returned_owner_id, owner_user_id) + stat_mock.assert_called_with(f"/proc/{process_id}/") + + @unittest.mock.patch("autils.devel.process.os.stat") + def test_process_get_owner_id_with_pid_not_found(self, stat_mock): + process_id = 123 + stat_mock.side_effect = OSError() + + returned_owner_id = process.get_owner_id(process_id) + + self.assertIsNone(returned_owner_id) + stat_mock.assert_called_with(f"/proc/{process_id}/") + + @unittest.mock.patch("autils.devel.process.time.sleep") + @unittest.mock.patch("autils.devel.process.safe_kill") + @unittest.mock.patch("autils.devel.process.get_children_pids") + def test_kill_process_tree_nowait(self, get_children_pids, safe_kill, sleep): + safe_kill.return_value = True + get_children_pids.return_value = [] + self.assertEqual([1], process.kill_process_tree(1)) + self.assertEqual(sleep.call_count, 0) + + @unittest.mock.patch("autils.devel.process.safe_kill") + @unittest.mock.patch("autils.devel.process.get_children_pids") + @unittest.mock.patch("autils.devel.process.time.time") + @unittest.mock.patch("autils.devel.process.time.sleep") + @unittest.mock.patch("autils.devel.process.pid_exists") + def test_kill_process_tree_timeout_3s( + self, pid_exists, sleep, p_time, get_children_pids, safe_kill + ): + safe_kill.return_value = True + get_children_pids.return_value = [] + p_time.side_effect = [ + 500, + 502, + 502, + 502, + 502, + 502, + 502, + 504, + 504, + 504, + 520, + 520, + 520, + ] + sleep.return_value = None + pid_exists.return_value = True + self.assertRaises(RuntimeError, process.kill_process_tree, 17, timeout=3) + self.assertLess(p_time.call_count, 10) + + @unittest.mock.patch("autils.devel.process.safe_kill") + @unittest.mock.patch("autils.devel.process.get_children_pids") + @unittest.mock.patch("autils.devel.process.time.time") + @unittest.mock.patch("autils.devel.process.time.sleep") + @unittest.mock.patch("autils.devel.process.pid_exists") + def test_kill_process_tree_dont_timeout_3s( + self, pid_exists, sleep, p_time, get_children_pids, safe_kill + ): + safe_kill.return_value = True + get_children_pids.return_value = [] + p_time.side_effect = [500, 502, 502, 502, 502, 502, 502, 502, 502, 503] + sleep.return_value = None + pid_exists.side_effect = [True, False] + self.assertEqual([76], process.kill_process_tree(76, timeout=3)) + self.assertLess(p_time.call_count, 10) + + @unittest.mock.patch("autils.devel.process.safe_kill") + @unittest.mock.patch("autils.devel.process.get_children_pids") + @unittest.mock.patch("autils.devel.process.time.sleep") + @unittest.mock.patch("autils.devel.process.pid_exists") + def test_kill_process_tree_dont_timeout_infinity( + self, pid_exists, sleep, get_children_pids, safe_kill + ): + safe_kill.return_value = True + get_children_pids.return_value = [] + sleep.return_value = None + pid_exists.side_effect = [True, True, True, True, True, False] + + self.assertEqual([31], process.kill_process_tree(31, timeout=-7.354)) + + self.assertEqual(pid_exists.call_count, 6) + self.assertEqual(sleep.call_count, 5) + + @unittest.mock.patch("autils.devel.process.time.sleep") + @unittest.mock.patch("autils.devel.process.safe_kill") + @unittest.mock.patch("autils.devel.process.get_children_pids") + def test_kill_process_tree_children(self, get_children_pids, safe_kill, sleep): + safe_kill.return_value = True + get_children_pids.side_effect = [[53, 12], [78, 58, 41], [], [13], [], [], []] + self.assertEqual([31, 53, 78, 58, 13, 41, 12], process.kill_process_tree(31)) + self.assertEqual(sleep.call_count, 0) + + def test_empty_command(self): + with self.assertRaises(process.CmdInputError): + process.run("") + + @unittest.mock.patch("autils.devel.process.os.getuid") + def test_can_sudo_as_root(self, getuid_mock): + """Test can_sudo when running as root""" + getuid_mock.return_value = 0 + self.assertTrue(process.can_sudo()) + + @unittest.mock.patch("autils.file.path.find_command") + @unittest.mock.patch("autils.devel.process.os.getuid") + def test_can_sudo_no_sudo_installed(self, getuid_mock, find_cmd_mock): + """Test can_sudo when sudo is not installed""" + getuid_mock.return_value = 1000 + find_cmd_mock.side_effect = path.CmdNotFoundError("sudo", []) + self.assertFalse(process.can_sudo()) + + @unittest.mock.patch("autils.devel.process.system") + @unittest.mock.patch("autils.file.path.find_command") + @unittest.mock.patch("autils.devel.process.os.getuid") + def test_can_sudo_with_specific_cmd(self, getuid_mock, find_cmd_mock, system_mock): + """Test can_sudo with a specific command""" + getuid_mock.return_value = 1000 + find_cmd_mock.return_value = "/usr/bin/sudo" + system_mock.return_value = 0 + self.assertTrue(process.can_sudo("ls -l")) + + @unittest.mock.patch("autils.devel.process.system") + @unittest.mock.patch("autils.file.path.find_command") + @unittest.mock.patch("autils.devel.process.os.getuid") + def test_can_sudo_oserror(self, getuid_mock, find_cmd_mock, system_mock): + """Test can_sudo when OSError occurs""" + getuid_mock.return_value = 1000 + find_cmd_mock.return_value = "/usr/bin/sudo" + system_mock.side_effect = OSError() + self.assertFalse(process.can_sudo()) + + @unittest.mock.patch("autils.devel.process.get_capabilities") + def test_has_capability_true(self, get_caps_mock): + """Test has_capability when capability exists""" + get_caps_mock.return_value = ["cap_chown", "cap_dac_override", "cap_kill"] + self.assertTrue(process.has_capability("cap_kill")) + + @unittest.mock.patch("autils.devel.process.get_capabilities") + def test_has_capability_false(self, get_caps_mock): + """Test has_capability when capability doesn't exist""" + get_caps_mock.return_value = ["cap_chown", "cap_dac_override"] + self.assertFalse(process.has_capability("cap_kill")) + + @unittest.mock.patch("autils.devel.process.get_capabilities") + def test_has_capability_with_pid(self, get_caps_mock): + """Test has_capability with specific PID""" + get_caps_mock.return_value = ["cap_sys_admin"] + self.assertTrue(process.has_capability("cap_sys_admin", pid=1234)) + get_caps_mock.assert_called_with(1234) + + @unittest.mock.patch("autils.devel.process.os.kill") + def test_pid_exists_true(self, kill_mock): + """Test pid_exists when PID exists""" + kill_mock.return_value = None + self.assertTrue(process.pid_exists(1234)) + kill_mock.assert_called_with(1234, 0) + + @unittest.mock.patch("autils.devel.process.os.kill") + def test_pid_exists_false(self, kill_mock): + """Test pid_exists when PID doesn't exist""" + kill_mock.side_effect = OSError(errno.ESRCH, "No such process") + self.assertFalse(process.pid_exists(99999)) + + @unittest.mock.patch("autils.devel.process.os.kill") + def test_pid_exists_permission_denied(self, kill_mock): + """Test pid_exists when permission denied (PID still exists)""" + kill_mock.side_effect = OSError(errno.EPERM, "Operation not permitted") + self.assertTrue(process.pid_exists(1)) + + @unittest.mock.patch("autils.devel.process.run") + def test_kill_process_by_pattern(self, run_mock): + """Test kill_process_by_pattern""" + run_mock.return_value = process.CmdResult(exit_status=0) + process.kill_process_by_pattern("test_pattern") + run_mock.assert_called_with("pkill -f test_pattern", ignore_status=True) + + @unittest.mock.patch("autils.devel.process.run") + def test_kill_process_by_pattern_failure(self, run_mock): + """Test kill_process_by_pattern when it fails""" + run_mock.return_value = process.CmdResult(exit_status=1) + # Should not raise exception + process.kill_process_by_pattern("nonexistent") + + @unittest.skipUnless(ECHO_CMD, "Echo command not available in system") + def test_system_function(self): + """Test system() function returns exit code""" + exit_code = process.system(f"{ECHO_CMD} test", ignore_status=True) + self.assertEqual(exit_code, 0) + + @unittest.skipUnless(ECHO_CMD, "Echo command not available in system") + def test_system_with_failure(self): + """Test system() with failing command""" + exit_code = process.system("false", ignore_status=True, shell=True) + self.assertNotEqual(exit_code, 0) + + @unittest.skipUnless(ECHO_CMD, "Echo command not available in system") + def test_system_output_function(self): + """Test system_output() function returns output""" + output = process.system_output(f"{ECHO_CMD} -n hello", ignore_status=True) + self.assertEqual(output, b"hello") + + @unittest.skipUnless(ECHO_CMD, "Echo command not available in system") + def test_system_output_strip_newline(self): + """Test system_output() strips trailing newline""" + output = process.system_output(f"{ECHO_CMD} hello", ignore_status=True) + self.assertEqual(output, b"hello") + + @unittest.skipUnless(ECHO_CMD, "Echo command not available in system") + def test_system_output_no_strip(self): + """Test system_output() without stripping newline""" + output = process.system_output( + f"{ECHO_CMD} hello", ignore_status=True, strip_trail_nl=False + ) + self.assertEqual(output, b"hello\n") + + @unittest.skipUnless(ECHO_CMD, "Echo command not available in system") + def test_getoutput_function(self): + """Test getoutput() function""" + output = process.getoutput(f"{ECHO_CMD} -n test") + self.assertEqual(output, "test") + + @unittest.skipUnless(ECHO_CMD, "Echo command not available in system") + def test_getstatusoutput_function(self): + """Test getstatusoutput() function""" + status, output = process.getstatusoutput(f"{ECHO_CMD} hello") + self.assertEqual(status, 0) + self.assertEqual(output, "hello") + + @unittest.skipUnless(ECHO_CMD, "Echo command not available in system") + def test_getstatusoutput_with_failure(self): + """Test getstatusoutput() with failing command""" + status, _output = process.getstatusoutput("false") + self.assertNotEqual(status, 0) + + +class CmdResultTests(unittest.TestCase): + def test_nasty_str(self): + result = process.CmdResult( + "ls", + b"unicode_follows: \xc5\xa1", + b"cp1250 follows: \xfd", + 1, + 2, + 3, + "wrong_encoding", + ) + self.assertEqual( + str(result), + "command: 'ls'\nexit_status: 1" + "\nduration: 2\ninterrupted: False\npid: " + "3\nencoding: 'wrong_encoding'\nstdout: " + "b'unicode_follows: \\xc5\\xa1'\nstderr: " + "b'cp1250 follows: \\xfd'", + ) + + def test_cmd_result_stdout_stderr_bytes(self): + result = process.CmdResult() + self.assertTrue(isinstance(result.stdout, bytes)) + self.assertTrue(isinstance(result.stderr, bytes)) + + def test_cmd_result_stdout_stderr_text(self): + result = process.CmdResult() + self.assertTrue(isinstance(result.stdout_text, str)) + self.assertTrue(isinstance(result.stderr_text, str)) + + def test_cmd_result_stdout_stderr_already_text(self): + result = process.CmdResult() + result.stdout = "supposed command output, but not as bytes" + result.stderr = "supposed command error, but not as bytes" + self.assertEqual(result.stdout, result.stdout_text) + self.assertEqual(result.stderr, result.stderr_text) + + def test_cmd_result_stdout_stderr_other_type(self): + result = process.CmdResult() + result.stdout = None + result.stderr = None + self.assertRaises(TypeError, lambda x: result.stdout_text) + self.assertRaises(TypeError, lambda x: result.stderr_text) + + +class CmdErrorTests(unittest.TestCase): + def test_nasty_str(self): + result = process.CmdResult( + "ls", + b"unicode_follows: \xc5\xa1", + b"cp1250 follows: \xfd", + 1, + 2, + 3, + "wrong_encoding", + ) + err = process.CmdError("ls", result, "please don't crash") + self.assertEqual( + str(err), + "Command 'ls' failed.\nstdout: " + "b'unicode_follows: \\xc5\\xa1'\nstderr: " + "b'cp1250 follows: \\xfd'\nadditional_info: " + "please don't crash", + ) + + +class FDDrainerTests(unittest.TestCase): + def test_drain_from_pipe_fd(self): + read_fd, write_fd = os.pipe() + result = process.CmdResult() + fd_drainer = process.FDDrainer(read_fd, result, "test") + fd_drainer.start() + for content in (b"foo", b"bar", b"baz", b"foo\nbar\nbaz\n\n"): + os.write(write_fd, content) + os.write(write_fd, b"finish") + os.close(write_fd) + fd_drainer.flush() + self.assertEqual( + fd_drainer.data.getvalue(), b"foobarbazfoo\nbar\nbaz\n\nfinish" + ) + + def test_log(self): + class CatchHandler(logging.NullHandler): + """ + Handler used just to confirm that a logging event happened + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.caught_record = False + + def handle(self, record): + self.caught_record = True + + read_fd, write_fd = os.pipe() + result = process.CmdResult() + logger = logging.getLogger("FDDrainerTests.test_log") + handler = CatchHandler() + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + fd_drainer = process.FDDrainer( + read_fd, result, "test", logger=logger, verbose=True + ) + fd_drainer.start() + os.write(write_fd, b"should go to the log\n") + os.close(write_fd) + fd_drainer.flush() + self.assertEqual(fd_drainer.data.getvalue(), b"should go to the log\n") + self.assertTrue(handler.caught_record) + + def test_flush_on_closed_handler(self): + handler = logging.StreamHandler(io.StringIO()) + log = logging.getLogger("test_flush_on_closed_handler") + log.addHandler(handler) + read_fd, write_fd = os.pipe() + result = process.CmdResult() + fd_drainer = process.FDDrainer(read_fd, result, name="test", stream_logger=log) + fd_drainer.start() + os.close(write_fd) + self.assertIsNotNone(fd_drainer._stream_logger) + one_stream_closed = False + for handler in fd_drainer._stream_logger.handlers: + stream = getattr(handler, "stream", None) + if stream is not None: + if hasattr(stream, "close"): + # force closing the handler's stream to check if + # flush will adapt to it + stream.close() + one_stream_closed = True + self.assertTrue(one_stream_closed) + fd_drainer.flush() + + def test_flush_on_handler_with_no_fileno(self): + handler = logging.StreamHandler(io.StringIO()) + log = logging.getLogger("test_flush_on_handler_with_no_fileno") + log.addHandler(handler) + read_fd, write_fd = os.pipe() + result = process.CmdResult() + fd_drainer = process.FDDrainer(read_fd, result, name="test", stream_logger=log) + fd_drainer.start() + os.close(write_fd) + fd_drainer.flush() + + def test_replace_incorrect_characters_in_log(self): + data = io.StringIO() + handler = logging.StreamHandler(data) + log = logging.getLogger("test_replace_incorrect_characters_in_log") + log.addHandler(handler) + log.setLevel(logging.DEBUG) + read_fd, write_fd = os.pipe() + result = process.CmdResult(encoding="ascii") + fd_drainer = process.FDDrainer( + read_fd, result, name="test", stream_logger=log, verbose=True + ) + fd_drainer.start() + os.write(write_fd, b"Avok\xc3\xa1do") + os.close(write_fd) + fd_drainer._thread.join(60) + self.assertFalse(fd_drainer._thread.is_alive()) + # \n added by StreamLogger + self.assertEqual(data.getvalue(), "Avok\ufffd\ufffddo\n") + + +class GetCommandOutputPattern(unittest.TestCase): + @unittest.skipUnless(ECHO_CMD, "Echo command not available in system") + def test_matches(self): + res = process.get_command_output_matching("echo foo", "foo") + self.assertEqual(res, ["foo"]) + + @unittest.skipUnless(ECHO_CMD, "Echo command not available in system") + @unittest.skipIf( + sys.platform.startswith("darwin"), + "Darwin implementation of echo lacks -e behavior", + ) + def test_matches_multiple(self): + res = process.get_command_output_matching("echo -e 'foo\nfoo\n'", "foo") + self.assertEqual(res, ["foo", "foo"]) + + @unittest.skipUnless(ECHO_CMD, "Echo command not available in system") + def test_does_not_match(self): + res = process.get_command_output_matching("echo foo", "bar") + self.assertEqual(res, []) + + +class GetCapabilities(unittest.TestCase): + def test_get_capabilities(self): + stdout = b"""1: cap_chown,cap_dac_override,cap_fowner,cap_fsetid,cap_kill,cap_setgid,cap_setuid,cap_setpcap,cap_net_bind_service,cap_net_raw,cap_sys_chroot,cap_mknod,cap_audit_write,cap_setfcap=eip""" + cmd_result = process.CmdResult(stdout=stdout, exit_status=0) + expected = [ + "cap_chown", + "cap_dac_override", + "cap_fowner", + "cap_fsetid", + "cap_kill", + "cap_setgid", + "cap_setuid", + "cap_setpcap", + "cap_net_bind_service", + "cap_net_raw", + "cap_sys_chroot", + "cap_mknod", + "cap_audit_write", + "cap_setfcap=eip", + ] + with unittest.mock.patch("autils.devel.process.run", return_value=cmd_result): + capabilities = process.get_capabilities() + self.assertEqual(capabilities, expected) + + def test_get_capabilities_legacy(self): + stderr = b"""Capabilities for `3114520': = cap_chown,cap_dac_override,cap_dac_read_search,cap_fowner,cap_fsetid,cap_kill,cap_setgid,cap_setuid,cap_setpcap,cap_linux_immutable,cap_net_bind_service,cap_net_broadcast,cap_net_admin,cap_net_raw,cap_ipc_lock,cap_ipc_owner,cap_sys_module,cap_sys_rawio,cap_sys_chroot,cap_sys_ptrace,cap_sys_pacct,cap_sys_admin,cap_sys_boot,cap_sys_nice,cap_sys_resource,cap_sys_time,cap_sys_tty_config,cap_mknod,cap_lease,cap_audit_write,cap_audit_control,cap_setfcap,cap_mac_override,cap_mac_admin,cap_syslog,cap_wake_alarm,cap_block_suspend,cap_audit_read,38,39+ep""" + cmd_result = process.CmdResult(stderr=stderr, exit_status=0) + expected = [ + "cap_chown", + "cap_dac_override", + "cap_dac_read_search", + "cap_fowner", + "cap_fsetid", + "cap_kill", + "cap_setgid", + "cap_setuid", + "cap_setpcap", + "cap_linux_immutable", + "cap_net_bind_service", + "cap_net_broadcast", + "cap_net_admin", + "cap_net_raw", + "cap_ipc_lock", + "cap_ipc_owner", + "cap_sys_module", + "cap_sys_rawio", + "cap_sys_chroot", + "cap_sys_ptrace", + "cap_sys_pacct", + "cap_sys_admin", + "cap_sys_boot", + "cap_sys_nice", + "cap_sys_resource", + "cap_sys_time", + "cap_sys_tty_config", + "cap_mknod", + "cap_lease", + "cap_audit_write", + "cap_audit_control", + "cap_setfcap", + "cap_mac_override", + "cap_mac_admin", + "cap_syslog", + "cap_wake_alarm", + "cap_block_suspend", + "cap_audit_read", + "38", + "39+ep", + ] + with unittest.mock.patch("autils.devel.process.run", return_value=cmd_result): + capabilities = process.get_capabilities() + self.assertEqual(capabilities, expected) + + def test_failure_no_capabilities(self): + stdout = b"1: cap_chown,cap_dac_override" + cmd_result = process.CmdResult(stdout=stdout, exit_status=1) + with unittest.mock.patch("autils.devel.process.run", return_value=cmd_result): + capabilities = process.get_capabilities() + self.assertEqual(capabilities, []) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/utils.py b/tests/utils.py index e157aff..342edb1 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,8 +1,24 @@ +import logging import os import tempfile import unittest +def setup_autils_loggers(): + """ + Setup autils loggers to contain at least one logger. + + This is required for tests that directly utilize autils modules + because they require those loggers to be configured. Without this + it might result in infinite recursion while attempting to log + "No handlers could be found for logger ..." message. + """ + for name in ("", "autils", "autils.test"): + logger = logging.getLogger(name) + if not logger.handlers: + logger.handlers.append(logging.NullHandler()) + + def temp_dir_prefix(klass): """ Returns a standard name for the temp dir prefix used by the tests @@ -27,7 +43,32 @@ def tearDown(self): self.tmpdir.cleanup() +def skipOnLevelsInferiorThan(level): + """ + Skip tests based on the AUTILS_CHECK_LEVEL environment variable. + + This is useful for skipping long-running, resource-intensive, + or time-sensitive tests during quick test runs. + + :param level: The minimum check level required to run the test + :type level: int + :return: unittest.skipIf decorator + """ + return unittest.skipIf( + int(os.environ.get("AUTILS_CHECK_LEVEL", 0)) < level, + "Skipping test that take a long time to run, are " + "resource intensive or time sensitive", + ) + + def skipUnlessPathExists(path): + """ + Skip tests unless a specific path exists on the system. + + :param path: The path to check for existence + :type path: str + :return: unittest.skipUnless decorator + """ return unittest.skipUnless( os.path.exists(path), ( From a80adc6095df43755a095a5c5fe4da0fb49fb672 Mon Sep 17 00:00:00 2001 From: Harvey Lynden Date: Wed, 17 Dec 2025 17:13:06 +0100 Subject: [PATCH 2/2] tests: fix test_can_sudo_oserror mocking wrong function The test was mocking 'system' but can_sudo() without a cmd argument calls getoutput() internally. --- tests/unit/modules/devel/process.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/modules/devel/process.py b/tests/unit/modules/devel/process.py index 942b67d..91734b2 100644 --- a/tests/unit/modules/devel/process.py +++ b/tests/unit/modules/devel/process.py @@ -569,14 +569,14 @@ def test_can_sudo_with_specific_cmd(self, getuid_mock, find_cmd_mock, system_moc system_mock.return_value = 0 self.assertTrue(process.can_sudo("ls -l")) - @unittest.mock.patch("autils.devel.process.system") + @unittest.mock.patch("autils.devel.process.getoutput") @unittest.mock.patch("autils.file.path.find_command") @unittest.mock.patch("autils.devel.process.os.getuid") - def test_can_sudo_oserror(self, getuid_mock, find_cmd_mock, system_mock): + def test_can_sudo_oserror(self, getuid_mock, find_cmd_mock, getoutput_mock): """Test can_sudo when OSError occurs""" getuid_mock.return_value = 1000 find_cmd_mock.return_value = "/usr/bin/sudo" - system_mock.side_effect = OSError() + getoutput_mock.side_effect = OSError() self.assertFalse(process.can_sudo()) @unittest.mock.patch("autils.devel.process.get_capabilities")