1616from tesp_api .repository .task_repository import task_repository
1717from tesp_api .service .file_transfer_service import file_transfer_service
1818from tesp_api .service .error import pulsar_event_handle_error , TaskNotFoundError , TaskExecutorError
19- from tesp_api .service .pulsar_operations import PulsarRestOperations , PulsarAmpqOperations , DataType
19+ from tesp_api .service .pulsar_operations import PulsarRestOperations , PulsarAmqpOperations , DataType
2020from tesp_api .repository .model .task import (
2121 TesTaskState ,
2222 TesTaskExecutor ,
2929
3030CONTAINER_TYPE = os .getenv ("CONTAINER_TYPE" , "docker" )
3131
32+
3233@local_handler .register (event_name = "queued_task" )
3334def handle_queued_task (event : Event ) -> None :
3435 """
@@ -39,8 +40,9 @@ def handle_queued_task(event: Event) -> None:
3940 match pulsar_service .get_operations ():
4041 case PulsarRestOperations () as pulsar_rest_operations :
4142 dispatch_event ('queued_task_rest' , {** payload , 'pulsar_operations' : pulsar_rest_operations })
42- case PulsarAmpqOperations () as pulsar_ampq_operations :
43- dispatch_event ('queued_task_ampq' , {** payload , 'pulsar_operations' : pulsar_ampq_operations })
43+ case PulsarAmqpOperations () as pulsar_amqp_operations :
44+ dispatch_event ('queued_task_amqp' , {** payload , 'pulsar_operations' : pulsar_amqp_operations })
45+
4446
4547@local_handler .register (event_name = "queued_task_rest" )
4648async def handle_queued_task_rest (event : Event ):
@@ -53,12 +55,37 @@ async def handle_queued_task_rest(event: Event):
5355
5456 print (f"Queued task rest: { task_id } " )
5557
56- await Promise (lambda resolve , reject : resolve (None ))\
57- .then (lambda nothing : pulsar_operations .setup_job (task_id ))\
58- .map (lambda setup_job_result : dispatch_event ('initialize_task' , {** payload , 'task_config' : setup_job_result }))\
59- .catch (lambda error : pulsar_event_handle_error (error , task_id , event_name , pulsar_operations ))\
58+ await Promise (lambda resolve , reject : resolve (None )) \
59+ .then (lambda nothing : pulsar_operations .setup_job (task_id )) \
60+ .map (lambda setup_job_result : dispatch_event ('initialize_task' , {** payload , 'task_config' : setup_job_result })) \
61+ .catch (lambda error : pulsar_event_handle_error (error , task_id , event_name , pulsar_operations )) \
6062 .then (lambda x : x ) # Invokes promise, potentially from error handler
6163
64+
65+ @local_handler .register (event_name = "queued_task_amqp" )
66+ async def handle_queued_task_amqp (event : Event ):
67+ """
68+ Sets up the job in Pulsar via AMQP operations and dispatches an 'initialize_task' event.
69+ """
70+ event_name , payload = event
71+ task_id : ObjectId = payload ['task_id' ]
72+ pulsar_operations : PulsarAmqpOperations = payload ['pulsar_operations' ]
73+
74+ print (f"Queued task AMQP: { task_id } " )
75+
76+ try :
77+ # Setup job via AMQP
78+ setup_job_result = await pulsar_operations .setup_job (task_id )
79+
80+ # Dispatch initialize event
81+ await dispatch_event ('initialize_task' , {
82+ ** payload ,
83+ 'task_config' : setup_job_result
84+ })
85+ except Exception as error :
86+ await pulsar_event_handle_error (error , task_id , event_name , pulsar_operations )
87+
88+
6289@local_handler .register (event_name = "initialize_task" )
6390async def handle_initializing_task (event : Event ) -> None :
6491 """
@@ -69,12 +96,11 @@ async def handle_initializing_task(event: Event) -> None:
6996 task_id : ObjectId = payload ['task_id' ]
7097 pulsar_operations : PulsarRestOperations = payload ['pulsar_operations' ]
7198
72- # Merged Logic: Using the feature-complete setup_data from the new version
7399 async def setup_data (job_id : ObjectId ,
74- resources : TesTaskResources ,
75- volumes : List [str ],
76- inputs : List [TesTaskInput ],
77- outputs : List [TesTaskOutput ]):
100+ resources : TesTaskResources ,
101+ volumes : List [str ],
102+ inputs : List [TesTaskInput ],
103+ outputs : List [TesTaskOutput ]):
78104 resource_conf : dict
79105 volume_confs : List [dict ] = []
80106 input_confs : List [dict ] = []
@@ -109,28 +135,29 @@ async def setup_data(job_id: ObjectId,
109135 return resource_conf , volume_confs , input_confs , output_confs
110136
111137 print (f"Initializing task: { task_id } " )
112- await Promise (lambda resolve , reject : resolve (None ))\
138+ await Promise (lambda resolve , reject : resolve (None )) \
113139 .then (lambda nothing : task_repository .update_task_state (
114- task_id ,
115- TesTaskState .QUEUED ,
116- TesTaskState .INITIALIZING
117- )).map (lambda updated_task : get_else_throw (
118- updated_task , TaskNotFoundError (task_id , Just (TesTaskState .QUEUED ))
119- )).then (lambda updated_task : setup_data (
120- task_id ,
121- maybe_of (updated_task .resources ).maybe (None , lambda x : x ),
122- maybe_of (updated_task .volumes ).maybe ([], lambda x : x ),
123- maybe_of (updated_task .inputs ).maybe ([], lambda x : x ),
124- maybe_of (updated_task .outputs ).maybe ([], lambda x : x )
125- )).map (lambda res_input_output_confs : dispatch_event ('run_task' , {
126- ** payload ,
127- 'resource_conf' : res_input_output_confs [0 ],
128- 'volume_confs' : res_input_output_confs [1 ],
129- 'input_confs' : res_input_output_confs [2 ],
130- 'output_confs' : res_input_output_confs [3 ]
131- })).catch (lambda error : pulsar_event_handle_error (error , task_id , event_name , pulsar_operations ))\
140+ task_id ,
141+ TesTaskState .QUEUED ,
142+ TesTaskState .INITIALIZING
143+ )).map (lambda updated_task : get_else_throw (
144+ updated_task , TaskNotFoundError (task_id , Just (TesTaskState .QUEUED ))
145+ )).then (lambda updated_task : setup_data (
146+ task_id ,
147+ maybe_of (updated_task .resources ).maybe (None , lambda x : x ),
148+ maybe_of (updated_task .volumes ).maybe ([], lambda x : x ),
149+ maybe_of (updated_task .inputs ).maybe ([], lambda x : x ),
150+ maybe_of (updated_task .outputs ).maybe ([], lambda x : x )
151+ )).map (lambda res_input_output_confs : dispatch_event ('run_task' , {
152+ ** payload ,
153+ 'resource_conf' : res_input_output_confs [0 ],
154+ 'volume_confs' : res_input_output_confs [1 ],
155+ 'input_confs' : res_input_output_confs [2 ],
156+ 'output_confs' : res_input_output_confs [3 ]
157+ })).catch (lambda error : pulsar_event_handle_error (error , task_id , event_name , pulsar_operations )) \
132158 .then (lambda x : x )
133159
160+
134161@local_handler .register (event_name = "run_task" )
135162async def handle_run_task (event : Event ) -> None :
136163 """
@@ -146,8 +173,8 @@ async def handle_run_task(event: Event) -> None:
146173 input_confs : List [dict ] = payload ['input_confs' ]
147174 output_confs : List [dict ] = payload ['output_confs' ]
148175 pulsar_operations : PulsarRestOperations = payload ['pulsar_operations' ]
149-
150- run_command_str = None
176+
177+ run_command_str = None
151178 command_start_time = datetime .datetime .now (datetime .timezone .utc )
152179
153180 try :
@@ -175,7 +202,7 @@ async def handle_run_task(event: Event) -> None:
175202 )
176203
177204 stage_exec = TesTaskExecutor (image = "willdockerhub/curl-wget:latest" , command = [], workdir = Path ("/downloads" ))
178-
205+
179206 # Stage-in command
180207 stage_in_cmd = ""
181208 stage_in_mount = ""
@@ -211,7 +238,6 @@ async def handle_run_task(event: Event) -> None:
211238 non_empty_parts = [p .strip () for p in parts if p and p .strip ()]
212239 run_command_str = " && " .join (non_empty_parts ) if non_empty_parts else None
213240
214- # Resume with the polished version's logic for execution and state management
215241 command_start_time = datetime .datetime .now (datetime .timezone .utc )
216242 command_status : dict
217243
@@ -231,27 +257,28 @@ async def handle_run_task(event: Event) -> None:
231257 command_status .get ('returncode' , - 1 )
232258 )
233259
234- current_task_monad = await task_repository .get_task (maybe_of (author ), {'_id' : task_id })
235- current_task_obj = get_else_throw (current_task_monad , TaskNotFoundError (task_id ))
260+ current_task_monad = await task_repository .get_task (maybe_of (author ), {'_id' : task_id })
261+ current_task_obj = get_else_throw (current_task_monad , TaskNotFoundError (task_id ))
236262
237263 if current_task_obj .state == TesTaskState .CANCELED :
238264 print (f"Task { task_id } found CANCELED after job completion polling. Aborting state changes." )
239- return
265+ return
240266
241267 if command_status .get ('returncode' , - 1 ) != 0 :
242- print (f"Task { task_id } executor error (return code: { command_status .get ('returncode' , - 1 )} ). Setting state to EXECUTOR_ERROR." )
268+ print (
269+ f"Task { task_id } executor error (return code: { command_status .get ('returncode' , - 1 )} ). Setting state to EXECUTOR_ERROR." )
243270 await task_repository .update_task_state (task_id , TesTaskState .RUNNING , TesTaskState .EXECUTOR_ERROR )
244271 await pulsar_operations .erase_job (task_id )
245- return
272+ return
246273
247274 print (f"Task { task_id } completed successfully. Setting state to COMPLETE." )
248275 await Promise (lambda resolve , reject : resolve (None )) \
249276 .then (lambda ignored : task_repository .update_task_state (
250- task_id , TesTaskState .RUNNING , TesTaskState .COMPLETE
251- )) \
277+ task_id , TesTaskState .RUNNING , TesTaskState .COMPLETE
278+ )) \
252279 .map (lambda task_after_complete_update : get_else_throw (
253- task_after_complete_update , TaskNotFoundError (task_id , Just (TesTaskState .RUNNING ))
254- )) \
280+ task_after_complete_update , TaskNotFoundError (task_id , Just (TesTaskState .RUNNING ))
281+ )) \
255282 .then (lambda ignored : pulsar_operations .erase_job (task_id )) \
256283 .catch (lambda error : pulsar_event_handle_error (error , task_id , event_name , pulsar_operations )) \
257284 .then (lambda x : x )
@@ -262,22 +289,24 @@ async def handle_run_task(event: Event) -> None:
262289 await pulsar_operations .kill_job (task_id )
263290 await pulsar_operations .erase_job (task_id )
264291 print (f"Task { task_id } Pulsar job cleanup attempted after asyncio cancellation." )
265-
292+
266293 except Exception as error :
267294 print (f"Exception in handle_run_task for task { task_id } : { type (error ).__name__ } - { error } " )
268295
269296 task_state_after_error_monad = await task_repository .get_task (maybe_of (author ), {'_id' : task_id })
270297 if task_state_after_error_monad .is_just () and task_state_after_error_monad .value .state == TesTaskState .CANCELED :
271- print (f"Task { task_id } is already CANCELED. Exception '{ type (error ).__name__ } ' likely due to this. No further error processing by handler." )
272- return
298+ print (
299+ f"Task { task_id } is already CANCELED. Exception '{ type (error ).__name__ } ' likely due to this. No further error processing by handler." )
300+ return
273301
274302 print (f"Task { task_id } not CANCELED; proceeding with pulsar_event_handle_error for '{ type (error ).__name__ } '." )
275303 error_handler_result = pulsar_event_handle_error (error , task_id , event_name , pulsar_operations )
276304 if asyncio .iscoroutine (error_handler_result ) or isinstance (error_handler_result , _Promise ):
277305 await error_handler_result
278-
279- try :
280- print (f"Ensuring Pulsar job for task { task_id } is erased after general error handling in run_task." )
281- await pulsar_operations .erase_job (task_id )
282- except Exception as final_erase_error :
283- print (f"Error during final Pulsar erase attempt for task { task_id } after general error: { final_erase_error } " )
306+
307+ # try:
308+ # print(f"Ensuring Pulsar job for task {task_id} is erased after general error handling in run_task.")
309+ # await pulsar_operations.erase_job(task_id)
310+ # except Exception as final_erase_error:
311+ # print(
312+ # f"Error during final Pulsar erase attempt for task {task_id} after general error: {final_erase_error}")
0 commit comments