diff --git a/config/base_config.py b/config/base_config.py index 961d2169a..0274b34a5 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -117,6 +117,10 @@ # 爬取间隔时间 CRAWLER_MAX_SLEEP_SEC = 2 +# 自动重试机制开关(仅针对小红书和抖音平台) +# 开启后,网络相关异常(超时、连接失败等)会自动重试3次,采用指数退避策略(1s、2s、4s) +ENABLE_AUTO_RETRY = True + from .bilibili_config import * from .xhs_config import * from .dy_config import * diff --git a/media_platform/douyin/client.py b/media_platform/douyin/client.py index b080f836b..84cac7443 100644 --- a/media_platform/douyin/client.py +++ b/media_platform/douyin/client.py @@ -29,6 +29,7 @@ from base.base_crawler import AbstractApiClient from proxy.proxy_mixin import ProxyRefreshMixin from tools import utils +from tools.retry_decorator import auto_retry from var import request_keyword_var if TYPE_CHECKING: @@ -112,6 +113,7 @@ async def __process_req_params( a_bogus = await get_a_bogus(uri, query_string, post_data, headers["User-Agent"], self.playwright_page) params["a_bogus"] = a_bogus + @auto_retry(max_retries=3, base_delay=1.0) async def request(self, method, url, **kwargs): # 每次请求前检测代理是否过期 await self._refresh_proxy_if_expired() @@ -332,6 +334,7 @@ async def get_all_user_aweme_posts(self, sec_user_id: str, callback: Optional[Ca result.extend(aweme_list) return result + @auto_retry(max_retries=3, base_delay=1.0) async def get_aweme_media(self, url: str) -> Union[bytes, None]: async with httpx.AsyncClient(proxy=self.proxy) as client: try: diff --git a/media_platform/xhs/client.py b/media_platform/xhs/client.py index f1df0de95..00d025d22 100644 --- a/media_platform/xhs/client.py +++ b/media_platform/xhs/client.py @@ -24,12 +24,12 @@ import httpx from playwright.async_api import BrowserContext, Page -from tenacity import retry, stop_after_attempt, wait_fixed import config from base.base_crawler import AbstractApiClient from proxy.proxy_mixin import ProxyRefreshMixin from tools import utils +from tools.retry_decorator import auto_retry if TYPE_CHECKING: from proxy.proxy_ip_pool import ProxyIpPool @@ -109,7 +109,7 @@ async def _pre_headers(self, url: str, params: Optional[Dict] = None, payload: O self.headers.update(headers) return self.headers - @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) + @auto_retry(max_retries=3, base_delay=1.0) async def request(self, method, url, **kwargs) -> Union[str, Any]: """ Wrapper for httpx common request method, processes request response @@ -613,7 +613,7 @@ async def get_note_short_url(self, note_id: str) -> Dict: data = {"original_url": f"{self._domain}/discovery/item/{note_id}"} return await self.post(uri, data=data, return_response=True) - @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) + @auto_retry(max_retries=3, base_delay=1.0) async def get_note_by_id_from_html( self, note_id: str, diff --git a/media_platform/xhs/core.py b/media_platform/xhs/core.py index 704746846..807fbafba 100644 --- a/media_platform/xhs/core.py +++ b/media_platform/xhs/core.py @@ -30,8 +30,6 @@ Playwright, async_playwright, ) -from tenacity import RetryError - import config from base.base_crawler import AbstractCrawler from model.m_xiaohongshu import NoteUrlInfo, CreatorUrlInfo @@ -39,6 +37,7 @@ from store import xhs as xhs_store from tools import utils from tools.cdp_browser import CDPBrowserManager +from tools.retry_decorator import RetryExhaustedError from var import crawler_type_var, source_keyword_var from .client import XiaoHongShuClient @@ -291,7 +290,7 @@ async def get_note_detail_async_task( try: try: note_detail = await self.xhs_client.get_note_by_id(note_id, xsec_source, xsec_token) - except RetryError: + except RetryExhaustedError: pass if not note_detail: diff --git a/tools/retry_decorator.py b/tools/retry_decorator.py new file mode 100644 index 000000000..321688446 --- /dev/null +++ b/tools/retry_decorator.py @@ -0,0 +1,140 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 relakkes@gmail.com +# +# This file is part of MediaCrawler project. +# Repository: https://github.com/NanmiCoder/MediaCrawler/blob/main/tools/retry_decorator.py +# GitHub: https://github.com/NanmiCoder +# Licensed under NON-COMMERCIAL LEARNING LICENSE 1.1 +# +# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: +# 1. 不得用于任何商业用途。 +# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 +# 3. 不得进行大规模爬取或对平台造成运营干扰。 +# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 +# 5. 不得用于任何非法或不当的用途。 +# +# 详细许可条款请参阅项目根目录下的LICENSE文件。 +# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 + +import asyncio +from functools import wraps +from typing import Callable, Type, Tuple + +import httpx + +import config +from tools import utils + + +class RetryExhaustedError(Exception): + pass + + +NETWORK_RELATED_EXCEPTIONS: Tuple[Type[Exception], ...] = ( + httpx.ConnectError, + httpx.ReadTimeout, + httpx.ConnectTimeout, + httpx.WriteTimeout, + httpx.PoolTimeout, + httpx.TimeoutException, + httpx.NetworkError, + httpx.TransportError, + httpx.ProtocolError, + ConnectionError, + TimeoutError, +) + + +def is_network_exception(exception: Exception) -> bool: + return isinstance(exception, NETWORK_RELATED_EXCEPTIONS) + + +def auto_retry( + max_retries: int = 3, + base_delay: float = 1.0, + enable_config: bool = True, +): + def decorator(func: Callable): + @wraps(func) + async def async_wrapper(*args, **kwargs): + if enable_config and not config.ENABLE_AUTO_RETRY: + return await func(*args, **kwargs) + + last_exception = None + for attempt in range(max_retries + 1): + try: + return await func(*args, **kwargs) + except Exception as e: + last_exception = e + if not is_network_exception(e): + utils.logger.warning( + f"[auto_retry] Non-network exception occurred, not retrying: " + f"{e.__class__.__name__}: {str(e)}" + ) + raise + + if attempt >= max_retries: + utils.logger.error( + f"[auto_retry] All {max_retries} retry attempts failed. " + f"Last exception: {e.__class__.__name__}: {str(e)}" + ) + raise RetryExhaustedError( + f"All {max_retries} retry attempts failed. " + f"Last exception: {e.__class__.__name__}: {str(e)}" + ) from e + + delay = base_delay * (2 ** attempt) + utils.logger.warning( + f"[auto_retry] Network exception occurred: {e.__class__.__name__}: {str(e)}. " + f"Retry {attempt + 1}/{max_retries} in {delay}s..." + ) + await asyncio.sleep(delay) + + if last_exception: + raise last_exception + + @wraps(func) + def sync_wrapper(*args, **kwargs): + if enable_config and not config.ENABLE_AUTO_RETRY: + return func(*args, **kwargs) + + last_exception = None + import time + for attempt in range(max_retries + 1): + try: + return func(*args, **kwargs) + except Exception as e: + last_exception = e + if not is_network_exception(e): + utils.logger.warning( + f"[auto_retry] Non-network exception occurred, not retrying: " + f"{e.__class__.__name__}: {str(e)}" + ) + raise + + if attempt >= max_retries: + utils.logger.error( + f"[auto_retry] All {max_retries} retry attempts failed. " + f"Last exception: {e.__class__.__name__}: {str(e)}" + ) + raise RetryExhaustedError( + f"All {max_retries} retry attempts failed. " + f"Last exception: {e.__class__.__name__}: {str(e)}" + ) from e + + delay = base_delay * (2 ** attempt) + utils.logger.warning( + f"[auto_retry] Network exception occurred: {e.__class__.__name__}: {str(e)}. " + f"Retry {attempt + 1}/{max_retries} in {delay}s..." + ) + time.sleep(delay) + + if last_exception: + raise last_exception + + if asyncio.iscoroutinefunction(func): + return async_wrapper + else: + return sync_wrapper + + return decorator