Skip to content

Commit 247217e

Browse files
Add specialized exception class and error handling
1 parent 2c43559 commit 247217e

1 file changed

Lines changed: 62 additions & 36 deletions

File tree

python/lsst/dax/ppdb/bigquery/updates/updates_manager.py

Lines changed: 62 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
from __future__ import annotations
2323

24-
__all__ = ["UpdatesManager"]
24+
__all__ = ["UpdatesManager", "UpdatesManagerError"]
2525

2626
import logging
2727
import posixpath
@@ -52,6 +52,10 @@
5252
_LOG = logging.getLogger(__name__)
5353

5454

55+
class UpdatesManagerError(Exception):
56+
"""Base exception for errors related to the updates process."""
57+
58+
5559
class UpdatesManager:
5660
"""Class responsible for managing the process of applying updates to the
5761
PPDB database, including expanding them into a generic format from JSON and
@@ -62,17 +66,13 @@ class UpdatesManager:
6266
----------
6367
config : `PpdbBigQueryConfig`
6468
Configuration for the PPDB BigQuery interface.
65-
mergers : `Sequence` [ `UpdatesMerger` ], optional
66-
Sequence of `UpdatesMerger` instances to use for merging updates into
67-
target tables. If not provided, a default set of mergers will be used.
6869
table_name_format : `str`, optional
6970
Optional format string for the target table names used by the mergers.
7071
"""
7172

7273
def __init__(
7374
self,
7475
config: PpdbBigQueryConfig,
75-
mergers: Sequence[UpdatesMerger] | None = None,
7676
table_name_format: str | None = None,
7777
) -> None:
7878
# Get some necessary setup information from the config
@@ -81,12 +81,7 @@ def __init__(
8181
bucket_name = config.bucket_name
8282

8383
# Merger instances for handling each target table
84-
if mergers and table_name_format:
85-
raise ValueError("Cannot specify both 'mergers' and 'table_name_format'")
86-
if mergers:
87-
self._mergers = mergers
88-
else:
89-
self._mergers = tuple(cls(table_name_format=table_name_format) for cls in _DEFAULT_MERGER_CLASSES)
84+
self._mergers = tuple(cls(table_name_format=table_name_format) for cls in _DEFAULT_MERGER_CLASSES)
9085

9186
# Setup the updates table interface
9287
self._bq_client = bigquery.Client()
@@ -111,24 +106,44 @@ def apply_updates(self, replica_chunks: Sequence[PpdbReplicaChunkExtended]) -> N
111106
----------
112107
replica_chunks: `Sequence` [ `PpdbReplicaChunkExtended` ]
113108
The replica chunks with the update records.
109+
110+
Raises
111+
------
112+
UpdatesManagerError
113+
If any step of the updates process fails.
114114
"""
115115
# Create the updates table, first dropping if it already exists
116-
self._updates_table.recreate()
116+
try:
117+
self._updates_table.recreate()
118+
except Exception as e:
119+
raise UpdatesManagerError("Failed to recreate updates table") from e
117120

118121
# Process the replica chunks to build the expanded updates table
119-
self._process_chunks(replica_chunks)
122+
try:
123+
self._process_chunks(replica_chunks)
124+
except Exception as e:
125+
raise UpdatesManagerError("Failed to process replica chunks") from e
120126

121127
# Get a fresh reference to the updates table to check if there were
122128
# any update records generated from the processed replica chunks
123-
bq_updates_table = self._bq_client.get_table(self._updates_table.table_fqn)
129+
try:
130+
bq_updates_table = self._bq_client.get_table(self._updates_table.table_fqn)
131+
except Exception as e:
132+
raise UpdatesManagerError(f"Failed to get updates table '{self._updates_table.table_fqn}'") from e
124133

125134
# Check if there were any updates in this set of chunks
126135
if bq_updates_table.num_rows > 0:
127136
# Select only the latest update records to a new table
128-
self._updates_table.create_latest_only()
137+
try:
138+
self._updates_table.create_latest_only()
139+
except Exception as e:
140+
raise UpdatesManagerError("Failed to create latest-only updates table") from e
129141

130142
# Merge the latest-only updates into the target tables
131-
self._merge_updates(self._updates_table.latest_only_table_fqn)
143+
try:
144+
self._merge_updates(self._updates_table.latest_only_table_fqn)
145+
except Exception as e:
146+
raise UpdatesManagerError("Failed to merge updates into target tables") from e
132147
else:
133148
# No updates were present in the processed replica chunks
134149
_LOG.info("No update records found when processing replica chunks")
@@ -140,7 +155,7 @@ def apply_updates(self, replica_chunks: Sequence[PpdbReplicaChunkExtended]) -> N
140155
def _process_chunks(self, chunks: Sequence[PpdbReplicaChunkExtended]) -> None:
141156
for chunk in chunks:
142157
if chunk.gcs_uri is None:
143-
raise ValueError(f"Replica chunk {chunk.id} does not have a GCS URI")
158+
raise UpdatesManagerError(f"Replica chunk {chunk.id} does not have a GCS URI")
144159

145160
# Parse the GCS URI
146161
parsed_uri = urllib.parse.urlparse(chunk.gcs_uri)
@@ -153,28 +168,39 @@ def _process_chunks(self, chunks: Sequence[PpdbReplicaChunkExtended]) -> None:
153168
chunk_prefix = parsed_uri.path.lstrip("/")
154169

155170
# Load the manifest file for the chunk from GCS
156-
manifest_uri = posixpath.join(chunk_prefix, Manifest.FILE_NAME)
157-
manifest_blob = bucket.blob(manifest_uri)
158-
manifest_content = manifest_blob.download_as_text()
159-
manifest = Manifest.from_json_str(manifest_content)
171+
try:
172+
manifest_uri = posixpath.join(chunk_prefix, Manifest.FILE_NAME)
173+
manifest_blob = bucket.blob(manifest_uri)
174+
manifest_content = manifest_blob.download_as_text()
175+
manifest = Manifest.from_json_str(manifest_content)
176+
except Exception as e:
177+
raise UpdatesManagerError(f"Failed to load manifest for replica chunk {chunk.id}") from e
160178

161179
# Read the update records if the chunk was flagged as having them
162180
if manifest.includes_update_records:
163-
# Get the update records file contents from the bucket
164-
object_name = posixpath.join(parsed_uri.path.lstrip("/"), UpdateRecords.FILE_NAME)
165-
blob = bucket.blob(object_name)
166-
content = blob.download_as_text()
167-
168-
# Expand the update records into the appropriate format and
169-
# insert them into the updates table
170-
update_records = UpdateRecords.from_json_string(content)
171-
expanded_update_records = UpdateRecordExpander.expand_updates(update_records)
172-
self._updates_table.insert(expanded_update_records)
181+
try:
182+
# Get the update records file contents from the bucket
183+
object_name = posixpath.join(parsed_uri.path.lstrip("/"), UpdateRecords.FILE_NAME)
184+
blob = bucket.blob(object_name)
185+
content = blob.download_as_text()
186+
187+
# Expand the update records into the appropriate format and
188+
# insert them into the updates table
189+
update_records = UpdateRecords.from_json_string(content)
190+
expanded_update_records = UpdateRecordExpander.expand_updates(update_records)
191+
self._updates_table.insert(expanded_update_records)
192+
except Exception as e:
193+
raise UpdatesManagerError(
194+
f"Failed to process update records for replica chunk {chunk.id}"
195+
) from e
173196

174197
def _merge_updates(self, target_table_fqn: str) -> None:
175198
for merger in self._mergers:
176-
merger.merge(
177-
client=self._bq_client,
178-
updates_table_fqn=target_table_fqn,
179-
target_dataset_fqn=self._target_dataset_fqn,
180-
)
199+
try:
200+
merger.merge(
201+
client=self._bq_client,
202+
updates_table_fqn=target_table_fqn,
203+
target_dataset_fqn=self._target_dataset_fqn,
204+
)
205+
except Exception as e:
206+
raise UpdatesManagerError(f"Failed to merge updates using {type(merger).__name__}") from e

0 commit comments

Comments
 (0)