Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
7958c56
Install postgres in build workflow for testing
JeremyMcCormick Mar 27, 2026
079d963
Update installed system packages in Dockerfile
JeremyMcCormick Mar 27, 2026
e1b3a08
Update project dependencies
JeremyMcCormick Mar 27, 2026
b060955
Use dax_apdb DM-54070 ticket branch
JeremyMcCormick Apr 2, 2026
a9bc154
Rename `config` module to `ppdb_config`
JeremyMcCormick Mar 27, 2026
3cb7a77
Adjust a few log levels
JeremyMcCormick Mar 27, 2026
458a0cf
Refactor test utilities
JeremyMcCormick Mar 27, 2026
3e63636
Add `query_runner` module
JeremyMcCormick Mar 27, 2026
41ca0b7
Update `Manifest` class with new file name and utility methods
JeremyMcCormick Mar 27, 2026
193d766
Add support for Google Secrets Maanger
JeremyMcCormick Mar 27, 2026
5cdc3a8
Move test schema to resources directory
JeremyMcCormick Mar 27, 2026
d2cf968
Rename test module
JeremyMcCormick Mar 27, 2026
1e7bead
Add package for handling APDB update records in BigQuery
JeremyMcCormick Apr 2, 2026
c7ba79a
Add utility module for generating test update records
JeremyMcCormick Mar 27, 2026
ce0aee8
Update model for managing extended repica chunk
JeremyMcCormick Mar 27, 2026
7ec55c7
Add module for handling the replica chunk promotion process
JeremyMcCormick Apr 2, 2026
8c00ffa
Integrate support for handling replica chunk updates
JeremyMcCormick Apr 2, 2026
4298c3e
Add module with miscellaneous test utilities for BigQuery
JeremyMcCormick Mar 27, 2026
60ddaa4
Add tests of APDB update record handling
JeremyMcCormick Mar 27, 2026
7ccf56f
Rename methods and variables related to getting latest updates only
JeremyMcCormick Apr 6, 2026
da74885
Make all fields in updates table required
JeremyMcCormick Apr 7, 2026
d498a5b
Skip updates of type close_diaobject_validity when nDiaSources is None
JeremyMcCormick Apr 7, 2026
b7648ca
Add specialized exception class and error handling
JeremyMcCormick Apr 7, 2026
9709ec3
Move test of chunk uploader to its own test module
JeremyMcCormick Apr 7, 2026
1b9196a
Serialize the update records to parquet instead of JSON
JeremyMcCormick Apr 7, 2026
3e42518
Add specialized exception type and catch errors during promotion
JeremyMcCormick Apr 7, 2026
3e29097
Store update count in the chunk manifest
JeremyMcCormick Apr 8, 2026
2a2a1d9
Make ignore of egg-info more generic
JeremyMcCormick Apr 8, 2026
7454dd8
Add update count field to the `PpdbReplicaChunk` table
JeremyMcCormick Apr 8, 2026
8c55611
Adjust some log levels and messages for replication
JeremyMcCormick Apr 8, 2026
7d7fde1
Add job stat logging to the updates manager
JeremyMcCormick Apr 10, 2026
e632600
Remove unused variable
JeremyMcCormick Apr 14, 2026
876b635
Remove unneeded check on nDiaSources being None
JeremyMcCormick Apr 14, 2026
952ae0e
Fix a few minor docstring issues
JeremyMcCormick Apr 14, 2026
baba54f
Use dax_apdb main branch
JeremyMcCormick Apr 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,16 @@ jobs:
mamba install -y -q pip wheel
pip install uv

- name: Install Postgres for testing
shell: bash -l {0}
run: |
mamba install -y -q postgresql

- name: Install dependencies
shell: bash -l {0}
run: |
uv pip install -r requirements.txt

# We have two cores so we can speed up the testing with xdist
- name: Install pytest packages
shell: bash -l {0}
run: |
Expand Down
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ bin/

# pip
build/
python/lsst_dax_ppdb.egg-info/
python/lsst_dax_ppdb.dist-info/
*.dist-info/
*.egg-info/

# Pytest
tests/.tests
Expand Down
13 changes: 8 additions & 5 deletions docker/Dockerfile.replication
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ FROM python:3.12-slim-bookworm
ENV DEBIAN_FRONTEND=noninteractive

# Update and install OS dependencies
RUN apt-get -y update && \
apt-get -y upgrade && \
apt-get -y install --no-install-recommends git && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
build-essential \
python3-dev \
pkg-config \
git \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

# Install required python build dependencies
RUN pip install --upgrade --no-cache-dir pip setuptools wheel uv
Expand Down
11 changes: 7 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ classifiers = [
keywords = ["lsst"]
dependencies = [
"astropy",
"google-cloud-bigquery",
"pyarrow",
"pydantic >=2,<3",
"pyyaml >= 5.1",
"sqlalchemy",
"lsst-dax-ppdbx-gcp",
"lsst-felis",
"lsst-sdm-schemas",
"lsst-utils",
Expand All @@ -44,9 +46,6 @@ test = [
"pytest >= 3.2",
"pytest-openfiles >= 0.5.0"
]
gcp = [
"lsst-dax-ppdbx-gcp"
]

[tool.setuptools.packages.find]
where = ["python"]
Expand All @@ -55,7 +54,11 @@ where = ["python"]
zip-safe = true

[tool.setuptools.package-data]
"lsst.dax.ppdb" = ["py.typed"]
"lsst.dax.ppdb" = [
"py.typed",
"resources/config/schemas/*.yaml",
"resources/config/sql/*.sql",
]

[tool.setuptools.dynamic]
version = { attr = "lsst_versions.get_lsst_version" }
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/dax/ppdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from .config import *
from .ppdb_config import *
from .ppdb import *
from .replicator import *
from .version import * # Generated by sconsUtils
2 changes: 1 addition & 1 deletion python/lsst/dax/ppdb/_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from .config import PpdbConfig
from .ppdb import Ppdb
from .ppdb_config import PpdbConfig


def config_type_for_name(type_name: str) -> type[PpdbConfig]:
Expand Down
11 changes: 8 additions & 3 deletions python/lsst/dax/ppdb/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from .manifest import Manifest
from .ppdb_bigquery import PpdbBigQuery, PpdbBigQueryConfig
from .ppdb_replica_chunk_extended import ChunkStatus, PpdbReplicaChunkExtended
from .chunk_promoter import *
from .chunk_uploader import *
from .manifest import *
from .ppdb_bigquery import *
from .ppdb_replica_chunk_extended import *
from .query_runner import *
from .sql_resource import *
from .table_refs import *
247 changes: 247 additions & 0 deletions python/lsst/dax/ppdb/bigquery/chunk_promoter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
# This file is part of dax_ppdbx_gcp
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = [
"ChunkPromoter",
"NoPromotableChunksError",
]

import logging

from google.api_core.exceptions import NotFound
from google.cloud import bigquery

from lsst.dax.apdb import ApdbTables

from .ppdb_bigquery import PpdbBigQuery, PpdbBigQueryConfig
from .query_runner import QueryRunner
from .table_refs import TableRefs
from .updates.updates_manager import UpdatesManager


class NoPromotableChunksError(Exception):
"""Exception raised when there are no promotable chunks available.

This is not really an error condition, but it is useful for managing the
control flow of the promotion process when there are no chunks to promote.
"""


class ChunkPromoterError(Exception):
"""Base exception for errors related to the chunk promotion process."""


class ChunkPromoter:
"""Class to promote replica chunks in BigQuery.

Parameters
----------
ppdb : `PpdbBigQuery`
Interface to the PPDB in BigQuery.
table_names : `list`[`str`], optional
List of table names to promote or if None a default list will be used.
"""

_DEFAULT_TABLE_NAMES = [
ApdbTables.DiaObject.value,
ApdbTables.DiaSource.value,
ApdbTables.DiaForcedSource.value,
]

def __init__(
self,
ppdb: PpdbBigQuery,
table_names: list[str] | None = None,
):
self._ppdb = ppdb
self._runner = QueryRunner(self.config.project_id, self.config.dataset_id)
self._table_names = table_names if table_names is not None else self._DEFAULT_TABLE_NAMES
self._bq_client = bigquery.Client(project=self.config.project_id)

self._table_refs = TableRefs(
project_id=self.config.project_id,
dataset_id=self.config.dataset_id,
table_names=tuple(self._table_names),
)

# Build a mapping of phases to run during the promotion process, not
# including cleanup, which is executed separately
_phase_methods = [
self._copy_to_promoted_tmp,
self._apply_record_updates,
self._promote_tmp_to_prod,
self._delete_staged_chunks,
self._mark_chunks_promoted,
]
self._phases = {m.__name__.lstrip("_"): m for m in _phase_methods}

self._promotable_chunks: list[int] = []

@property
def config(self) -> PpdbBigQueryConfig:
"""Config associated with this instance (`PpdbBigQueryConfig`)."""
return self._ppdb.config

@property
def promotable_chunks(self) -> list[int]:
"""List of promotable chunks (`list` [ `int` ], read-only)."""
return self._promotable_chunks

@property
def table_refs(self) -> TableRefs:
"""Table references (`TableRefs`, read-only)."""
return self._table_refs

def _copy_to_promoted_tmp(self) -> None:
"""Build ``_{table_name}_promoted_tmp`` efficiently by cloning prod and
inserting only staged rows for the given replica chunk IDs.
"""
job_cfg = bigquery.QueryJobConfig(
query_parameters=[bigquery.ArrayQueryParameter("ids", "INT64", self.promotable_chunks)]
)

for prod_ref, tmp_ref, stage_ref in zip(
self._table_refs.prod, self._table_refs.promoted_tmp, self._table_refs.staging, strict=True
):
# Drop any existing tmp table (should not exist but just to be
# safe)
self._runner.run_job("drop_tmp", f"DROP TABLE IF EXISTS `{tmp_ref}`")

# Clone prod table structure and data (zero-copy)
self._runner.run_job("clone_prod", f"CREATE TABLE `{tmp_ref}` CLONE `{prod_ref}`")

# Build ordered target list from the cloned tmp schema
tmp_schema = self._bq_client.get_table(tmp_ref).schema
target_names = [f.name for f in tmp_schema if f.name != "apdb_replica_chunk"]
target_list_sql = ", ".join(f"`{n}`" for n in target_names)

# Build source list, handling geo_point conversion
source_list_sql = ", ".join(
"ST_GEOGPOINT(s.`ra`, s.`dec`)" if n == "geo_point" else f"s.`{n}`" for n in target_names
)

# Insert staged rows into tmp, excluding apdb_replica_chunk column
sql = f"""
INSERT INTO `{tmp_ref}` ({target_list_sql})
SELECT {source_list_sql}
FROM `{stage_ref}` AS s
WHERE s.apdb_replica_chunk IN UNNEST(@ids)
"""
logging.debug("SQL for inserting staged rows into %s: %s", tmp_ref, sql)
self._runner.run_job("insert_staged_to_tmp", sql, job_config=job_cfg)

def _promote_tmp_to_prod(self) -> None:
"""Swap each prod table with its corresponding *_promoted_tmp by
replacing prod contents in a single atomic copy job. This preserves
schema, partitioning, and clustering with zero-copy when in the same
dataset.
"""
for prod_ref, tmp_ref in zip(self._table_refs.prod, self._table_refs.promoted_tmp, strict=True):
# Ensure tmp exists
try:
self._bq_client.get_table(tmp_ref)
except NotFound as e:
raise RuntimeError(f"Missing tmp table for promotion: {tmp_ref}") from e

# Atomic zero-copy replacement of prod with tmp
copy_cfg = bigquery.CopyJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE)
job = self._bq_client.copy_table(
tmp_ref, prod_ref, job_config=copy_cfg, location=self._runner.location
)
job.result()
QueryRunner.log_job(job, "promote_tmp_to_prod")

def _cleanup(self) -> None:
"""Drop the promotion temporary tables."""
for tmp_ref in self._table_refs.promoted_tmp:
self._bq_client.delete_table(tmp_ref, not_found_ok=True)
logging.debug("Dropped %s (if it existed)", tmp_ref)

def _delete_staged_chunks(self) -> None:
"""Delete only rows for the promoted replica chunk IDs from each
staging table.
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[bigquery.ArrayQueryParameter("ids", "INT64", self.promotable_chunks)]
)

for staging_ref in self._table_refs.staging:
try:
sql = f"DELETE FROM `{staging_ref}` WHERE apdb_replica_chunk IN UNNEST(@ids)"
self._runner.run_job("delete_staged_chunks", sql, job_config=job_config)
logging.debug(
"Deleted %d chunk(s) from staging table %s", len(self.promotable_chunks), staging_ref
)
except NotFound:
logging.warning("Staging table %s does not exist, skipping delete", staging_ref)

def _apply_record_updates(self) -> None:
"""Apply record updates to the promoted temporary tables."""
updates_manager = UpdatesManager(
self._ppdb.config,
table_name_format="_{}_promoted_tmp", # FIXME: Should use the table refs formatting instead.
)

# Get the replica chunks and the total update count.
replica_chunks = self._ppdb.get_replica_chunks_ext_by_ids(self.promotable_chunks)

# Apply the updates for the chunks. The manager will skip the process
# entirely if there are no updates, so we don't need to check that
# here.
updates_manager.apply_updates(replica_chunks)

def _mark_chunks_promoted(self) -> None:
"""Mark the replica chunks as promoted in the database."""
self._ppdb.mark_chunks_promoted(self.promotable_chunks)

# TODO: It would be preferable if this method received a list of
# `PpdbReplicaChunkExtended` objects rather than integer IDs. This could
# be easily provided with the `PpdbBigquery` interface.
def promote_chunks(self, chunks: list[int]) -> None:
"""Promote APDB replica chunks into production by executing a series of
phases.
"""
if not chunks:
raise NoPromotableChunksError("No promotable chunks provided for promotion")

logging.info("Starting promotion of %d chunk(s): %s", len(chunks), chunks)

# Set the list of promotable chunks for use in the promotion phases.
self._promotable_chunks = chunks

# Execute the promotion phases in order.
try:
for name, phase in self._phases.items():
logging.debug("Starting phase: %s", name)
phase()
logging.debug("Completed phase: %s", name)
except Exception as e:
raise ChunkPromoterError("Chunk promotion failed") from e
finally:
# Always execute the cleanup, even if there were errors.
try:
self._cleanup()
except Exception:
logging.exception("Cleanup of chunk promotion failed")

logging.info("Completed promotion of %d chunk(s)", len(chunks))
Loading
Loading