diff --git a/README.md b/README.md index a1eab0b..1824386 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,10 @@ make format make test ``` +## OpenApi spec + +[http://0.0.0.0:5000/docs](http://0.0.0.0:5000/docs) + ## Dockerize ``` diff --git a/TODO.md b/TODO.md index 70dd2e9..06382ef 100644 --- a/TODO.md +++ b/TODO.md @@ -1,9 +1,2 @@ -## Now - -- Add better tx_id getter - -## Future improvements - -- Add async to all firestore repos -- Multi files at once +- Integration tests diff --git a/app/repositories/dataset_storage/firestore_dataset_storage_repository.py b/app/repositories/dataset_storage/firestore_dataset_storage_repository.py index 5b4393b..8e75cb1 100644 --- a/app/repositories/dataset_storage/firestore_dataset_storage_repository.py +++ b/app/repositories/dataset_storage/firestore_dataset_storage_repository.py @@ -13,6 +13,7 @@ class FirestoreSettings(Protocol): project_id: str firestore_database: str + should_batch: bool class FirestoreDatasetStorageRepository(DatasetStorageRepositoryInterface): @@ -33,7 +34,9 @@ def __init__( database=self.settings.firestore_database ) + # Max size in bytes we can upload in one batch self.MAX_BATCH_SIZE_BYTES = 9 * 1024 * 1024 + self.MAX_NUMBER_OF_WRITES_PER_BATCH = 500 # Initialize Firestore collections self.dataset_collection = self.client.collection("datasets") @@ -93,18 +96,104 @@ def store_dataset( unit_data_identifiers: list[str] ): + # Write to firebase in batches or not depends on the settings + + if self.settings.should_batch: + self._store_dataset_with_batching( + dataset_id=dataset_id, + dataset_metadata=dataset_metadata, + unit_data_collection_with_metadata=unit_data_collection_with_metadata, + unit_data_identifiers=unit_data_identifiers + ) + else: + self._store_dataset_without_batching( + dataset_id=dataset_id, + dataset_metadata=dataset_metadata, + unit_data_collection_with_metadata=unit_data_collection_with_metadata, + unit_data_identifiers=unit_data_identifiers + ) + + def _store_dataset_without_batching( + self, + dataset_id: Guid, + dataset_metadata: DatasetMetadataWithoutId, + unit_data_collection_with_metadata: list[UnitDataset], + unit_data_identifiers: list[str] + ): + """ + Write the data to firestore without batching + """ # Create a new document for this dataset new_dataset_document = self.dataset_collection.document(dataset_id) + # Store the core data first + new_dataset_document.set(dataset_metadata.model_dump(), merge=True) + # Create a new collection for the units units_collection = new_dataset_document.collection("units") # Go through unit data for (unit_data, unit_identifier) in zip(unit_data_collection_with_metadata, unit_data_identifiers): - # Create and save the unit data as a new sub document units_collection.document(unit_identifier).set(unit_data.model_dump()) + def _store_dataset_with_batching( + self, + dataset_id: Guid, + dataset_metadata: DatasetMetadataWithoutId, + unit_data_collection_with_metadata: list[UnitDataset], + unit_data_identifiers: list[str] + ): + """ + Use batches to write the data to firestore + """ + + # Create a new document for this dataset + new_dataset_document = self.dataset_collection.document(dataset_id) + + # Store the core data first + new_dataset_document.set(dataset_metadata.model_dump(), merge=True) + + # Create a new collection for the units + units_collection = new_dataset_document.collection("units") + + # Initialise a batch + batch = self.client.batch() + batch_size_bytes = 0 + batch_num_records = 0 + + # Go through unit data + for (unit_data, unit_identifier) in zip(unit_data_collection_with_metadata, unit_data_identifiers): + """ + Add this unit to the current batch if ... + + 1. adding it does not exceed the batch size limits + 2. adding it does not exceed the batch record limits + """ + + # Work out the size of this unit + unit_size = len(unit_data.model_dump().encode('utf-8')) + + # Work out if the current batch is too big already + if (batch_size_bytes + unit_size >= self.MAX_BATCH_SIZE_BYTES) or (batch_num_records + 1 >= self.MAX_NUMBER_OF_WRITES_PER_BATCH): + # Commit the current batch + batch.commit() + + # Start a new batch + batch = self.client.batch() + batch_size_bytes = 0 + batch_num_records = 0 + + # Add the unit to the new batch + new_unit = units_collection.document(unit_identifier) + batch.set(new_unit, unit_data.model_dump(), merge=True) + batch_size_bytes += unit_size + batch_num_records += 1 + + # If we never exceeded batch limit we still need to commit + if batch_size_bytes > 0: + batch.commit() + def delete_dataset_version( self, survey_id: str, diff --git a/app/services/dataset_service.py b/app/services/dataset_service.py index 5e287df..141bd73 100644 --- a/app/services/dataset_service.py +++ b/app/services/dataset_service.py @@ -27,7 +27,7 @@ class DatasetSettings(Protocol): """ autodelete_dataset: bool - retain_old_datasets: bool = True + retain_old_datasets: bool class DatasetService: diff --git a/app/services/schema_service.py b/app/services/schema_service.py index 5137b90..b92d1f3 100644 --- a/app/services/schema_service.py +++ b/app/services/schema_service.py @@ -30,7 +30,7 @@ def __init__( self.bucket_publisher = bucket_publisher self.repository_publisher = repository_publisher - def _publish_single_file(self, file_name: str, publisher: PublisherProtocol): + def _publish_single_file(self, file_name: str, publisher: PublisherProtocol): # noqa """ Publish a single file using a given publisher :param file_name: name of the file to be published @@ -42,7 +42,7 @@ def _publish_single_file(self, file_name: str, publisher: PublisherProtocol): except Exception as e: logger.error(f"Failed to publish schema {file_name}: {e}") - def _filter_github_files(self, files: list[str]) -> list[str]: + def _filter_github_files(self, files: list[str]) -> list[str]: # noqa """ Filter a list of file names to publish by the regex """ @@ -59,7 +59,7 @@ def publish_new_schemas( If any schema fails to publish, it will print an error message but continue processing the remaining files. :param source: string indicating the source of the schema files to be published - Options are "github", "bucket" + Options are "GitHub", "bucket" :param file_list: List of file names to publish """ diff --git a/app/settings.py b/app/settings.py index 7e029b4..61eac3e 100644 --- a/app/settings.py +++ b/app/settings.py @@ -23,6 +23,7 @@ class Settings(AppSettings): project_id: The GCP project ID autodelete_dataset: Whether to automatically delete datasets from the source repo (bucket) after publishing retain_old_dataset: Whether to retain old versions of an updated dataset in the target repo (firestore) + should_batch: Whether to batch write data to firestore to avoid memory limits dataset_bucket_name: The bucket name to pick up datasets from firestore_database: The Firestore database to publish datasets to publish_dataset_topic_id: The Pub/Sub topic ID to publish dataset updates to @@ -30,6 +31,7 @@ class Settings(AppSettings): project_id: str autodelete_dataset: bool = True retain_old_dataset: bool = True + should_batch: bool = True dataset_bucket_name: str firestore_database: str publish_dataset_topic_id: str diff --git a/pyproject.toml b/pyproject.toml index f754fe2..02cfd0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sds-loader" -version = "0.1.0" +version = "0.1.1" description = "A service for loading new sds schemas and datasets" requires-python = "==3.13.*" dependencies = [ @@ -11,6 +11,7 @@ dependencies = [ "pydantic-settings>=2.13.1", "fastapi>=0.119.0", "starlette>=0.48.0", + "requests>=2.32.3", ] [[tool.uv.index]] diff --git a/uv.lock b/uv.lock index 119a49f..5143665 100644 --- a/uv.lock +++ b/uv.lock @@ -1328,13 +1328,14 @@ wheels = [ [[package]] name = "sds-loader" -version = "0.1.0" +version = "0.1.1" source = { virtual = "." } dependencies = [ { name = "fastapi" }, { name = "lagom" }, { name = "polyfactory" }, { name = "pydantic-settings" }, + { name = "requests" }, { name = "sds-common" }, { name = "sdx-base" }, { name = "starlette" }, @@ -1360,6 +1361,7 @@ requires-dist = [ { name = "lagom", specifier = ">=2.7.7" }, { name = "polyfactory", specifier = ">=3.3.0" }, { name = "pydantic-settings", specifier = ">=2.13.1" }, + { name = "requests", specifier = ">=2.32.3" }, { name = "sds-common", specifier = "==1.0.17" }, { name = "sdx-base", specifier = ">=0.2.9", index = "https://europe-west2-python.pkg.dev/ons-sdx-ci/sdx-python-packages/simple/" }, { name = "starlette", specifier = ">=0.48.0" },