1616from diem .jsonrpc import AsyncClient
1717from diem .diem_types import AccountAddress
1818
19- import logging , numpy , asyncio , uuid
19+ import logging , numpy , asyncio , uuid , functools
2020
2121
2222class App :
@@ -131,20 +131,19 @@ async def start_worker(self, delay: float = 0.05) -> asyncio.Task:
131131 await self .event_puller .head ()
132132 self .add_bg_task (self .event_puller .process )
133133 self .add_bg_task (self ._send_pending_payments )
134+ self .add_bg_task (functools .partial (asyncio .sleep , delay ))
134135
135136 async def worker () -> None :
136137 while True :
137- for t in self .bg_tasks :
138- try :
139- await t ()
140- await asyncio .sleep (delay )
141- except asyncio .CancelledError :
138+ try :
139+ await asyncio .gather (* [t () for t in self .bg_tasks ])
140+ except asyncio .CancelledError :
141+ return
142+ except Exception as e :
143+ if "cannot schedule new futures" in str (e ):
144+ # ignore unexpected shutdown RuntimeError
142145 return
143- except Exception as e :
144- if "cannot schedule new futures" in str (e ):
145- # ignore unexpected shutdown RuntimeError
146- return
147- self .logger .exception (e )
146+ self .logger .exception (e )
148147
149148 return asyncio .create_task (worker ())
150149
@@ -162,23 +161,26 @@ async def txn_metadata(self, txn: Transaction) -> Tuple[bytes, bytes]:
162161 raise ValueError ("could not create diem payment transacton metadata: %s" % txn )
163162
164163 async def _send_pending_payments (self ) -> None :
165- for txn in self .store .find_all (Transaction , status = Transaction .Status .pending ):
166- if self .store .find (Account , id = txn .account_id ).disable_background_tasks :
167- self .logger .debug ("account bg tasks disabled, ignore %s" , txn )
168- continue
169- self .logger .info ("processing %s" , txn )
170- try :
171- if txn .payee_account_id :
172- self ._send_internal_payment_txn (txn )
173- else :
174- await self ._send_external_payment_txn (txn )
175- except jsonrpc .JsonRpcError as e :
176- msg = "ignore error %s when sending transaction %s, retry later"
177- self .logger .info (msg , e , txn , exc_info = True )
178- except Exception as e :
179- msg = "send pending transaction failed with %s, cancel transaction %s."
180- self .logger .error (msg , e , txn , exc_info = True )
181- self .store .update (txn , status = Transaction .Status .canceled , cancel_reason = str (e ))
164+ txns = self .store .find_all (Transaction , status = Transaction .Status .pending )
165+ await asyncio .gather (* [self ._send_pending_payment (txn ) for txn in txns ])
166+
167+ async def _send_pending_payment (self , txn : Transaction ) -> None :
168+ if self .store .find (Account , id = txn .account_id ).disable_background_tasks :
169+ self .logger .debug ("account bg tasks disabled, ignore %s" , txn )
170+ return
171+ self .logger .info ("send pending payment %s" , txn )
172+ try :
173+ if txn .payee_account_id :
174+ self ._send_internal_payment_txn (txn )
175+ else :
176+ await self ._send_external_payment_txn (txn )
177+ except jsonrpc .JsonRpcError as e :
178+ msg = "ignore error %s when sending transaction %s, retry later"
179+ self .logger .info (msg , e , txn , exc_info = True )
180+ except Exception as e :
181+ msg = "send pending transaction failed with %s, cancel transaction %s."
182+ self .logger .error (msg , e , txn , exc_info = True )
183+ self .store .update (txn , status = Transaction .Status .canceled , cancel_reason = str (e ))
182184
183185 def _send_internal_payment_txn (self , txn : Transaction ) -> None :
184186 self .store .create (
0 commit comments