Skip to content

Commit eedfa4c

Browse files
committed
handle many kernel status
1 parent f6b31e4 commit eedfa4c

2 files changed

Lines changed: 62 additions & 16 deletions

File tree

src/ai/backend/agent/server.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,8 @@ async def sync_kernel_registry(
483483
@collect_error
484484
async def sync_and_get_kernels(
485485
self,
486+
preparing_kernels: Collection[str],
487+
pulling_kernels: Collection[str],
486488
running_kernels: Collection[str],
487489
terminating_kernels: Collection[str],
488490
) -> dict[str, Any]:
@@ -547,10 +549,30 @@ async def sync_and_get_kernels(
547549
or KernelLifecycleEventReason.NOT_FOUND_IN_MANAGER,
548550
suppress_events=False,
549551
)
550-
elif kernel_id not in running_kernels:
551-
# The kernel status is not 'running' or 'terminating' in truth.
552-
# It should be terminated.
553-
if kernel_id not in self.agent.terminating_kernels:
552+
elif kernel_id in running_kernels:
553+
pass
554+
elif kernel_id in preparing_kernels:
555+
# kernel_registry may not have `preparing` state kernels.
556+
pass
557+
elif kernel_id in pulling_kernels:
558+
# kernel_registry does not have `pulling` state kernels.
559+
# Let's just skip it.
560+
pass
561+
else:
562+
# This kernel is not alive according to the truth data.
563+
# The kernel should be destroyed or cleaned
564+
if kernel_id in self.agent.terminating_kernels:
565+
await self.agent.inject_container_lifecycle_event(
566+
kernel_id,
567+
kernel_obj.session_id,
568+
LifecycleEvent.CLEAN,
569+
kernel_obj.termination_reason
570+
or KernelLifecycleEventReason.NOT_FOUND_IN_MANAGER,
571+
suppress_events=True,
572+
)
573+
elif kernel_id in self.agent.restarting_kernels:
574+
pass
575+
else:
554576
await self.agent.inject_container_lifecycle_event(
555577
kernel_id,
556578
kernel_obj.session_id,

src/ai/backend/manager/registry.py

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3089,11 +3089,15 @@ async def sync_agent_kernel_registry(self, agent_id: AgentId) -> None:
30893089
async def _sync_agent_resource_and_get_kerenels(
30903090
self,
30913091
agent_id: AgentId,
3092+
preparing_kernels: Collection[KernelId],
3093+
pulling_kernels: Collection[KernelId],
30923094
running_kernels: Collection[KernelId],
30933095
terminating_kernels: Collection[KernelId],
30943096
) -> AgentKernelRegistryByStatus:
30953097
async with self.agent_cache.rpc_context(agent_id) as rpc:
30963098
resp: dict[str, Any] = await rpc.call.sync_and_get_kernels(
3099+
preparing_kernels,
3100+
pulling_kernels,
30973101
running_kernels,
30983102
terminating_kernels,
30993103
)
@@ -3112,31 +3116,49 @@ async def sync_agent_resource(
31123116
.options(
31133117
selectinload(
31143118
AgentRow.kernels.and_(
3115-
KernelRow.status.in_([KernelStatus.RUNNING, KernelStatus.TERMINATING])
3119+
KernelRow.status.in_([
3120+
KernelStatus.PREPARING,
3121+
KernelStatus.PULLING,
3122+
KernelStatus.RUNNING,
3123+
KernelStatus.TERMINATING,
3124+
])
31163125
),
31173126
).options(load_only(KernelRow.id, KernelRow.status))
31183127
)
31193128
)
31203129
async with SASession(bind=db_connection) as db_session:
31213130
for _agent_row in await db_session.scalars(stmt):
31223131
agent_row = cast(AgentRow, _agent_row)
3132+
preparing_kernels: list[KernelId] = []
3133+
pulling_kernels: list[KernelId] = []
3134+
running_kernels: list[KernelId] = []
3135+
terminating_kernels: list[KernelId] = []
3136+
for kernel in agent_row.kernels:
3137+
kernel_status = cast(KernelStatus, kernel.status)
3138+
match kernel_status:
3139+
case KernelStatus.PREPARING:
3140+
preparing_kernels.append(KernelId(kernel.id))
3141+
case KernelStatus.PULLING:
3142+
pulling_kernels.append(KernelId(kernel.id))
3143+
case KernelStatus.RUNNING:
3144+
running_kernels.append(KernelId(kernel.id))
3145+
case KernelStatus.TERMINATING:
3146+
terminating_kernels.append(KernelId(kernel.id))
3147+
case _:
3148+
continue
31233149
agent_kernel_by_status[AgentId(agent_row.id)] = {
3124-
"running_kernels": [
3125-
KernelId(kern.id)
3126-
for kern in agent_row.kernels
3127-
if kern.status == KernelStatus.RUNNING
3128-
],
3129-
"terminating_kernels": [
3130-
KernelId(kern.id)
3131-
for kern in agent_row.kernels
3132-
if kern.status == KernelStatus.TERMINATING
3133-
],
3150+
"preparing_kernels": preparing_kernels,
3151+
"pulling_kernels": pulling_kernels,
3152+
"running_kernels": running_kernels,
3153+
"terminating_kernels": terminating_kernels,
31343154
}
31353155
tasks = []
31363156
for agent_id in agent_ids:
31373157
tasks.append(
31383158
self._sync_agent_resource_and_get_kerenels(
31393159
agent_id,
3160+
agent_kernel_by_status[agent_id]["preparing_kernels"],
3161+
agent_kernel_by_status[agent_id]["pulling_kernels"],
31403162
agent_kernel_by_status[agent_id]["running_kernels"],
31413163
agent_kernel_by_status[agent_id]["terminating_kernels"],
31423164
)
@@ -3154,7 +3176,9 @@ async def sync_agent_resource(
31543176
agent_errors,
31553177
)
31563178
else:
3157-
assert isinstance(resp, AgentKernelRegistryByStatus)
3179+
assert isinstance(
3180+
resp, AgentKernelRegistryByStatus
3181+
), f"response should be `AgentKernelRegistryByStatus`, not {type(resp)}"
31583182
result[aid] = resp
31593183
return result
31603184

0 commit comments

Comments
 (0)