Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
7572a26
added a nice shiny cloudbuild file to deploy the loady
jamesbryer Apr 23, 2026
b97ad7a
tweak
jamesbryer Apr 24, 2026
4ad12e8
tweak
jamesbryer Apr 24, 2026
05699d8
tweak
jamesbryer Apr 24, 2026
4304a57
tweak
jamesbryer Apr 24, 2026
2ac238a
tweak
jamesbryer Apr 24, 2026
bee54f0
tweak
jamesbryer Apr 28, 2026
2d4d94c
tweak
jamesbryer Apr 28, 2026
c2c11f2
tweak
jamesbryer Apr 28, 2026
f289f66
tweak
jamesbryer Apr 28, 2026
2f7d14e
Merge branch 'refs/heads/main' into DFTS-883-performance-testing-loader
jamesbryer Apr 28, 2026
baf6561
Merge branch 'main' into DFTS-883-performance-testing-loader
angusgoody Apr 28, 2026
16c6f98
update cloudbuild.yaml
jamesbryer Apr 29, 2026
bda92dc
fix the bad error
jamesbryer Apr 29, 2026
164d854
set timeout to 1 hour
jamesbryer Apr 29, 2026
4091f68
add script
jamesbryer Apr 29, 2026
0e70498
add script
jamesbryer Apr 29, 2026
3029e51
add script
jamesbryer Apr 29, 2026
28110ae
scale down resources
jamesbryer Apr 29, 2026
54b2057
Merge branch 'main' into DFTS-883-performance-testing-loader
jamesbryer Apr 29, 2026
5fe30ca
docker-tweaks
angusgoody Apr 29, 2026
2cb6cb4
enum tweaks
angusgoody Apr 29, 2026
cab2978
tweak
angusgoody Apr 29, 2026
9db2e04
scale down resources
jamesbryer Apr 29, 2026
8528e3d
attempting to use BulkWriter
angusgoody Apr 29, 2026
caf9eb8
tweaks
angusgoody Apr 29, 2026
9b65632
another tweak
angusgoody Apr 29, 2026
4e31f21
tweaks
angusgoody Apr 30, 2026
9bf29b1
tweak
angusgoody Apr 30, 2026
dc9915a
tweaks
angusgoody Apr 30, 2026
31ec6f8
add env var
jamesbryer Apr 30, 2026
de92bc4
new common variant
jamesbryer May 5, 2026
98c4fec
tweAk
jamesbryer May 5, 2026
560fb15
tweAky2.0
jamesbryer May 5, 2026
4ea80be
tweak
angusgoody May 5, 2026
e2181de
Merge remote-tracking branch 'origin/DFTS-883-performance-testing-loa…
angusgoody May 5, 2026
2e67454
tweAky3.0
jamesbryer May 6, 2026
de5ac2e
try with new permission
jamesbryer May 6, 2026
73ba200
try with new permission 2.0
jamesbryer May 6, 2026
3ed533d
try with new permission 3.0
jamesbryer May 6, 2026
95f9cf1
fix (tests broken)
angusgoody May 7, 2026
ccd46e0
tweak
angusgoody May 7, 2026
dcfe74b
fixed tests
angusgoody May 7, 2026
e7118ad
tweaks
angusgoody May 7, 2026
ab0eff2
tweaks
angusgoody May 12, 2026
2cf6e60
tweak
angusgoody May 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Use a Python image with uv pre-installed
FROM ghcr.io/astral-sh/uv:python3.13-alpine
FROM ghcr.io/astral-sh/uv:python3.13-bookworm-slim

# Install the project into `/app`
WORKDIR /app
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# sds-loader

![Version](https://ons-badges-752336435892.europe-west2.run.app/api/badge/custom?left=Python&right=3.13)

A microservice for loading and modifying data into SDS


Expand Down Expand Up @@ -108,4 +110,7 @@ docker run \
spine3/firebase-emulator &
```

## Performance testing

The directory `performance_tests` contains a script for generating large datasets, that can be used to performance test the application

2 changes: 0 additions & 2 deletions TODO.md

This file was deleted.

4 changes: 2 additions & 2 deletions app/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ def build_pubsub_broadcaster() -> PubsubBroadcaster:
container[DatasetDeletionRepositoryInterface] = FirestoreDatasetDeletionRepository
container[DatasetBroadcastInterface] = PubsubBroadcaster
container[SchemaService] = SchemaService(
bucket_publisher=GcsSchemaPublisher,
repository_publisher=GithubSchemaPublisher,
bucket_publisher=GcsSchemaPublisher(),
repository_publisher=GithubSchemaPublisher(),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def mark_record_status(self, guid: Guid, status: DeleteStatus) -> None:
document_reference.update({"status": status.value})

# If the status is deleted, then mark a timestamp
if status == DeleteStatus.DELETED:
if status == "deleted":
utc_dt = datetime.now(timezone.utc) # UTC time
dt = utc_dt.astimezone() # local time
timestamp = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
Expand All @@ -74,7 +74,7 @@ def mark_record_status(self, guid: Guid, status: DeleteStatus) -> None:

def get_dataset_to_delete(self) -> Guid | None:
# First, try to fetch a PROCESSING dataset
processing = self.mark_deletion_collection.where("status", "==", DeleteStatus.PROCESSING.value).limit(1).stream()
processing = self.mark_deletion_collection.where("status", "==", "processing").limit(1).stream()

processing_list = list(processing)

Expand All @@ -98,7 +98,7 @@ def get_dataset_to_delete(self) -> Guid | None:
return guid

# If no "Processing" results found, fetch a PENDING dataset
pending = self.mark_deletion_collection.where("status", "==", DeleteStatus.PENDING.value).limit(1).stream()
pending = self.mark_deletion_collection.where("status", "==", "pending").limit(1).stream()

pending_list = list(pending)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Protocol

from google.cloud import firestore
from google.cloud.firestore_v1.bulk_writer import BulkWriterOptions

from app import get_logger
from app.interfaces.dataset_storage_repository_interface import DatasetStorageRepositoryInterface
Expand All @@ -13,7 +14,6 @@
class FirestoreSettings(Protocol):
project_id: str
firestore_database: str
should_batch: bool


class FirestoreDatasetStorageRepository(DatasetStorageRepositoryInterface):
Expand All @@ -35,10 +35,6 @@ def __init__(self, settings: FirestoreSettings):
f"Connected to Firestore with project_id: {settings.project_id} and database: {settings.firestore_database}"
)

# Max size in bytes we can upload in one batch
self.MAX_BATCH_SIZE_BYTES = 9 * 1024 * 1024
self.MAX_NUMBER_OF_WRITES_PER_BATCH = 500

# Initialize Firestore collections
self.dataset_collection = self.client.collection("datasets")

Expand Down Expand Up @@ -88,107 +84,53 @@ def store_dataset(
unit_data_collection_with_metadata: list[UnitDataset],
unit_data_identifiers: list[str],
):
# Write to firebase in batches or not depends on the settings

if self.settings.should_batch:
logger.info("Writing to Firestore in BATCH mode")
self._store_dataset_with_batching(
dataset_id=dataset_id,
dataset_metadata=dataset_metadata,
unit_data_collection_with_metadata=unit_data_collection_with_metadata,
unit_data_identifiers=unit_data_identifiers,
)
else:
logger.info("Writing to Firestore in NORMAL mode")
self._store_dataset_without_batching(
dataset_id=dataset_id,
dataset_metadata=dataset_metadata,
unit_data_collection_with_metadata=unit_data_collection_with_metadata,
unit_data_identifiers=unit_data_identifiers,
)

def _store_dataset_without_batching(
self,
dataset_id: Guid,
dataset_metadata: DatasetMetadataWithoutId,
unit_data_collection_with_metadata: list[UnitDataset],
unit_data_identifiers: list[str],
):
"""
Write the data to firestore without batching
"""
# Create a new document for this dataset
new_dataset_document = self.dataset_collection.document(dataset_id)

# Store the core data first
new_dataset_document.set(dataset_metadata.model_dump(), merge=True)

# Create a new collection for the units
units_collection = new_dataset_document.collection("units")

# Go through unit data
for unit_data, unit_identifier in zip(unit_data_collection_with_metadata, unit_data_identifiers):
# Create and save the unit data as a new sub document
units_collection.document(unit_identifier).set(unit_data.model_dump())
logger.info("Writing to Firestore in BATCH mode")
self._store_dataset_with_bulk_writer(
dataset_id=dataset_id,
dataset_metadata=dataset_metadata,
unit_data_collection_with_metadata=unit_data_collection_with_metadata,
unit_data_identifiers=unit_data_identifiers,
)

def _store_dataset_with_batching(
def _store_dataset_with_bulk_writer(
self,
dataset_id: Guid,
dataset_metadata: DatasetMetadataWithoutId,
unit_data_collection_with_metadata: list[UnitDataset],
unit_data_identifiers: list[str],
):
"""
Use batches to write the data to firestore
"""

# Create a new document for this dataset
new_dataset_document = self.dataset_collection.document(dataset_id)

# Store the core data first
new_dataset_document.set(dataset_metadata.model_dump(), merge=True)

# Create a new collection for the units
units_collection = new_dataset_document.collection("units")

# Initialise a batch
batch = self.client.batch()
batch_size_bytes = 0
batch_num_records = 0

# Go through unit data
for unit_data, unit_identifier in zip(unit_data_collection_with_metadata, unit_data_identifiers):
"""
Add this unit to the current batch if ...

1. adding it does not exceed the batch size limits
2. adding it does not exceed the batch record limits
"""
bulk_writer = self.client.bulk_writer(
options=BulkWriterOptions(
max_ops_per_second=2500
)
)

# Work out the size of this unit
unit_size = len(unit_data.model_dump_json().encode("utf-8"))
try:
# Include metadata in bulk
bulk_writer.set(
new_dataset_document,
dataset_metadata.model_dump(),
merge=False,
)

# Work out if the current batch is too big already
if (batch_size_bytes + unit_size >= self.MAX_BATCH_SIZE_BYTES) or (
batch_num_records + 1 >= self.MAX_NUMBER_OF_WRITES_PER_BATCH
for unit, unit_identifier in zip(
unit_data_collection_with_metadata,
unit_data_identifiers,
):
# Commit the current batch
batch.commit()

# Start a new batch
batch = self.client.batch()
batch_size_bytes = 0
batch_num_records = 0
bulk_writer.set(
units_collection.document(unit_identifier),
unit.model_dump(),
merge=False,
)

# Add the unit to the new batch
new_unit = units_collection.document(unit_identifier)
batch.set(new_unit, unit_data.model_dump(), merge=True)
batch_size_bytes += unit_size
batch_num_records += 1
bulk_writer.flush()

# If we never exceeded batch limit we still need to commit
if batch_size_bytes > 0:
batch.commit()
finally:
bulk_writer.close()

def delete_dataset_version(self, survey_id: str, period_id: str, version: int):
dataset_metadata = self._get_dataset_metadata(survey_id, period_id, version)
Expand All @@ -206,7 +148,7 @@ def delete_dataset_by_guid(self, guid: Guid):

# Delete each collection
for collection in collections:
collection.recursive_delete()
self.client.recursive_delete(collection)

# Delete the dataset itself
self.dataset_collection.document(guid).delete()
21 changes: 16 additions & 5 deletions app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from fastapi import APIRouter, Request
from fastapi.params import Query
from lagom.integrations.fast_api import FastApiIntegration
from sdx_base.models.pubsub import get_message, Message, get_data
from sdx_base.models.pubsub import get_message, Message
from starlette.responses import JSONResponse

from app import get_logger
Expand All @@ -12,6 +12,7 @@
from app.services.dataset_service import DatasetService
from app.services.schema_service import SchemaService
from app.settings import get_instance
from app.util.file_getters import get_file_path_from_bucket_notification, get_file_paths_from_github_notification

logger = get_logger()
router = APIRouter()
Expand Down Expand Up @@ -47,27 +48,35 @@ async def version():
async def publish_schemas(
request: Request,
source: Annotated[
Literal["github", "bucket"], Query(description="The source of the files specified in this request.")
Literal["github", "bucket"], Query(description="The source of the file specified in this request.")
] = "github",
schema_service: SchemaService = DEPS.depends(SchemaService),
):
"""
This endpoint handles a publishing schemas from a given
location.
This endpoint handles a publishing schemas from a given location

- If the source is "bucket", this will always be a single file (the one added to the bucket)
- The body of the message Must contain a "name" field in the json payload, this specifies the name of the file in the bucket
- If the source is "github", this could be multiple files (the new additions to the repo)
- The body of the message must contain a comma separated list of file names
"""

try:
# Fetch the message from pubsub
message: Message = await get_message(request)
except Exception as e:
logger.exception("Exception fetching message from request")
return JSONResponse(
status_code=500,
content={"success": False, "message": "Invalid message body received: " + str(e)},
)

try:
# Publish the new schemas
schema_service.publish_new_schemas(source=source, file_list=get_data(message).split("\n"))
schema_service.publish_new_schemas(
source=source,
file_list=[get_file_path_from_bucket_notification(message)] if source.lower() == "bucket" else get_file_paths_from_github_notification(message),
)
except NonCriticalException as e:
# Return a status 200 (non-critical exception)
return JSONResponse(
Expand All @@ -76,6 +85,8 @@ async def publish_schemas(
)

except (SchemaException, Exception) as e:
logger.exception("Exception publishing schemas")

return JSONResponse(
status_code=500,
content={"success": False, "message": "Exception publishing schema: " + str(e)},
Expand Down
4 changes: 2 additions & 2 deletions app/services/schema_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ def _publish_single_file(self, file_name: str, publisher: PublisherProtocol): #
:param publisher: publisher - the publishing protocol to use to publish the file
"""
try:
publisher.publish_schema(file_name)
publisher.publish_schema(file_name=file_name)
logger.info(f"Successfully published schema: {file_name}")
except Exception as e:
logger.error(f"Failed to publish schema {file_name}: {e}")
logger.exception(f"Failed to publish schema {file_name}: {e}")

def _filter_github_files(self, files: list[str]) -> list[str]: # noqa
"""
Expand Down
4 changes: 1 addition & 3 deletions app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class Settings(AppSettings):
project_id: The GCP project ID
autodelete_dataset: Whether to automatically delete datasets from the source repo (bucket) after publishing
retain_old_dataset: Whether to retain old versions of an updated dataset in the target repo (firestore)
should_batch: Whether to batch write data to firestore to avoid memory limits
dataset_bucket_name: The bucket name to pick up datasets from
firestore_database: The Firestore database to publish datasets to
publish_dataset_topic_id: The Pub/Sub topic ID to publish dataset updates to
Expand All @@ -34,8 +33,7 @@ class Settings(AppSettings):

project_id: str = "ons-sds-sandbox"
autodelete_dataset: bool = True
retain_old_dataset: bool = True
should_batch: bool = True
retain_old_datasets: bool = True
dataset_bucket_name: str
firestore_database: str
publish_dataset_topic_id: str
Expand Down
Empty file added app/util/__init__.py
Empty file.
24 changes: 24 additions & 0 deletions app/util/file_getters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import json

from sdx_base.models.pubsub import get_data

from app import get_logger

logger = get_logger()


def get_file_path_from_bucket_notification(message) -> str:
"""
Extract the file name from the bucket notification message
"""
raw_data = get_data(message)
raw_dict = json.loads(raw_data)
logger.debug("Bucket notification message: ", raw_dict)
return raw_dict["name"]


def get_file_paths_from_github_notification(message) -> list[str]:
"""
Extract the file names from the github notification message
"""
return get_data(message).split("\n")
Loading
Loading