diff --git a/src/apify/scrapy/_async_thread.py b/src/apify/scrapy/_async_thread.py index 0333531b..90f6f4cb 100644 --- a/src/apify/scrapy/_async_thread.py +++ b/src/apify/scrapy/_async_thread.py @@ -52,51 +52,63 @@ def run_coro( The result returned by the coroutine. Raises: - RuntimeError: If the event loop is not running. + RuntimeError: If the event loop has been closed. TimeoutError: If the coroutine does not complete within the timeout. Exception: Any exception raised during coroutine execution. """ if timeout == 'default': timeout = self._default_timeout - if not self._eventloop.is_running(): - raise RuntimeError(f'The coroutine {coro} cannot be executed because the event loop is not running.') + if self._eventloop.is_closed(): + raise RuntimeError(f'The coroutine {coro} cannot be executed because the event loop is closed.') # Submit the coroutine to the event loop running in the other thread. future = asyncio.run_coroutine_threadsafe(coro, self._eventloop) try: # Wait for the coroutine's result until the specified timeout. return future.result(timeout=timeout.total_seconds()) - except futures.TimeoutError as exc: - logger.exception('Coroutine execution timed out.', exc_info=exc) - raise - except Exception as exc: - logger.exception('Coroutine execution raised an exception.', exc_info=exc) + except futures.TimeoutError: + # `future.result` gave up, but the coroutine keeps running on the loop; cancel it so it does + # not outlive the timeout. The propagated error is logged once by the caller (or Scrapy), so + # this method does not log it itself. + future.cancel() raise - def close(self, timeout: timedelta = timedelta(seconds=60)) -> None: + def close(self, timeout: timedelta | None = None) -> None: """Close the event loop and its thread gracefully. This method cancels all pending tasks, stops the event loop, and waits for the thread to exit. If the thread does not exit within the given timeout, a forced shutdown is attempted. Args: - timeout: The maximum number of seconds to wait for the event loop thread to exit. + timeout: The maximum time to wait for the event loop thread to exit. Pass `None` to use the + `default_timeout` passed to the constructor. """ - if self._eventloop.is_running(): - # Cancel all pending tasks in the event loop. - self.run_coro(self._shutdown_tasks()) + if timeout is None: + timeout = self._default_timeout + + # A repeated close (e.g. a retried shutdown) would call into the already-closed loop and raise + # `RuntimeError: Event loop is closed`. The loop closes itself once it stops, so a second close + # is a no-op. + if self._eventloop.is_closed(): + return - # Schedule the event loop to stop. - self._eventloop.call_soon_threadsafe(self._eventloop.stop) + try: + if self._eventloop.is_running(): + # Cancel all pending tasks in the event loop, honouring the caller's timeout. + self.run_coro(self._shutdown_tasks(), timeout=timeout) + finally: + # Stop the loop and join its thread even if cancelling the pending tasks above raised or timed + # out. Skipping this would leave the loop running and leak its thread. + self._eventloop.call_soon_threadsafe(self._eventloop.stop) - # Wait for the event loop thread to finish execution. - self._thread.join(timeout=timeout.total_seconds()) + # Wait for the event loop thread to finish execution. + self._thread.join(timeout=timeout.total_seconds()) - # If the thread is still running after the timeout, force a shutdown. - if self._thread.is_alive(): - logger.warning('Event loop thread did not exit cleanly! Forcing shutdown...') - self._force_exit_event_loop() + # If the thread is still running after the timeout, force a shutdown. + if self._thread.is_alive(): + logger.warning('Event loop thread did not exit cleanly! Forcing shutdown...') + self._force_exit_event_loop() def _start_event_loop(self) -> None: """Set up and run the asyncio event loop in the dedicated thread.""" @@ -125,5 +137,5 @@ def _force_exit_event_loop(self) -> None: logger.info('Forced shutdown of the event loop and its thread...') self._eventloop.call_soon_threadsafe(self._eventloop.stop) self._thread.join(timeout=5) - except Exception as exc: - logger.exception('Exception occurred during forced event loop shutdown.', exc_info=exc) + except Exception: + logger.exception('Exception occurred during forced event loop shutdown.') diff --git a/src/apify/scrapy/extensions/_httpcache.py b/src/apify/scrapy/extensions/_httpcache.py index 0909c583..c32647f0 100644 --- a/src/apify/scrapy/extensions/_httpcache.py +++ b/src/apify/scrapy/extensions/_httpcache.py @@ -4,6 +4,7 @@ import io import re import struct +from datetime import timedelta from logging import getLogger from time import time from typing import TYPE_CHECKING @@ -38,6 +39,8 @@ def __init__(self, settings: BaseSettings) -> None: # Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`). self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100) self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS') + self._async_thread_timeout = timedelta(seconds=settings.getint('APIFY_ASYNC_THREAD_TIMEOUT_SECS', 60)) + """Caps how long each coroutine run on the background event loop may take; defaults to 60 seconds.""" self._spider: Spider | None = None self._kvs: KeyValueStore | None = None self._fingerprinter: RequestFingerprinterProtocol | None = None @@ -62,9 +65,20 @@ async def open_kvs() -> KeyValueStore: return await KeyValueStore.open(name=kvs_name) logger.debug("Starting background thread for cache storage's event loop") - self._async_thread = AsyncThread() + self._async_thread = AsyncThread(default_timeout=self._async_thread_timeout) logger.debug(f"Opening cache storage's {kvs_name!r} key value store") - self._kvs = self._async_thread.run_coro(open_kvs()) + try: + self._kvs = self._async_thread.run_coro(open_kvs()) + except Exception: + logger.exception('Failed to open the cache key-value store.') + # Opening the key-value store failed, so close the freshly started async thread instead of + # leaking its event-loop thread (`close_spider` may never run if `open_spider` fails). Guard + # the close so a secondary failure here cannot mask the original error. + try: + self._async_thread.close() + except Exception: + logger.exception('Failed to close the async thread after a failed cache storage open.') + raise def close_spider(self, _: Spider, current_time: int | None = None) -> None: """Close the cache storage for a spider.""" @@ -72,45 +86,53 @@ def close_spider(self, _: Spider, current_time: int | None = None) -> None: raise ValueError('Async thread not initialized') logger.info(f'Cleaning up cache items (max {self._expiration_max_items})') - if self._expiration_secs > 0: - if current_time is None: - current_time = int(time()) - - async def expire_kvs() -> None: - if self._kvs is None: - raise ValueError('Key value store not initialized') - # Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order, - # so stale entries may linger. This only reclaims storage; `retrieve_response` already treats - # an expired entry as a cache miss. - processed = 0 - async for item in self._kvs.iterate_keys(): - if processed >= self._expiration_max_items: - break - processed += 1 - value = await self._kvs.get_value(item.key) - try: - gzip_time = read_gzip_time(value) - except Exception as e: - logger.warning(f'Malformed cache item {item.key}: {e}') - await self._kvs.delete_value(item.key) - else: - if self._expiration_secs < current_time - gzip_time: - logger.debug(f'Expired cache item {item.key}') + # `close` always runs in the `finally`, so neither a cleanup failure below nor an early return can leak + # the event-loop thread. + try: + if self._expiration_secs > 0: + if current_time is None: + current_time = int(time()) + + async def expire_kvs() -> None: + if self._kvs is None: + raise ValueError('Key value store not initialized') + # Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order, + # so stale entries may linger. This only reclaims storage; `retrieve_response` already treats + # an expired entry as a cache miss. + processed = 0 + async for item in self._kvs.iterate_keys(): + if processed >= self._expiration_max_items: + break + processed += 1 + value = await self._kvs.get_value(item.key) + try: + gzip_time = read_gzip_time(value) + except Exception as exc: + logger.warning(f'Malformed cache item {item.key}: {exc}') await self._kvs.delete_value(item.key) else: - logger.debug(f'Valid cache item {item.key}') - - self._async_thread.run_coro(expire_kvs()) - - logger.debug('Closing cache storage') - try: - self._async_thread.close() - except KeyboardInterrupt: - logger.warning('Shutdown interrupted by KeyboardInterrupt!') - except Exception: - logger.exception('Exception occurred while shutting down cache storage') + if self._expiration_secs < current_time - gzip_time: + logger.debug(f'Expired cache item {item.key}') + await self._kvs.delete_value(item.key) + else: + logger.debug(f'Valid cache item {item.key}') + + # Best-effort: log and swallow a cleanup failure rather than raise. The sweep only reclaims + # storage, so failing it must not turn a normal spider close into an error. + try: + self._async_thread.run_coro(expire_kvs()) + except Exception: + logger.exception('Failed to clean up expired cache items.') finally: - logger.debug('Cache storage closed') + logger.debug('Closing cache storage') + try: + self._async_thread.close() + except KeyboardInterrupt: + logger.warning('Shutdown interrupted by KeyboardInterrupt!') + except Exception: + logger.exception('Exception occurred while shutting down cache storage') + finally: + logger.debug('Cache storage closed') def retrieve_response(self, _: Spider, request: Request, current_time: int | None = None) -> Response | None: """Retrieve a response from the cache storage.""" @@ -122,7 +144,11 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non raise ValueError('Request fingerprinter not initialized') key = self._fingerprinter.fingerprint(request).hex() - value = self._async_thread.run_coro(self._kvs.get_value(key)) + try: + value = self._async_thread.run_coro(self._kvs.get_value(key)) + except Exception: + logger.exception('Failed to retrieve a response from the cache.') + raise if value is None: logger.debug('Cache miss', extra={'request': request}) @@ -169,7 +195,11 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non 'body': response.body, } value = to_gzip(data) - self._async_thread.run_coro(self._kvs.set_value(key, value)) + try: + self._async_thread.run_coro(self._kvs.set_value(key, value)) + except Exception: + logger.exception('Failed to store a response in the cache.') + raise def to_gzip(data: dict, mtime: int | None = None) -> bytes: diff --git a/src/apify/scrapy/requests.py b/src/apify/scrapy/requests.py index 1524d7aa..610b1d9d 100644 --- a/src/apify/scrapy/requests.py +++ b/src/apify/scrapy/requests.py @@ -119,8 +119,8 @@ def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequ apify_request = ApifyRequest.from_url(**request_kwargs) scrapy_request_dict = scrapy_request.to_dict(spider=spider) - except Exception as exc: - logger.warning(f'Conversion of Scrapy request {scrapy_request} to Apify request failed; {exc}') + except Exception: + logger.exception(f'Conversion of Scrapy request {scrapy_request} to Apify request failed; skipping it.') return None # Serialize the Scrapy request as JSON under 'scrapy_request'. Kept outside the broad except above so diff --git a/src/apify/scrapy/scheduler.py b/src/apify/scrapy/scheduler.py index 2b95d30c..77e2fdd9 100644 --- a/src/apify/scrapy/scheduler.py +++ b/src/apify/scrapy/scheduler.py @@ -1,6 +1,6 @@ from __future__ import annotations -import traceback +from datetime import timedelta from logging import getLogger from typing import TYPE_CHECKING @@ -15,6 +15,7 @@ from apify.storages import RequestQueue if TYPE_CHECKING: + from scrapy.crawler import Crawler from scrapy.http.request import Request from twisted.internet.defer import Deferred @@ -27,7 +28,7 @@ class ApifyScheduler(BaseScheduler): This scheduler requires the asyncio Twisted reactor to be installed. """ - def __init__(self) -> None: + def __init__(self, async_thread_timeout: timedelta = timedelta(seconds=60)) -> None: if not is_asyncio_reactor_installed(): raise ValueError( f'{ApifyScheduler.__qualname__} requires the asyncio Twisted reactor. ' @@ -38,7 +39,17 @@ def __init__(self) -> None: self.spider: Spider | None = None # A thread with the asyncio event loop to run coroutines on. - self._async_thread = AsyncThread() + self._async_thread = AsyncThread(default_timeout=async_thread_timeout) + + @classmethod + def from_crawler(cls, crawler: Crawler) -> ApifyScheduler: + """Create the scheduler, reading the async-thread timeout from the Scrapy settings. + + The `APIFY_ASYNC_THREAD_TIMEOUT_SECS` setting (in seconds) caps how long each coroutine run on the + background event loop may take before timing out; it defaults to 60 seconds. + """ + timeout_secs = crawler.settings.getint('APIFY_ASYNC_THREAD_TIMEOUT_SECS', 60) + return cls(async_thread_timeout=timedelta(seconds=timeout_secs)) def open(self, spider: Spider) -> Deferred[None] | None: """Open the scheduler. @@ -61,8 +72,13 @@ async def open_rq() -> RequestQueue: try: self._rq = self._async_thread.run_coro(open_rq()) except Exception: - self._async_thread.close() - traceback.print_exc() + logger.exception('Failed to open the request queue.') + # Close the freshly started async thread so a failed open does not leak its event-loop thread. + # Guard the close so a secondary failure here cannot mask the original error. + try: + self._async_thread.close() + except Exception: + logger.exception('Failed to close the async thread after a failed scheduler open.') raise return None @@ -100,7 +116,7 @@ def has_pending_requests(self) -> bool: try: is_finished = self._async_thread.run_coro(self._rq.is_finished()) except Exception: - traceback.print_exc() + logger.exception('Failed to check whether the request queue is finished.') raise return not is_finished @@ -133,7 +149,7 @@ def enqueue_request(self, request: Request) -> bool: try: result = self._async_thread.run_coro(self._rq.add_request(apify_request)) except Exception: - traceback.print_exc() + logger.exception('Failed to enqueue the request to the request queue.') raise logger.debug(f'rq.add_request result: {result}') @@ -152,7 +168,7 @@ def next_request(self) -> Request | None: try: apify_request = self._async_thread.run_coro(self._rq.fetch_next_request()) except Exception: - traceback.print_exc() + logger.exception('Failed to fetch the next request from the request queue.') raise logger.debug(f'Fetched apify_request: {apify_request}') @@ -166,8 +182,8 @@ def next_request(self) -> Request | None: # the whole run, so on failure it is logged and skipped (None) rather than propagating. try: scrapy_request = to_scrapy_request(apify_request, spider=self.spider) - except Exception: - logger.exception(f'Failed to convert Apify request {apify_request} to a Scrapy request; skipping it.') + except Exception as exc: + logger.warning(f'Failed to convert Apify request {apify_request} to a Scrapy request; skipping it: {exc}') scrapy_request = None # Mark the request as handled. This runs even when reconstruction failed above: an unrecoverable entry @@ -176,7 +192,7 @@ def next_request(self) -> Request | None: try: self._async_thread.run_coro(self._rq.mark_request_as_handled(apify_request)) except Exception: - traceback.print_exc() + logger.exception('Failed to mark the request as handled in the request queue.') raise if scrapy_request is None: diff --git a/tests/unit/scrapy/extensions/test_httpcache.py b/tests/unit/scrapy/extensions/test_httpcache.py index 6f34853b..f178a13c 100644 --- a/tests/unit/scrapy/extensions/test_httpcache.py +++ b/tests/unit/scrapy/extensions/test_httpcache.py @@ -4,7 +4,9 @@ import gzip import io import json +import logging import pickle +from datetime import timedelta from time import time from types import SimpleNamespace from typing import TYPE_CHECKING, Any, cast @@ -274,6 +276,82 @@ def test_close_spider_respects_max_items() -> None: assert len(kvs.deleted) == 2 +def test_close_spider_closes_thread_even_when_cleanup_fails(caplog: pytest.LogCaptureFixture) -> None: + """A best-effort cleanup failure is logged and swallowed; the async thread is still closed, not leaked.""" + closed: list[bool] = [] + + class _FailingAsyncThread: + def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any: + coro.close() # we never run it; just avoid an un-awaited coroutine warning + raise RuntimeError('cleanup boom') + + def close(self, *_: Any, **__: Any) -> None: + closed.append(True) + + storage = ApifyCacheStorage(Settings({'HTTPCACHE_EXPIRATION_SECS': 100})) + storage._async_thread = _FailingAsyncThread() # ty: ignore[invalid-assignment] + storage._kvs = _FakeKvs(None) # ty: ignore[invalid-assignment] + + with caplog.at_level(logging.ERROR, logger='apify.scrapy.extensions._httpcache'): + storage.close_spider(None, current_time=1000) # ty: ignore[invalid-argument-type] + + assert closed == [True] + assert 'Failed to clean up expired cache items' in caplog.text + + +def test_cache_storage_reads_async_thread_timeout_setting() -> None: + """`APIFY_ASYNC_THREAD_TIMEOUT_SECS` is read into the storage's async-thread timeout.""" + storage = ApifyCacheStorage(Settings({'APIFY_ASYNC_THREAD_TIMEOUT_SECS': 77})) + assert storage._async_thread_timeout == timedelta(seconds=77) + + +def test_open_spider_passes_timeout_to_async_thread(monkeypatch: pytest.MonkeyPatch) -> None: + """`open_spider` constructs the async thread with the configured timeout.""" + captured: dict[str, Any] = {} + + class _RecordingAsyncThread: + def __init__(self, default_timeout: timedelta | None = None) -> None: + captured['default_timeout'] = default_timeout + + def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any: + coro.close() # we never run it; just avoid an un-awaited coroutine warning + return _FakeKvs(None) + + monkeypatch.setattr('apify.scrapy.extensions._httpcache.AsyncThread', _RecordingAsyncThread) + + storage = ApifyCacheStorage(Settings({'APIFY_ASYNC_THREAD_TIMEOUT_SECS': 77})) + spider = SimpleNamespace(name='myspider', crawler=SimpleNamespace(request_fingerprinter=_FakeFingerprinter())) + storage.open_spider(cast('Any', spider)) + + assert captured['default_timeout'] == timedelta(seconds=77) + + +def test_open_spider_closes_async_thread_when_open_kvs_fails(monkeypatch: pytest.MonkeyPatch) -> None: + """If opening the key-value store fails, `open_spider` closes the async thread rather than leaking it.""" + closed: list[bool] = [] + + class _FailingAsyncThread: + def __init__(self, default_timeout: timedelta | None = None) -> None: + pass + + def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any: + coro.close() # we never run it; just avoid an un-awaited coroutine warning + raise RuntimeError('open boom') + + def close(self, *_: Any, **__: Any) -> None: + closed.append(True) + + monkeypatch.setattr('apify.scrapy.extensions._httpcache.AsyncThread', _FailingAsyncThread) + + storage = ApifyCacheStorage(Settings()) + spider = SimpleNamespace(name='myspider', crawler=SimpleNamespace(request_fingerprinter=_FakeFingerprinter())) + + with pytest.raises(RuntimeError, match='open boom'): + storage.open_spider(cast('Any', spider)) + + assert closed == [True] + + @pytest.mark.parametrize( ('spider_name', 'expected'), [ diff --git a/tests/unit/scrapy/test_async_thread.py b/tests/unit/scrapy/test_async_thread.py new file mode 100644 index 00000000..3cf51b62 --- /dev/null +++ b/tests/unit/scrapy/test_async_thread.py @@ -0,0 +1,163 @@ +from __future__ import annotations + +import asyncio +import logging +import threading +import time +from concurrent import futures +from datetime import timedelta +from typing import Any, Literal + +import pytest + +from ..._utils import poll_until_condition +from apify.scrapy._async_thread import AsyncThread + + +async def _return(value: int) -> int: + return value + + +def _wait_until_running(thread: AsyncThread, timeout: float = 2.0) -> None: + """Block until the background event loop is running, so `run_coro` does not race the thread startup.""" + if not asyncio.run(poll_until_condition(thread._eventloop.is_running, timeout=timeout, poll_interval=0.01)): + raise AssertionError('The event loop did not start in time.') + + +def test_run_coro_returns_coroutine_result() -> None: + """`run_coro` runs a coroutine on the background loop and returns its result.""" + async_thread = AsyncThread() + try: + assert async_thread.run_coro(_return(42)) == 42 + finally: + async_thread.close() + + +def test_run_coro_succeeds_when_called_before_loop_starts(monkeypatch: pytest.MonkeyPatch) -> None: + """`run_coro` must not fail when invoked before the background thread reaches `run_forever`.""" + gate = threading.Event() + original_start = AsyncThread._start_event_loop + + def gated_start(self: AsyncThread) -> None: + gate.wait() # hold the loop just shy of run_forever() to force the startup race + original_start(self) + + monkeypatch.setattr(AsyncThread, '_start_event_loop', gated_start) + + async_thread = AsyncThread() + try: + # The loop start is gated, so the loop is provably not running yet. + assert not async_thread._eventloop.is_running() + + # Let the loop reach run_forever() shortly after run_coro starts waiting. + def release_gate() -> None: + time.sleep(0.1) + gate.set() + + releaser = threading.Thread(target=release_gate) + releaser.start() + + assert async_thread.run_coro(_return(42)) == 42 + releaser.join() + finally: + gate.set() + async_thread.close() + + +def test_run_coro_raises_after_close() -> None: + """`run_coro` raises `RuntimeError` once the loop has been closed.""" + async_thread = AsyncThread() + async_thread.close() + + coro = _return(42) + with pytest.raises(RuntimeError): + async_thread.run_coro(coro) + coro.close() + + +def test_run_coro_cancels_the_coroutine_on_timeout() -> None: + """A timed-out coroutine is cancelled, not left running on the background loop.""" + thread = AsyncThread() + _wait_until_running(thread) + + started = threading.Event() + cancelled = threading.Event() + + async def slow() -> None: + started.set() + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + cancelled.set() + raise + + with pytest.raises(futures.TimeoutError): + thread.run_coro(slow(), timeout=timedelta(seconds=0.1)) + + assert started.wait(timeout=2) + assert cancelled.wait(timeout=2), 'the timed-out coroutine was left running instead of being cancelled' + + thread.close() + + +def test_run_coro_does_not_log_on_exception(caplog: pytest.LogCaptureFixture) -> None: + """`run_coro` propagates a failing coroutine without logging it itself (the caller/Scrapy reports it once).""" + thread = AsyncThread() + _wait_until_running(thread) + + async def boom() -> None: + raise RuntimeError('boom') + + with caplog.at_level(logging.DEBUG, logger='apify.scrapy._async_thread'), pytest.raises(RuntimeError, match='boom'): + thread.run_coro(boom()) + + thread.close() + + assert [record for record in caplog.records if record.levelno >= logging.ERROR] == [] + + +def test_close_is_idempotent() -> None: + """Calling `close` twice is a no-op the second time, not a `RuntimeError` on the closed loop.""" + thread = AsyncThread() + _wait_until_running(thread) + thread.run_coro(asyncio.sleep(0)) + + thread.close() + thread.close() # must not raise + + +def test_close_passes_its_timeout_to_the_shutdown_step(monkeypatch: pytest.MonkeyPatch) -> None: + """`close(timeout=...)` honours that timeout for the task-cancellation step, not only the thread join.""" + thread = AsyncThread() + _wait_until_running(thread) + thread.run_coro(asyncio.sleep(0)) + + recorded: list[timedelta | str] = [] + original = thread.run_coro + + def spy(coro: Any, timeout: timedelta | Literal['default'] = 'default') -> Any: + recorded.append(timeout) + return original(coro, timeout=timeout) + + monkeypatch.setattr(thread, 'run_coro', spy) + thread.close(timeout=timedelta(seconds=42)) + + assert recorded == [timedelta(seconds=42)] + + +def test_close_stops_and_joins_thread_even_when_task_cancellation_fails(monkeypatch: pytest.MonkeyPatch) -> None: + """If cancelling the pending tasks fails, `close` still stops the loop and joins the thread, not leaks it.""" + thread = AsyncThread() + _wait_until_running(thread) + + async def boom() -> None: + raise RuntimeError('shutdown boom') + + monkeypatch.setattr(thread, '_shutdown_tasks', boom) + + with pytest.raises(RuntimeError, match='shutdown boom'): + thread.close(timeout=timedelta(seconds=5)) + + # The loop was stopped and its thread joined despite the failing cancellation, so nothing is left running. + assert not thread._thread.is_alive() + assert thread._eventloop.is_closed() diff --git a/tests/unit/scrapy/test_scheduler.py b/tests/unit/scrapy/test_scheduler.py index c4a87622..cfe5b48a 100644 --- a/tests/unit/scrapy/test_scheduler.py +++ b/tests/unit/scrapy/test_scheduler.py @@ -1,12 +1,14 @@ from __future__ import annotations import logging +from datetime import timedelta from types import SimpleNamespace -from typing import cast +from typing import Any, cast from unittest import mock import pytest from scrapy import Request, Spider +from scrapy.settings import Settings from apify import Request as ApifyRequest from apify.scrapy.scheduler import ApifyScheduler @@ -109,7 +111,7 @@ def test_next_request_skips_request_that_fails_to_convert( # `run_coro` is called for `fetch_next_request`, then for `mark_request_as_handled`. async_thread.run_coro.side_effect = [malformed_request, None] - with caplog.at_level(logging.ERROR, logger='apify.scrapy.scheduler'): + with caplog.at_level(logging.WARNING, logger='apify.scrapy.scheduler'): result = scheduler.next_request() # The malformed request is skipped instead of crashing the whole run. @@ -151,3 +153,40 @@ def test_next_request_returns_none_when_queue_empty(scheduler: ApifyScheduler) - assert result is None rq.mark_request_as_handled.assert_not_called() + + +def test_next_request_logs_exception_before_propagating( + scheduler: ApifyScheduler, + caplog: pytest.LogCaptureFixture, +) -> None: + """A failure in the coroutine run is logged with its traceback via `logger.exception` before propagating.""" + async_thread = cast('mock.MagicMock', scheduler._async_thread) + async_thread.run_coro.side_effect = RuntimeError('boom') + + with caplog.at_level(logging.ERROR, logger='apify.scrapy.scheduler'), pytest.raises(RuntimeError, match='boom'): + scheduler.next_request() + + errors = [record for record in caplog.records if record.levelno >= logging.ERROR] + assert len(errors) == 1 + (error,) = errors + assert error.exc_info is not None + assert isinstance(error.exc_info[1], RuntimeError) + assert str(error.exc_info[1]) == 'boom' + + +def test_from_crawler_reads_async_thread_timeout_setting(monkeypatch: pytest.MonkeyPatch) -> None: + """`from_crawler` wires the `APIFY_ASYNC_THREAD_TIMEOUT_SECS` setting into the async thread's timeout.""" + monkeypatch.setattr('apify.scrapy.scheduler.is_asyncio_reactor_installed', lambda: True) + + captured: dict[str, Any] = {} + + class _RecordingAsyncThread: + def __init__(self, default_timeout: timedelta | None = None) -> None: + captured['default_timeout'] = default_timeout + + monkeypatch.setattr('apify.scrapy.scheduler.AsyncThread', _RecordingAsyncThread) + + crawler = SimpleNamespace(settings=Settings({'APIFY_ASYNC_THREAD_TIMEOUT_SECS': 123})) + ApifyScheduler.from_crawler(cast('Any', crawler)) + + assert captured['default_timeout'] == timedelta(seconds=123)