From 2ef053f5337bca65b0dcccc3dee93b9c77fccc64 Mon Sep 17 00:00:00 2001 From: Angus Date: Fri, 10 Apr 2026 12:45:26 +0100 Subject: [PATCH 1/5] tweak --- TODO.md | 9 +-------- pyproject.toml | 1 + uv.lock | 2 ++ 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/TODO.md b/TODO.md index 70dd2e9..06382ef 100644 --- a/TODO.md +++ b/TODO.md @@ -1,9 +1,2 @@ -## Now - -- Add better tx_id getter - -## Future improvements - -- Add async to all firestore repos -- Multi files at once +- Integration tests diff --git a/pyproject.toml b/pyproject.toml index f754fe2..07f514a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ dependencies = [ "pydantic-settings>=2.13.1", "fastapi>=0.119.0", "starlette>=0.48.0", + "requests>=2.32.3", ] [[tool.uv.index]] diff --git a/uv.lock b/uv.lock index 119a49f..7284b4d 100644 --- a/uv.lock +++ b/uv.lock @@ -1335,6 +1335,7 @@ dependencies = [ { name = "lagom" }, { name = "polyfactory" }, { name = "pydantic-settings" }, + { name = "requests" }, { name = "sds-common" }, { name = "sdx-base" }, { name = "starlette" }, @@ -1360,6 +1361,7 @@ requires-dist = [ { name = "lagom", specifier = ">=2.7.7" }, { name = "polyfactory", specifier = ">=3.3.0" }, { name = "pydantic-settings", specifier = ">=2.13.1" }, + { name = "requests", specifier = ">=2.32.3" }, { name = "sds-common", specifier = "==1.0.17" }, { name = "sdx-base", specifier = ">=0.2.9", index = "https://europe-west2-python.pkg.dev/ons-sdx-ci/sdx-python-packages/simple/" }, { name = "starlette", specifier = ">=0.48.0" }, From 6971a26a7f77c2acc748723d3af0d8c609a681e2 Mon Sep 17 00:00:00 2001 From: Angus Date: Fri, 10 Apr 2026 13:29:07 +0100 Subject: [PATCH 2/5] adding batching to write --- .../firestore_dataset_storage_repository.py | 96 ++++++++++++++++++- app/settings.py | 2 + 2 files changed, 97 insertions(+), 1 deletion(-) diff --git a/app/repositories/dataset_storage/firestore_dataset_storage_repository.py b/app/repositories/dataset_storage/firestore_dataset_storage_repository.py index 5b4393b..9612125 100644 --- a/app/repositories/dataset_storage/firestore_dataset_storage_repository.py +++ b/app/repositories/dataset_storage/firestore_dataset_storage_repository.py @@ -13,6 +13,7 @@ class FirestoreSettings(Protocol): project_id: str firestore_database: str + should_batch: bool class FirestoreDatasetStorageRepository(DatasetStorageRepositoryInterface): @@ -33,7 +34,9 @@ def __init__( database=self.settings.firestore_database ) + # Max size in bytes we can upload in one batch self.MAX_BATCH_SIZE_BYTES = 9 * 1024 * 1024 + self.MAX_NUMBER_OF_WRITES_PER_BATCH = 500 # Initialize Firestore collections self.dataset_collection = self.client.collection("datasets") @@ -93,18 +96,109 @@ def store_dataset( unit_data_identifiers: list[str] ): + # Write to firebase in batches or not depends on the settings + + if self.settings.should_batch: + self._store_dataset_with_batching( + dataset_id=dataset_id, + dataset_metadata=dataset_metadata, + unit_data_collection_with_metadata=unit_data_collection_with_metadata, + unit_data_identifiers=unit_data_identifiers + ) + else: + self._store_dataset_without_batching( + dataset_id=dataset_id, + dataset_metadata=dataset_metadata, + unit_data_collection_with_metadata=unit_data_collection_with_metadata, + unit_data_identifiers=unit_data_identifiers + ) + + def _store_dataset_without_batching( + self, + dataset_id: Guid, + dataset_metadata: DatasetMetadataWithoutId, + unit_data_collection_with_metadata: list[UnitDataset], + unit_data_identifiers: list[str] + ): + """ + Write the data to firestore without batching + """ # Create a new document for this dataset new_dataset_document = self.dataset_collection.document(dataset_id) + # Store the core data first + new_dataset_document.set(dataset_metadata.model_dump(), merge=True) + # Create a new collection for the units units_collection = new_dataset_document.collection("units") # Go through unit data for (unit_data, unit_identifier) in zip(unit_data_collection_with_metadata, unit_data_identifiers): - # Create and save the unit data as a new sub document units_collection.document(unit_identifier).set(unit_data.model_dump()) + def _store_dataset_with_batching( + self, + dataset_id: Guid, + dataset_metadata: DatasetMetadataWithoutId, + unit_data_collection_with_metadata: list[UnitDataset], + unit_data_identifiers: list[str] + ): + """ + Use batches to write the data to firestore + """ + + # Create a new document for this dataset + new_dataset_document = self.dataset_collection.document(dataset_id) + + # Store the core data first + new_dataset_document.set(dataset_metadata.model_dump(), merge=True) + + # Create a new collection for the units + units_collection = new_dataset_document.collection("units") + + # Initialise a batch + batch = self.client.batch() + batch_size_bytes = 0 + batch_num_records = 0 + + # Go through unit data + for (unit_data, unit_identifier) in zip(unit_data_collection_with_metadata, unit_data_identifiers): + """ + Add this unit to the current batch if ... + + 1. adding it does not exceed the batch size limits + 2. adding it does not exceed the batch record limits + """ + + # Work out the size of this unit + unit_size = len(unit_data.model_dump().encode('utf-8')) + + # Work out if the current batch is too big already + if (batch_size_bytes + unit_size >= self.MAX_BATCH_SIZE_BYTES) or (batch_num_records + 1 >= self.MAX_NUMBER_OF_WRITES_PER_BATCH): + # Commit the current batch + batch.commit() + + # Start a new batch + batch = self.client.batch() + batch_size_bytes = 0 + batch_num_records = 0 + + # Add the unit to the new batch + new_unit = units_collection.document(unit_identifier) + batch.set(new_unit, unit_data.model_dump(), merge=True) + batch_size_bytes += unit_size + batch_num_records += 1 + + # If we never exceeded batch limit we still need to commit + if batch_size_bytes > 0: + batch.commit() + + + + + + def delete_dataset_version( self, survey_id: str, diff --git a/app/settings.py b/app/settings.py index 7e029b4..61eac3e 100644 --- a/app/settings.py +++ b/app/settings.py @@ -23,6 +23,7 @@ class Settings(AppSettings): project_id: The GCP project ID autodelete_dataset: Whether to automatically delete datasets from the source repo (bucket) after publishing retain_old_dataset: Whether to retain old versions of an updated dataset in the target repo (firestore) + should_batch: Whether to batch write data to firestore to avoid memory limits dataset_bucket_name: The bucket name to pick up datasets from firestore_database: The Firestore database to publish datasets to publish_dataset_topic_id: The Pub/Sub topic ID to publish dataset updates to @@ -30,6 +31,7 @@ class Settings(AppSettings): project_id: str autodelete_dataset: bool = True retain_old_dataset: bool = True + should_batch: bool = True dataset_bucket_name: str firestore_database: str publish_dataset_topic_id: str From b4047d5a6c549b908e11cd6e0f6d50be91385649 Mon Sep 17 00:00:00 2001 From: Angus Date: Fri, 10 Apr 2026 13:29:53 +0100 Subject: [PATCH 3/5] lint --- .../dataset_storage/firestore_dataset_storage_repository.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/app/repositories/dataset_storage/firestore_dataset_storage_repository.py b/app/repositories/dataset_storage/firestore_dataset_storage_repository.py index 9612125..8e75cb1 100644 --- a/app/repositories/dataset_storage/firestore_dataset_storage_repository.py +++ b/app/repositories/dataset_storage/firestore_dataset_storage_repository.py @@ -194,11 +194,6 @@ def _store_dataset_with_batching( if batch_size_bytes > 0: batch.commit() - - - - - def delete_dataset_version( self, survey_id: str, From cf3dda2ee485b6f27fa1b164e102b748aa856f03 Mon Sep 17 00:00:00 2001 From: Angus Date: Fri, 10 Apr 2026 13:40:28 +0100 Subject: [PATCH 4/5] bump --- app/services/schema_service.py | 6 +++--- pyproject.toml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/app/services/schema_service.py b/app/services/schema_service.py index 5137b90..b92d1f3 100644 --- a/app/services/schema_service.py +++ b/app/services/schema_service.py @@ -30,7 +30,7 @@ def __init__( self.bucket_publisher = bucket_publisher self.repository_publisher = repository_publisher - def _publish_single_file(self, file_name: str, publisher: PublisherProtocol): + def _publish_single_file(self, file_name: str, publisher: PublisherProtocol): # noqa """ Publish a single file using a given publisher :param file_name: name of the file to be published @@ -42,7 +42,7 @@ def _publish_single_file(self, file_name: str, publisher: PublisherProtocol): except Exception as e: logger.error(f"Failed to publish schema {file_name}: {e}") - def _filter_github_files(self, files: list[str]) -> list[str]: + def _filter_github_files(self, files: list[str]) -> list[str]: # noqa """ Filter a list of file names to publish by the regex """ @@ -59,7 +59,7 @@ def publish_new_schemas( If any schema fails to publish, it will print an error message but continue processing the remaining files. :param source: string indicating the source of the schema files to be published - Options are "github", "bucket" + Options are "GitHub", "bucket" :param file_list: List of file names to publish """ diff --git a/pyproject.toml b/pyproject.toml index 07f514a..02cfd0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sds-loader" -version = "0.1.0" +version = "0.1.1" description = "A service for loading new sds schemas and datasets" requires-python = "==3.13.*" dependencies = [ From 033ae20124f8c0b3ad1030caaa33ef7ac7b0e919 Mon Sep 17 00:00:00 2001 From: Angus Date: Fri, 10 Apr 2026 14:13:47 +0100 Subject: [PATCH 5/5] tweaks --- README.md | 4 ++++ app/services/dataset_service.py | 2 +- uv.lock | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a1eab0b..1824386 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,10 @@ make format make test ``` +## OpenApi spec + +[http://0.0.0.0:5000/docs](http://0.0.0.0:5000/docs) + ## Dockerize ``` diff --git a/app/services/dataset_service.py b/app/services/dataset_service.py index 5e287df..141bd73 100644 --- a/app/services/dataset_service.py +++ b/app/services/dataset_service.py @@ -27,7 +27,7 @@ class DatasetSettings(Protocol): """ autodelete_dataset: bool - retain_old_datasets: bool = True + retain_old_datasets: bool class DatasetService: diff --git a/uv.lock b/uv.lock index 7284b4d..5143665 100644 --- a/uv.lock +++ b/uv.lock @@ -1328,7 +1328,7 @@ wheels = [ [[package]] name = "sds-loader" -version = "0.1.0" +version = "0.1.1" source = { virtual = "." } dependencies = [ { name = "fastapi" },