1+ import contextlib
12import locale
3+ import queue
24import shutil
35import subprocess # nosec
6+ import threading
7+ import time
48from pathlib import Path
59from typing import Any
610
11+ import psutil
12+
713from .exceptions import UserNotificationException
814from .logging import logger
915
@@ -23,6 +29,7 @@ class SubprocessExecutor:
2329 capture_output: If True, the output of the command will be captured.
2430 print_output: If True, the output of the command will be printed to the logger.
2531 One can set this to false in order to get the output in the returned CompletedProcess object.
32+ timeout: Maximum runtime in seconds before the subprocess is forcefully terminated. If None, there is no timeout.
2633
2734 """
2835
@@ -34,6 +41,7 @@ def __init__(
3441 env : dict [str , str ] | None = None ,
3542 shell : bool = False ,
3643 print_output : bool = True ,
44+ timeout : int | None = None ,
3745 ):
3846 self .logger = logger .bind ()
3947 self .command = command
@@ -42,58 +50,136 @@ def __init__(
4250 self .env = env
4351 self .shell = shell
4452 self .print_output = print_output
53+ self .timeout = timeout
4554
4655 @property
4756 def command_str (self ) -> str :
4857 if isinstance (self .command , str ):
4958 return self .command
5059 return " " .join (str (arg ) if not isinstance (arg , str ) else arg for arg in self .command )
5160
61+ def _finalize_process (
62+ self ,
63+ process : subprocess .Popen [Any ] | None ,
64+ ) -> None :
65+ """
66+ Clean up a subprocess and its children.
67+
68+ If the process is still running it is forcibly killed (children first,
69+ then parent). This is a no-op when the process has already finished.
70+ """
71+ if process is None :
72+ return
73+ wait_timeout_seconds : float = 5.0
74+ try :
75+ if process .poll () is None :
76+ try :
77+ parent = psutil .Process (process .pid )
78+ except (psutil .NoSuchProcess , psutil .AccessDenied ):
79+ parent = None
80+
81+ if parent :
82+ for child in parent .children (recursive = True ):
83+ with contextlib .suppress (Exception ):
84+ child .kill ()
85+ with contextlib .suppress (Exception ):
86+ parent .kill ()
87+ except Exception as exc :
88+ self .logger .error (f"Failed to kill process tree for PID { getattr (process , 'pid' , 'unknown' )} : { exc } " )
89+
90+ with contextlib .suppress (BaseException ):
91+ process .wait (timeout = wait_timeout_seconds )
92+
93+ for pipe in (process .stdin , process .stdout , process .stderr ):
94+ if pipe :
95+ with contextlib .suppress (Exception ):
96+ pipe .close ()
97+
5298 def execute (self , handle_errors : bool = True ) -> subprocess .CompletedProcess [Any ] | None :
5399 """Execute the command and return the CompletedProcess object if handle_errors is False."""
100+ start_time = time .monotonic ()
101+ completed_process : subprocess .CompletedProcess [Any ] | None = None
102+ stdout = ""
103+ stderr = ""
104+ self .logger .info (f"Running command: { self .command_str } " )
105+ cwd_path = (self .current_working_directory or Path .cwd ()).as_posix ()
106+ process : subprocess .Popen [str ] | None = None
107+ streaming_active = False
108+
54109 try :
55- completed_process = None
56- stdout = ""
57- stderr = ""
58- self .logger .info (f"Running command: { self .command_str } " )
59- cwd_path = (self .current_working_directory or Path .cwd ()).as_posix ()
60- with subprocess .Popen (
110+ process = subprocess .Popen (
61111 args = self .command ,
62112 cwd = cwd_path ,
63113 # Combine both streams to stdout (when captured)
64114 stdout = (subprocess .PIPE if self .capture_output else subprocess .DEVNULL ),
65115 stderr = (subprocess .STDOUT if self .capture_output else subprocess .DEVNULL ),
66- # enables line buffering, line is flushed after each \n
67- bufsize = 1 ,
116+ bufsize = 1 , # line buffering, flushed after each \n
68117 text = True ,
69- # every new line is a \n
70- universal_newlines = True ,
71- # decode bytes to str using current locale/system encoding
118+ universal_newlines = True , # normalize line endings to \n
72119 encoding = locale .getpreferredencoding (False ),
73- # replace unknown characters with �
74- errors = "replace" ,
120+ errors = "replace" , # replace undecodable bytes with �
75121 env = self .env ,
76122 shell = self .shell ,
77- ) as process : # nosec
78- if self .capture_output and process .stdout is not None :
79- if self .print_output :
80- for line in iter (process .stdout .readline , "" ):
81- self .logger .info (line .strip ())
82- stdout += line
83- process .wait ()
84- else :
85- stdout , stderr = process .communicate ()
123+ )
124+ # STREAMING OUTPUT MODE
125+ if self .capture_output and self .print_output and process .stdout is not None :
126+ # Drain stdout on a background thread so the main loop is never
127+ # blocked by readline() when the process is silent. A sentinel
128+ # None is pushed after EOF so the consumer knows when to stop.
129+ streaming_active = True
130+ line_queue : queue .Queue [str | None ] = queue .Queue ()
131+
132+ def _reader (src : Any , dest : queue .Queue [str | None ]) -> None :
133+ for ln in src :
134+ dest .put (ln )
135+ dest .put (None )
136+
137+ reader_thread = threading .Thread (target = _reader , args = (process .stdout , line_queue ), daemon = True )
138+ reader_thread .start ()
139+
140+ while True :
141+ # Timeout check runs even when the process writes nothing
142+ if self .timeout is not None and (time .monotonic () - start_time ) > self .timeout :
143+ raise subprocess .TimeoutExpired (
144+ cmd = self .command_str ,
145+ timeout = self .timeout ,
146+ output = stdout ,
147+ )
148+ try :
149+ line = line_queue .get (timeout = 0.05 )
150+ except queue .Empty :
151+ continue
152+ if line is None : # EOF sentinel
153+ break
154+ self .logger .info (line .strip ())
155+ stdout += line
156+
157+ reader_thread .join ()
158+ process .wait ()
159+ # NON-STREAMING MODE
160+ elif self .capture_output and not self .print_output :
161+ stdout , stderr = process .communicate (timeout = self .timeout )
162+ else :
163+ # No output capturing; just wait for completion
164+ process .wait (timeout = self .timeout )
86165
87166 if handle_errors :
88- # Check return code
89167 if process .returncode != 0 :
90- raise subprocess .CalledProcessError (process .returncode , self .command_str )
168+ raise subprocess .CalledProcessError (process .returncode , self .command_str , stdout , stderr )
91169 else :
92170 completed_process = subprocess .CompletedProcess (process .args , process .returncode , stdout , stderr )
171+ except subprocess .TimeoutExpired :
172+ raise UserNotificationException (f"Command '{ self .command_str } ' timed out after { self .timeout } seconds and was forcefully terminated." ) from None
93173 except subprocess .CalledProcessError as e :
94174 raise UserNotificationException (f"Command '{ self .command_str } ' execution failed with return code { e .returncode } " ) from None
95175 except FileNotFoundError as e :
96176 raise UserNotificationException (f"Command '{ self .command_str } ' could not be executed. Failed with error { e } " ) from None
97177 except KeyboardInterrupt :
98178 raise UserNotificationException (f"Command '{ self .command_str } ' execution interrupted by user" ) from None
179+ finally :
180+ # Kill the process first so pipe EOF unblocks the reader thread
181+ self ._finalize_process (process = process )
182+ if streaming_active :
183+ with contextlib .suppress (Exception ):
184+ reader_thread .join (timeout = 2.0 )
99185 return completed_process
0 commit comments