From 45b1ae7dd8fbb0803721e86a9429042fe6de07e0 Mon Sep 17 00:00:00 2001 From: Max Chis Date: Tue, 12 Aug 2025 18:10:22 -0400 Subject: [PATCH] Break up Huggingface Upload --- .../scheduled/impl/huggingface/operator.py | 8 ++++-- .../impl/huggingface/queries/get/core.py | 6 +++++ src/db/client/async_.py | 4 +-- src/external/huggingface/hub/client.py | 26 ++++++++++++++++--- 4 files changed, 36 insertions(+), 8 deletions(-) diff --git a/src/core/tasks/scheduled/impl/huggingface/operator.py b/src/core/tasks/scheduled/impl/huggingface/operator.py index 45e35e17..7d5324f5 100644 --- a/src/core/tasks/scheduled/impl/huggingface/operator.py +++ b/src/core/tasks/scheduled/impl/huggingface/operator.py @@ -1,3 +1,4 @@ +from itertools import count from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase from src.db.client.async_ import AsyncDatabaseClient @@ -30,7 +31,10 @@ async def inner_task_logic(self): # Otherwise, push to huggingface run_dt = await self.adb_client.get_current_database_time() - outputs = await self.adb_client.get_data_sources_raw_for_huggingface() - self.hf_client.push_data_sources_raw_to_hub(outputs) + for idx in count(start=1): + outputs = await self.adb_client.get_data_sources_raw_for_huggingface(page=idx) + if len(outputs) == 0: + break + self.hf_client.push_data_sources_raw_to_hub(outputs, idx=idx) await self.adb_client.set_hugging_face_upload_state(run_dt.replace(tzinfo=None)) diff --git a/src/core/tasks/scheduled/impl/huggingface/queries/get/core.py b/src/core/tasks/scheduled/impl/huggingface/queries/get/core.py index 30cfa234..662f7fbb 100644 --- a/src/core/tasks/scheduled/impl/huggingface/queries/get/core.py +++ b/src/core/tasks/scheduled/impl/huggingface/queries/get/core.py @@ -5,6 +5,7 @@ from src.core.tasks.scheduled.impl.huggingface.queries.get.convert import convert_url_status_to_relevant, \ convert_fine_to_coarse_record_type from src.core.tasks.scheduled.impl.huggingface.queries.get.model import GetForLoadingToHuggingFaceOutput +from src.db.client.helpers import add_standard_limit_and_offset from src.db.models.impl.url.html.compressed.sqlalchemy import URLCompressedHTML from src.db.models.impl.url.core.sqlalchemy import URL from src.db.queries.base.builder import QueryBuilderBase @@ -13,6 +14,10 @@ class GetForLoadingToHuggingFaceQueryBuilder(QueryBuilderBase): + def __init__(self, page: int): + super().__init__() + self.page = page + async def run(self, session: AsyncSession) -> list[GetForLoadingToHuggingFaceOutput]: label_url_id = 'url_id' @@ -42,6 +47,7 @@ async def run(self, session: AsyncSession) -> list[GetForLoadingToHuggingFaceOut ]) ) ) + query = add_standard_limit_and_offset(page=self.page, statement=query) db_results = await sh.mappings( session=session, query=query diff --git a/src/db/client/async_.py b/src/db/client/async_.py index ebe1b772..cd2f7c02 100644 --- a/src/db/client/async_.py +++ b/src/db/client/async_.py @@ -1463,9 +1463,9 @@ async def add_raw_html( ) session.add(compressed_html) - async def get_data_sources_raw_for_huggingface(self) -> list[GetForLoadingToHuggingFaceOutput]: + async def get_data_sources_raw_for_huggingface(self, page: int) -> list[GetForLoadingToHuggingFaceOutput]: return await self.run_query_builder( - GetForLoadingToHuggingFaceQueryBuilder() + GetForLoadingToHuggingFaceQueryBuilder(page) ) async def set_hugging_face_upload_state(self, dt: datetime) -> None: diff --git a/src/external/huggingface/hub/client.py b/src/external/huggingface/hub/client.py index ef9d1cc7..3ca53ceb 100644 --- a/src/external/huggingface/hub/client.py +++ b/src/external/huggingface/hub/client.py @@ -1,5 +1,6 @@ from datasets import Dataset +from huggingface_hub import HfApi from src.external.huggingface.hub.constants import DATA_SOURCES_RAW_REPO_ID from src.external.huggingface.hub.format import format_as_huggingface_dataset @@ -10,17 +11,30 @@ class HuggingFaceHubClient: def __init__(self, token: str): self.token = token + self.api = HfApi(token=token) - def _push_dataset_to_hub(self, repo_id: str, dataset: Dataset) -> None: + def _push_dataset_to_hub( + self, + repo_id: str, + dataset: Dataset, + idx: int + ) -> None: """ Modifies: - repository on Hugging Face, identified by `repo_id` """ - dataset.push_to_hub(repo_id=repo_id, token=self.token) + dataset.to_parquet(f"part_{idx}.parquet") + self.api.upload_file( + path_or_fileobj=f"part_{idx}.parquet", + path_in_repo=f"data/part_{idx}.parquet", + repo_id=repo_id, + repo_type="dataset", + ) def push_data_sources_raw_to_hub( self, - outputs: list[GetForLoadingToHuggingFaceOutput] + outputs: list[GetForLoadingToHuggingFaceOutput], + idx: int ) -> None: """ Modifies: @@ -28,4 +42,8 @@ def push_data_sources_raw_to_hub( """ dataset = format_as_huggingface_dataset(outputs) print(dataset) - self._push_dataset_to_hub(repo_id=DATA_SOURCES_RAW_REPO_ID, dataset=dataset) \ No newline at end of file + self._push_dataset_to_hub( + repo_id=DATA_SOURCES_RAW_REPO_ID, + dataset=dataset, + idx=idx + ) \ No newline at end of file