From 5cca58447c17413d2523b1b5fc2a3a204e3a03cd Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 12 Jun 2026 16:05:49 +0200 Subject: [PATCH 1/9] fix: Resolve AsyncThread.run_coro startup race --- src/apify/scrapy/_async_thread.py | 6 +-- tests/unit/scrapy/test_async_thread.py | 72 ++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 tests/unit/scrapy/test_async_thread.py diff --git a/src/apify/scrapy/_async_thread.py b/src/apify/scrapy/_async_thread.py index 0333531b..f9b2e365 100644 --- a/src/apify/scrapy/_async_thread.py +++ b/src/apify/scrapy/_async_thread.py @@ -52,15 +52,15 @@ def run_coro( The result returned by the coroutine. Raises: - RuntimeError: If the event loop is not running. + RuntimeError: If the event loop has been closed. TimeoutError: If the coroutine does not complete within the timeout. Exception: Any exception raised during coroutine execution. """ if timeout == 'default': timeout = self._default_timeout - if not self._eventloop.is_running(): - raise RuntimeError(f'The coroutine {coro} cannot be executed because the event loop is not running.') + if self._eventloop.is_closed(): + raise RuntimeError(f'The coroutine {coro} cannot be executed because the event loop is closed.') # Submit the coroutine to the event loop running in the other thread. future = asyncio.run_coroutine_threadsafe(coro, self._eventloop) diff --git a/tests/unit/scrapy/test_async_thread.py b/tests/unit/scrapy/test_async_thread.py new file mode 100644 index 00000000..67a15088 --- /dev/null +++ b/tests/unit/scrapy/test_async_thread.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +import threading +import time + +import pytest + +from apify.scrapy._async_thread import AsyncThread + + +async def _return(value: int) -> int: + return value + + +# Normal operation + + +def test_run_coro_returns_coroutine_result() -> None: + """`run_coro` runs a coroutine on the background loop and returns its result.""" + async_thread = AsyncThread() + try: + assert async_thread.run_coro(_return(42)) == 42 + finally: + async_thread.close() + + +# Startup race regression + + +def test_run_coro_succeeds_when_called_before_loop_starts(monkeypatch: pytest.MonkeyPatch) -> None: + """`run_coro` must not fail when invoked before the background thread reaches `run_forever`.""" + gate = threading.Event() + original_start = AsyncThread._start_event_loop + + def gated_start(self: AsyncThread) -> None: + gate.wait() # hold the loop just shy of run_forever() to force the startup race + original_start(self) + + monkeypatch.setattr(AsyncThread, '_start_event_loop', gated_start) + + async_thread = AsyncThread() + try: + # The loop start is gated, so the loop is provably not running yet. + assert not async_thread._eventloop.is_running() + + # Let the loop reach run_forever() shortly after run_coro starts waiting. + def release_gate() -> None: + time.sleep(0.1) + gate.set() + + releaser = threading.Thread(target=release_gate) + releaser.start() + + assert async_thread.run_coro(_return(42)) == 42 + releaser.join() + finally: + gate.set() + async_thread.close() + + +# Closed loop guard + + +def test_run_coro_raises_after_close() -> None: + """`run_coro` raises `RuntimeError` once the loop has been closed.""" + async_thread = AsyncThread() + async_thread.close() + + coro = _return(42) + with pytest.raises(RuntimeError): + async_thread.run_coro(coro) + coro.close() From c0257453d0e676646d3b7f117cd3c06810556667 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 12 Jun 2026 17:12:22 +0200 Subject: [PATCH 2/9] fix(scrapy): async-thread shutdown, duplicate error logs, and timeout setting --- src/apify/scrapy/_async_thread.py | 20 ++-- src/apify/scrapy/extensions/_httpcache.py | 78 ++++++++------- src/apify/scrapy/scheduler.py | 45 ++++----- .../unit/scrapy/extensions/test_httpcache.py | 50 ++++++++++ tests/unit/scrapy/test_async_thread.py | 98 +++++++++++++++++++ tests/unit/scrapy/test_scheduler.py | 36 ++++++- 6 files changed, 256 insertions(+), 71 deletions(-) create mode 100644 tests/unit/scrapy/test_async_thread.py diff --git a/src/apify/scrapy/_async_thread.py b/src/apify/scrapy/_async_thread.py index 0333531b..a6d93d66 100644 --- a/src/apify/scrapy/_async_thread.py +++ b/src/apify/scrapy/_async_thread.py @@ -67,11 +67,11 @@ def run_coro( try: # Wait for the coroutine's result until the specified timeout. return future.result(timeout=timeout.total_seconds()) - except futures.TimeoutError as exc: - logger.exception('Coroutine execution timed out.', exc_info=exc) - raise - except Exception as exc: - logger.exception('Coroutine execution raised an exception.', exc_info=exc) + except futures.TimeoutError: + # `future.result` gave up, but the coroutine keeps running on the loop; cancel it so it does + # not outlive the timeout. The propagated error is logged once by the caller (or Scrapy), so + # this method does not log it itself. + future.cancel() raise def close(self, timeout: timedelta = timedelta(seconds=60)) -> None: @@ -83,9 +83,15 @@ def close(self, timeout: timedelta = timedelta(seconds=60)) -> None: Args: timeout: The maximum number of seconds to wait for the event loop thread to exit. """ + # A repeated close (e.g. a retried shutdown) would call into the already-closed loop and raise + # `RuntimeError: Event loop is closed`. The loop closes itself once it stops, so a second close + # is a no-op. + if self._eventloop.is_closed(): + return + if self._eventloop.is_running(): - # Cancel all pending tasks in the event loop. - self.run_coro(self._shutdown_tasks()) + # Cancel all pending tasks in the event loop, honouring the caller's timeout. + self.run_coro(self._shutdown_tasks(), timeout=timeout) # Schedule the event loop to stop. self._eventloop.call_soon_threadsafe(self._eventloop.stop) diff --git a/src/apify/scrapy/extensions/_httpcache.py b/src/apify/scrapy/extensions/_httpcache.py index 0909c583..296216c5 100644 --- a/src/apify/scrapy/extensions/_httpcache.py +++ b/src/apify/scrapy/extensions/_httpcache.py @@ -4,6 +4,7 @@ import io import re import struct +from datetime import timedelta from logging import getLogger from time import time from typing import TYPE_CHECKING @@ -38,6 +39,8 @@ def __init__(self, settings: BaseSettings) -> None: # Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`). self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100) self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS') + # Caps how long each coroutine run on the background event loop may take; defaults to 60 seconds. + self._async_thread_timeout = timedelta(seconds=settings.getint('APIFY_ASYNC_THREAD_TIMEOUT_SECS', 60)) self._spider: Spider | None = None self._kvs: KeyValueStore | None = None self._fingerprinter: RequestFingerprinterProtocol | None = None @@ -62,7 +65,7 @@ async def open_kvs() -> KeyValueStore: return await KeyValueStore.open(name=kvs_name) logger.debug("Starting background thread for cache storage's event loop") - self._async_thread = AsyncThread() + self._async_thread = AsyncThread(default_timeout=self._async_thread_timeout) logger.debug(f"Opening cache storage's {kvs_name!r} key value store") self._kvs = self._async_thread.run_coro(open_kvs()) @@ -72,45 +75,48 @@ def close_spider(self, _: Spider, current_time: int | None = None) -> None: raise ValueError('Async thread not initialized') logger.info(f'Cleaning up cache items (max {self._expiration_max_items})') - if self._expiration_secs > 0: - if current_time is None: - current_time = int(time()) - - async def expire_kvs() -> None: - if self._kvs is None: - raise ValueError('Key value store not initialized') - # Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order, - # so stale entries may linger. This only reclaims storage; `retrieve_response` already treats - # an expired entry as a cache miss. - processed = 0 - async for item in self._kvs.iterate_keys(): - if processed >= self._expiration_max_items: - break - processed += 1 - value = await self._kvs.get_value(item.key) - try: - gzip_time = read_gzip_time(value) - except Exception as e: - logger.warning(f'Malformed cache item {item.key}: {e}') - await self._kvs.delete_value(item.key) - else: - if self._expiration_secs < current_time - gzip_time: - logger.debug(f'Expired cache item {item.key}') + # The cleanup sweep runs inside `try` so a failure there cannot skip closing the async thread + # (which would leak its event-loop thread); `close` always runs in the `finally`. + try: + if self._expiration_secs > 0: + if current_time is None: + current_time = int(time()) + + async def expire_kvs() -> None: + if self._kvs is None: + raise ValueError('Key value store not initialized') + # Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order, + # so stale entries may linger. This only reclaims storage; `retrieve_response` already treats + # an expired entry as a cache miss. + processed = 0 + async for item in self._kvs.iterate_keys(): + if processed >= self._expiration_max_items: + break + processed += 1 + value = await self._kvs.get_value(item.key) + try: + gzip_time = read_gzip_time(value) + except Exception as e: + logger.warning(f'Malformed cache item {item.key}: {e}') await self._kvs.delete_value(item.key) else: - logger.debug(f'Valid cache item {item.key}') - - self._async_thread.run_coro(expire_kvs()) + if self._expiration_secs < current_time - gzip_time: + logger.debug(f'Expired cache item {item.key}') + await self._kvs.delete_value(item.key) + else: + logger.debug(f'Valid cache item {item.key}') - logger.debug('Closing cache storage') - try: - self._async_thread.close() - except KeyboardInterrupt: - logger.warning('Shutdown interrupted by KeyboardInterrupt!') - except Exception: - logger.exception('Exception occurred while shutting down cache storage') + self._async_thread.run_coro(expire_kvs()) finally: - logger.debug('Cache storage closed') + logger.debug('Closing cache storage') + try: + self._async_thread.close() + except KeyboardInterrupt: + logger.warning('Shutdown interrupted by KeyboardInterrupt!') + except Exception: + logger.exception('Exception occurred while shutting down cache storage') + finally: + logger.debug('Cache storage closed') def retrieve_response(self, _: Spider, request: Request, current_time: int | None = None) -> Response | None: """Retrieve a response from the cache storage.""" diff --git a/src/apify/scrapy/scheduler.py b/src/apify/scrapy/scheduler.py index 2b95d30c..d339c856 100644 --- a/src/apify/scrapy/scheduler.py +++ b/src/apify/scrapy/scheduler.py @@ -1,6 +1,6 @@ from __future__ import annotations -import traceback +from datetime import timedelta from logging import getLogger from typing import TYPE_CHECKING @@ -15,6 +15,7 @@ from apify.storages import RequestQueue if TYPE_CHECKING: + from scrapy.crawler import Crawler from scrapy.http.request import Request from twisted.internet.defer import Deferred @@ -27,7 +28,7 @@ class ApifyScheduler(BaseScheduler): This scheduler requires the asyncio Twisted reactor to be installed. """ - def __init__(self) -> None: + def __init__(self, async_thread_timeout: timedelta = timedelta(seconds=60)) -> None: if not is_asyncio_reactor_installed(): raise ValueError( f'{ApifyScheduler.__qualname__} requires the asyncio Twisted reactor. ' @@ -38,7 +39,17 @@ def __init__(self) -> None: self.spider: Spider | None = None # A thread with the asyncio event loop to run coroutines on. - self._async_thread = AsyncThread() + self._async_thread = AsyncThread(default_timeout=async_thread_timeout) + + @classmethod + def from_crawler(cls, crawler: Crawler) -> ApifyScheduler: + """Create the scheduler, reading the async-thread timeout from the Scrapy settings. + + The `APIFY_ASYNC_THREAD_TIMEOUT_SECS` setting (in seconds) caps how long each coroutine run on the + background event loop may take before timing out; it defaults to 60 seconds. + """ + timeout_secs = crawler.settings.getint('APIFY_ASYNC_THREAD_TIMEOUT_SECS', 60) + return cls(async_thread_timeout=timedelta(seconds=timeout_secs)) def open(self, spider: Spider) -> Deferred[None] | None: """Open the scheduler. @@ -62,7 +73,6 @@ async def open_rq() -> RequestQueue: self._rq = self._async_thread.run_coro(open_rq()) except Exception: self._async_thread.close() - traceback.print_exc() raise return None @@ -97,12 +107,7 @@ def has_pending_requests(self) -> bool: if not isinstance(self._rq, RequestQueue): raise TypeError('self._rq must be an instance of the RequestQueue class') - try: - is_finished = self._async_thread.run_coro(self._rq.is_finished()) - except Exception: - traceback.print_exc() - raise - + is_finished = self._async_thread.run_coro(self._rq.is_finished()) return not is_finished def enqueue_request(self, request: Request) -> bool: @@ -130,12 +135,7 @@ def enqueue_request(self, request: Request) -> bool: if not isinstance(self._rq, RequestQueue): raise TypeError('self._rq must be an instance of the RequestQueue class') - try: - result = self._async_thread.run_coro(self._rq.add_request(apify_request)) - except Exception: - traceback.print_exc() - raise - + result = self._async_thread.run_coro(self._rq.add_request(apify_request)) logger.debug(f'rq.add_request result: {result}') return not bool(result.was_already_present) @@ -149,12 +149,7 @@ def next_request(self) -> Request | None: if not isinstance(self._rq, RequestQueue): raise TypeError('self._rq must be an instance of the RequestQueue class') - try: - apify_request = self._async_thread.run_coro(self._rq.fetch_next_request()) - except Exception: - traceback.print_exc() - raise - + apify_request = self._async_thread.run_coro(self._rq.fetch_next_request()) logger.debug(f'Fetched apify_request: {apify_request}') if apify_request is None: return None @@ -173,11 +168,7 @@ def next_request(self) -> Request | None: # Mark the request as handled. This runs even when reconstruction failed above: an unrecoverable entry # (a corrupt or legacy payload) must still be consumed, otherwise the queue would keep handing it back # forever. Retrying genuine failures is the RetryMiddleware's job. - try: - self._async_thread.run_coro(self._rq.mark_request_as_handled(apify_request)) - except Exception: - traceback.print_exc() - raise + self._async_thread.run_coro(self._rq.mark_request_as_handled(apify_request)) if scrapy_request is None: return None diff --git a/tests/unit/scrapy/extensions/test_httpcache.py b/tests/unit/scrapy/extensions/test_httpcache.py index 6f34853b..b7db7910 100644 --- a/tests/unit/scrapy/extensions/test_httpcache.py +++ b/tests/unit/scrapy/extensions/test_httpcache.py @@ -5,6 +5,7 @@ import io import json import pickle +from datetime import timedelta from time import time from types import SimpleNamespace from typing import TYPE_CHECKING, Any, cast @@ -274,6 +275,55 @@ def test_close_spider_respects_max_items() -> None: assert len(kvs.deleted) == 2 +def test_close_spider_closes_thread_even_when_cleanup_fails() -> None: + """If the expiration sweep raises, the async thread is still closed rather than leaked.""" + closed: list[bool] = [] + + class _FailingAsyncThread: + def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any: + coro.close() # we never run it; just avoid an un-awaited coroutine warning + raise RuntimeError('cleanup boom') + + def close(self, *_: Any, **__: Any) -> None: + closed.append(True) + + storage = ApifyCacheStorage(Settings({'HTTPCACHE_EXPIRATION_SECS': 100})) + storage._async_thread = _FailingAsyncThread() # ty: ignore[invalid-assignment] + storage._kvs = _FakeKvs(None) # ty: ignore[invalid-assignment] + + with pytest.raises(RuntimeError, match='cleanup boom'): + storage.close_spider(None, current_time=1000) # ty: ignore[invalid-argument-type] + + assert closed == [True] + + +def test_cache_storage_reads_async_thread_timeout_setting() -> None: + """`APIFY_ASYNC_THREAD_TIMEOUT_SECS` is read into the storage's async-thread timeout.""" + storage = ApifyCacheStorage(Settings({'APIFY_ASYNC_THREAD_TIMEOUT_SECS': 77})) + assert storage._async_thread_timeout == timedelta(seconds=77) + + +def test_open_spider_passes_timeout_to_async_thread(monkeypatch: pytest.MonkeyPatch) -> None: + """`open_spider` constructs the async thread with the configured timeout.""" + captured: dict[str, Any] = {} + + class _RecordingAsyncThread: + def __init__(self, default_timeout: timedelta | None = None) -> None: + captured['default_timeout'] = default_timeout + + def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any: + coro.close() # we never run it; just avoid an un-awaited coroutine warning + return _FakeKvs(None) + + monkeypatch.setattr('apify.scrapy.extensions._httpcache.AsyncThread', _RecordingAsyncThread) + + storage = ApifyCacheStorage(Settings({'APIFY_ASYNC_THREAD_TIMEOUT_SECS': 77})) + spider = SimpleNamespace(name='myspider', crawler=SimpleNamespace(request_fingerprinter=_FakeFingerprinter())) + storage.open_spider(cast('Any', spider)) + + assert captured['default_timeout'] == timedelta(seconds=77) + + @pytest.mark.parametrize( ('spider_name', 'expected'), [ diff --git a/tests/unit/scrapy/test_async_thread.py b/tests/unit/scrapy/test_async_thread.py new file mode 100644 index 00000000..d624f465 --- /dev/null +++ b/tests/unit/scrapy/test_async_thread.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import asyncio +import logging +import threading +import time +from concurrent import futures +from datetime import timedelta +from typing import Any, Literal + +import pytest + +from apify.scrapy._async_thread import AsyncThread + + +def _wait_until_running(thread: AsyncThread, timeout: float = 2.0) -> None: + """Block until the background event loop is running, so `run_coro` does not race the thread startup.""" + deadline = time.monotonic() + timeout + while not thread._eventloop.is_running(): + if time.monotonic() > deadline: + raise AssertionError('The event loop did not start in time.') + time.sleep(0.01) + + +# Coroutine execution + + +def test_run_coro_cancels_the_coroutine_on_timeout() -> None: + """A timed-out coroutine is cancelled, not left running on the background loop.""" + thread = AsyncThread() + _wait_until_running(thread) + + started = threading.Event() + cancelled = threading.Event() + + async def slow() -> None: + started.set() + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + cancelled.set() + raise + + with pytest.raises(futures.TimeoutError): + thread.run_coro(slow(), timeout=timedelta(seconds=0.1)) + + assert started.wait(timeout=2) + assert cancelled.wait(timeout=2), 'the timed-out coroutine was left running instead of being cancelled' + + thread.close() + + +def test_run_coro_does_not_log_on_exception(caplog: pytest.LogCaptureFixture) -> None: + """`run_coro` propagates a failing coroutine without logging it itself (the caller/Scrapy reports it once).""" + thread = AsyncThread() + _wait_until_running(thread) + + async def boom() -> None: + raise RuntimeError('boom') + + with caplog.at_level(logging.DEBUG, logger='apify.scrapy._async_thread'), pytest.raises(RuntimeError, match='boom'): + thread.run_coro(boom()) + + thread.close() + + assert [record for record in caplog.records if record.levelno >= logging.ERROR] == [] + + +# Shutdown + + +def test_close_is_idempotent() -> None: + """Calling `close` twice is a no-op the second time, not a `RuntimeError` on the closed loop.""" + thread = AsyncThread() + _wait_until_running(thread) + thread.run_coro(asyncio.sleep(0)) + + thread.close() + thread.close() # must not raise + + +def test_close_passes_its_timeout_to_the_shutdown_step(monkeypatch: pytest.MonkeyPatch) -> None: + """`close(timeout=...)` honours that timeout for the task-cancellation step, not only the thread join.""" + thread = AsyncThread() + _wait_until_running(thread) + thread.run_coro(asyncio.sleep(0)) + + recorded: list[timedelta | str] = [] + original = thread.run_coro + + def spy(coro: Any, timeout: timedelta | Literal['default'] = 'default') -> Any: + recorded.append(timeout) + return original(coro, timeout=timeout) + + monkeypatch.setattr(thread, 'run_coro', spy) + thread.close(timeout=timedelta(seconds=42)) + + assert recorded == [timedelta(seconds=42)] diff --git a/tests/unit/scrapy/test_scheduler.py b/tests/unit/scrapy/test_scheduler.py index c4a87622..7a6926e1 100644 --- a/tests/unit/scrapy/test_scheduler.py +++ b/tests/unit/scrapy/test_scheduler.py @@ -1,12 +1,14 @@ from __future__ import annotations import logging +from datetime import timedelta from types import SimpleNamespace -from typing import cast +from typing import Any, cast from unittest import mock import pytest from scrapy import Request, Spider +from scrapy.settings import Settings from apify import Request as ApifyRequest from apify.scrapy.scheduler import ApifyScheduler @@ -151,3 +153,35 @@ def test_next_request_returns_none_when_queue_empty(scheduler: ApifyScheduler) - assert result is None rq.mark_request_as_handled.assert_not_called() + + +def test_next_request_does_not_print_traceback_to_stderr( + scheduler: ApifyScheduler, + capsys: pytest.CaptureFixture[str], +) -> None: + """A failure propagates as-is, without `traceback.print_exc()` printing a second copy past the log formatter.""" + async_thread = cast('mock.MagicMock', scheduler._async_thread) + async_thread.run_coro.side_effect = RuntimeError('boom') + + with pytest.raises(RuntimeError, match='boom'): + scheduler.next_request() + + assert capsys.readouterr().err == '' + + +def test_from_crawler_reads_async_thread_timeout_setting(monkeypatch: pytest.MonkeyPatch) -> None: + """`from_crawler` wires the `APIFY_ASYNC_THREAD_TIMEOUT_SECS` setting into the async thread's timeout.""" + monkeypatch.setattr('apify.scrapy.scheduler.is_asyncio_reactor_installed', lambda: True) + + captured: dict[str, Any] = {} + + class _RecordingAsyncThread: + def __init__(self, default_timeout: timedelta | None = None) -> None: + captured['default_timeout'] = default_timeout + + monkeypatch.setattr('apify.scrapy.scheduler.AsyncThread', _RecordingAsyncThread) + + crawler = SimpleNamespace(settings=Settings({'APIFY_ASYNC_THREAD_TIMEOUT_SECS': 123})) + ApifyScheduler.from_crawler(cast('Any', crawler)) + + assert captured['default_timeout'] == timedelta(seconds=123) From 85404709ce993a625d0f9f8e6811dfcd5164ee99 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 12 Jun 2026 19:07:24 +0200 Subject: [PATCH 3/9] test: remove redundant section-header comments --- tests/unit/scrapy/test_async_thread.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/unit/scrapy/test_async_thread.py b/tests/unit/scrapy/test_async_thread.py index 67a15088..c8975b9f 100644 --- a/tests/unit/scrapy/test_async_thread.py +++ b/tests/unit/scrapy/test_async_thread.py @@ -12,9 +12,6 @@ async def _return(value: int) -> int: return value -# Normal operation - - def test_run_coro_returns_coroutine_result() -> None: """`run_coro` runs a coroutine on the background loop and returns its result.""" async_thread = AsyncThread() @@ -24,9 +21,6 @@ def test_run_coro_returns_coroutine_result() -> None: async_thread.close() -# Startup race regression - - def test_run_coro_succeeds_when_called_before_loop_starts(monkeypatch: pytest.MonkeyPatch) -> None: """`run_coro` must not fail when invoked before the background thread reaches `run_forever`.""" gate = threading.Event() @@ -58,9 +52,6 @@ def release_gate() -> None: async_thread.close() -# Closed loop guard - - def test_run_coro_raises_after_close() -> None: """`run_coro` raises `RuntimeError` once the loop has been closed.""" async_thread = AsyncThread() From 50aa5396e872304a8a73dbe40261ceae50f762d5 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 12 Jun 2026 19:10:08 +0200 Subject: [PATCH 4/9] fix(scrapy): keep traceback.print_exc() on background-loop coroutine errors --- src/apify/scrapy/extensions/_httpcache.py | 27 +++++++++++++++++---- src/apify/scrapy/scheduler.py | 29 +++++++++++++++++++---- tests/unit/scrapy/test_async_thread.py | 15 +++--------- tests/unit/scrapy/test_scheduler.py | 8 ++++--- 4 files changed, 55 insertions(+), 24 deletions(-) diff --git a/src/apify/scrapy/extensions/_httpcache.py b/src/apify/scrapy/extensions/_httpcache.py index 296216c5..1c109867 100644 --- a/src/apify/scrapy/extensions/_httpcache.py +++ b/src/apify/scrapy/extensions/_httpcache.py @@ -4,6 +4,7 @@ import io import re import struct +import traceback from datetime import timedelta from logging import getLogger from time import time @@ -39,8 +40,8 @@ def __init__(self, settings: BaseSettings) -> None: # Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`). self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100) self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS') - # Caps how long each coroutine run on the background event loop may take; defaults to 60 seconds. self._async_thread_timeout = timedelta(seconds=settings.getint('APIFY_ASYNC_THREAD_TIMEOUT_SECS', 60)) + """Caps how long each coroutine run on the background event loop may take; defaults to 60 seconds.""" self._spider: Spider | None = None self._kvs: KeyValueStore | None = None self._fingerprinter: RequestFingerprinterProtocol | None = None @@ -67,7 +68,11 @@ async def open_kvs() -> KeyValueStore: logger.debug("Starting background thread for cache storage's event loop") self._async_thread = AsyncThread(default_timeout=self._async_thread_timeout) logger.debug(f"Opening cache storage's {kvs_name!r} key value store") - self._kvs = self._async_thread.run_coro(open_kvs()) + try: + self._kvs = self._async_thread.run_coro(open_kvs()) + except Exception: + traceback.print_exc() + raise def close_spider(self, _: Spider, current_time: int | None = None) -> None: """Close the cache storage for a spider.""" @@ -106,7 +111,11 @@ async def expire_kvs() -> None: else: logger.debug(f'Valid cache item {item.key}') - self._async_thread.run_coro(expire_kvs()) + try: + self._async_thread.run_coro(expire_kvs()) + except Exception: + traceback.print_exc() + raise finally: logger.debug('Closing cache storage') try: @@ -128,7 +137,11 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non raise ValueError('Request fingerprinter not initialized') key = self._fingerprinter.fingerprint(request).hex() - value = self._async_thread.run_coro(self._kvs.get_value(key)) + try: + value = self._async_thread.run_coro(self._kvs.get_value(key)) + except Exception: + traceback.print_exc() + raise if value is None: logger.debug('Cache miss', extra={'request': request}) @@ -175,7 +188,11 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non 'body': response.body, } value = to_gzip(data) - self._async_thread.run_coro(self._kvs.set_value(key, value)) + try: + self._async_thread.run_coro(self._kvs.set_value(key, value)) + except Exception: + traceback.print_exc() + raise def to_gzip(data: dict, mtime: int | None = None) -> bytes: diff --git a/src/apify/scrapy/scheduler.py b/src/apify/scrapy/scheduler.py index d339c856..69b9b8cf 100644 --- a/src/apify/scrapy/scheduler.py +++ b/src/apify/scrapy/scheduler.py @@ -1,5 +1,6 @@ from __future__ import annotations +import traceback from datetime import timedelta from logging import getLogger from typing import TYPE_CHECKING @@ -73,6 +74,7 @@ async def open_rq() -> RequestQueue: self._rq = self._async_thread.run_coro(open_rq()) except Exception: self._async_thread.close() + traceback.print_exc() raise return None @@ -107,7 +109,12 @@ def has_pending_requests(self) -> bool: if not isinstance(self._rq, RequestQueue): raise TypeError('self._rq must be an instance of the RequestQueue class') - is_finished = self._async_thread.run_coro(self._rq.is_finished()) + try: + is_finished = self._async_thread.run_coro(self._rq.is_finished()) + except Exception: + traceback.print_exc() + raise + return not is_finished def enqueue_request(self, request: Request) -> bool: @@ -135,7 +142,12 @@ def enqueue_request(self, request: Request) -> bool: if not isinstance(self._rq, RequestQueue): raise TypeError('self._rq must be an instance of the RequestQueue class') - result = self._async_thread.run_coro(self._rq.add_request(apify_request)) + try: + result = self._async_thread.run_coro(self._rq.add_request(apify_request)) + except Exception: + traceback.print_exc() + raise + logger.debug(f'rq.add_request result: {result}') return not bool(result.was_already_present) @@ -149,7 +161,12 @@ def next_request(self) -> Request | None: if not isinstance(self._rq, RequestQueue): raise TypeError('self._rq must be an instance of the RequestQueue class') - apify_request = self._async_thread.run_coro(self._rq.fetch_next_request()) + try: + apify_request = self._async_thread.run_coro(self._rq.fetch_next_request()) + except Exception: + traceback.print_exc() + raise + logger.debug(f'Fetched apify_request: {apify_request}') if apify_request is None: return None @@ -168,7 +185,11 @@ def next_request(self) -> Request | None: # Mark the request as handled. This runs even when reconstruction failed above: an unrecoverable entry # (a corrupt or legacy payload) must still be consumed, otherwise the queue would keep handing it back # forever. Retrying genuine failures is the RetryMiddleware's job. - self._async_thread.run_coro(self._rq.mark_request_as_handled(apify_request)) + try: + self._async_thread.run_coro(self._rq.mark_request_as_handled(apify_request)) + except Exception: + traceback.print_exc() + raise if scrapy_request is None: return None diff --git a/tests/unit/scrapy/test_async_thread.py b/tests/unit/scrapy/test_async_thread.py index d624f465..409a8e42 100644 --- a/tests/unit/scrapy/test_async_thread.py +++ b/tests/unit/scrapy/test_async_thread.py @@ -3,26 +3,20 @@ import asyncio import logging import threading -import time from concurrent import futures from datetime import timedelta from typing import Any, Literal import pytest +from ..._utils import poll_until_condition from apify.scrapy._async_thread import AsyncThread def _wait_until_running(thread: AsyncThread, timeout: float = 2.0) -> None: """Block until the background event loop is running, so `run_coro` does not race the thread startup.""" - deadline = time.monotonic() + timeout - while not thread._eventloop.is_running(): - if time.monotonic() > deadline: - raise AssertionError('The event loop did not start in time.') - time.sleep(0.01) - - -# Coroutine execution + if not asyncio.run(poll_until_condition(thread._eventloop.is_running, timeout=timeout, poll_interval=0.01)): + raise AssertionError('The event loop did not start in time.') def test_run_coro_cancels_the_coroutine_on_timeout() -> None: @@ -66,9 +60,6 @@ async def boom() -> None: assert [record for record in caplog.records if record.levelno >= logging.ERROR] == [] -# Shutdown - - def test_close_is_idempotent() -> None: """Calling `close` twice is a no-op the second time, not a `RuntimeError` on the closed loop.""" thread = AsyncThread() diff --git a/tests/unit/scrapy/test_scheduler.py b/tests/unit/scrapy/test_scheduler.py index 7a6926e1..d85dd76a 100644 --- a/tests/unit/scrapy/test_scheduler.py +++ b/tests/unit/scrapy/test_scheduler.py @@ -155,18 +155,20 @@ def test_next_request_returns_none_when_queue_empty(scheduler: ApifyScheduler) - rq.mark_request_as_handled.assert_not_called() -def test_next_request_does_not_print_traceback_to_stderr( +def test_next_request_prints_traceback_to_stderr( scheduler: ApifyScheduler, capsys: pytest.CaptureFixture[str], ) -> None: - """A failure propagates as-is, without `traceback.print_exc()` printing a second copy past the log formatter.""" + """A failure in the coroutine run prints a traceback to stderr via `traceback.print_exc()` before propagating.""" async_thread = cast('mock.MagicMock', scheduler._async_thread) async_thread.run_coro.side_effect = RuntimeError('boom') with pytest.raises(RuntimeError, match='boom'): scheduler.next_request() - assert capsys.readouterr().err == '' + captured = capsys.readouterr() + assert 'Traceback (most recent call last)' in captured.err + assert 'RuntimeError: boom' in captured.err def test_from_crawler_reads_async_thread_timeout_setting(monkeypatch: pytest.MonkeyPatch) -> None: From 98edce20e08ea9112cb9db1761d6042c34c346f3 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Sat, 13 Jun 2026 09:55:12 +0200 Subject: [PATCH 5/9] fix(scrapy): don't leak the async-thread event loop on cache-open failure or shutdown error --- src/apify/scrapy/_async_thread.py | 26 ++++++++++--------- src/apify/scrapy/extensions/_httpcache.py | 3 +++ .../unit/scrapy/extensions/test_httpcache.py | 26 +++++++++++++++++++ tests/unit/scrapy/test_async_thread.py | 18 +++++++++++++ 4 files changed, 61 insertions(+), 12 deletions(-) diff --git a/src/apify/scrapy/_async_thread.py b/src/apify/scrapy/_async_thread.py index a6d93d66..22a9f5cd 100644 --- a/src/apify/scrapy/_async_thread.py +++ b/src/apify/scrapy/_async_thread.py @@ -89,20 +89,22 @@ def close(self, timeout: timedelta = timedelta(seconds=60)) -> None: if self._eventloop.is_closed(): return - if self._eventloop.is_running(): - # Cancel all pending tasks in the event loop, honouring the caller's timeout. - self.run_coro(self._shutdown_tasks(), timeout=timeout) - - # Schedule the event loop to stop. - self._eventloop.call_soon_threadsafe(self._eventloop.stop) + try: + if self._eventloop.is_running(): + # Cancel all pending tasks in the event loop, honouring the caller's timeout. + self.run_coro(self._shutdown_tasks(), timeout=timeout) + finally: + # Stop the loop and join its thread even if cancelling the pending tasks above raised or timed + # out. Skipping this would leave the loop running and leak its thread. + self._eventloop.call_soon_threadsafe(self._eventloop.stop) - # Wait for the event loop thread to finish execution. - self._thread.join(timeout=timeout.total_seconds()) + # Wait for the event loop thread to finish execution. + self._thread.join(timeout=timeout.total_seconds()) - # If the thread is still running after the timeout, force a shutdown. - if self._thread.is_alive(): - logger.warning('Event loop thread did not exit cleanly! Forcing shutdown...') - self._force_exit_event_loop() + # If the thread is still running after the timeout, force a shutdown. + if self._thread.is_alive(): + logger.warning('Event loop thread did not exit cleanly! Forcing shutdown...') + self._force_exit_event_loop() def _start_event_loop(self) -> None: """Set up and run the asyncio event loop in the dedicated thread.""" diff --git a/src/apify/scrapy/extensions/_httpcache.py b/src/apify/scrapy/extensions/_httpcache.py index 1c109867..dd644b8a 100644 --- a/src/apify/scrapy/extensions/_httpcache.py +++ b/src/apify/scrapy/extensions/_httpcache.py @@ -71,6 +71,9 @@ async def open_kvs() -> KeyValueStore: try: self._kvs = self._async_thread.run_coro(open_kvs()) except Exception: + # Opening the key-value store failed, so close the freshly started async thread instead of + # leaking its event-loop thread (`close_spider` may never run if `open_spider` fails). + self._async_thread.close() traceback.print_exc() raise diff --git a/tests/unit/scrapy/extensions/test_httpcache.py b/tests/unit/scrapy/extensions/test_httpcache.py index b7db7910..04a73eba 100644 --- a/tests/unit/scrapy/extensions/test_httpcache.py +++ b/tests/unit/scrapy/extensions/test_httpcache.py @@ -324,6 +324,32 @@ def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any: assert captured['default_timeout'] == timedelta(seconds=77) +def test_open_spider_closes_async_thread_when_open_kvs_fails(monkeypatch: pytest.MonkeyPatch) -> None: + """If opening the key-value store fails, `open_spider` closes the async thread rather than leaking it.""" + closed: list[bool] = [] + + class _FailingAsyncThread: + def __init__(self, default_timeout: timedelta | None = None) -> None: + pass + + def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any: + coro.close() # we never run it; just avoid an un-awaited coroutine warning + raise RuntimeError('open boom') + + def close(self, *_: Any, **__: Any) -> None: + closed.append(True) + + monkeypatch.setattr('apify.scrapy.extensions._httpcache.AsyncThread', _FailingAsyncThread) + + storage = ApifyCacheStorage(Settings()) + spider = SimpleNamespace(name='myspider', crawler=SimpleNamespace(request_fingerprinter=_FakeFingerprinter())) + + with pytest.raises(RuntimeError, match='open boom'): + storage.open_spider(cast('Any', spider)) + + assert closed == [True] + + @pytest.mark.parametrize( ('spider_name', 'expected'), [ diff --git a/tests/unit/scrapy/test_async_thread.py b/tests/unit/scrapy/test_async_thread.py index 409a8e42..34297834 100644 --- a/tests/unit/scrapy/test_async_thread.py +++ b/tests/unit/scrapy/test_async_thread.py @@ -87,3 +87,21 @@ def spy(coro: Any, timeout: timedelta | Literal['default'] = 'default') -> Any: thread.close(timeout=timedelta(seconds=42)) assert recorded == [timedelta(seconds=42)] + + +def test_close_stops_and_joins_thread_even_when_task_cancellation_fails(monkeypatch: pytest.MonkeyPatch) -> None: + """If cancelling the pending tasks fails, `close` still stops the loop and joins the thread, not leaks it.""" + thread = AsyncThread() + _wait_until_running(thread) + + async def boom() -> None: + raise RuntimeError('shutdown boom') + + monkeypatch.setattr(thread, '_shutdown_tasks', boom) + + with pytest.raises(RuntimeError, match='shutdown boom'): + thread.close(timeout=timedelta(seconds=5)) + + # The loop was stopped and its thread joined despite the failing cancellation, so nothing is left running. + assert not thread._thread.is_alive() + assert thread._eventloop.is_closed() From 29ec5ff8082a28126451763f5a2072eee63e9460 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 17 Jun 2026 14:24:03 +0200 Subject: [PATCH 6/9] fix(scrapy): apply timeout to async-thread close, log errors via logger.exception --- src/apify/scrapy/_async_thread.py | 8 ++++++-- src/apify/scrapy/extensions/_httpcache.py | 17 ++++++++++------- src/apify/scrapy/scheduler.py | 18 +++++++++++------- tests/unit/scrapy/test_scheduler.py | 17 ++++++++++------- 4 files changed, 37 insertions(+), 23 deletions(-) diff --git a/src/apify/scrapy/_async_thread.py b/src/apify/scrapy/_async_thread.py index 4265fff0..dcd7946e 100644 --- a/src/apify/scrapy/_async_thread.py +++ b/src/apify/scrapy/_async_thread.py @@ -74,15 +74,19 @@ def run_coro( future.cancel() raise - def close(self, timeout: timedelta = timedelta(seconds=60)) -> None: + def close(self, timeout: timedelta | None = None) -> None: """Close the event loop and its thread gracefully. This method cancels all pending tasks, stops the event loop, and waits for the thread to exit. If the thread does not exit within the given timeout, a forced shutdown is attempted. Args: - timeout: The maximum number of seconds to wait for the event loop thread to exit. + timeout: The maximum time to wait for the event loop thread to exit. Pass `None` to use the + `default_timeout` passed to the constructor. """ + if timeout is None: + timeout = self._default_timeout + # A repeated close (e.g. a retried shutdown) would call into the already-closed loop and raise # `RuntimeError: Event loop is closed`. The loop closes itself once it stops, so a second close # is a no-op. diff --git a/src/apify/scrapy/extensions/_httpcache.py b/src/apify/scrapy/extensions/_httpcache.py index dd644b8a..a85995d2 100644 --- a/src/apify/scrapy/extensions/_httpcache.py +++ b/src/apify/scrapy/extensions/_httpcache.py @@ -4,7 +4,6 @@ import io import re import struct -import traceback from datetime import timedelta from logging import getLogger from time import time @@ -71,10 +70,14 @@ async def open_kvs() -> KeyValueStore: try: self._kvs = self._async_thread.run_coro(open_kvs()) except Exception: + logger.exception('Failed to open the cache key-value store.') # Opening the key-value store failed, so close the freshly started async thread instead of - # leaking its event-loop thread (`close_spider` may never run if `open_spider` fails). - self._async_thread.close() - traceback.print_exc() + # leaking its event-loop thread (`close_spider` may never run if `open_spider` fails). Guard + # the close so a secondary failure here cannot mask the original error. + try: + self._async_thread.close() + except Exception: + logger.exception('Failed to close the async thread after a failed cache storage open.') raise def close_spider(self, _: Spider, current_time: int | None = None) -> None: @@ -117,7 +120,7 @@ async def expire_kvs() -> None: try: self._async_thread.run_coro(expire_kvs()) except Exception: - traceback.print_exc() + logger.exception('Failed to clean up expired cache items.') raise finally: logger.debug('Closing cache storage') @@ -143,7 +146,7 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non try: value = self._async_thread.run_coro(self._kvs.get_value(key)) except Exception: - traceback.print_exc() + logger.exception('Failed to retrieve a response from the cache.') raise if value is None: @@ -194,7 +197,7 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non try: self._async_thread.run_coro(self._kvs.set_value(key, value)) except Exception: - traceback.print_exc() + logger.exception('Failed to store a response in the cache.') raise diff --git a/src/apify/scrapy/scheduler.py b/src/apify/scrapy/scheduler.py index 69b9b8cf..aed566e4 100644 --- a/src/apify/scrapy/scheduler.py +++ b/src/apify/scrapy/scheduler.py @@ -1,6 +1,5 @@ from __future__ import annotations -import traceback from datetime import timedelta from logging import getLogger from typing import TYPE_CHECKING @@ -73,8 +72,13 @@ async def open_rq() -> RequestQueue: try: self._rq = self._async_thread.run_coro(open_rq()) except Exception: - self._async_thread.close() - traceback.print_exc() + logger.exception('Failed to open the request queue.') + # Close the freshly started async thread so a failed open does not leak its event-loop thread. + # Guard the close so a secondary failure here cannot mask the original error. + try: + self._async_thread.close() + except Exception: + logger.exception('Failed to close the async thread after a failed scheduler open.') raise return None @@ -112,7 +116,7 @@ def has_pending_requests(self) -> bool: try: is_finished = self._async_thread.run_coro(self._rq.is_finished()) except Exception: - traceback.print_exc() + logger.exception('Failed to check whether the request queue is finished.') raise return not is_finished @@ -145,7 +149,7 @@ def enqueue_request(self, request: Request) -> bool: try: result = self._async_thread.run_coro(self._rq.add_request(apify_request)) except Exception: - traceback.print_exc() + logger.exception('Failed to enqueue the request to the request queue.') raise logger.debug(f'rq.add_request result: {result}') @@ -164,7 +168,7 @@ def next_request(self) -> Request | None: try: apify_request = self._async_thread.run_coro(self._rq.fetch_next_request()) except Exception: - traceback.print_exc() + logger.exception('Failed to fetch the next request from the request queue.') raise logger.debug(f'Fetched apify_request: {apify_request}') @@ -188,7 +192,7 @@ def next_request(self) -> Request | None: try: self._async_thread.run_coro(self._rq.mark_request_as_handled(apify_request)) except Exception: - traceback.print_exc() + logger.exception('Failed to mark the request as handled in the request queue.') raise if scrapy_request is None: diff --git a/tests/unit/scrapy/test_scheduler.py b/tests/unit/scrapy/test_scheduler.py index d85dd76a..050e9e52 100644 --- a/tests/unit/scrapy/test_scheduler.py +++ b/tests/unit/scrapy/test_scheduler.py @@ -155,20 +155,23 @@ def test_next_request_returns_none_when_queue_empty(scheduler: ApifyScheduler) - rq.mark_request_as_handled.assert_not_called() -def test_next_request_prints_traceback_to_stderr( +def test_next_request_logs_exception_before_propagating( scheduler: ApifyScheduler, - capsys: pytest.CaptureFixture[str], + caplog: pytest.LogCaptureFixture, ) -> None: - """A failure in the coroutine run prints a traceback to stderr via `traceback.print_exc()` before propagating.""" + """A failure in the coroutine run is logged with its traceback via `logger.exception` before propagating.""" async_thread = cast('mock.MagicMock', scheduler._async_thread) async_thread.run_coro.side_effect = RuntimeError('boom') - with pytest.raises(RuntimeError, match='boom'): + with caplog.at_level(logging.ERROR, logger='apify.scrapy.scheduler'), pytest.raises(RuntimeError, match='boom'): scheduler.next_request() - captured = capsys.readouterr() - assert 'Traceback (most recent call last)' in captured.err - assert 'RuntimeError: boom' in captured.err + errors = [record for record in caplog.records if record.levelno >= logging.ERROR] + assert len(errors) == 1 + (error,) = errors + assert error.exc_info is not None + assert isinstance(error.exc_info[1], RuntimeError) + assert str(error.exc_info[1]) == 'boom' def test_from_crawler_reads_async_thread_timeout_setting(monkeypatch: pytest.MonkeyPatch) -> None: From 8b7b8c41c23f41c6e22bce212d8c8d555f889f59 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 17 Jun 2026 14:36:33 +0200 Subject: [PATCH 7/9] refactor(scrapy): drop redundant exception object from logger.exception (TRY401) --- src/apify/scrapy/_async_thread.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/apify/scrapy/_async_thread.py b/src/apify/scrapy/_async_thread.py index dcd7946e..90f6f4cb 100644 --- a/src/apify/scrapy/_async_thread.py +++ b/src/apify/scrapy/_async_thread.py @@ -137,5 +137,5 @@ def _force_exit_event_loop(self) -> None: logger.info('Forced shutdown of the event loop and its thread...') self._eventloop.call_soon_threadsafe(self._eventloop.stop) self._thread.join(timeout=5) - except Exception as exc: - logger.exception('Exception occurred during forced event loop shutdown.', exc_info=exc) + except Exception: + logger.exception('Exception occurred during forced event loop shutdown.') From 5e564242a6a8a263c8efb6dd24a5aaea65187492 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 17 Jun 2026 14:39:16 +0200 Subject: [PATCH 8/9] update --- src/apify/scrapy/extensions/_httpcache.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/apify/scrapy/extensions/_httpcache.py b/src/apify/scrapy/extensions/_httpcache.py index a85995d2..d65e48a6 100644 --- a/src/apify/scrapy/extensions/_httpcache.py +++ b/src/apify/scrapy/extensions/_httpcache.py @@ -107,8 +107,8 @@ async def expire_kvs() -> None: value = await self._kvs.get_value(item.key) try: gzip_time = read_gzip_time(value) - except Exception as e: - logger.warning(f'Malformed cache item {item.key}: {e}') + except Exception as exc: + logger.warning(f'Malformed cache item {item.key}: {exc}') await self._kvs.delete_value(item.key) else: if self._expiration_secs < current_time - gzip_time: From 4a5a77e893aa7c372f67236b06a8d4f7bb3905a4 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 17 Jun 2026 14:52:20 +0200 Subject: [PATCH 9/9] refactor(scrapy): align warning/exception logging in skip and cleanup paths --- src/apify/scrapy/extensions/_httpcache.py | 7 ++++--- src/apify/scrapy/requests.py | 4 ++-- src/apify/scrapy/scheduler.py | 4 ++-- tests/unit/scrapy/extensions/test_httpcache.py | 8 +++++--- tests/unit/scrapy/test_scheduler.py | 2 +- 5 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/apify/scrapy/extensions/_httpcache.py b/src/apify/scrapy/extensions/_httpcache.py index d65e48a6..c32647f0 100644 --- a/src/apify/scrapy/extensions/_httpcache.py +++ b/src/apify/scrapy/extensions/_httpcache.py @@ -86,8 +86,8 @@ def close_spider(self, _: Spider, current_time: int | None = None) -> None: raise ValueError('Async thread not initialized') logger.info(f'Cleaning up cache items (max {self._expiration_max_items})') - # The cleanup sweep runs inside `try` so a failure there cannot skip closing the async thread - # (which would leak its event-loop thread); `close` always runs in the `finally`. + # `close` always runs in the `finally`, so neither a cleanup failure below nor an early return can leak + # the event-loop thread. try: if self._expiration_secs > 0: if current_time is None: @@ -117,11 +117,12 @@ async def expire_kvs() -> None: else: logger.debug(f'Valid cache item {item.key}') + # Best-effort: log and swallow a cleanup failure rather than raise. The sweep only reclaims + # storage, so failing it must not turn a normal spider close into an error. try: self._async_thread.run_coro(expire_kvs()) except Exception: logger.exception('Failed to clean up expired cache items.') - raise finally: logger.debug('Closing cache storage') try: diff --git a/src/apify/scrapy/requests.py b/src/apify/scrapy/requests.py index 1524d7aa..610b1d9d 100644 --- a/src/apify/scrapy/requests.py +++ b/src/apify/scrapy/requests.py @@ -119,8 +119,8 @@ def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequ apify_request = ApifyRequest.from_url(**request_kwargs) scrapy_request_dict = scrapy_request.to_dict(spider=spider) - except Exception as exc: - logger.warning(f'Conversion of Scrapy request {scrapy_request} to Apify request failed; {exc}') + except Exception: + logger.exception(f'Conversion of Scrapy request {scrapy_request} to Apify request failed; skipping it.') return None # Serialize the Scrapy request as JSON under 'scrapy_request'. Kept outside the broad except above so diff --git a/src/apify/scrapy/scheduler.py b/src/apify/scrapy/scheduler.py index aed566e4..77e2fdd9 100644 --- a/src/apify/scrapy/scheduler.py +++ b/src/apify/scrapy/scheduler.py @@ -182,8 +182,8 @@ def next_request(self) -> Request | None: # the whole run, so on failure it is logged and skipped (None) rather than propagating. try: scrapy_request = to_scrapy_request(apify_request, spider=self.spider) - except Exception: - logger.exception(f'Failed to convert Apify request {apify_request} to a Scrapy request; skipping it.') + except Exception as exc: + logger.warning(f'Failed to convert Apify request {apify_request} to a Scrapy request; skipping it: {exc}') scrapy_request = None # Mark the request as handled. This runs even when reconstruction failed above: an unrecoverable entry diff --git a/tests/unit/scrapy/extensions/test_httpcache.py b/tests/unit/scrapy/extensions/test_httpcache.py index 04a73eba..f178a13c 100644 --- a/tests/unit/scrapy/extensions/test_httpcache.py +++ b/tests/unit/scrapy/extensions/test_httpcache.py @@ -4,6 +4,7 @@ import gzip import io import json +import logging import pickle from datetime import timedelta from time import time @@ -275,8 +276,8 @@ def test_close_spider_respects_max_items() -> None: assert len(kvs.deleted) == 2 -def test_close_spider_closes_thread_even_when_cleanup_fails() -> None: - """If the expiration sweep raises, the async thread is still closed rather than leaked.""" +def test_close_spider_closes_thread_even_when_cleanup_fails(caplog: pytest.LogCaptureFixture) -> None: + """A best-effort cleanup failure is logged and swallowed; the async thread is still closed, not leaked.""" closed: list[bool] = [] class _FailingAsyncThread: @@ -291,10 +292,11 @@ def close(self, *_: Any, **__: Any) -> None: storage._async_thread = _FailingAsyncThread() # ty: ignore[invalid-assignment] storage._kvs = _FakeKvs(None) # ty: ignore[invalid-assignment] - with pytest.raises(RuntimeError, match='cleanup boom'): + with caplog.at_level(logging.ERROR, logger='apify.scrapy.extensions._httpcache'): storage.close_spider(None, current_time=1000) # ty: ignore[invalid-argument-type] assert closed == [True] + assert 'Failed to clean up expired cache items' in caplog.text def test_cache_storage_reads_async_thread_timeout_setting() -> None: diff --git a/tests/unit/scrapy/test_scheduler.py b/tests/unit/scrapy/test_scheduler.py index 050e9e52..cfe5b48a 100644 --- a/tests/unit/scrapy/test_scheduler.py +++ b/tests/unit/scrapy/test_scheduler.py @@ -111,7 +111,7 @@ def test_next_request_skips_request_that_fails_to_convert( # `run_coro` is called for `fetch_next_request`, then for `mark_request_as_handled`. async_thread.run_coro.side_effect = [malformed_request, None] - with caplog.at_level(logging.ERROR, logger='apify.scrapy.scheduler'): + with caplog.at_level(logging.WARNING, logger='apify.scrapy.scheduler'): result = scheduler.next_request() # The malformed request is skipped instead of crashing the whole run.