11import asyncio
22import os
33import json
4+ import time
5+ from functools import wraps
6+ from typing import Callable , TypeVar
47from sqlmodel import create_engine , Session , select , update , delete , text
5- from sqlalchemy import func
6- from sqlalchemy .engine import Connection
8+ from sqlalchemy import func , event
9+ from sqlalchemy .engine import Connection , Engine
10+ from sqlite3 import OperationalError as SQLiteOperationalError
711from datetime import datetime , timezone , timedelta
812from loguru import logger
913from redis import Redis # type: ignore
2529ENGINE = create_engine (
2630 os .getenv ("DATABASE_URL" , "sqlite:///data/db.sqlite" ),
2731 echo = bool (os .getenv ("DEBUG" , False )),
28- connect_args = {"check_same_thread" : False },
32+ connect_args = {"check_same_thread" : False , "timeout" : 30 },
2933)
3034
35+
36+ @event .listens_for (Engine , "connect" )
37+ def set_sqlite_pragma (dbapi_connection , connection_record ): # type: ignore[no-untyped-def]
38+ """Configure SQLite for better concurrency."""
39+ cursor = dbapi_connection .cursor ()
40+ cursor .execute ("PRAGMA journal_mode=WAL" )
41+ cursor .execute ("PRAGMA busy_timeout=30000" )
42+ cursor .close ()
43+
44+
45+ T = TypeVar ("T" )
46+
47+
48+ def with_db_retry (
49+ max_retries : int = 3 ,
50+ base_delay : float = 0.1 ,
51+ max_delay : float = 2.0 ,
52+ ) -> Callable [[Callable [..., T ]], Callable [..., T ]]:
53+ """Decorator to retry database operations on lock errors with exponential backoff."""
54+
55+ def decorator (func : Callable [..., T ]) -> Callable [..., T ]:
56+ @wraps (func )
57+ def wrapper (* args , ** kwargs ) -> T : # type: ignore[no-untyped-def]
58+ last_exception = None
59+ for attempt in range (max_retries ):
60+ try :
61+ return func (* args , ** kwargs )
62+ except Exception as e :
63+ # Check if it's a database locked error
64+ if "database is locked" in str (e ) or isinstance (
65+ e .__cause__ , SQLiteOperationalError
66+ ):
67+ last_exception = e
68+ delay = min (base_delay * (2 ** attempt ), max_delay )
69+ logger .warning (
70+ f"Database locked on attempt { attempt + 1 } /{ max_retries } "
71+ f"for { func .__name__ } , retrying in { delay :.2f} s"
72+ )
73+ time .sleep (delay )
74+ else :
75+ raise
76+ logger .error (
77+ f"Database still locked after { max_retries } retries for { func .__name__ } "
78+ )
79+ raise last_exception # type: ignore[misc]
80+
81+ return wrapper
82+
83+ return decorator
84+
85+
3186redis_conn = Redis .from_url (os .getenv ("REDIS_URL" , "redis://localhost:6379" ))
3287low_queue = Queue ("low" , connection = redis_conn , default_timeout = 60 )
3388medium_queue = Queue ("medium" , connection = redis_conn , default_timeout = 60 )
@@ -53,6 +108,7 @@ def compute_article_embedding(article_id: int) -> None:
53108 logger .error (f"Error computing embedding for article { article_id } : { e } " )
54109
55110
111+ @with_db_retry (max_retries = 3 , base_delay = 0.1 , max_delay = 1.0 )
56112def recompute_user_clusters (user_id : str ) -> None :
57113 with Session (ENGINE ) as session :
58114 user = session .get (User , user_id )
@@ -286,6 +342,7 @@ def run_full_maintenance() -> dict:
286342BATCH_SIZE = int (os .getenv ("FEED_FETCH_BATCH_SIZE" , "10" ))
287343
288344
345+ @with_db_retry (max_retries = 3 , base_delay = 0.2 , max_delay = 2.0 )
289346def fetch_feed_batch (feed_ids : list [int ]) -> None :
290347 async def fetch_single_feed (feed : Feed ) -> Feed | None :
291348 try :
@@ -382,6 +439,7 @@ def fetch_all_feeds() -> None:
382439 )
383440
384441
442+ @with_db_retry (max_retries = 5 , base_delay = 0.1 , max_delay = 2.0 )
385443def log_user_action (user_id : str , article_id : int , link_url : str ) -> None :
386444 with Session (ENGINE ) as session :
387445 user = session .exec (select (User ).where (User .id == user_id )).first ()
0 commit comments