Skip to content

Commit 54a8377

Browse files
committed
Fix event handler leak in ExecuteLocal by unregistering handlers on cleanup
ExecuteLocal registers 6 event handlers in execute() but never unregisters them when the process exits. This causes a memory leak proportional to the number of processes launched over the lifetime of a LaunchService, as the handlers and all objects they reference remain in context._event_handlers indefinitely. This change: - Stores registered event handlers and context as instance variables - Adds __unregister_event_handlers() helper that safely removes all handlers registered by this action - Unregisters handlers in __flush_buffers/__flush_cached_buffers after all ProcessExited handlers (including on_exit) have fired - Unregisters handlers when process fails to start (no ProcessExited) - Fixes _shutdown_process() deferred handler accumulation by tracking and replacing the OnProcessStart handler instead of creating new ones - Adds tests verifying handler cleanup and stability during respawn Fixes #565 Signed-off-by: Koki Shinjo <kshinjo@pfrobotics.jp>
1 parent 84935de commit 54a8377

2 files changed

Lines changed: 142 additions & 6 deletions

File tree

launch/launch/actions/execute_local.py

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from ..conditions import evaluate_condition_expression
4545
from ..descriptions import Executable
4646
from ..event import Event
47+
from ..event_handler import BaseEventHandler
4748
from ..event_handler import EventHandler
4849
from ..event_handlers import OnProcessExit
4950
from ..event_handlers import OnProcessIO
@@ -223,6 +224,9 @@ def __init__(
223224
self.__stderr_buffer = io.StringIO()
224225

225226
self.__executed = False
227+
self.__registered_event_handlers: List[BaseEventHandler] = []
228+
self.__context: Optional[LaunchContext] = None
229+
self.__deferred_shutdown_handler: Optional[BaseEventHandler] = None
226230

227231
@property
228232
def process_description(self) -> Executable:
@@ -283,10 +287,16 @@ def _shutdown_process(self, context: LaunchContext, *, send_sigint: bool
283287
# Defer shut down if the process is scheduled to be started
284288
if (self.process_details is None or self._subprocess_transport is None):
285289
# Do not set shutdown result, as event is postponed
286-
context.register_event_handler(
287-
OnProcessStart(
288-
on_start=lambda event, context:
289-
self._shutdown_process(context, send_sigint=send_sigint)))
290+
# Remove previously registered deferred shutdown handler to avoid accumulation
291+
if self.__deferred_shutdown_handler is not None:
292+
try:
293+
context.unregister_event_handler(self.__deferred_shutdown_handler)
294+
except ValueError:
295+
pass
296+
self.__deferred_shutdown_handler = OnProcessStart(
297+
on_start=lambda event, context:
298+
self._shutdown_process(context, send_sigint=send_sigint))
299+
context.register_event_handler(self.__deferred_shutdown_handler)
290300
return None
291301

292302
self.__shutdown_future.set_result(None)
@@ -413,6 +423,13 @@ def __flush_buffers(self, event, context):
413423
self.__stderr_buffer.seek(0)
414424
self.__stderr_buffer.truncate(0)
415425

426+
# Unregister event handlers if the action is complete (not respawning).
427+
# This is done here rather than in __cleanup() because __cleanup() runs
428+
# before the ProcessExited event is processed from the queue, and
429+
# unregistering there would prevent on_exit callbacks from firing.
430+
if self.__completed_future is not None and self.__completed_future.done():
431+
self.__unregister_event_handlers()
432+
416433
def __on_process_output_cached(
417434
self, event: ProcessIO, buffer, logger
418435
) -> None:
@@ -442,6 +459,10 @@ def __flush_cached_buffers(self, event, context):
442459
self.__output_format.format(line=line, this=self)
443460
)
444461

462+
# Unregister event handlers if the action is complete (not respawning).
463+
if self.__completed_future is not None and self.__completed_future.done():
464+
self.__unregister_event_handlers()
465+
445466
def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeEntitiesType]:
446467
due_to_sigint = cast(Shutdown, event).due_to_sigint
447468
return self._shutdown_process(
@@ -504,6 +525,24 @@ def __get_sigint_event(self) -> EmitEvent:
504525
process_matcher=matches_action(self),
505526
))
506527

528+
def __unregister_event_handlers(self) -> None:
529+
"""Unregister all event handlers registered by this action."""
530+
context = self.__context
531+
if context is None:
532+
return
533+
for event_handler in self.__registered_event_handlers:
534+
try:
535+
context.unregister_event_handler(event_handler)
536+
except ValueError:
537+
pass
538+
self.__registered_event_handlers.clear()
539+
if self.__deferred_shutdown_handler is not None:
540+
try:
541+
context.unregister_event_handler(self.__deferred_shutdown_handler)
542+
except ValueError:
543+
pass
544+
self.__deferred_shutdown_handler = None
545+
507546
def __cleanup(self) -> None:
508547
# Cancel any pending timers we started.
509548
if self.__sigterm_timer is not None:
@@ -514,6 +553,8 @@ def __cleanup(self) -> None:
514553
if self._subprocess_transport is not None:
515554
self._subprocess_transport.close()
516555
# Signal that we're done to the launch system.
556+
# Event handlers are unregistered in __flush_buffers/__flush_cached_buffers
557+
# after all ProcessExited handlers (including on_exit) have had a chance to fire.
517558
if self.__completed_future is not None:
518559
self.__completed_future.set_result(None)
519560

@@ -583,6 +624,8 @@ async def __execute_process(self, context: LaunchContext) -> None:
583624
self.__logger.error('exception occurred while executing process:\n{}'.format(
584625
traceback.format_exc()
585626
))
627+
# No ProcessExited event will be emitted, so unregister handlers directly.
628+
self.__unregister_event_handlers()
586629
self.__cleanup()
587630
return
588631

@@ -703,6 +746,8 @@ def execute(self, context: LaunchContext) -> None:
703746
]
704747
for event_handler in event_handlers:
705748
context.register_event_handler(event_handler)
749+
self.__registered_event_handlers = list(event_handlers)
750+
self.__context = context
706751

707752
try:
708753
self.__completed_future = context.asyncio_loop.create_future()
@@ -720,8 +765,7 @@ def execute(self, context: LaunchContext) -> None:
720765
launch.logging.get_output_loggers(name, self.__output)
721766
context.asyncio_loop.create_task(self.__execute_process(context))
722767
except Exception:
723-
for event_handler in event_handlers:
724-
context.unregister_event_handler(event_handler)
768+
self.__unregister_event_handlers()
725769
raise
726770
return None
727771

launch/test/launch/test_execute_local.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,3 +178,95 @@ def test_execute_process_with_output_dictionary():
178178
ls = LaunchService()
179179
ls.include_launch_description(ld)
180180
assert 0 == ls.run()
181+
182+
183+
def test_event_handlers_cleaned_up_after_process_exit():
184+
"""Test that event handlers are unregistered after a process exits."""
185+
ls = LaunchService()
186+
initial_handler_count = len(ls.context._event_handlers)
187+
188+
executable = ExecuteLocal(
189+
process_description=Executable(
190+
cmd=[sys.executable, '-c', "print('hello')"]
191+
),
192+
output='screen'
193+
)
194+
ld = LaunchDescription([executable])
195+
ls.include_launch_description(ld)
196+
assert 0 == ls.run()
197+
198+
final_handler_count = len(ls.context._event_handlers)
199+
assert final_handler_count == initial_handler_count, (
200+
f'Expected {initial_handler_count} handlers after cleanup, '
201+
f'but found {final_handler_count}'
202+
)
203+
204+
205+
def test_event_handlers_stable_during_respawn():
206+
"""Test that event handler count stays stable across respawn cycles."""
207+
handler_counts = []
208+
209+
def on_exit_callback(event, context):
210+
handler_counts.append(len(context._event_handlers))
211+
212+
shutdown_time = 4.0
213+
respawn_delay = 1.0
214+
215+
executable = ExecuteLocal(
216+
process_description=Executable(
217+
cmd=[sys.executable, '-c', "print('respawn test')"]
218+
),
219+
respawn=True,
220+
respawn_delay=respawn_delay,
221+
on_exit=on_exit_callback,
222+
output='screen'
223+
)
224+
225+
ls = LaunchService()
226+
227+
ld = LaunchDescription([
228+
executable,
229+
TimerAction(
230+
period=shutdown_time,
231+
actions=[
232+
Shutdown(reason='Timer expired')
233+
]
234+
)
235+
])
236+
ls.include_launch_description(ld)
237+
assert 0 == ls.run()
238+
239+
# Handler count should remain stable during respawn (not growing)
240+
assert len(handler_counts) >= 2, (
241+
f'Expected at least 2 process exits, got {len(handler_counts)}'
242+
)
243+
assert all(c == handler_counts[0] for c in handler_counts), (
244+
f'Handler counts should be stable across respawns, got: {handler_counts}'
245+
)
246+
247+
248+
def test_multiple_processes_handler_cleanup():
249+
"""Test that multiple processes don't leak event handlers."""
250+
ls = LaunchService()
251+
initial_handler_count = len(ls.context._event_handlers)
252+
253+
executables = [
254+
ExecuteLocal(
255+
process_description=Executable(
256+
cmd=[sys.executable, '-c', f"print('process {i}')"]
257+
),
258+
output='screen'
259+
)
260+
for i in range(5)
261+
]
262+
263+
ld = LaunchDescription(executables)
264+
ls.include_launch_description(ld)
265+
assert 0 == ls.run()
266+
267+
final_handler_count = len(ls.context._event_handlers)
268+
assert final_handler_count == initial_handler_count, (
269+
f'Expected {initial_handler_count} handlers after cleanup, '
270+
f'but found {final_handler_count}. '
271+
f'Leaked {final_handler_count - initial_handler_count} handlers from 5 processes.'
272+
)

0 commit comments

Comments
 (0)