From 52c939de4b3291e762eac0d71f0a8d275ed4ea6e Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Thu, 23 May 2024 15:06:00 +0900 Subject: [PATCH 1/3] fix: enhanced kernel termination handling --- changes/2178.fix.md | 1 + src/ai/backend/agent/agent.py | 218 ++++++++++++++++----------- src/ai/backend/agent/docker/agent.py | 13 +- 3 files changed, 141 insertions(+), 91 deletions(-) create mode 100644 changes/2178.fix.md diff --git a/changes/2178.fix.md b/changes/2178.fix.md new file mode 100644 index 00000000000..cc3e5bd4629 --- /dev/null +++ b/changes/2178.fix.md @@ -0,0 +1 @@ +Keep `sync_container_lifecycles()` bgtask alive in a loop. diff --git a/src/ai/backend/agent/agent.py b/src/ai/backend/agent/agent.py index 73c5440a5f0..016dc157e19 100644 --- a/src/ai/backend/agent/agent.py +++ b/src/ai/backend/agent/agent.py @@ -153,7 +153,13 @@ known_slot_types, ) from .stats import StatContext, StatModes -from .types import Container, ContainerLifecycleEvent, ContainerStatus, LifecycleEvent, MountInfo +from .types import ( + Container, + ContainerLifecycleEvent, + ContainerStatus, + LifecycleEvent, + MountInfo, +) from .utils import generate_local_instance_id, get_arch_name if TYPE_CHECKING: @@ -1011,7 +1017,9 @@ async def _handle_destroy_event(self, ev: ContainerLifecycleEvent) -> None: kernel_obj = self.kernel_registry.get(ev.kernel_id) if kernel_obj is None: log.warning( - "destroy_kernel(k:{0}) kernel missing (already dead?)", ev.kernel_id + "destroy_kernel(k:{0}, c:{1}) kernel missing (already dead?)", + ev.kernel_id, + ev.container_id, ) if ev.container_id is None: await self.reconstruct_resource_usage() @@ -1039,18 +1047,17 @@ async def _handle_destroy_event(self, ev: ContainerLifecycleEvent) -> None: ev.done_future.set_exception(e) raise finally: - if ev.container_id is not None: - await self.container_lifecycle_queue.put( - ContainerLifecycleEvent( - ev.kernel_id, - ev.session_id, - ev.container_id, - LifecycleEvent.CLEAN, - ev.reason, - suppress_events=ev.suppress_events, - done_future=ev.done_future, - ), - ) + await self.container_lifecycle_queue.put( + ContainerLifecycleEvent( + ev.kernel_id, + ev.session_id, + ev.container_id, + LifecycleEvent.CLEAN, + ev.reason, + suppress_events=ev.suppress_events, + done_future=ev.done_future, + ), + ) except asyncio.CancelledError: pass except Exception: @@ -1262,75 +1269,109 @@ async def sync_container_lifecycles(self, interval: float) -> None: for cases when we miss the container lifecycle events from the underlying implementation APIs due to the agent restarts or crashes. """ - known_kernels: Dict[KernelId, ContainerId] = {} + known_kernels: Dict[KernelId, ContainerId | None] = {} alive_kernels: Dict[KernelId, ContainerId] = {} kernel_session_map: Dict[KernelId, SessionId] = {} own_kernels: dict[KernelId, ContainerId] = {} - terminated_kernels = {} + terminated_kernels: dict[KernelId, ContainerLifecycleEvent] = {} - async with self.registry_lock: + def _get_session_id(container: Container) -> SessionId | None: + _session_id = container.labels.get("ai.backend.session-id") try: - # Check if: there are dead containers - for kernel_id, container in await self.enumerate_containers(DEAD_STATUS_SET): - if ( - kernel_id in self.restarting_kernels - or kernel_id in self.terminating_kernels - ): - continue - log.info( - "detected dead container during lifeycle sync (k:{}, c:{})", - kernel_id, - container.id, - ) - session_id = SessionId(UUID(container.labels["ai.backend.session-id"])) - terminated_kernels[kernel_id] = ContainerLifecycleEvent( - kernel_id, - session_id, - known_kernels[kernel_id], - LifecycleEvent.CLEAN, - KernelLifecycleEventReason.SELF_TERMINATED, - ) - for kernel_id, container in await self.enumerate_containers(ACTIVE_STATUS_SET): - alive_kernels[kernel_id] = container.id - session_id = SessionId(UUID(container.labels["ai.backend.session-id"])) - kernel_session_map[kernel_id] = session_id - own_kernels[kernel_id] = container.id - for kernel_id, kernel_obj in self.kernel_registry.items(): - known_kernels[kernel_id] = kernel_obj["container_id"] - session_id = kernel_obj.session_id - kernel_session_map[kernel_id] = session_id - # Check if: kernel_registry has the container but it's gone. - for kernel_id in known_kernels.keys() - alive_kernels.keys(): - if ( - kernel_id in self.restarting_kernels - or kernel_id in self.terminating_kernels - ): - continue - terminated_kernels[kernel_id] = ContainerLifecycleEvent( - kernel_id, - kernel_session_map[kernel_id], - known_kernels[kernel_id], - LifecycleEvent.CLEAN, - KernelLifecycleEventReason.SELF_TERMINATED, - ) - # Check if: there are containers not spawned by me. - for kernel_id in alive_kernels.keys() - known_kernels.keys(): - if kernel_id in self.restarting_kernels: - continue - terminated_kernels[kernel_id] = ContainerLifecycleEvent( - kernel_id, - kernel_session_map[kernel_id], - alive_kernels[kernel_id], - LifecycleEvent.DESTROY, - KernelLifecycleEventReason.TERMINATED_UNKNOWN_CONTAINER, - ) - finally: - # Enqueue the events. - for kernel_id, ev in terminated_kernels.items(): - await self.container_lifecycle_queue.put(ev) + return SessionId(UUID(_session_id)) + except ValueError: + log.warning( + f"sync_container_lifecycles() invalid session-id (cid: {container.id}, sid:{_session_id})" + ) + return None - # Set container count - await self.set_container_count(len(own_kernels.keys())) + try: + _containers = await self.enumerate_containers(ACTIVE_STATUS_SET | DEAD_STATUS_SET) + async with self.registry_lock: + try: + # Check if: there are dead containers + dead_containers = [ + (kid, container) + for kid, container in _containers + if container.status in DEAD_STATUS_SET + ] + for kernel_id, container in dead_containers: + if kernel_id in self.restarting_kernels: + continue + log.info( + "detected dead container during lifeycle sync (k:{}, c:{})", + kernel_id, + container.id, + ) + session_id = _get_session_id(container) + if session_id is None: + continue + terminated_kernels[kernel_id] = ContainerLifecycleEvent( + kernel_id, + session_id, + container.id, + LifecycleEvent.CLEAN, + KernelLifecycleEventReason.SELF_TERMINATED, + ) + active_containers = [ + (kid, container) + for kid, container in _containers + if container.status in ACTIVE_STATUS_SET + ] + for kernel_id, container in active_containers: + alive_kernels[kernel_id] = container.id + session_id = _get_session_id(container) + if session_id is None: + continue + kernel_session_map[kernel_id] = session_id + own_kernels[kernel_id] = container.id + for kernel_id, kernel_obj in self.kernel_registry.items(): + known_kernels[kernel_id] = ( + ContainerId(kernel_obj.container_id) + if kernel_obj.container_id is not None + else None + ) + session_id = kernel_obj.session_id + kernel_session_map[kernel_id] = session_id + # Check if: kernel_registry has the container but it's gone. + for kernel_id in known_kernels.keys() - alive_kernels.keys(): + if ( + kernel_id in self.restarting_kernels + or kernel_id in self.terminating_kernels + ): + continue + terminated_kernels[kernel_id] = ContainerLifecycleEvent( + kernel_id, + kernel_session_map[kernel_id], + known_kernels[kernel_id], + LifecycleEvent.CLEAN, + KernelLifecycleEventReason.SELF_TERMINATED, + ) + # Check if: there are containers already deleted from my registry or not spawned by me. + for kernel_id in alive_kernels.keys() - known_kernels.keys(): + if kernel_id in self.restarting_kernels: + continue + terminated_kernels[kernel_id] = ContainerLifecycleEvent( + kernel_id, + kernel_session_map[kernel_id], + alive_kernels[kernel_id], + LifecycleEvent.DESTROY, + KernelLifecycleEventReason.TERMINATED_UNKNOWN_CONTAINER, + ) + finally: + # Enqueue the events. + for kernel_id, ev in terminated_kernels.items(): + await self.container_lifecycle_queue.put(ev) + + # Set container count + await self.set_container_count(len(own_kernels.keys())) + except asyncio.CancelledError: + pass + except asyncio.TimeoutError: + log.warning("sync_container_lifecycles() timeout, continuing") + except Exception as e: + log.exception(f"sync_container_lifecycles() failure, continuing (detail: {repr(e)})") + await self.produce_error_event() async def set_container_count(self, container_count: int) -> None: await redis_helper.execute( @@ -1946,7 +1987,7 @@ async def create_kernel( service_ports, ) async with self.registry_lock: - self.kernel_registry[ctx.kernel_id] = kernel_obj + self.kernel_registry[kernel_id] = kernel_obj try: container_data = await ctx.start_container( kernel_obj, @@ -1958,7 +1999,7 @@ async def create_kernel( msg = e.message or "unknown" log.error( "Kernel failed to create container. Kernel is going to be destroyed." - f" (k:{ctx.kernel_id}, detail:{msg})", + f" (k:{kernel_id}, detail:{msg})", ) cid = e.container_id async with self.registry_lock: @@ -1973,17 +2014,22 @@ async def create_kernel( raise AgentError( f"Kernel failed to create container (k:{str(ctx.kernel_id)}, detail:{msg})" ) - except Exception: + except Exception as e: log.warning( - "Kernel failed to create container (k:{}). Kernel is going to be" - " unregistered.", + "Kernel failed to create container (k:{}). Kernel is going to be destroyed.", kernel_id, ) - async with self.registry_lock: - del self.kernel_registry[kernel_id] - raise + await self.inject_container_lifecycle_event( + kernel_id, + session_id, + LifecycleEvent.DESTROY, + KernelLifecycleEventReason.FAILED_TO_CREATE, + ) + raise AgentError( + f"Kernel failed to create container (k:{str(kernel_id)}, detail: {str(e)})" + ) async with self.registry_lock: - self.kernel_registry[ctx.kernel_id].data.update(container_data) + self.kernel_registry[kernel_id].data.update(container_data) await kernel_obj.init(self.event_producer) current_task = asyncio.current_task() diff --git a/src/ai/backend/agent/docker/agent.py b/src/ai/backend/agent/docker/agent.py index 59a8b609352..904b9923b74 100644 --- a/src/ai/backend/agent/docker/agent.py +++ b/src/ai/backend/agent/docker/agent.py @@ -10,6 +10,7 @@ import signal import struct import sys +from collections.abc import Mapping from decimal import Decimal from functools import partial from io import StringIO @@ -22,13 +23,13 @@ FrozenSet, List, Literal, - Mapping, MutableMapping, Optional, Sequence, Set, Tuple, Union, + cast, ) from uuid import UUID @@ -860,7 +861,7 @@ async def start_container( config=container_config, name=kernel_name ) assert container is not None - cid = container._id + cid = cast(str, container._id) resource_spec.container_id = cid # Write resource.txt again to update the container id. with open(self.config_dir / "resource.txt", "w") as f: @@ -896,10 +897,10 @@ async def start_container( except asyncio.CancelledError: if container is not None: raise ContainerCreationError( - container_id=cid, message="Container creation cancelled" + container_id=container._id, message="Container creation cancelled" ) raise - except Exception: + except Exception as e: # Oops, we have to restore the allocated resources! scratch_type = self.local_config["container"]["scratch-type"] scratch_root = self.local_config["container"]["scratch-root"] @@ -917,7 +918,9 @@ async def start_container( for dev_name, device_alloc in resource_spec.allocations.items(): self.computers[dev_name].alloc_map.free(device_alloc) if container is not None: - raise ContainerCreationError(container_id=cid, message="unknown") + raise ContainerCreationError( + container_id=container._id, message=f"unknown. {repr(e)}" + ) raise additional_network_names: Set[str] = set() From 59cbbc92de810c78811e0f9af6f18ff940fecfba Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Fri, 5 Jul 2024 17:14:01 +0900 Subject: [PATCH 2/3] save registry when shutting down agent and more logging --- src/ai/backend/agent/agent.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/ai/backend/agent/agent.py b/src/ai/backend/agent/agent.py index 016dc157e19..e5ff9f86e37 100644 --- a/src/ai/backend/agent/agent.py +++ b/src/ai/backend/agent/agent.py @@ -743,6 +743,7 @@ async def shutdown(self, stop_signal: signal.Signals) -> None: if kernel_obj.runner is not None: await kernel_obj.runner.close() await kernel_obj.close() + await self.save_last_registry(force=True) if stop_signal == signal.SIGTERM: await self.clean_all_kernels(blocking=True) @@ -1285,6 +1286,7 @@ def _get_session_id(container: Container) -> SessionId | None: ) return None + log.debug("sync_container_lifecycles(): triggered") try: _containers = await self.enumerate_containers(ACTIVE_STATUS_SET | DEAD_STATUS_SET) async with self.registry_lock: @@ -1295,6 +1297,9 @@ def _get_session_id(container: Container) -> SessionId | None: for kid, container in _containers if container.status in DEAD_STATUS_SET ] + log.debug( + f"detected dead containers: {[container.id[:12] for _, container in dead_containers]}" + ) for kernel_id, container in dead_containers: if kernel_id in self.restarting_kernels: continue @@ -1318,6 +1323,9 @@ def _get_session_id(container: Container) -> SessionId | None: for kid, container in _containers if container.status in ACTIVE_STATUS_SET ] + log.debug( + f"detected active containers: {[container.id[:12] for _, container in active_containers]}" + ) for kernel_id, container in active_containers: alive_kernels[kernel_id] = container.id session_id = _get_session_id(container) @@ -1340,6 +1348,7 @@ def _get_session_id(container: Container) -> SessionId | None: or kernel_id in self.terminating_kernels ): continue + log.debug(f"kernel with no container (kid: {kernel_id})") terminated_kernels[kernel_id] = ContainerLifecycleEvent( kernel_id, kernel_session_map[kernel_id], @@ -1347,10 +1356,11 @@ def _get_session_id(container: Container) -> SessionId | None: LifecycleEvent.CLEAN, KernelLifecycleEventReason.SELF_TERMINATED, ) - # Check if: there are containers already deleted from my registry or not spawned by me. + # Check if: there are containers already deleted from my registry. for kernel_id in alive_kernels.keys() - known_kernels.keys(): if kernel_id in self.restarting_kernels: continue + log.debug(f"kernel not found in registry (kid:{kernel_id})") terminated_kernels[kernel_id] = ContainerLifecycleEvent( kernel_id, kernel_session_map[kernel_id], From ccdc29bbcca011e103369968dca619011df8131d Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Fri, 5 Jul 2024 19:28:39 +0900 Subject: [PATCH 3/3] more logging and more event type --- src/ai/backend/agent/agent.py | 6 +++++- src/ai/backend/common/events.py | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/ai/backend/agent/agent.py b/src/ai/backend/agent/agent.py index e5ff9f86e37..4a373eaea0d 100644 --- a/src/ai/backend/agent/agent.py +++ b/src/ai/backend/agent/agent.py @@ -1354,7 +1354,7 @@ def _get_session_id(container: Container) -> SessionId | None: kernel_session_map[kernel_id], known_kernels[kernel_id], LifecycleEvent.CLEAN, - KernelLifecycleEventReason.SELF_TERMINATED, + KernelLifecycleEventReason.CONTAINER_NOT_FOUND, ) # Check if: there are containers already deleted from my registry. for kernel_id in alive_kernels.keys() - known_kernels.keys(): @@ -1370,6 +1370,10 @@ def _get_session_id(container: Container) -> SessionId | None: ) finally: # Enqueue the events. + terminated_kernel_ids = ",".join([ + str(kid) for kid in terminated_kernels.keys() + ]) + log.debug(f"Terminating kernels(ids:[{terminated_kernel_ids}])") for kernel_id, ev in terminated_kernels.items(): await self.container_lifecycle_queue.put(ev) diff --git a/src/ai/backend/common/events.py b/src/ai/backend/common/events.py index 7b4ae3acb61..c6ed9fe0538 100644 --- a/src/ai/backend/common/events.py +++ b/src/ai/backend/common/events.py @@ -235,6 +235,7 @@ class KernelLifecycleEventReason(enum.StrEnum): UNKNOWN = "unknown" USER_REQUESTED = "user-requested" NOT_FOUND_IN_MANAGER = "not-found-in-manager" + CONTAINER_NOT_FOUND = "container-not-found" @classmethod def from_value(cls, value: Optional[str]) -> Optional[KernelLifecycleEventReason]: