diff --git a/README.md b/README.md index aa2e0dd..997ccb5 100644 --- a/README.md +++ b/README.md @@ -38,24 +38,47 @@ My application development modules. -## Installation +## Start developing -Install this via pip (or your favourite package manager): +The project uses UV for dependencies management and packaging and the [pypeline](https://github.com/cuinixam/pypeline) for streamlining the development workflow. +Use pipx (or your favorite package manager) to install the `pypeline` in an isolated environment: -`pip install py-app-dev` - -## Usage +```shell +pipx install pypeline-runner +``` -Start by importing it: +To bootstrap the project and run all the steps configured in the `pypeline.yaml` file, execute the following command: -```python -import py_app_dev +```shell +pypeline run ``` +For those using [VS Code](https://code.visualstudio.com/) there are tasks defined for the most common commands: -## Credits +- run tests +- run pre-commit checks (linters, formatters, etc.) +- generate documentation + +See the `.vscode/tasks.json` for more details. + +## Committing changes + +This repository uses [commitlint](https://github.com/conventional-changelog/commitlint) for checking if the commit message meets the [conventional commit format](https://www.conventionalcommits.org/en). -[![Copier](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/copier-org/copier/master/img/badge/badge-grayscale-inverted-border-orange.json)](https://github.com/copier-org/copier) +## Contributors ✨ + +Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/docs/en/emoji-key)): + + + + + + + + +This project follows the [all-contributors](https://github.com/all-contributors/all-contributors) specification. Contributions of any kind welcome! + +## Credits This package was created with [Copier](https://copier.readthedocs.io/) and the diff --git a/docs/conf.py b/docs/conf.py index d9fe74a..2684750 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -66,9 +66,7 @@ traceability_collapse_links = True # The suffix of source filenames. -source_suffix = [ - ".md", -] +source_suffix = [".md", ".rst"] templates_path = ["_templates"] exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] diff --git a/src/py_app_dev/core/subprocess.py b/src/py_app_dev/core/subprocess.py index ae9e66f..124ba8c 100644 --- a/src/py_app_dev/core/subprocess.py +++ b/src/py_app_dev/core/subprocess.py @@ -1,88 +1,99 @@ -import shutil -import subprocess # nosec -from pathlib import Path -from typing import Any - -from .exceptions import UserNotificationException -from .logging import logger - - -def which(app_name: str) -> Path | None: - """Return the path to the app if it is in the PATH, otherwise return None.""" - app_path = shutil.which(app_name) - return Path(app_path) if app_path else None - - -class SubprocessExecutor: - """ - Execute a command in a subprocess. - - Args: - ---- - capture_output: If True, the output of the command will be captured. - print_output: If True, the output of the command will be printed to the logger. - One can set this to false in order to get the output in the returned CompletedProcess object. - - """ - - def __init__( - self, - command: str | list[str | Path], - cwd: Path | None = None, - capture_output: bool = True, - env: dict[str, str] | None = None, - shell: bool = False, - print_output: bool = True, - ): - self.logger = logger.bind() - self.command = command - self.current_working_directory = cwd - self.capture_output = capture_output - self.env = env - self.shell = shell - self.print_output = print_output - - @property - def command_str(self) -> str: - if isinstance(self.command, str): - return self.command - return " ".join(str(arg) if not isinstance(arg, str) else arg for arg in self.command) - - def execute(self, handle_errors: bool = True) -> subprocess.CompletedProcess[Any] | None: - """Execute the command and return the CompletedProcess object if handle_errors is False.""" - try: - completed_process = None - stdout = "" - stderr = "" - self.logger.info(f"Running command: {self.command_str}") - cwd_path = (self.current_working_directory or Path.cwd()).as_posix() - with subprocess.Popen( - args=self.command, - cwd=cwd_path, - stdout=(subprocess.PIPE if self.capture_output else subprocess.DEVNULL), - stderr=(subprocess.STDOUT if self.capture_output else subprocess.DEVNULL), - text=True, - env=self.env, - shell=self.shell, - ) as process: # nosec - if self.capture_output and process.stdout is not None: - if self.print_output: - for line in iter(process.stdout.readline, ""): - self.logger.info(line.strip()) - process.wait() - else: - stdout, stderr = process.communicate() - - if handle_errors: - # Check return code - if process.returncode != 0: - raise subprocess.CalledProcessError(process.returncode, self.command_str) - else: - completed_process = subprocess.CompletedProcess(process.args, process.returncode, stdout, stderr) - except subprocess.CalledProcessError as e: - raise UserNotificationException(f"Command '{self.command_str}' execution failed with return code {e.returncode}") from None - except FileNotFoundError as e: - raise UserNotificationException(f"Command '{self.command_str}' could not be executed. Failed with error {e}") from None - except KeyboardInterrupt: - raise UserNotificationException(f"Command '{self.command_str}' execution interrupted by user") from None - return completed_process +import locale +import shutil +import subprocess # nosec +from pathlib import Path +from typing import Any + +from .exceptions import UserNotificationException +from .logging import logger + + +def which(app_name: str) -> Path | None: + """Return the path to the app if it is in the PATH, otherwise return None.""" + app_path = shutil.which(app_name) + return Path(app_path) if app_path else None + + +class SubprocessExecutor: + """ + Execute a command in a subprocess. + + Args: + ---- + capture_output: If True, the output of the command will be captured. + print_output: If True, the output of the command will be printed to the logger. + One can set this to false in order to get the output in the returned CompletedProcess object. + + """ + + def __init__( + self, + command: str | list[str | Path], + cwd: Path | None = None, + capture_output: bool = True, + env: dict[str, str] | None = None, + shell: bool = False, + print_output: bool = True, + ): + self.logger = logger.bind() + self.command = command + self.current_working_directory = cwd + self.capture_output = capture_output + self.env = env + self.shell = shell + self.print_output = print_output + + @property + def command_str(self) -> str: + if isinstance(self.command, str): + return self.command + return " ".join(str(arg) if not isinstance(arg, str) else arg for arg in self.command) + + def execute(self, handle_errors: bool = True) -> subprocess.CompletedProcess[Any] | None: + """Execute the command and return the CompletedProcess object if handle_errors is False.""" + try: + completed_process = None + stdout = "" + stderr = "" + self.logger.info(f"Running command: {self.command_str}") + cwd_path = (self.current_working_directory or Path.cwd()).as_posix() + with subprocess.Popen( + args=self.command, + cwd=cwd_path, + # Combine both streams to stdout (when captured) + stdout=(subprocess.PIPE if self.capture_output else subprocess.DEVNULL), + stderr=(subprocess.STDOUT if self.capture_output else subprocess.DEVNULL), + # enables line buffering, line is flushed after each \n + bufsize=1, + text=True, + # every new line is a \n + universal_newlines=True, + # decode bytes to str using current locale/system encoding + encoding=locale.getpreferredencoding(False), + # replace unknown characters with � + errors="replace", + env=self.env, + shell=self.shell, + ) as process: # nosec + if self.capture_output and process.stdout is not None: + if self.print_output: + for line in iter(process.stdout.readline, ""): + self.logger.info(line.strip()) + stdout += line + process.wait() + else: + stdout, stderr = process.communicate() + + if handle_errors: + # Check return code + if process.returncode != 0: + raise subprocess.CalledProcessError(process.returncode, self.command_str) + else: + completed_process = subprocess.CompletedProcess(process.args, process.returncode, stdout, stderr) + except subprocess.CalledProcessError as e: + raise UserNotificationException(f"Command '{self.command_str}' execution failed with return code {e.returncode}") from None + except FileNotFoundError as e: + raise UserNotificationException(f"Command '{self.command_str}' could not be executed. Failed with error {e}") from None + except KeyboardInterrupt: + raise UserNotificationException(f"Command '{self.command_str}' execution interrupted by user") from None + return completed_process diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py index 002240c..d578432 100644 --- a/tests/test_subprocess.py +++ b/tests/test_subprocess.py @@ -1,20 +1,147 @@ -from pathlib import Path - -from py_app_dev.core.subprocess import SubprocessExecutor, which - - -def test_get_app_path(): - assert which("python") - - -def test_subprocess_executor(tmp_path: Path) -> None: - SubprocessExecutor(["python", "-V"], cwd=tmp_path, capture_output=True).execute() - - -def test_subprocess_executor_no_error_handling() -> None: - process = SubprocessExecutor(["python", "-V"], capture_output=True).execute(handle_errors=False) - assert process and process.returncode == 0 - assert process.stdout == "" - process = SubprocessExecutor(["python", "-V"], capture_output=True, print_output=False).execute(handle_errors=False) - assert process and process.returncode == 0 - assert "Python" in process.stdout +import os +import platform +import tempfile +from pathlib import Path +from unittest.mock import Mock, patch + +import pytest + +from py_app_dev.core.subprocess import SubprocessExecutor, which + + +def test_get_app_path(): + """Test the which function for finding executables in PATH.""" + assert which("python") + + +class TestSubprocessExecutor: + """Test class for SubprocessExecutor functionality.""" + + @patch("loguru._logger.Logger.info") + @pytest.mark.parametrize( + "capture_output,print_output,expected_stdout_empty,expected_log_count", + [ + (True, True, False, ">=2"), # Capture and print - should have stdout + log command + output + (True, False, False, 1), # Capture but don't print - should have stdout + log only command + (False, True, True, 1), # Don't capture but print - should have empty stdout + log only command + (False, False, True, 1), # Don't capture or print - should have empty stdout + log only command + ], + ) + def test_capture_output_and_logging_combinations( + self, mock_info: Mock, capture_output: bool, print_output: bool, expected_stdout_empty: bool, expected_log_count: str | int + ) -> None: + """Test different combinations of capture_output and print_output parameters with logger verification.""" + # Arrange & Act + mock_info.reset_mock() + process = SubprocessExecutor(["python", "-V"], capture_output=capture_output, print_output=print_output).execute(handle_errors=False) + + # Assert process execution + assert process and process.returncode == 0 + + # Assert stdout behavior + if expected_stdout_empty: + assert process.stdout == "" + else: + assert "Python" in process.stdout + + # Assert logger behavior + if expected_log_count == ">=2": + # Should log both command execution and Python version output + assert mock_info.call_count >= 2 + # Check that the command execution was logged + command_logged = any("Running command: python -V" in str(call) for call in mock_info.call_args_list) + assert command_logged, "Command execution should be logged" + # Check that Python version output was logged + python_output_logged = any("Python" in str(call) and "Running command" not in str(call) for call in mock_info.call_args_list) + assert python_output_logged, "Python version output should be logged when print_output=True" + else: + # Should only log the command execution, not the output + assert mock_info.call_count == expected_log_count + assert "Running command: python -V" in str(mock_info.call_args_list[0]) + + @pytest.mark.parametrize( + "command, exp_stdout, exp_returncode", + [ + (["python", "-c", "print('Hello World!')"], "Hello World!\n", 0), + # SubprocessExecutor redirects stderr to stdout when capture_output=True + ( + [ + "python", + "-c", + "import sys; print('Hello World!', file=sys.stderr)", + ], + "Hello World!\n", + 0, + ), + (["python", "-c", "exit(0)"], "", 0), + (["python", "-c", "exit(1)"], "", 1), + (["python", "-c", "exit(42)"], "", 42), + ], + ) + def test_command_execution_scenarios(self, command, exp_stdout, exp_returncode): + """Test various command execution scenarios adapted from CommandLineExecutor tests.""" + # Arrange + executor = SubprocessExecutor(command, capture_output=True, print_output=False) + + # Act + result = executor.execute(handle_errors=False) + + # Assert + assert result is not None + assert result.stdout == exp_stdout + # Note: SubprocessExecutor redirects stderr to stdout, so stderr is always None + # This is different from CommandLineExecutor which returned empty string for stderr + assert result.stderr is None + assert result.returncode == exp_returncode + + @pytest.mark.skipif(platform.system() != "Windows", reason="Junction creation test is Windows-specific") + def test_junction_creation(self, tmp_path: Path) -> None: + """Test creating a junction link (Windows-specific test adapted from CommandLineExecutor).""" + # Arrange + test_path = tmp_path.joinpath("test") + test_path.mkdir() + link_path = test_path.joinpath("link") + command: list[str | Path] = ["cmd", "/c", "mklink", "/J", str(link_path), str(test_path)] + executor = SubprocessExecutor(command, capture_output=True, print_output=False) + + # Act + result = executor.execute(handle_errors=False) + + # Assert + assert result is not None + assert result.returncode == 0 + + @pytest.mark.parametrize( + "stream_type, test_data, expected_text_parts", + [ + ("stdout", b"Hello\x85World\n", ["Hello", "World"]), + ("stderr", b"Error\x85Message\n", ["Error", "Message"]), + ], + ) + def test_undecodable_bytes_handling(self, stream_type: str, test_data: bytes, expected_text_parts: list[str]) -> None: + """Test that undecodable bytes in stdout/stderr are handled gracefully.""" + # Arrange + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as tmp: + # Write bytes that are invalid in UTF-8 (e.g., 0x85) + tmp.write(test_data) + tmp_path = tmp.name + + try: + if stream_type == "stdout": + py_cmd: list[str | Path] = ["python", "-c", f"import sys; sys.stdout.buffer.write(open(r'{tmp_path}', 'rb').read())"] + else: # stderr + py_cmd = ["python", "-c", f"import sys; sys.stderr.buffer.write(open(r'{tmp_path}', 'rb').read())"] + + executor = SubprocessExecutor(py_cmd, capture_output=True, print_output=False) + + # Act + result = executor.execute(handle_errors=False) + + # Assert + assert result is not None + for expected_part in expected_text_parts: + assert expected_part in result.stdout + # Should not raise UnicodeDecodeError due to errors="replace" in subprocess.py + assert result.returncode == 0 + finally: + os.remove(tmp_path)