Skip to content

Commit dd46895

Browse files
fregataakyujin-cho
andcommitted
fix: Omit cleaning containerless kernels which are still creating its container (#2317)
Co-authored-by: Kyujin Cho <kyujin.cho@lablup.com> Backported-from: main (24.09) Backported-to: 24.03 Backport-of: 2317
1 parent 82c2276 commit dd46895

4 files changed

Lines changed: 38 additions & 9 deletions

File tree

changes/2317.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Omit to clean containerless kernels which are still creating its container.

src/ai/backend/agent/agent.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@
157157
Container,
158158
ContainerLifecycleEvent,
159159
ContainerStatus,
160+
KernelLifecycleStatus,
160161
LifecycleEvent,
161162
MountInfo,
162163
)
@@ -560,7 +561,6 @@ class AbstractAgent(
560561
redis: Redis
561562

562563
restarting_kernels: MutableMapping[KernelId, RestartTracker]
563-
terminating_kernels: Set[KernelId]
564564
timer_tasks: MutableSequence[asyncio.Task]
565565
container_lifecycle_queue: asyncio.Queue[ContainerLifecycleEvent | Sentinel]
566566

@@ -600,7 +600,6 @@ def __init__(
600600
self.computers = {}
601601
self.images = {} # repoTag -> digest
602602
self.restarting_kernels = {}
603-
self.terminating_kernels = set()
604603
self.stat_ctx = StatContext(
605604
self,
606605
mode=StatModes(local_config["container"]["stats-type"]),
@@ -969,7 +968,10 @@ async def collect_container_stat(self, interval: float):
969968
container_ids = []
970969
async with self.registry_lock:
971970
for kernel_id, kernel_obj in [*self.kernel_registry.items()]:
972-
if not kernel_obj.stats_enabled:
971+
if (
972+
not kernel_obj.stats_enabled
973+
or kernel_obj.state != KernelLifecycleStatus.RUNNING
974+
):
973975
continue
974976
container_ids.append(kernel_obj["container_id"])
975977
await self.stat_ctx.collect_container_stat(container_ids)
@@ -987,7 +989,10 @@ async def collect_process_stat(self, interval: float):
987989
container_ids = []
988990
async with self.registry_lock:
989991
for kernel_id, kernel_obj in [*self.kernel_registry.items()]:
990-
if not kernel_obj.stats_enabled:
992+
if (
993+
not kernel_obj.stats_enabled
994+
or kernel_obj.state != KernelLifecycleStatus.RUNNING
995+
):
991996
continue
992997
updated_kernel_ids.append(kernel_id)
993998
container_ids.append(kernel_obj["container_id"])
@@ -1012,14 +1017,14 @@ async def _handle_start_event(self, ev: ContainerLifecycleEvent) -> None:
10121017
kernel_obj = self.kernel_registry.get(ev.kernel_id)
10131018
if kernel_obj is not None:
10141019
kernel_obj.stats_enabled = True
1020+
kernel_obj.state = KernelLifecycleStatus.RUNNING
10151021

10161022
async def _handle_destroy_event(self, ev: ContainerLifecycleEvent) -> None:
10171023
try:
10181024
current_task = asyncio.current_task()
10191025
assert current_task is not None
10201026
if ev.kernel_id not in self._ongoing_destruction_tasks:
10211027
self._ongoing_destruction_tasks[ev.kernel_id] = current_task
1022-
self.terminating_kernels.add(ev.kernel_id)
10231028
async with self.registry_lock:
10241029
kernel_obj = self.kernel_registry.get(ev.kernel_id)
10251030
if kernel_obj is None:
@@ -1042,6 +1047,7 @@ async def _handle_destroy_event(self, ev: ContainerLifecycleEvent) -> None:
10421047
ev.done_future.set_result(None)
10431048
return
10441049
else:
1050+
kernel_obj.state = KernelLifecycleStatus.TERMINATING
10451051
kernel_obj.stats_enabled = False
10461052
kernel_obj.termination_reason = ev.reason
10471053
if kernel_obj.runner is not None:
@@ -1115,7 +1121,6 @@ async def _handle_clean_event(self, ev: ContainerLifecycleEvent) -> None:
11151121
self.port_pool.update(restored_ports)
11161122
await kernel_obj.close()
11171123
finally:
1118-
self.terminating_kernels.discard(ev.kernel_id)
11191124
if restart_tracker := self.restarting_kernels.get(ev.kernel_id, None):
11201125
restart_tracker.destroy_event.set()
11211126
else:
@@ -1349,9 +1354,10 @@ def _get_session_id(container: Container) -> SessionId | None:
13491354
kernel_session_map[kernel_id] = session_id
13501355
# Check if: kernel_registry has the container but it's gone.
13511356
for kernel_id in known_kernels.keys() - alive_kernels.keys():
1357+
kernel_obj = self.kernel_registry[kernel_id]
13521358
if (
13531359
kernel_id in self.restarting_kernels
1354-
or kernel_id in self.terminating_kernels
1360+
or kernel_obj.state != KernelLifecycleStatus.RUNNING
13551361
):
13561362
continue
13571363
log.debug(f"kernel with no container (kid: {kernel_id})")
@@ -1379,7 +1385,8 @@ def _get_session_id(container: Container) -> SessionId | None:
13791385
terminated_kernel_ids = ",".join([
13801386
str(kid) for kid in terminated_kernels.keys()
13811387
])
1382-
log.debug(f"Terminating kernels(ids:[{terminated_kernel_ids}])")
1388+
if terminated_kernel_ids:
1389+
log.debug(f"Terminate kernels(ids:[{terminated_kernel_ids}])")
13831390
for kernel_id, ev in terminated_kernels.items():
13841391
await self.container_lifecycle_queue.put(ev)
13851392

@@ -2141,6 +2148,8 @@ async def create_kernel(
21412148
},
21422149
),
21432150
)
2151+
async with self.registry_lock:
2152+
kernel_obj.state = KernelLifecycleStatus.RUNNING
21442153

21452154
# The startup command for the batch-type sessions will be executed by the manager
21462155
# upon firing of the "session_started" event.

src/ai/backend/agent/kernel.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555

5656
from .exception import UnsupportedBaseDistroError
5757
from .resources import KernelResourceSpec
58-
from .types import AgentEventData
58+
from .types import AgentEventData, KernelLifecycleStatus
5959

6060
log = BraceStyleAdapter(logging.getLogger(__spec__.name)) # type: ignore[name-defined]
6161

@@ -177,6 +177,7 @@ class AbstractKernel(UserDict, aobject, metaclass=ABCMeta):
177177
stats_enabled: bool
178178
# FIXME: apply TypedDict to data in Python 3.8
179179
environ: Mapping[str, Any]
180+
status: KernelLifecycleStatus
180181

181182
_tasks: Set[asyncio.Task]
182183

@@ -213,6 +214,7 @@ def __init__(
213214
self.environ = environ
214215
self.runner = None
215216
self.container_id = None
217+
self.state = KernelLifecycleStatus.PREPARING
216218

217219
async def init(self, event_producer: EventProducer) -> None:
218220
log.debug(
@@ -233,6 +235,9 @@ def __getstate__(self) -> Mapping[str, Any]:
233235
return props
234236

235237
def __setstate__(self, props) -> None:
238+
# Used when a `Kernel` object is loaded from pickle data.
239+
if "state" not in props:
240+
props["state"] = KernelLifecycleStatus.RUNNING
236241
self.__dict__.update(props)
237242
# agent_config is set by the pickle.loads() caller.
238243
self.clean_event = None

src/ai/backend/agent/types.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,20 @@ class Container:
6464
backend_obj: Any # used to keep the backend-specific data
6565

6666

67+
class KernelLifecycleStatus(enum.StrEnum):
68+
"""
69+
The lifecycle status of `AbstractKernel` object.
70+
71+
By default, the state of a newly created kernel is `PREPARING`.
72+
The state of a kernel changes from `PREPARING` to `RUNNING` after the kernel starts a container successfully.
73+
It changes from `RUNNING` to `TERMINATING` before destroy kernel.
74+
"""
75+
76+
PREPARING = enum.auto()
77+
RUNNING = enum.auto()
78+
TERMINATING = enum.auto()
79+
80+
6781
class LifecycleEvent(enum.IntEnum):
6882
DESTROY = 0
6983
CLEAN = 1

0 commit comments

Comments
 (0)