Skip to content
100 changes: 73 additions & 27 deletions src/apify/storage_clients/_apify/_request_queue_shared_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata

from ._models import ApifyRequestQueueMetadata, CachedRequest, RequestQueueHead
from ._utils import to_crawlee_request, unique_key_to_request_id
from ._utils import (
resolve_awaited_in_flight,
settle_pending_addition,
to_crawlee_request,
unique_key_to_request_id,
)

if TYPE_CHECKING:
from collections.abc import Callable, Coroutine, Sequence
Expand Down Expand Up @@ -71,6 +76,19 @@ def __init__(
self._requests_cache: LRUCache[str, CachedRequest] = LRUCache(maxsize=cache_size)
"""LRU cache storing request objects, keyed by request ID."""

self._requests_being_added: dict[str, asyncio.Future[bool]] = {}
"""In-flight `add_batch_of_requests` markers, keyed by request ID.

Coordinates only concurrent `add_batch_of_requests` calls sharing this one client instance (e.g. several
producer coroutines adding requests in the same process). It does not coordinate separate client instances
or processes, which each keep their own markers; deduplication across clients still relies on the platform.

Each future resolves once the platform call that is adding the request settles: `True` if the request was
committed, `False` otherwise. A concurrent call adding the same request awaits the future instead of
re-sending it, which avoids a duplicate platform write while still avoiding false success when the original
add fails.
"""

self._queue_has_locked_requests: bool | None = None
"""Whether the queue contains requests currently locked by other clients."""

Expand All @@ -87,9 +105,13 @@ async def add_batch_of_requests(
forefront: bool = False,
) -> AddRequestsResponse:
"""Specific implementation of this method for the RQ shared access mode."""
loop = asyncio.get_running_loop()
# Do not try to add previously added requests to avoid pointless expensive calls to API
new_requests: list[Request] = []
already_present_requests: list[ProcessedRequest] = []
# Requests a concurrent `add_batch_of_requests` call is already sending. We await its outcome instead of
# re-sending them, as (request, that call's in-flight future) pairs.
awaited_in_flight: list[tuple[Request, asyncio.Future[bool]]] = []

for request in requests:
request_id = unique_key_to_request_id(request.unique_key)
Expand All @@ -106,46 +128,70 @@ async def add_batch_of_requests(
)
)

elif request_id in self._requests_being_added:
# A concurrent call is already adding this request; await its outcome rather than re-sending it.
awaited_in_flight.append((request, self._requests_being_added[request_id]))

else:
# Add new request to the cache.
processed_request = ProcessedRequest(
id=request_id,
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=request.was_already_handled,
)
self._cache_request(
request_id,
processed_request,
)
# Register an in-flight marker so a concurrent call dedupes against it; caching is deferred
# until the platform confirms the request was accepted (see below).
new_requests.append(request)
self._requests_being_added[request_id] = loop.create_future()

if new_requests:
# Prepare requests for API by converting to dictionaries.
requests_dict = [request.model_dump(by_alias=True) for request in new_requests]

# Send requests to API.
batch_response = await self._api_client.batch_add_requests(
requests=requests_dict,
forefront=forefront,
)

batch_response_dict = batch_response.model_dump(by_alias=True)
api_response = AddRequestsResponse.model_validate(batch_response_dict)

# Add the locally known already present processed requests based on the local cache.
api_response.processed_requests.extend(already_present_requests)
committed_request_ids: set[str] = set()
try:
# Send requests to API.
batch_response = await self._api_client.batch_add_requests(
requests=requests_dict,
forefront=forefront,
)

# Remove unprocessed requests from the cache
for unprocessed_request in api_response.unprocessed_requests:
unprocessed_request_id = unique_key_to_request_id(unprocessed_request.unique_key)
self._requests_cache.pop(unprocessed_request_id, None)
batch_response_dict = batch_response.model_dump(by_alias=True)
api_response = AddRequestsResponse.model_validate(batch_response_dict)

# Commit only the requests the platform actually accepted to the local dedup cache. Caching after
# the call succeeds (not before) keeps a failed call from poisoning the cache and silently
# deduplicating a later retry of the same request.
unprocessed_unique_keys = {request.unique_key for request in api_response.unprocessed_requests}
for request in new_requests:
if request.unique_key in unprocessed_unique_keys:
continue
request_id = unique_key_to_request_id(request.unique_key)
self._cache_request(
request_id,
ProcessedRequest(
id=request_id,
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=request.was_already_handled,
),
)
committed_request_ids.add(request_id)

# Add the locally known already present processed requests based on the local cache.
api_response.processed_requests.extend(already_present_requests)
finally:
# Release the in-flight markers we registered. Committed requests tell concurrent callers the
# request reached the platform; everything else (unprocessed, API error, cancellation) tells them
# it did not, so they retry instead of reporting false success.
for request in new_requests:
request_id = unique_key_to_request_id(request.unique_key)
settle_pending_addition(
self._requests_being_added, request_id, committed=request_id in committed_request_ids
)

else:
api_response = AddRequestsResponse.model_validate(
{'unprocessedRequests': [], 'processedRequests': already_present_requests}
)

# Fold in requests a concurrent call was already adding.
await resolve_awaited_in_flight(awaited_in_flight, api_response)

logger.debug(
f'Tried to add new requests: {len(new_requests)}, '
f'succeeded to add new requests: {len(api_response.processed_requests) - len(already_present_requests)}, '
Expand Down
89 changes: 68 additions & 21 deletions src/apify/storage_clients/_apify/_request_queue_single_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
from collections import deque
from datetime import UTC, datetime
from logging import getLogger
Expand All @@ -9,7 +10,12 @@

from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata

from ._utils import to_crawlee_request, unique_key_to_request_id
from ._utils import (
resolve_awaited_in_flight,
settle_pending_addition,
to_crawlee_request,
unique_key_to_request_id,
)

if TYPE_CHECKING:
from collections.abc import Sequence
Expand Down Expand Up @@ -90,6 +96,19 @@ def __init__(
Tracked locally to accurately determine when the queue is empty for this single consumer.
"""

self._requests_being_added: dict[str, asyncio.Future[bool]] = {}
"""In-flight `add_batch_of_requests` markers, keyed by request ID.

Coordinates only concurrent `add_batch_of_requests` calls sharing this one client instance (e.g. several
producer coroutines adding requests in the same process). It does not coordinate separate client instances
or processes, which each keep their own markers; deduplication across clients still relies on the platform.

Each future resolves once the platform call that is adding the request settles: `True` if the request was
committed, `False` otherwise. A concurrent call adding the same request awaits the future instead of
re-sending it, which avoids a duplicate platform write while still avoiding false success when the original
add fails.
"""

self._initialized_caches = False
"""Flag indicating whether local caches have been populated from existing queue contents.

Expand All @@ -108,8 +127,12 @@ async def add_batch_of_requests(
await self._init_caches()
self._initialized_caches = True

loop = asyncio.get_running_loop()
new_requests: list[Request] = []
already_present_requests: list[ProcessedRequest] = []
# Requests a concurrent `add_batch_of_requests` call is already sending. We await its outcome instead of
# re-sending them, as (request, that call's in-flight future) pairs.
awaited_in_flight: list[tuple[Request, asyncio.Future[bool]]] = []

for request in requests:
# Calculate id for request
Expand All @@ -135,40 +158,64 @@ async def add_batch_of_requests(
was_already_handled=request.was_already_handled,
)
)
# Check if a concurrent call is already adding this request, and await its outcome rather than
# re-sending it.
elif request_id in self._requests_being_added:
awaited_in_flight.append((request, self._requests_being_added[request_id]))
else:
# Push the request to the platform. Probably not there, or we are not aware of it
# Push the request to the platform. Probably not there, or we are not aware of it. Register an
# in-flight marker so a concurrent call dedupes against it; caching is deferred until the
# platform confirms the request was accepted (see below).
new_requests.append(request)

# Update local caches
self._requests_cache[request_id] = request
if forefront:
self._head_requests.append(request_id)
else:
self._head_requests.appendleft(request_id)
self._requests_being_added[request_id] = loop.create_future()

if new_requests:
# Prepare requests for API by converting to dictionaries.
requests_dict = [request.model_dump(by_alias=True) for request in new_requests]

# Send requests to API.
batch_response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
batch_response_dict = batch_response.model_dump(by_alias=True)
api_response = AddRequestsResponse.model_validate(batch_response_dict)

# Add the locally known already present processed requests based on the local cache.
api_response.processed_requests.extend(already_present_requests)

# Remove unprocessed requests from the cache
for unprocessed_request in api_response.unprocessed_requests:
request_id = unique_key_to_request_id(unprocessed_request.unique_key)
self._requests_cache.pop(request_id, None)
committed_request_ids: set[str] = set()
try:
# Send requests to API.
batch_response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
batch_response_dict = batch_response.model_dump(by_alias=True)
api_response = AddRequestsResponse.model_validate(batch_response_dict)

# Commit only the requests the platform actually accepted to the local caches. Caching after the
# call succeeds (not before) keeps a failed call from poisoning the cache and silently
# deduplicating a later retry of the same request.
unprocessed_unique_keys = {request.unique_key for request in api_response.unprocessed_requests}
for request in new_requests:
if request.unique_key in unprocessed_unique_keys:
continue
request_id = unique_key_to_request_id(request.unique_key)
self._requests_cache[request_id] = request
if forefront:
self._head_requests.append(request_id)
else:
self._head_requests.appendleft(request_id)
committed_request_ids.add(request_id)

# Add the locally known already present processed requests based on the local cache.
api_response.processed_requests.extend(already_present_requests)
finally:
# Release the in-flight markers we registered. Committed requests tell concurrent callers the
# request reached the platform; everything else (unprocessed, API error, cancellation) tells them
# it did not, so they retry instead of reporting false success.
for request in new_requests:
request_id = unique_key_to_request_id(request.unique_key)
settle_pending_addition(
self._requests_being_added, request_id, committed=request_id in committed_request_ids
)

else:
api_response = AddRequestsResponse(
unprocessed_requests=[],
processed_requests=already_present_requests,
)

# Fold in requests a concurrent call was already adding.
await resolve_awaited_in_flight(awaited_in_flight, api_response)

# Update assumed total count for newly added requests.
new_request_count = 0
for processed_request in api_response.processed_requests:
Expand Down
48 changes: 48 additions & 0 deletions src/apify/storage_clients/_apify/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
from typing import TYPE_CHECKING

from crawlee._utils.crypto import compute_short_hash
from crawlee.storage_clients.models import ProcessedRequest, UnprocessedRequest

from apify import Request

if TYPE_CHECKING:
import asyncio
from collections.abc import Iterable

from apify_client._models import HeadRequest, LockedHeadRequest
from apify_client._models import Request as ClientRequest
from crawlee.storage_clients.models import AddRequestsResponse

from apify import Configuration

Expand Down Expand Up @@ -60,3 +65,46 @@ def to_crawlee_request(client_request: ClientRequest | HeadRequest | LockedHeadR

# Validate and construct Crawlee Request from the serialized dict
return Request.model_validate(request_dict)


def settle_pending_addition(
requests_being_added: dict[str, asyncio.Future[bool]],
request_id: str,
*,
committed: bool,
) -> None:
"""Resolve the in-flight add marker for a request, unblocking any concurrent call awaiting it.

Args:
requests_being_added: The client's map of in-flight `add_batch_of_requests` markers.
request_id: ID of the request whose in-flight add has settled.
committed: Whether the request was committed to the platform.
"""
future = requests_being_added.pop(request_id, None)
if future is not None and not future.done():
future.set_result(committed)


async def resolve_awaited_in_flight(
awaited_in_flight: Iterable[tuple[Request, asyncio.Future[bool]]],
api_response: AddRequestsResponse,
) -> None:
"""Await concurrent in-flight adds of these requests and fold the outcome into `api_response`.

Requests the concurrent add committed are reported as already present; the rest are reported unprocessed
so the caller retries them rather than receiving false success.
"""
for request, future in awaited_in_flight:
if await future:
api_response.processed_requests.append(
ProcessedRequest(
id=unique_key_to_request_id(request.unique_key),
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=request.was_already_handled,
)
)
else:
api_response.unprocessed_requests.append(
UnprocessedRequest(unique_key=request.unique_key, url=request.url, method=request.method)
)
Loading