Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,33 @@ ps-helper create-report scrapy_stats.json
```
This will automatically create a report named scrapy_stats-report.html in the same directory as your metrics file.

### Track curl_cffi Downloaded Bytes

Use the reusable helper to register transfer bytes in Scrapy stats (including `downloader/response_bytes`):

```python
from ps_helper.extensions import record_curl_transfer_bytes

record_curl_transfer_bytes(
stats=self.crawler.stats,
curl_response=curl_resp,
add_to_downloader_response_bytes=True,
)
```

With `MetricsExtension`, this is also reflected in the final JSON report under `resources`.

For automatic tracking in every curl request, use `TrackedCurlSession`:

```python
from ps_helper.extensions import TrackedCurlSession

self.curl_session = TrackedCurlSession(stats=self.crawler.stats)

# keep using get/post as usual
curl_resp = self.curl_session.get(url, impersonate="chrome120")
```

---

## 🕷️ Scrapy URL Blocker Middleware
Expand Down
56 changes: 55 additions & 1 deletion src/ps_helper/extensions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- **Retry & Timeout Analysis**: Record retry attempts and timeout errors.
- **Resource Monitoring**: Capture memory usage and response bytes.
- **JSON Reports**: Save structured metrics reports to `metrics/YYYY-MM-DD/` folders.
- **curl_cffi Transfer Metrics**: Reusable helper for downloaded/uploaded bytes and optional integration with `downloader/response_bytes`.

------------------------------------------------------------------------

Expand All @@ -33,6 +34,9 @@ Optionally configure the number of timeline buckets:

```python
METRICS_TIMELINE_BUCKETS = 30

# If True, curl_cffi downloaded bytes are also added to downloader/response_bytes
PS_HELPER_CURL_ADD_TO_DOWNLOADER_RESPONSE_BYTES = True
```

## Example Usage
Expand All @@ -57,6 +61,57 @@ The extension will validate items and detect duplicates automatically.

------------------------------------------------------------------------

## Reusable curl_cffi Byte Tracker

You can use this helper from any project to record transfer bytes from `curl_cffi` responses:

```python
from ps_helper.extensions import record_curl_transfer_bytes

# inside a spider or custom download handler
record_curl_transfer_bytes(
stats=self.crawler.stats,
curl_response=curl_resp,
add_to_downloader_response_bytes=True,
)
```

What it records:

- `curl_cffi/bytes_down`
- `curl_cffi/bytes_up`
- `curl_cffi/bytes_total`
- `curl_cffi/response_count`
- Optional: `downloader/response_bytes` (downloaded bytes)

The helper prefers curl/libcurl transfer sizes when available and falls back to `len(response.content)`.

## TrackedCurlSession (Recommended)

For automatic tracking on every request, use `TrackedCurlSession` as a drop-in wrapper:

```python
from ps_helper.extensions import TrackedCurlSession

class MySpider(scrapy.Spider):
name = "my_spider"

@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
spider = super().from_crawler(crawler, *args, **kwargs)
spider.curl_session = TrackedCurlSession(
stats=crawler.stats,
add_to_downloader_response_bytes=crawler.settings.getbool(
"PS_HELPER_CURL_ADD_TO_DOWNLOADER_RESPONSE_BYTES", True
),
)
return spider
```

Then keep using `self.curl_session.get(...)` / `post(...)` normally.

------------------------------------------------------------------------


## Metrics Report

Expand Down Expand Up @@ -123,4 +178,3 @@ Example structure:
}
```
------------------------------------------------------------------------

3 changes: 3 additions & 0 deletions src/ps_helper/extensions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .curl_metrics import TrackedCurlSession, record_curl_transfer_bytes

__all__ = ["record_curl_transfer_bytes", "TrackedCurlSession"]
158 changes: 158 additions & 0 deletions src/ps_helper/extensions/curl_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""Helpers to record curl_cffi transfer metrics in Scrapy stats."""

from __future__ import annotations

from curl_cffi import requests as curl_requests
from curl_cffi.const import CurlInfo


class TrackedCurlSession:
"""Wrapper around curl_cffi Session that auto-records transfer bytes."""

def __init__(
self,
*,
stats,
session=None,
add_to_downloader_response_bytes=True,
dedupe_on_response=True,
):
self.stats = stats
self.add_to_downloader_response_bytes = add_to_downloader_response_bytes
self.dedupe_on_response = dedupe_on_response
self._session = session if session is not None else self._build_default_session()

@staticmethod
def _build_default_session():
return curl_requests.Session()

def _track(self, response):
record_curl_transfer_bytes(
self.stats,
response,
add_to_downloader_response_bytes=self.add_to_downloader_response_bytes,
dedupe_on_response=self.dedupe_on_response,
)
return response

def request(self, method, url, *args, **kwargs):
response = self._session.request(method, url, *args, **kwargs)
return self._track(response)

def get(self, url, *args, **kwargs):
return self.request("GET", url, *args, **kwargs)

def post(self, url, *args, **kwargs):
return self.request("POST", url, *args, **kwargs)

def put(self, url, *args, **kwargs):
return self.request("PUT", url, *args, **kwargs)

def patch(self, url, *args, **kwargs):
return self.request("PATCH", url, *args, **kwargs)

def delete(self, url, *args, **kwargs):
return self.request("DELETE", url, *args, **kwargs)

def head(self, url, *args, **kwargs):
return self.request("HEAD", url, *args, **kwargs)

def options(self, url, *args, **kwargs):
return self.request("OPTIONS", url, *args, **kwargs)

def close(self):
close_fn = getattr(self._session, "close", None)
if callable(close_fn):
close_fn()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False

def __getattr__(self, item):
return getattr(self._session, item)


def _safe_int(value):
try:
return int(value or 0)
except Exception:
return 0


def _extract_transfer_sizes(curl_response):
"""Return tuple: (down_bytes, up_bytes)."""
download_size = 0
upload_size = 0
header_size = 0
request_size = 0

curl_handle = getattr(curl_response, "curl", None)
if curl_handle is not None:
try:
download_size = _safe_int(curl_handle.getinfo(CurlInfo.SIZE_DOWNLOAD_T))
upload_size = _safe_int(curl_handle.getinfo(CurlInfo.SIZE_UPLOAD_T))
header_size = _safe_int(curl_handle.getinfo(CurlInfo.HEADER_SIZE))
request_size = _safe_int(curl_handle.getinfo(CurlInfo.REQUEST_SIZE))
except Exception:
download_size = 0
upload_size = 0
header_size = 0
request_size = 0

if (download_size + header_size) == 0:
download_size = len(getattr(curl_response, "content", b"") or b"")

down_bytes = download_size + header_size
up_bytes = request_size + upload_size
return down_bytes, up_bytes


def record_curl_transfer_bytes(
stats,
curl_response,
*,
add_to_downloader_response_bytes=True,
dedupe_on_response=True,
):
"""Record curl_cffi transfer metrics in Scrapy stats.

Args:
stats: Scrapy stats collector.
curl_response: response object returned by curl_cffi.
add_to_downloader_response_bytes: if True, increment
``downloader/response_bytes`` with downloaded bytes.
dedupe_on_response: if True, skip if metrics were already recorded for
this response instance.
"""
if stats is None or curl_response is None:
return {"down_bytes": 0, "up_bytes": 0, "total_bytes": 0}

if dedupe_on_response and getattr(curl_response, "_ps_helper_curl_metrics_recorded", False):
return {"down_bytes": 0, "up_bytes": 0, "total_bytes": 0}

down_bytes, up_bytes = _extract_transfer_sizes(curl_response)
total_bytes = down_bytes + up_bytes

stats.inc_value("curl_cffi/bytes_down", down_bytes)
stats.inc_value("curl_cffi/bytes_up", up_bytes)
stats.inc_value("curl_cffi/bytes_total", total_bytes)
stats.inc_value("curl_cffi/response_count", 1)

if add_to_downloader_response_bytes:
stats.inc_value("downloader/response_bytes", down_bytes)

if dedupe_on_response:
try:
setattr(curl_response, "_ps_helper_curl_metrics_recorded", True)
except Exception:
pass

return {
"down_bytes": down_bytes,
"up_bytes": up_bytes,
"total_bytes": total_bytes,
}
49 changes: 42 additions & 7 deletions src/ps_helper/extensions/metrics_extension.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import os
import time
import datetime
import json
import math
import datetime
import os
import time
from collections import defaultdict
from ..scripts.generate_report import generate_html_report
from ..scripts.utils import upload_html_to_s3

from scrapy import signals
from pydantic import ValidationError
from scrapy import signals

from ..scripts.generate_report import generate_html_report
from ..scripts.utils import upload_html_to_s3
from .curl_metrics import record_curl_transfer_bytes


class MetricsExtension:
Expand Down Expand Up @@ -40,6 +42,7 @@ def __init__(self, stats, schema=None, unique_field=None, max_buckets=30, items_
self.unique_field = unique_field

self.items_expected = items_expected
self.curl_add_to_downloader_response_bytes = True

@classmethod
def from_crawler(cls, crawler):
Expand All @@ -48,6 +51,9 @@ def from_crawler(cls, crawler):

max_buckets = crawler.settings.getint("METRICS_TIMELINE_BUCKETS", 30)
items_expected = getattr(crawler.spidercls, "ITEMS_EXPECTED", None)
curl_add_to_downloader_response_bytes = crawler.settings.getbool(
"PS_HELPER_CURL_ADD_TO_DOWNLOADER_RESPONSE_BYTES", True
)

ext = cls(
crawler.stats,
Expand All @@ -56,6 +62,7 @@ def from_crawler(cls, crawler):
max_buckets=max_buckets,
items_expected=items_expected
)
ext.curl_add_to_downloader_response_bytes = curl_add_to_downloader_response_bytes

crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
Expand Down Expand Up @@ -95,7 +102,7 @@ def item_scraped(self, item, spider):
self.field_coverage[field]["complete"] += 1

# Temporal timeline: save timestamp in seconds
elapsed_seconds = int(time.time() - self.start_time)
elapsed_seconds = int(time.time() - (self.start_time or time.time()))
self.timeline[elapsed_seconds] += 1

# Check duplicates if unique_field is defined
Expand All @@ -109,7 +116,18 @@ def item_scraped(self, item, spider):
def spider_error(self, failure, response, spider):
self.stats.inc_value("custom/errors")

def record_curl_response(self, curl_response):
"""Public helper to register curl_cffi transfer bytes from spiders/handlers."""
return record_curl_transfer_bytes(
self.stats,
curl_response,
add_to_downloader_response_bytes=self.curl_add_to_downloader_response_bytes,
)

def spider_closed(self, spider, reason):
if self.start_time is None:
self.start_time = time.time()

elapsed = time.time() - self.start_time
total_minutes = elapsed / 60

Expand Down Expand Up @@ -140,6 +158,7 @@ def spider_closed(self, spider, reason):
else:
efficiency_factor = 0.65 # 35% penalización (muy ineficiente)

goal_achievement = None
if self.items_expected:
goal_achievement = (items / self.items_expected * 100) if self.items_expected > 0 else 0

Expand Down Expand Up @@ -194,6 +213,10 @@ def spider_closed(self, spider, reason):
# Memory and bytes
peak_mem = self.stats.get_value("memusage/max", 0) # bytes
total_bytes = self.stats.get_value("downloader/response_bytes", 0)
curl_bytes_down = self.stats.get_value("curl_cffi/bytes_down", 0)
curl_bytes_up = self.stats.get_value("curl_cffi/bytes_up", 0)
curl_bytes_total = self.stats.get_value("curl_cffi/bytes_total", 0)
curl_response_count = self.stats.get_value("curl_cffi/response_count", 0)

metrics = {
"spider_name": spider.name,
Expand Down Expand Up @@ -223,6 +246,16 @@ def spider_closed(self, spider, reason):
"resources": {
"peak_memory_bytes": peak_mem,
"downloaded_bytes": total_bytes,
"downloaded_kb": round(total_bytes / 1024, 2),
"downloaded_mb": round(total_bytes / (1024 * 1024), 2),
"curl_cffi": {
"response_count": curl_response_count,
"bytes_down": curl_bytes_down,
"bytes_up": curl_bytes_up,
"bytes_total": curl_bytes_total,
"bytes_total_kb": round(curl_bytes_total / 1024, 2),
"bytes_total_mb": round(curl_bytes_total / (1024 * 1024), 2),
},
},
"timeline": timeline_sorted,
"timeline_interval_minutes": interval_size,
Expand Down Expand Up @@ -257,6 +290,8 @@ def _upload_report_to_s3(self, html_content, spider):
"""Upload HTML report to S3 from memory"""

bucket_name = os.getenv('S3_BUCKET_NAME')
if not bucket_name:
raise ValueError("S3_BUCKET_NAME is required to upload report")

expiration_days = int(os.getenv('REPORT_EXPIRATION_DAYS', '3'))
expiration_seconds = expiration_days * 24 * 3600
Expand Down
Loading
Loading