Skip to content

Commit 7e4652a

Browse files
committed
feat: add timeout support for subprocess execution
1 parent f2a6a96 commit 7e4652a

File tree

7 files changed

+323
-1978
lines changed

7 files changed

+323
-1978
lines changed

AGENTS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ This document provides guidance for AI agents and contributors working on the `p
99
- **Package name**: `py-app-dev`
1010
- **Python version**: 3.10+
1111
- **Package manager**: [uv](https://docs.astral.sh/uv/)
12-
- **Build backend**: Poetry
12+
- **Build backend**: uv
1313
- **CI/CD**: GitHub Actions with semantic-release
1414

1515
## Repository Structure

poetry.lock

Lines changed: 0 additions & 1827 deletions
This file was deleted.

poetry.toml

Lines changed: 0 additions & 2 deletions
This file was deleted.

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ classifiers = [
2323
dependencies = [
2424
"mashumaro<4.0,>=3.5",
2525
"loguru<1.0,>=0.7",
26+
"psutil>=7.1,<8.0",
2627
]
2728
urls."Bug Tracker" = "https://github.com/cuinixam/py-app-dev/issues"
2829
urls.Changelog = "https://github.com/cuinixam/py-app-dev/blob/main/CHANGELOG.md"
@@ -95,6 +96,9 @@ addopts = """\
9596
--cov-report=xml
9697
"""
9798
pythonpath = [ "src" ]
99+
markers = [
100+
"exploratory: local-only exploratory tests; skipped by default (run with --run-exploratory)",
101+
]
98102

99103
[tool.coverage.run]
100104
branch = true

src/py_app_dev/core/subprocess.py

Lines changed: 108 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
1+
import contextlib
12
import locale
3+
import queue
24
import shutil
35
import subprocess # nosec
6+
import threading
7+
import time
48
from pathlib import Path
59
from typing import Any
610

11+
import psutil
12+
713
from .exceptions import UserNotificationException
814
from .logging import logger
915

@@ -23,6 +29,7 @@ class SubprocessExecutor:
2329
capture_output: If True, the output of the command will be captured.
2430
print_output: If True, the output of the command will be printed to the logger.
2531
One can set this to false in order to get the output in the returned CompletedProcess object.
32+
timeout: Maximum runtime in seconds before the subprocess is forcefully terminated. If None, there is no timeout.
2633
2734
"""
2835

@@ -34,6 +41,7 @@ def __init__(
3441
env: dict[str, str] | None = None,
3542
shell: bool = False,
3643
print_output: bool = True,
44+
timeout: int | None = None,
3745
):
3846
self.logger = logger.bind()
3947
self.command = command
@@ -42,22 +50,64 @@ def __init__(
4250
self.env = env
4351
self.shell = shell
4452
self.print_output = print_output
53+
self.timeout = timeout
4554

4655
@property
4756
def command_str(self) -> str:
4857
if isinstance(self.command, str):
4958
return self.command
5059
return " ".join(str(arg) if not isinstance(arg, str) else arg for arg in self.command)
5160

61+
def _finalize_process(
62+
self,
63+
process: subprocess.Popen[Any] | None,
64+
) -> None:
65+
"""
66+
Clean up a subprocess and its children.
67+
68+
If the process is still running it is forcibly killed (children first,
69+
then parent). This is a no-op when the process has already finished.
70+
"""
71+
if process is None:
72+
return
73+
wait_timeout_seconds: float = 5.0
74+
try:
75+
if process.poll() is None:
76+
try:
77+
parent = psutil.Process(process.pid)
78+
except (psutil.NoSuchProcess, psutil.AccessDenied):
79+
parent = None
80+
81+
if parent:
82+
for child in parent.children(recursive=True):
83+
with contextlib.suppress(Exception):
84+
child.kill()
85+
with contextlib.suppress(Exception):
86+
parent.kill()
87+
except Exception as exc:
88+
self.logger.error(f"Failed to kill process tree for PID {getattr(process, 'pid', 'unknown')}: {exc}")
89+
90+
with contextlib.suppress(BaseException):
91+
process.wait(timeout=wait_timeout_seconds)
92+
93+
for pipe in (process.stdin, process.stdout, process.stderr):
94+
if pipe:
95+
with contextlib.suppress(Exception):
96+
pipe.close()
97+
5298
def execute(self, handle_errors: bool = True) -> subprocess.CompletedProcess[Any] | None:
5399
"""Execute the command and return the CompletedProcess object if handle_errors is False."""
100+
start_time = time.monotonic()
101+
completed_process: subprocess.CompletedProcess[Any] | None = None
102+
stdout = ""
103+
stderr = ""
104+
self.logger.info(f"Running command: {self.command_str}")
105+
cwd_path = (self.current_working_directory or Path.cwd()).as_posix()
106+
process: subprocess.Popen[str] | None = None
107+
streaming_active = False
108+
54109
try:
55-
completed_process = None
56-
stdout = ""
57-
stderr = ""
58-
self.logger.info(f"Running command: {self.command_str}")
59-
cwd_path = (self.current_working_directory or Path.cwd()).as_posix()
60-
with subprocess.Popen(
110+
process = subprocess.Popen(
61111
args=self.command,
62112
cwd=cwd_path,
63113
# Combine both streams to stdout (when captured)
@@ -74,26 +124,68 @@ def execute(self, handle_errors: bool = True) -> subprocess.CompletedProcess[Any
74124
errors="replace",
75125
env=self.env,
76126
shell=self.shell,
77-
) as process: # nosec
78-
if self.capture_output and process.stdout is not None:
79-
if self.print_output:
80-
for line in iter(process.stdout.readline, ""):
81-
self.logger.info(line.strip())
82-
stdout += line
83-
process.wait()
84-
else:
85-
stdout, stderr = process.communicate()
127+
)
128+
# STREAMING OUTPUT MODE
129+
if self.capture_output and self.print_output and process.stdout is not None:
130+
# Drain stdout on a background thread so the main loop is never
131+
# blocked by readline() when the process is silent. A sentinel
132+
# None is pushed after EOF so the consumer knows when to stop.
133+
streaming_active = True
134+
line_queue: queue.Queue[str | None] = queue.Queue()
135+
136+
def _reader(src: Any, dest: queue.Queue[str | None]) -> None:
137+
for ln in src:
138+
dest.put(ln)
139+
dest.put(None)
86140

141+
reader_thread = threading.Thread(target=_reader, args=(process.stdout, line_queue), daemon=True)
142+
reader_thread.start()
143+
144+
while True:
145+
# TIMEOUT CHECK (runs even when the process writes nothing)
146+
if self.timeout is not None and (time.monotonic() - start_time) > self.timeout:
147+
# Kill NOW so Popen.__exit__ → process.wait() returns immediately.
148+
raise subprocess.TimeoutExpired(
149+
cmd=self.command_str,
150+
timeout=self.timeout,
151+
output=stdout,
152+
)
153+
try:
154+
line = line_queue.get(timeout=0.05)
155+
except queue.Empty:
156+
continue
157+
if line is None: # EOF sentinel
158+
break
159+
self.logger.info(line.strip())
160+
stdout += line
161+
162+
reader_thread.join()
163+
process.wait()
164+
# NON-STREAMING MODE (communicate with native timeout)
165+
elif self.capture_output and not self.print_output:
166+
stdout, stderr = process.communicate(timeout=self.timeout)
167+
else:
168+
# No output capturing; just wait for completion or timeout
169+
process.wait(timeout=self.timeout)
170+
# HANDLE RETURN CODES AND ERROR INTERPRETATION
87171
if handle_errors:
88172
# Check return code
89173
if process.returncode != 0:
90-
raise subprocess.CalledProcessError(process.returncode, self.command_str)
174+
raise subprocess.CalledProcessError(process.returncode, self.command_str, stdout, stderr)
91175
else:
92176
completed_process = subprocess.CompletedProcess(process.args, process.returncode, stdout, stderr)
177+
except subprocess.TimeoutExpired:
178+
raise UserNotificationException(f"Command '{self.command_str}' timed out after {self.timeout} seconds and was forcefully terminated.") from None
93179
except subprocess.CalledProcessError as e:
94180
raise UserNotificationException(f"Command '{self.command_str}' execution failed with return code {e.returncode}") from None
95181
except FileNotFoundError as e:
96182
raise UserNotificationException(f"Command '{self.command_str}' could not be executed. Failed with error {e}") from None
97183
except KeyboardInterrupt:
98184
raise UserNotificationException(f"Command '{self.command_str}' execution interrupted by user") from None
185+
finally:
186+
# Kill the process first so pipe EOF unblocks the reader thread
187+
self._finalize_process(process=process)
188+
if streaming_active:
189+
with contextlib.suppress(Exception):
190+
reader_thread.join(timeout=2.0)
99191
return completed_process

0 commit comments

Comments
 (0)