Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ make format
make test
```

## OpenApi spec

[http://0.0.0.0:5000/docs](http://0.0.0.0:5000/docs)

## Dockerize

```
Expand Down
9 changes: 1 addition & 8 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,2 @@

## Now

- Add better tx_id getter

## Future improvements

- Add async to all firestore repos
- Multi files at once
- Integration tests
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
class FirestoreSettings(Protocol):
project_id: str
firestore_database: str
should_batch: bool


class FirestoreDatasetStorageRepository(DatasetStorageRepositoryInterface):
Expand All @@ -33,7 +34,9 @@ def __init__(
database=self.settings.firestore_database
)

# Max size in bytes we can upload in one batch
self.MAX_BATCH_SIZE_BYTES = 9 * 1024 * 1024
self.MAX_NUMBER_OF_WRITES_PER_BATCH = 500

# Initialize Firestore collections
self.dataset_collection = self.client.collection("datasets")
Expand Down Expand Up @@ -93,18 +96,104 @@ def store_dataset(
unit_data_identifiers: list[str]
):

# Write to firebase in batches or not depends on the settings

if self.settings.should_batch:
self._store_dataset_with_batching(
dataset_id=dataset_id,
dataset_metadata=dataset_metadata,
unit_data_collection_with_metadata=unit_data_collection_with_metadata,
unit_data_identifiers=unit_data_identifiers
)
else:
self._store_dataset_without_batching(
dataset_id=dataset_id,
dataset_metadata=dataset_metadata,
unit_data_collection_with_metadata=unit_data_collection_with_metadata,
unit_data_identifiers=unit_data_identifiers
)

def _store_dataset_without_batching(
self,
dataset_id: Guid,
dataset_metadata: DatasetMetadataWithoutId,
unit_data_collection_with_metadata: list[UnitDataset],
unit_data_identifiers: list[str]
):
"""
Write the data to firestore without batching
"""
# Create a new document for this dataset
new_dataset_document = self.dataset_collection.document(dataset_id)

# Store the core data first
new_dataset_document.set(dataset_metadata.model_dump(), merge=True)

# Create a new collection for the units
units_collection = new_dataset_document.collection("units")

# Go through unit data
for (unit_data, unit_identifier) in zip(unit_data_collection_with_metadata, unit_data_identifiers):

# Create and save the unit data as a new sub document
units_collection.document(unit_identifier).set(unit_data.model_dump())

def _store_dataset_with_batching(
self,
dataset_id: Guid,
dataset_metadata: DatasetMetadataWithoutId,
unit_data_collection_with_metadata: list[UnitDataset],
unit_data_identifiers: list[str]
):
"""
Use batches to write the data to firestore
"""

# Create a new document for this dataset
new_dataset_document = self.dataset_collection.document(dataset_id)

# Store the core data first
new_dataset_document.set(dataset_metadata.model_dump(), merge=True)

# Create a new collection for the units
units_collection = new_dataset_document.collection("units")

# Initialise a batch
batch = self.client.batch()
batch_size_bytes = 0
batch_num_records = 0

# Go through unit data
for (unit_data, unit_identifier) in zip(unit_data_collection_with_metadata, unit_data_identifiers):
"""
Add this unit to the current batch if ...

1. adding it does not exceed the batch size limits
2. adding it does not exceed the batch record limits
"""

# Work out the size of this unit
unit_size = len(unit_data.model_dump().encode('utf-8'))

# Work out if the current batch is too big already
if (batch_size_bytes + unit_size >= self.MAX_BATCH_SIZE_BYTES) or (batch_num_records + 1 >= self.MAX_NUMBER_OF_WRITES_PER_BATCH):
# Commit the current batch
batch.commit()

# Start a new batch
batch = self.client.batch()
batch_size_bytes = 0
batch_num_records = 0

# Add the unit to the new batch
new_unit = units_collection.document(unit_identifier)
batch.set(new_unit, unit_data.model_dump(), merge=True)
batch_size_bytes += unit_size
batch_num_records += 1

# If we never exceeded batch limit we still need to commit
if batch_size_bytes > 0:
batch.commit()

def delete_dataset_version(
self,
survey_id: str,
Expand Down
2 changes: 1 addition & 1 deletion app/services/dataset_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class DatasetSettings(Protocol):
"""

autodelete_dataset: bool
retain_old_datasets: bool = True
retain_old_datasets: bool


class DatasetService:
Expand Down
6 changes: 3 additions & 3 deletions app/services/schema_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(
self.bucket_publisher = bucket_publisher
self.repository_publisher = repository_publisher

def _publish_single_file(self, file_name: str, publisher: PublisherProtocol):
def _publish_single_file(self, file_name: str, publisher: PublisherProtocol): # noqa
"""
Publish a single file using a given publisher
:param file_name: name of the file to be published
Expand All @@ -42,7 +42,7 @@ def _publish_single_file(self, file_name: str, publisher: PublisherProtocol):
except Exception as e:
logger.error(f"Failed to publish schema {file_name}: {e}")

def _filter_github_files(self, files: list[str]) -> list[str]:
def _filter_github_files(self, files: list[str]) -> list[str]: # noqa
"""
Filter a list of file names to publish by the regex
"""
Expand All @@ -59,7 +59,7 @@ def publish_new_schemas(
If any schema fails to publish, it will print an error message but continue processing the remaining files.

:param source: string indicating the source of the schema files to be published
Options are "github", "bucket"
Options are "GitHub", "bucket"
:param file_list: List of file names to publish
"""

Expand Down
2 changes: 2 additions & 0 deletions app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ class Settings(AppSettings):
project_id: The GCP project ID
autodelete_dataset: Whether to automatically delete datasets from the source repo (bucket) after publishing
retain_old_dataset: Whether to retain old versions of an updated dataset in the target repo (firestore)
should_batch: Whether to batch write data to firestore to avoid memory limits
dataset_bucket_name: The bucket name to pick up datasets from
firestore_database: The Firestore database to publish datasets to
publish_dataset_topic_id: The Pub/Sub topic ID to publish dataset updates to
"""
project_id: str
autodelete_dataset: bool = True
retain_old_dataset: bool = True
should_batch: bool = True
dataset_bucket_name: str
firestore_database: str
publish_dataset_topic_id: str
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "sds-loader"
version = "0.1.0"
version = "0.1.1"
description = "A service for loading new sds schemas and datasets"
requires-python = "==3.13.*"
dependencies = [
Expand All @@ -11,6 +11,7 @@ dependencies = [
"pydantic-settings>=2.13.1",
"fastapi>=0.119.0",
"starlette>=0.48.0",
"requests>=2.32.3",
]

[[tool.uv.index]]
Expand Down
4 changes: 3 additions & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading