@@ -302,12 +302,15 @@ def _pipeline(r: Redis) -> RedisPipeline:
302302 schedulable_scaling_groups = [row .scaling_group for row in result .fetchall ()]
303303
304304 async with self .db .begin () as db_conn :
305+ produce_do_prepare = False
305306 for sgroup_name in schedulable_scaling_groups :
306307 try :
307308 kernel_agent_bindings = await self ._schedule_in_sgroup (
308309 sched_ctx ,
309310 sgroup_name ,
310311 )
312+ if kernel_agent_bindings :
313+ produce_do_prepare = True
311314 await redis_helper .execute (
312315 self .redis_live ,
313316 lambda r : r .hset (
@@ -340,6 +343,8 @@ def _pipeline(r: Redis) -> RedisPipeline:
340343 datetime .now (tzutc ()).isoformat (),
341344 ),
342345 )
346+ if produce_do_prepare :
347+ await self .event_producer .produce_event (DoPrepareEvent ())
343348 except DBAPIError as e :
344349 if getattr (e .orig , "pgcode" , None ) == "55P03" :
345350 log .info (
@@ -444,7 +449,6 @@ async def _update():
444449 len (cancelled_sessions ),
445450 )
446451 zero = ResourceSlot ()
447- num_scheduled = 0
448452 kernel_agent_bindings_in_sgroup : list [KernelAgentBinding ] = []
449453
450454 while len (pending_sessions ) > 0 :
@@ -717,9 +721,6 @@ async def _update_session_status_data() -> None:
717721 continue
718722 else :
719723 kernel_agent_bindings_in_sgroup .extend (kernel_agent_bindings )
720- num_scheduled += 1
721- if num_scheduled > 0 :
722- await self .event_producer .produce_event (DoPrepareEvent ())
723724 return kernel_agent_bindings_in_sgroup
724725
725726 async def _filter_agent_by_container_limit (
0 commit comments