From 43ea1115ba69cb340ecbc5449e3fd44560f2a44f Mon Sep 17 00:00:00 2001 From: Brenda Solari Date: Tue, 21 Apr 2026 00:33:31 +0200 Subject: [PATCH] Add curl_cffi metrics --- README.md | 27 +++ src/ps_helper/extensions/README.md | 56 ++++- src/ps_helper/extensions/__init__.py | 3 + src/ps_helper/extensions/curl_metrics.py | 158 +++++++++++++ src/ps_helper/extensions/metrics_extension.py | 49 +++- tests/test_curl_metrics.py | 223 ++++++++++++++++++ 6 files changed, 508 insertions(+), 8 deletions(-) create mode 100644 src/ps_helper/extensions/curl_metrics.py create mode 100644 tests/test_curl_metrics.py diff --git a/README.md b/README.md index 1d006e9..8e9a608 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,33 @@ ps-helper create-report scrapy_stats.json ``` This will automatically create a report named scrapy_stats-report.html in the same directory as your metrics file. +### Track curl_cffi Downloaded Bytes + +Use the reusable helper to register transfer bytes in Scrapy stats (including `downloader/response_bytes`): + +```python +from ps_helper.extensions import record_curl_transfer_bytes + +record_curl_transfer_bytes( + stats=self.crawler.stats, + curl_response=curl_resp, + add_to_downloader_response_bytes=True, +) +``` + +With `MetricsExtension`, this is also reflected in the final JSON report under `resources`. + +For automatic tracking in every curl request, use `TrackedCurlSession`: + +```python +from ps_helper.extensions import TrackedCurlSession + +self.curl_session = TrackedCurlSession(stats=self.crawler.stats) + +# keep using get/post as usual +curl_resp = self.curl_session.get(url, impersonate="chrome120") +``` + --- ## 🕷️ Scrapy URL Blocker Middleware diff --git a/src/ps_helper/extensions/README.md b/src/ps_helper/extensions/README.md index 9e57b28..73c085d 100644 --- a/src/ps_helper/extensions/README.md +++ b/src/ps_helper/extensions/README.md @@ -13,6 +13,7 @@ - **Retry & Timeout Analysis**: Record retry attempts and timeout errors. - **Resource Monitoring**: Capture memory usage and response bytes. - **JSON Reports**: Save structured metrics reports to `metrics/YYYY-MM-DD/` folders. +- **curl_cffi Transfer Metrics**: Reusable helper for downloaded/uploaded bytes and optional integration with `downloader/response_bytes`. ------------------------------------------------------------------------ @@ -33,6 +34,9 @@ Optionally configure the number of timeline buckets: ```python METRICS_TIMELINE_BUCKETS = 30 + +# If True, curl_cffi downloaded bytes are also added to downloader/response_bytes +PS_HELPER_CURL_ADD_TO_DOWNLOADER_RESPONSE_BYTES = True ``` ## Example Usage @@ -57,6 +61,57 @@ The extension will validate items and detect duplicates automatically. ------------------------------------------------------------------------ +## Reusable curl_cffi Byte Tracker + +You can use this helper from any project to record transfer bytes from `curl_cffi` responses: + +```python +from ps_helper.extensions import record_curl_transfer_bytes + +# inside a spider or custom download handler +record_curl_transfer_bytes( + stats=self.crawler.stats, + curl_response=curl_resp, + add_to_downloader_response_bytes=True, +) +``` + +What it records: + +- `curl_cffi/bytes_down` +- `curl_cffi/bytes_up` +- `curl_cffi/bytes_total` +- `curl_cffi/response_count` +- Optional: `downloader/response_bytes` (downloaded bytes) + +The helper prefers curl/libcurl transfer sizes when available and falls back to `len(response.content)`. + +## TrackedCurlSession (Recommended) + +For automatic tracking on every request, use `TrackedCurlSession` as a drop-in wrapper: + +```python +from ps_helper.extensions import TrackedCurlSession + +class MySpider(scrapy.Spider): + name = "my_spider" + + @classmethod + def from_crawler(cls, crawler, *args, **kwargs): + spider = super().from_crawler(crawler, *args, **kwargs) + spider.curl_session = TrackedCurlSession( + stats=crawler.stats, + add_to_downloader_response_bytes=crawler.settings.getbool( + "PS_HELPER_CURL_ADD_TO_DOWNLOADER_RESPONSE_BYTES", True + ), + ) + return spider +``` + +Then keep using `self.curl_session.get(...)` / `post(...)` normally. + +------------------------------------------------------------------------ + ## Metrics Report @@ -123,4 +178,3 @@ Example structure: } ``` ------------------------------------------------------------------------ - diff --git a/src/ps_helper/extensions/__init__.py b/src/ps_helper/extensions/__init__.py index e69de29..3633f57 100644 --- a/src/ps_helper/extensions/__init__.py +++ b/src/ps_helper/extensions/__init__.py @@ -0,0 +1,3 @@ +from .curl_metrics import TrackedCurlSession, record_curl_transfer_bytes + +__all__ = ["record_curl_transfer_bytes", "TrackedCurlSession"] diff --git a/src/ps_helper/extensions/curl_metrics.py b/src/ps_helper/extensions/curl_metrics.py new file mode 100644 index 0000000..6f4fc48 --- /dev/null +++ b/src/ps_helper/extensions/curl_metrics.py @@ -0,0 +1,158 @@ +"""Helpers to record curl_cffi transfer metrics in Scrapy stats.""" + +from __future__ import annotations + +from curl_cffi import requests as curl_requests +from curl_cffi.const import CurlInfo + + +class TrackedCurlSession: + """Wrapper around curl_cffi Session that auto-records transfer bytes.""" + + def __init__( + self, + *, + stats, + session=None, + add_to_downloader_response_bytes=True, + dedupe_on_response=True, + ): + self.stats = stats + self.add_to_downloader_response_bytes = add_to_downloader_response_bytes + self.dedupe_on_response = dedupe_on_response + self._session = session if session is not None else self._build_default_session() + + @staticmethod + def _build_default_session(): + return curl_requests.Session() + + def _track(self, response): + record_curl_transfer_bytes( + self.stats, + response, + add_to_downloader_response_bytes=self.add_to_downloader_response_bytes, + dedupe_on_response=self.dedupe_on_response, + ) + return response + + def request(self, method, url, *args, **kwargs): + response = self._session.request(method, url, *args, **kwargs) + return self._track(response) + + def get(self, url, *args, **kwargs): + return self.request("GET", url, *args, **kwargs) + + def post(self, url, *args, **kwargs): + return self.request("POST", url, *args, **kwargs) + + def put(self, url, *args, **kwargs): + return self.request("PUT", url, *args, **kwargs) + + def patch(self, url, *args, **kwargs): + return self.request("PATCH", url, *args, **kwargs) + + def delete(self, url, *args, **kwargs): + return self.request("DELETE", url, *args, **kwargs) + + def head(self, url, *args, **kwargs): + return self.request("HEAD", url, *args, **kwargs) + + def options(self, url, *args, **kwargs): + return self.request("OPTIONS", url, *args, **kwargs) + + def close(self): + close_fn = getattr(self._session, "close", None) + if callable(close_fn): + close_fn() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + return False + + def __getattr__(self, item): + return getattr(self._session, item) + + +def _safe_int(value): + try: + return int(value or 0) + except Exception: + return 0 + + +def _extract_transfer_sizes(curl_response): + """Return tuple: (down_bytes, up_bytes).""" + download_size = 0 + upload_size = 0 + header_size = 0 + request_size = 0 + + curl_handle = getattr(curl_response, "curl", None) + if curl_handle is not None: + try: + download_size = _safe_int(curl_handle.getinfo(CurlInfo.SIZE_DOWNLOAD_T)) + upload_size = _safe_int(curl_handle.getinfo(CurlInfo.SIZE_UPLOAD_T)) + header_size = _safe_int(curl_handle.getinfo(CurlInfo.HEADER_SIZE)) + request_size = _safe_int(curl_handle.getinfo(CurlInfo.REQUEST_SIZE)) + except Exception: + download_size = 0 + upload_size = 0 + header_size = 0 + request_size = 0 + + if (download_size + header_size) == 0: + download_size = len(getattr(curl_response, "content", b"") or b"") + + down_bytes = download_size + header_size + up_bytes = request_size + upload_size + return down_bytes, up_bytes + + +def record_curl_transfer_bytes( + stats, + curl_response, + *, + add_to_downloader_response_bytes=True, + dedupe_on_response=True, +): + """Record curl_cffi transfer metrics in Scrapy stats. + + Args: + stats: Scrapy stats collector. + curl_response: response object returned by curl_cffi. + add_to_downloader_response_bytes: if True, increment + ``downloader/response_bytes`` with downloaded bytes. + dedupe_on_response: if True, skip if metrics were already recorded for + this response instance. + """ + if stats is None or curl_response is None: + return {"down_bytes": 0, "up_bytes": 0, "total_bytes": 0} + + if dedupe_on_response and getattr(curl_response, "_ps_helper_curl_metrics_recorded", False): + return {"down_bytes": 0, "up_bytes": 0, "total_bytes": 0} + + down_bytes, up_bytes = _extract_transfer_sizes(curl_response) + total_bytes = down_bytes + up_bytes + + stats.inc_value("curl_cffi/bytes_down", down_bytes) + stats.inc_value("curl_cffi/bytes_up", up_bytes) + stats.inc_value("curl_cffi/bytes_total", total_bytes) + stats.inc_value("curl_cffi/response_count", 1) + + if add_to_downloader_response_bytes: + stats.inc_value("downloader/response_bytes", down_bytes) + + if dedupe_on_response: + try: + setattr(curl_response, "_ps_helper_curl_metrics_recorded", True) + except Exception: + pass + + return { + "down_bytes": down_bytes, + "up_bytes": up_bytes, + "total_bytes": total_bytes, + } diff --git a/src/ps_helper/extensions/metrics_extension.py b/src/ps_helper/extensions/metrics_extension.py index 7235c00..cf99bb0 100644 --- a/src/ps_helper/extensions/metrics_extension.py +++ b/src/ps_helper/extensions/metrics_extension.py @@ -1,14 +1,16 @@ -import os -import time +import datetime import json import math -import datetime +import os +import time from collections import defaultdict -from ..scripts.generate_report import generate_html_report -from ..scripts.utils import upload_html_to_s3 -from scrapy import signals from pydantic import ValidationError +from scrapy import signals + +from ..scripts.generate_report import generate_html_report +from ..scripts.utils import upload_html_to_s3 +from .curl_metrics import record_curl_transfer_bytes class MetricsExtension: @@ -40,6 +42,7 @@ def __init__(self, stats, schema=None, unique_field=None, max_buckets=30, items_ self.unique_field = unique_field self.items_expected = items_expected + self.curl_add_to_downloader_response_bytes = True @classmethod def from_crawler(cls, crawler): @@ -48,6 +51,9 @@ def from_crawler(cls, crawler): max_buckets = crawler.settings.getint("METRICS_TIMELINE_BUCKETS", 30) items_expected = getattr(crawler.spidercls, "ITEMS_EXPECTED", None) + curl_add_to_downloader_response_bytes = crawler.settings.getbool( + "PS_HELPER_CURL_ADD_TO_DOWNLOADER_RESPONSE_BYTES", True + ) ext = cls( crawler.stats, @@ -56,6 +62,7 @@ def from_crawler(cls, crawler): max_buckets=max_buckets, items_expected=items_expected ) + ext.curl_add_to_downloader_response_bytes = curl_add_to_downloader_response_bytes crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened) crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed) @@ -95,7 +102,7 @@ def item_scraped(self, item, spider): self.field_coverage[field]["complete"] += 1 # Temporal timeline: save timestamp in seconds - elapsed_seconds = int(time.time() - self.start_time) + elapsed_seconds = int(time.time() - (self.start_time or time.time())) self.timeline[elapsed_seconds] += 1 # Check duplicates if unique_field is defined @@ -109,7 +116,18 @@ def item_scraped(self, item, spider): def spider_error(self, failure, response, spider): self.stats.inc_value("custom/errors") + def record_curl_response(self, curl_response): + """Public helper to register curl_cffi transfer bytes from spiders/handlers.""" + return record_curl_transfer_bytes( + self.stats, + curl_response, + add_to_downloader_response_bytes=self.curl_add_to_downloader_response_bytes, + ) + def spider_closed(self, spider, reason): + if self.start_time is None: + self.start_time = time.time() + elapsed = time.time() - self.start_time total_minutes = elapsed / 60 @@ -140,6 +158,7 @@ def spider_closed(self, spider, reason): else: efficiency_factor = 0.65 # 35% penalización (muy ineficiente) + goal_achievement = None if self.items_expected: goal_achievement = (items / self.items_expected * 100) if self.items_expected > 0 else 0 @@ -194,6 +213,10 @@ def spider_closed(self, spider, reason): # Memory and bytes peak_mem = self.stats.get_value("memusage/max", 0) # bytes total_bytes = self.stats.get_value("downloader/response_bytes", 0) + curl_bytes_down = self.stats.get_value("curl_cffi/bytes_down", 0) + curl_bytes_up = self.stats.get_value("curl_cffi/bytes_up", 0) + curl_bytes_total = self.stats.get_value("curl_cffi/bytes_total", 0) + curl_response_count = self.stats.get_value("curl_cffi/response_count", 0) metrics = { "spider_name": spider.name, @@ -223,6 +246,16 @@ def spider_closed(self, spider, reason): "resources": { "peak_memory_bytes": peak_mem, "downloaded_bytes": total_bytes, + "downloaded_kb": round(total_bytes / 1024, 2), + "downloaded_mb": round(total_bytes / (1024 * 1024), 2), + "curl_cffi": { + "response_count": curl_response_count, + "bytes_down": curl_bytes_down, + "bytes_up": curl_bytes_up, + "bytes_total": curl_bytes_total, + "bytes_total_kb": round(curl_bytes_total / 1024, 2), + "bytes_total_mb": round(curl_bytes_total / (1024 * 1024), 2), + }, }, "timeline": timeline_sorted, "timeline_interval_minutes": interval_size, @@ -257,6 +290,8 @@ def _upload_report_to_s3(self, html_content, spider): """Upload HTML report to S3 from memory""" bucket_name = os.getenv('S3_BUCKET_NAME') + if not bucket_name: + raise ValueError("S3_BUCKET_NAME is required to upload report") expiration_days = int(os.getenv('REPORT_EXPIRATION_DAYS', '3')) expiration_seconds = expiration_days * 24 * 3600 diff --git a/tests/test_curl_metrics.py b/tests/test_curl_metrics.py new file mode 100644 index 0000000..c5873b2 --- /dev/null +++ b/tests/test_curl_metrics.py @@ -0,0 +1,223 @@ +import sys +import types + +from ps_helper.extensions.curl_metrics import (TrackedCurlSession, + record_curl_transfer_bytes) + + +class DummyStats: + def __init__(self): + self._values = {} + + def inc_value(self, key, value=1): + self._values[key] = self._values.get(key, 0) + value + + def get_value(self, key, default=0): + return self._values.get(key, default) + + +class DummyCurl: + def __init__(self, mapping): + self.mapping = mapping + + def getinfo(self, key): + return self.mapping.get(key, 0) + + +class DummyResponse: + def __init__(self, content=b"", curl=None): + self.content = content + self.curl = curl + + +class DummySession: + def __init__(self, response): + self.response = response + self.calls = [] + + def request(self, method, url, *args, **kwargs): + self.calls.append((method, url, args, kwargs)) + return self.response + + +def _install_fake_curlinfo(monkeypatch): + curl_module = types.ModuleType("curl_cffi") + const_module = types.ModuleType("curl_cffi.const") + + class CurlInfo: + SIZE_DOWNLOAD_T = "SIZE_DOWNLOAD_T" + SIZE_UPLOAD_T = "SIZE_UPLOAD_T" + HEADER_SIZE = "HEADER_SIZE" + REQUEST_SIZE = "REQUEST_SIZE" + + const_module.CurlInfo = CurlInfo + monkeypatch.setitem(sys.modules, "curl_cffi", curl_module) + monkeypatch.setitem(sys.modules, "curl_cffi.const", const_module) + return CurlInfo + + +def test_records_bytes_from_curl_getinfo(monkeypatch): + curl_info = _install_fake_curlinfo(monkeypatch) + stats = DummyStats() + curl = DummyCurl( + { + curl_info.SIZE_DOWNLOAD_T: 1200, + curl_info.SIZE_UPLOAD_T: 100, + curl_info.HEADER_SIZE: 300, + curl_info.REQUEST_SIZE: 50, + } + ) + response = DummyResponse(content=b"x" * 10, curl=curl) + + result = record_curl_transfer_bytes(stats, response) + + assert result["down_bytes"] == 1500 + assert result["up_bytes"] == 150 + assert result["total_bytes"] == 1650 + assert stats.get_value("curl_cffi/bytes_down") == 1500 + assert stats.get_value("curl_cffi/bytes_up") == 150 + assert stats.get_value("curl_cffi/bytes_total") == 1650 + assert stats.get_value("curl_cffi/response_count") == 1 + assert stats.get_value("downloader/response_bytes") == 1500 + + +def test_falls_back_to_content_length_without_curl_info(monkeypatch): + monkeypatch.delitem(sys.modules, "curl_cffi", raising=False) + monkeypatch.delitem(sys.modules, "curl_cffi.const", raising=False) + + stats = DummyStats() + response = DummyResponse(content=b"abcde", curl=None) + + result = record_curl_transfer_bytes(stats, response) + + assert result["down_bytes"] == 5 + assert result["up_bytes"] == 0 + assert result["total_bytes"] == 5 + assert stats.get_value("downloader/response_bytes") == 5 + + +def test_deduplicates_same_response_instance(monkeypatch): + curl_info = _install_fake_curlinfo(monkeypatch) + stats = DummyStats() + curl = DummyCurl( + { + curl_info.SIZE_DOWNLOAD_T: 100, + curl_info.SIZE_UPLOAD_T: 0, + curl_info.HEADER_SIZE: 20, + curl_info.REQUEST_SIZE: 10, + } + ) + response = DummyResponse(content=b"ignored", curl=curl) + + first = record_curl_transfer_bytes(stats, response) + second = record_curl_transfer_bytes(stats, response) + + assert first["total_bytes"] == 130 + assert second["total_bytes"] == 0 + assert stats.get_value("curl_cffi/response_count") == 1 + assert stats.get_value("downloader/response_bytes") == 120 + + +def test_can_skip_downloader_response_bytes(monkeypatch): + curl_info = _install_fake_curlinfo(monkeypatch) + stats = DummyStats() + curl = DummyCurl( + { + curl_info.SIZE_DOWNLOAD_T: 200, + curl_info.SIZE_UPLOAD_T: 10, + curl_info.HEADER_SIZE: 40, + curl_info.REQUEST_SIZE: 20, + } + ) + response = DummyResponse(content=b"", curl=curl) + + record_curl_transfer_bytes( + stats, + response, + add_to_downloader_response_bytes=False, + ) + + assert stats.get_value("curl_cffi/bytes_total") == 270 + assert stats.get_value("downloader/response_bytes") == 0 + + +def test_tracked_session_get_records_and_forwards_kwargs(monkeypatch): + curl_info = _install_fake_curlinfo(monkeypatch) + stats = DummyStats() + response = DummyResponse( + content=b"", + curl=DummyCurl( + { + curl_info.SIZE_DOWNLOAD_T: 500, + curl_info.SIZE_UPLOAD_T: 20, + curl_info.HEADER_SIZE: 100, + curl_info.REQUEST_SIZE: 30, + } + ), + ) + session = DummySession(response) + tracked = TrackedCurlSession(stats=stats, session=session) + + returned = tracked.get("https://example.com", impersonate="chrome120", timeout=10) + + assert returned is response + assert session.calls[0][0] == "GET" + assert session.calls[0][1] == "https://example.com" + assert session.calls[0][3]["impersonate"] == "chrome120" + assert session.calls[0][3]["timeout"] == 10 + assert stats.get_value("curl_cffi/bytes_down") == 600 + assert stats.get_value("curl_cffi/bytes_up") == 50 + assert stats.get_value("curl_cffi/bytes_total") == 650 + assert stats.get_value("downloader/response_bytes") == 600 + + +def test_tracked_session_request_method_is_supported(monkeypatch): + curl_info = _install_fake_curlinfo(monkeypatch) + stats = DummyStats() + response = DummyResponse( + content=b"", + curl=DummyCurl( + { + curl_info.SIZE_DOWNLOAD_T: 100, + curl_info.SIZE_UPLOAD_T: 10, + curl_info.HEADER_SIZE: 20, + curl_info.REQUEST_SIZE: 5, + } + ), + ) + session = DummySession(response) + tracked = TrackedCurlSession(stats=stats, session=session) + + tracked.request("POST", "https://example.com/x", data={"k": "v"}) + + assert session.calls[0][0] == "POST" + assert session.calls[0][1] == "https://example.com/x" + assert session.calls[0][3]["data"] == {"k": "v"} + assert stats.get_value("curl_cffi/response_count") == 1 + + +def test_tracked_session_respects_toggle_for_response_bytes(monkeypatch): + curl_info = _install_fake_curlinfo(monkeypatch) + stats = DummyStats() + response = DummyResponse( + content=b"", + curl=DummyCurl( + { + curl_info.SIZE_DOWNLOAD_T: 300, + curl_info.SIZE_UPLOAD_T: 20, + curl_info.HEADER_SIZE: 80, + curl_info.REQUEST_SIZE: 40, + } + ), + ) + session = DummySession(response) + tracked = TrackedCurlSession( + stats=stats, + session=session, + add_to_downloader_response_bytes=False, + ) + + tracked.post("https://example.com/y") + + assert stats.get_value("curl_cffi/bytes_total") == 440 + assert stats.get_value("downloader/response_bytes") == 0