diff --git a/src/bioetl/infrastructure/adapters/crossref/_doi_batch_processor.py b/src/bioetl/infrastructure/adapters/crossref/_doi_batch_processor.py index 735a30490f..a698c341c2 100644 --- a/src/bioetl/infrastructure/adapters/crossref/_doi_batch_processor.py +++ b/src/bioetl/infrastructure/adapters/crossref/_doi_batch_processor.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import time from typing import TYPE_CHECKING, cast @@ -100,17 +101,30 @@ async def _fallback_individual_fetch( self, dois: list[str] ) -> AsyncIterator[BronzeRecord]: """Fall back to individual DOI fetches.""" - for doi in dois: + + async def fetch_one(doi: str) -> BronzeRecord | None: try: - publication = await self.fetch_single(doi) - if publication: - yield publication + return await self.fetch_single(doi) except CROSSREF_FALLBACK_ERRORS as error: self._logger.debug( "crossref_individual_fetch_failed", doi=doi, error=str(error), ) + return None + + # CrossRef API allows 50 requests per second. Use a semaphore to limit concurrency + # in the fallback to avoid overwhelming the polite pool. + semaphore = asyncio.Semaphore(10) + + async def fetch_with_semaphore(doi: str) -> BronzeRecord | None: + async with semaphore: + return await fetch_one(doi) + + results = await asyncio.gather(*(fetch_with_semaphore(doi) for doi in dois)) + for publication in results: + if publication: + yield publication def _normalize_dois(self, dois: list[str]) -> list[str]: """Normalize and filter DOI list, removing invalid entries."""