Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion diracx-db/src/diracx/db/os/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,17 @@ class BaseOSDB(metaclass=ABCMeta):
@abstractmethod
def index_name(self, vo: str, doc_id: int) -> str: ...

def __init__(self, connection_kwargs: dict[str, Any]) -> None:
def __init__(
self, connection_kwargs: dict[str, Any], *, global_prefix: str = ""
) -> None:
self._client: AsyncOpenSearch | None = None
self._connection_kwargs = connection_kwargs
# We use a ContextVar to make sure that self._conn
# is specific to each context, and avoid parallel
# route executions to overlap
self._conn: ContextVar[bool] = ContextVar("_conn", default=False)
if global_prefix:
self.index_prefix = f"{global_prefix}_{self.index_prefix}"

@classmethod
def available_implementations(cls, db_name: str) -> list[type[BaseOSDB]]:
Expand Down
5 changes: 4 additions & 1 deletion diracx-routers/src/diracx/routers/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,14 @@ def create_app_inner(
raise Exception("No SQL database could be initialized, aborting")

# Add the OpenSearch DBs to the application
os_global_prefix = os.environ.get("DIRACX_FACTORY_OS_GLOBAL_PREFIX", "")
available_os_db_classes: set[type[BaseOSDB]] = set()
for db_name, connection_kwargs in os_database_conn_kwargs.items():
os_db_classes = BaseOSDB.available_implementations(db_name)
# The first DB is the highest priority one
os_db = os_db_classes[0](connection_kwargs=connection_kwargs)
os_db = os_db_classes[0](
connection_kwargs=connection_kwargs, global_prefix=os_global_prefix
)
app.lifetime_functions.append(os_db.client_context)
# Add overrides for all the DB classes, including those from extensions
# This means vanilla DiracX routers get an instance of the extension's DB
Expand Down
5 changes: 4 additions & 1 deletion diracx-tasks/src/diracx/tasks/plumbing/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,12 @@ async def setup_dependency_overrides(
)

# --- OS databases ---
os_global_prefix = os.environ.get("DIRACX_FACTORY_OS_GLOBAL_PREFIX", "")
for db_name, conn_kwargs in BaseOSDB.available_urls().items():
os_db_classes = BaseOSDB.available_implementations(db_name)
os_db = os_db_classes[0](connection_kwargs=conn_kwargs)
os_db = os_db_classes[0](
connection_kwargs=conn_kwargs, global_prefix=os_global_prefix
)
await stack.enter_async_context(os_db.client_context())
for os_db_class in os_db_classes:
overrides[os_db_class.session] = partial(_db_context, os_db)
Expand Down
4 changes: 3 additions & 1 deletion diracx-testing/src/diracx/testing/mock_osdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ class JobParametersDB(MockOSDBMixin, JobParametersDB):
JobParametersDB = type("JobParametersDB", (MockOSDBMixin, JobParametersDB), {})
"""

def __init__(self, connection_kwargs: dict[str, Any]) -> None:
def __init__(
self, connection_kwargs: dict[str, Any], global_prefix: str = ""
) -> None:
from sqlalchemy import JSON, Column, DateTime, Integer, MetaData, String, Table

# Dynamically create a subclass of BaseSQLDB so we get clearer errors
Expand Down
Empty file added toto
Empty file.
Loading