@@ -157,13 +157,21 @@ def __init__(
157157 self ._worker_semaphore = asyncio .Semaphore (max_workers )
158158 self ._scheduler_task : Optional [asyncio .Task ] = None
159159 self ._db_initialized = False
160+ self ._conn : Optional [aiosqlite .Connection ] = None
161+
162+ async def _get_db (self ) -> aiosqlite .Connection :
163+ """Get or create a persistent database connection."""
164+ if self ._conn is None :
165+ self ._conn = await aiosqlite .connect (self .db_path )
166+ await self ._conn .execute ("PRAGMA journal_mode=WAL" )
167+ return self ._conn
160168
161169 async def _init_db (self ) -> None :
162170 """Initialize SQLite database schema using aiosqlite."""
163171 if self ._db_initialized :
164172 return
165- async with aiosqlite . connect ( self .db_path ) as db :
166- await db . execute ( "PRAGMA journal_mode=WAL" )
173+ db = await self ._get_db ()
174+ if True : # preserve indentation
167175 # Scheduled tasks table
168176 await db .execute (
169177 """
@@ -214,7 +222,7 @@ async def _init_db(self) -> None:
214222 """
215223 )
216224 await db .commit ()
217- self ._db_initialized = True
225+ self ._db_initialized = True
218226
219227 async def schedule_task (
220228 self ,
@@ -272,31 +280,30 @@ async def schedule_task(
272280 async def _store_task (self , task : ScheduledTask ) -> None :
273281 """Store task in database using aiosqlite."""
274282 await self ._init_db ()
275- async with aiosqlite .connect (self .db_path ) as db :
276- await db .execute ("PRAGMA journal_mode=WAL" )
277- await db .execute (
278- """
279- INSERT OR REPLACE INTO scheduled_tasks
280- (task_id, name, schedule_type, run_at, interval_seconds,
281- max_retries, timeout_seconds, created_at, last_run, next_run,
282- is_active)
283- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
284- """ ,
285- (
286- task .task_id ,
287- task .name ,
288- task .schedule_type .value ,
289- task .run_at .isoformat () if task .run_at else None ,
290- task .interval_seconds ,
291- task .max_retries ,
292- task .timeout_seconds ,
293- task .created_at .isoformat (),
294- task .last_run .isoformat () if task .last_run else None ,
295- task .next_run .isoformat () if task .next_run else None ,
296- task .is_active ,
297- ),
298- )
299- await db .commit ()
283+ db = await self ._get_db ()
284+ await db .execute (
285+ """
286+ INSERT OR REPLACE INTO scheduled_tasks
287+ (task_id, name, schedule_type, run_at, interval_seconds,
288+ max_retries, timeout_seconds, created_at, last_run, next_run,
289+ is_active)
290+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
291+ """ ,
292+ (
293+ task .task_id ,
294+ task .name ,
295+ task .schedule_type .value ,
296+ task .run_at .isoformat () if task .run_at else None ,
297+ task .interval_seconds ,
298+ task .max_retries ,
299+ task .timeout_seconds ,
300+ task .created_at .isoformat (),
301+ task .last_run .isoformat () if task .last_run else None ,
302+ task .next_run .isoformat () if task .next_run else None ,
303+ task .is_active ,
304+ ),
305+ )
306+ await db .commit ()
300307
301308 async def cancel_task (self , task_id : str ) -> bool :
302309 """
@@ -460,7 +467,7 @@ async def _scheduler_loop(self) -> None:
460467 except asyncio .CancelledError :
461468 pass
462469 except Exception as e :
463- logger .error (f "Error in scheduler loop: { e } " )
470+ logger .error ("Error in scheduler loop" , exc_info = True )
464471
465472 async def _execute_task (self , task : ScheduledTask ) -> None :
466473 """
@@ -508,7 +515,7 @@ async def _execute_task(self, task: ScheduledTask) -> None:
508515 )
509516
510517 except Exception as e :
511- logger .error (f "Error executing task { task .task_id } : { e } " )
518+ logger .error ("Error executing task {task.task_id}" , exc_info = True )
512519 result = TaskResult (
513520 task_id = task .task_id ,
514521 task_name = task .name ,
@@ -589,12 +596,15 @@ async def _run_task_with_retries(
589596 execution_count = attempt ,
590597 )
591598
592- except Exception as e :
593- error_msg = str ( e )
594- if attempt < task .max_retries and "retryable" in error_msg . lower () :
595- logger .warning (f"Task error: { error_msg } , retrying... (attempt { attempt + 1 } )" )
599+ except sqlite3 . OperationalError :
600+ # Retry on database lock errors (e.g. "database is locked" )
601+ if attempt < task .max_retries :
602+ logger .warning ("Database lock error , retrying (attempt %d)" , attempt + 1 , exc_info = True )
596603 await asyncio .sleep (2 ** attempt )
597604 return await self ._run_task_with_retries (task , attempt + 1 )
605+ error_msg = "Database lock error"
606+ except Exception as e :
607+ error_msg = str (e )
598608
599609 completed_at = datetime .now ()
600610 return TaskResult (
@@ -611,45 +621,44 @@ async def _run_task_with_retries(
611621 async def _store_result (self , result : TaskResult ) -> None :
612622 """Store task result in database using aiosqlite."""
613623 await self ._init_db ()
614- async with aiosqlite .connect (self .db_path ) as db :
615- await db .execute ("PRAGMA journal_mode=WAL" )
616- await db .execute (
617- """
618- INSERT OR REPLACE INTO task_results
619- (task_id, task_name, status, started_at, completed_at,
620- duration_seconds, output, error, execution_count, next_scheduled)
621- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
622- """ ,
623- (
624- result .task_id ,
625- result .task_name ,
626- result .status .value ,
627- result .started_at .isoformat (),
628- result .completed_at .isoformat () if result .completed_at else None ,
629- result .duration_seconds ,
630- result .output ,
631- result .error ,
632- result .execution_count ,
633- result .next_scheduled .isoformat () if result .next_scheduled else None ,
634- ),
635- )
636- # Also store in history
637- await db .execute (
638- """
639- INSERT INTO task_history
640- (task_id, timestamp, status, output, error, duration_seconds)
641- VALUES (?, ?, ?, ?, ?, ?)
642- """ ,
643- (
644- result .task_id ,
645- result .completed_at .isoformat () if result .completed_at else datetime .now ().isoformat (),
646- result .status .value ,
647- result .output ,
648- result .error ,
649- result .duration_seconds ,
650- ),
651- )
652- await db .commit ()
624+ db = await self ._get_db ()
625+ await db .execute (
626+ """
627+ INSERT OR REPLACE INTO task_results
628+ (task_id, task_name, status, started_at, completed_at,
629+ duration_seconds, output, error, execution_count, next_scheduled)
630+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
631+ """ ,
632+ (
633+ result .task_id ,
634+ result .task_name ,
635+ result .status .value ,
636+ result .started_at .isoformat (),
637+ result .completed_at .isoformat () if result .completed_at else None ,
638+ result .duration_seconds ,
639+ result .output ,
640+ result .error ,
641+ result .execution_count ,
642+ result .next_scheduled .isoformat () if result .next_scheduled else None ,
643+ ),
644+ )
645+ # Also store in history
646+ await db .execute (
647+ """
648+ INSERT INTO task_history
649+ (task_id, timestamp, status, output, error, duration_seconds)
650+ VALUES (?, ?, ?, ?, ?, ?)
651+ """ ,
652+ (
653+ result .task_id ,
654+ result .completed_at .isoformat () if result .completed_at else datetime .now ().isoformat (),
655+ result .status .value ,
656+ result .output ,
657+ result .error ,
658+ result .duration_seconds ,
659+ ),
660+ )
661+ await db .commit ()
653662
654663 async def get_task_history (
655664 self ,
@@ -667,33 +676,33 @@ async def get_task_history(
667676 List of history records
668677 """
669678 await self ._init_db ()
670- async with aiosqlite .connect (self .db_path ) as db :
671- db .row_factory = aiosqlite .Row
672- if task_id :
673- cursor = await db .execute (
674- """
675- SELECT task_id, timestamp, status, output, error,
676- duration_seconds
677- FROM task_history
678- WHERE task_id = ?
679- ORDER BY timestamp DESC
680- LIMIT ?
681- """ ,
682- (task_id , limit ),
683- )
684- else :
685- cursor = await db .execute (
679+ db = await self ._get_db ()
680+ db .row_factory = aiosqlite .Row
681+ if task_id :
682+ cursor = await db .execute (
686683 """
687- SELECT task_id, timestamp, status, output, error,
688- duration_seconds
689- FROM task_history
690- ORDER BY timestamp DESC
691- LIMIT ?
692- """ ,
693- (limit ,),
694- )
695- rows = await cursor .fetchall ()
696- return [dict (row ) for row in rows ]
684+ SELECT task_id, timestamp, status, output, error,
685+ duration_seconds
686+ FROM task_history
687+ WHERE task_id = ?
688+ ORDER BY timestamp DESC
689+ LIMIT ?
690+ """ ,
691+ (task_id , limit ),
692+ )
693+ else :
694+ cursor = await db .execute (
695+ """
696+ SELECT task_id, timestamp, status, output, error,
697+ duration_seconds
698+ FROM task_history
699+ ORDER BY timestamp DESC
700+ LIMIT ?
701+ """ ,
702+ (limit ,),
703+ )
704+ rows = await cursor .fetchall ()
705+ return [dict (row ) for row in rows ]
697706
698707 async def get_statistics (self ) -> dict :
699708 """
@@ -1214,7 +1223,7 @@ async def _worker_loop(self, worker_id: int) -> None:
12141223 except asyncio .CancelledError :
12151224 pass
12161225 except Exception as e :
1217- logger .error (f "Worker { worker_id } error: { e } " )
1226+ logger .error ("Worker {worker_id} error" , exc_info = True )
12181227
12191228 logger .debug (f"Worker { worker_id } stopped" )
12201229
0 commit comments