diff --git a/NEWS.md b/NEWS.md index 7761e29b..3d9a5814 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,5 @@ +**05/17/2026:** The OGC `waterdata` getters (`get_daily`, `get_continuous`, `get_field_measurements`, and the rest of the multi-value-capable functions) now transparently chunk requests whose URLs would otherwise exceed the server's ~8 KB byte limit. A common chained-query pattern — pull a long site list from `get_monitoring_locations`, then feed it into `get_daily` — previously failed with HTTP 414 once the resulting URL grew past the limit; it now fans out across multiple sub-requests under the hood and returns one combined DataFrame. The chunker coordinates with the existing CQL `filter` chunker (long top-level-`OR` filters still split correctly when used alongside long multi-value lists), caps cartesian-product plans at 1000 sub-requests (the default USGS hourly quota), and aborts mid-call with a structured `QuotaExhausted` exception — carrying the partial result and a resume offset — if `x-ratelimit-remaining` drops below a safety floor. Mirrors R `dataRetrieval`'s [#870](https://github.com/DOI-USGS/dataRetrieval/pull/870), generalized to N dimensions. Note one metadata-behavior change for paginated/chunked calls: `BaseMetadata.url` still reflects the user's original query (unchanged), but `BaseMetadata.header` now carries the *last* page's / sub-request's headers (so `x-ratelimit-remaining` is current) rather than the first, and `BaseMetadata.query_time` is now the cumulative wall-clock across pages instead of the first page's elapsed. + **05/16/2026:** Fixed silent truncation in the paginated `waterdata` request loops (`_walk_pages` and `get_stats_data`). Mid-pagination failures (HTTP 429, 5xx, network error) were previously swallowed — pagination would quietly stop and the function would return whatever rows it had collected, leaving callers with truncated DataFrames they had no way to detect. The loops now status-check every page like the initial request and raise `RuntimeError` on any failure, with the upstream exception chained as `__cause__` and a short menu of recovery actions (wait and retry, reduce the request, or obtain an API token) in the message. **Behavior change**: callers that previously consumed partial DataFrames on transient upstream blips will now see an exception; retry the call (possibly with a smaller `limit` or narrower query). **05/07/2026:** Bumped the declared minimum Python version from **3.8** to **3.9** (`pyproject.toml`'s `requires-python` and the ruff target). This brings the manifest in line with what was already being tested — CI's matrix has long covered only 3.9, 3.13, and 3.14, the `waterdata` test module already skipped itself on Python < 3.10, and several modules already use 3.9-only stdlib (e.g. `zoneinfo`). Users on 3.8 will no longer be able to install the package; please upgrade. diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index ad268194..025aafcd 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -230,6 +230,21 @@ def get_daily( ... parameter_code="00060", ... last_modified="P7D", ... ) + + >>> # Chain queries: pull all stream sites in a state, then their + >>> # daily discharge for the last week. The site list can be hundreds + >>> # of values long — the request is transparently chunked across + >>> # multiple sub-requests so the URL stays under the server's byte + >>> # limit. Combined output looks like a single query. + >>> sites_df, _ = dataretrieval.waterdata.get_monitoring_locations( + ... state_name="Ohio", + ... site_type="Stream", + ... ) + >>> df, md = dataretrieval.waterdata.get_daily( + ... monitoring_location_id=sites_df["monitoring_location_id"].tolist(), + ... parameter_code="00060", + ... time="P7D", + ... ) """ service = "daily" output_id = "daily_id" diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py new file mode 100644 index 00000000..b33a95ed --- /dev/null +++ b/dataretrieval/waterdata/chunking.py @@ -0,0 +1,692 @@ +"""Joint URL-byte chunking for the Water Data OGC getters. + +Long multi-value list params (sites, parameter codes, ...) and long +top-level-``OR`` CQL filters independently risk overflowing the +server's ~8 KB URL byte limit. ``multi_value_chunked`` builds a +``ChunkPlan`` that plans both chunking dimensions together, picks the +allocation that minimizes total sub-requests, and iterates the joint +cartesian product so every sub-request URL fits. Requests that +already fit get a trivial single-step plan — the wrapper has one code +path either way. + +Planning: for a filter with ``n_clauses`` top-level OR clauses, try +candidate filter chunk counts ``k = 1, 2, 4, ..., n_clauses``. For +each, partition clauses into ``k`` count-balanced groups joined by +``OR``, take the longest (URL-encoded) group as the worst-case filter, +then plan list-dim chunking by greedy halving against the remaining +budget. Keep the candidate with the smallest ``list_count × k``. + +Quota: after the first sub-request the execution reads +``x-ratelimit-remaining``; if the rest of the plan won't fit, it +raises ``RequestExceedsQuota`` before burning more budget. A 429 +on any sub-request surfaces as ``QuotaExhausted`` carrying whatever +chunks completed first, so callers can resume after the hourly window +resets. + +Dedup: list sub-chunks don't overlap; filter sub-chunks can, so the +combiner dedupes by feature ``id``. ``properties``, ``bbox``, date +intervals, ``limit``, ``skip_geometry``, and ``filter``/``filter_lang`` +themselves are never sliced as list dims (the filter is partitioned +along its top-level OR axis instead). +""" + +from __future__ import annotations + +import functools +import itertools +import math +from collections.abc import Callable, Iterator +from dataclasses import dataclass +from typing import Any +from urllib.parse import quote_plus + +import pandas as pd +import requests + +from .filters import ( + _check_numeric_filter_pitfall, + _is_chunkable, + _split_top_level_or, +) + +# Empirically the API replies HTTP 414 above ~8200 bytes of full URL — +# matches nginx's default ``large_client_header_buffers`` of 8 KB. 8000 +# leaves ~200 bytes for request-line framing and proxy variance. +_WATERDATA_URL_BYTE_LIMIT = 8000 + +# Default rule: any list-shaped kwarg with >1 element is chunked across +# sub-requests — each chunk becomes a comma-joined sub-list in the URL. +# The OGC getters expose ~90 such list-shaped params (IDs, codes, +# statuses, ...), all chunkable, so it's shorter to enumerate the +# exceptions than to maintain an allowlist that grows with the API. +# Exceptions, by reason: +# - response shape: ``properties`` defines the columns; sharding +# would yield different schemas per chunk. +# - structured: ``bbox`` is a fixed 4-element coord tuple. +# - intervals: date/time ranges are not enumerable sets. +# - handled elsewhere: ``filter`` gets OR-clause partitioning in +# ``ChunkPlan.from_args``; comma-joining CQL +# clauses would emit malformed expressions. +# - scalar by contract: ``limit``, ``skip_geometry``, ``filter_lang`` +# — a list value would be a type-erasure smuggle. +_NEVER_CHUNK = frozenset( + { + "properties", + "bbox", + "datetime", + "last_modified", + "begin", + "begin_utc", + "end", + "end_utc", + "time", + "filter", + "filter_lang", + "limit", + "skip_geometry", + } +) + +# Response header USGS uses to advertise remaining hourly quota. +_QUOTA_HEADER = "x-ratelimit-remaining" + +# Separators the two chunking dimensions use to compose their atoms +# into URL-encoded blobs. List dims comma-join values +# (``site=USGS-A,USGS-B``); filter dims OR-join clauses +# (``filter=a='1' OR a='2'``). Pinned as constants so the URL-byte +# sizing helper and the partition logic agree on the join shape. +_LIST_SEP = "," +_OR_SEP = " OR " + +# Args-dict key for the CQL filter. Hoisted so the planner and the +# wrapper substitute on the same key. +_FILTER_KEY = "filter" + +_FetchOnce = Callable[[dict[str, Any]], tuple[pd.DataFrame, requests.Response]] + + +class RequestTooLarge(ValueError): + """Raised when the URL exceeds the byte limit even at the smallest + reducible plan (every list dim at singleton chunks and the filter + at one clause per sub-request). Shrink the input lists, simplify + the filter, or split the call manually.""" + + +class RequestExceedsQuota(ValueError): + """Raised after the first sub-request when ``x-ratelimit-remaining`` + in the response shows the rest of the plan can't fit in the + current per-key rate-limit window. The first chunk has already + been issued; the wrapper stops here rather than burn quota on a + call that will fail mid-way. + + Attributes + ---------- + planned_chunks : int + Total sub-requests the joint plan would issue. + available : int + Sub-requests this caller can still issue in the current window + (``x-ratelimit-remaining`` + 1, since the first sub-request + already cost one slot). + deficit : int + ``planned_chunks - available`` — how far over budget the call + would run if it continued. + """ + + def __init__(self, *, planned_chunks: int, available: int, deficit: int) -> None: + super().__init__( + f"Request would issue {planned_chunks} sub-requests but only " + f"{available} fit in the current rate-limit window (short by " + f"{deficit}). Wait for the window to reset, request a higher " + f"per-key quota, or narrow the query." + ) + self.planned_chunks = planned_chunks + self.available = available + self.deficit = deficit + + +class QuotaExhausted(RuntimeError): + """Raised when a sub-request returns HTTP 429. + + For a chunked call (``total_chunks > 1``) reached past chunk 0, + the post-first-chunk ``RequestExceedsQuota`` check normally + short-circuits before burning quota on a plan that won't fit; + arrival here typically means a concurrent caller drained the + bucket faster than predicted, and ``partial_frame`` holds what + completed first. + + For a single-shot call (``total_chunks == 1``) or a 429 on the very + first chunk, ``partial_frame`` is empty and ``partial_response`` is + ``None``; the original ``RateLimited`` is on ``__cause__``. + + Attributes + ---------- + partial_frame : pd.DataFrame + Concatenated, deduplicated result of every sub-request that + completed before the 429. Empty when ``completed_chunks == 0``. + partial_response : requests.Response | None + Aggregated response with the canonical URL restored to the + user's full original query. ``None`` when ``completed_chunks == 0``. + completed_chunks : int + Number of sub-requests successfully completed. + total_chunks : int + Total sub-requests in the plan. + """ + + def __init__( + self, + *, + partial_frame: pd.DataFrame, + partial_response: requests.Response | None, + completed_chunks: int, + total_chunks: int, + ) -> None: + super().__init__( + f"HTTP 429 after {completed_chunks}/{total_chunks} " + f"sub-requests; catch QuotaExhausted to access .partial_frame " + f"and resume after the rate-limit window resets." + ) + self.partial_frame = partial_frame + self.partial_response = partial_response + self.completed_chunks = completed_chunks + self.total_chunks = total_chunks + + +def _chunkable_params(args: dict[str, Any]) -> dict[str, list[Any]]: + """Return ``{name: list(values)}`` for every list/tuple kwarg with + >1 element that is allowed to chunk.""" + return { + k: list(v) + for k, v in args.items() + if k not in _NEVER_CHUNK and isinstance(v, (list, tuple)) and len(v) > 1 + } + + +def _joined_url_bytes(atoms: list[Any], separator: str) -> int: + """URL-encoded byte length of ``atoms`` joined by ``separator``. + + The shared sizing primitive for both list dims (``separator=","``) + and filter clauses (``separator=" OR "``). ``quote_plus`` is faithful + to what the real URL builder produces, so values containing + characters that expand under URL encoding (``%``, ``+``, ``/``, + ``&``, …) can't be mis-ranked.""" + return len(quote_plus(separator.join(map(str, atoms)))) + + +def _request_bytes(req: requests.PreparedRequest) -> int: + """Total bytes of a prepared request: URL + body. + + GET routes have ``body=None`` and reduce to URL length. POST routes + (CQL2 JSON body) need body bytes — the URL stays short regardless + of payload, so URL-only sizing would underestimate the request and + skip chunking when it's needed. + + Raises ``TypeError`` on non-sizable bodies (generators, file-like + streams). Size-based planning needs a deterministic byte count. + """ + url_len = len(req.url) + body = req.body + if body is None: + return url_len + if isinstance(body, (bytes, bytearray)): + return url_len + len(body) + if isinstance(body, str): + return url_len + len(body.encode("utf-8")) + raise TypeError( + f"multi_value_chunked cannot size a request body of type " + f"{type(body).__name__!r}; pass str, bytes, or None." + ) + + +def _worst_case_args( + base_args: dict[str, Any], list_plan: dict[str, list[list[Any]]] +) -> dict[str, Any]: + """Args representing the largest sub-request the list plan will issue: + each dim's longest chunk by URL-encoded bytes, layered onto + ``base_args`` (which already has the candidate filter substituted + in by the caller).""" + out = dict(base_args) + for k, chunks in list_plan.items(): + out[k] = max(chunks, key=lambda c: _joined_url_bytes(c, _LIST_SEP)) + return out + + +def _partition_clauses(clauses: list[str], k: int) -> list[list[str]]: + """Split ``clauses`` into ``k`` roughly-balanced groups. Distributes + the remainder across the first groups so no group differs from + another by more than one clause. Returns groups as raw atom lists; + callers join with ``" OR "`` only for groups they actually intend + to issue, so the planner can size candidates without joining + discarded partitions.""" + if k <= 0: + raise ValueError(f"k must be >= 1; got {k}") + if k >= len(clauses): + return [[c] for c in clauses] + base, extra = divmod(len(clauses), k) + out: list[list[str]] = [] + i = 0 + for g in range(k): + size = base + (1 if g < extra else 0) + out.append(clauses[i : i + size]) + i += size + return out + + +def _plan_list_chunks( + args: dict[str, Any], + build_request: Callable[..., Any], + url_limit: int, +) -> dict[str, list[list[Any]]] | None: + """Greedy halving of multi-value list dims until the worst-case + sub-request URL fits ``url_limit``. The filter (if any) in ``args`` + is treated as fixed — the caller must have already substituted the + candidate's worst filter chunk so the URL probe accounts for it. + + Returns ``None`` when no list chunking is needed (request as-is + fits or no chunkable list dims present). Raises ``RequestTooLarge`` + when the smallest reducible plan still doesn't fit. + """ + chunkable = _chunkable_params(args) + if not chunkable: + return None + if _request_bytes(build_request(**args)) <= url_limit: + return None + + plan: dict[str, list[list[Any]]] = {k: [v] for k, v in chunkable.items()} + + while True: + worst = _worst_case_args(args, plan) + if _request_bytes(build_request(**worst)) <= url_limit: + return plan + + splittable = ( + (dim, idx, chunk) + for dim, dim_chunks in plan.items() + for idx, chunk in enumerate(dim_chunks) + if len(chunk) > 1 + ) + biggest = max( + splittable, + key=lambda t: _joined_url_bytes(t[2], _LIST_SEP), + default=None, + ) + if biggest is None: + raise RequestTooLarge( + f"Request exceeds {url_limit} bytes (URL + body) at the " + f"smallest reducible list plan. Reduce list sizes, shorten " + f"or simplify the filter, or split the call manually." + ) + dim, idx, chunk = biggest + mid = len(chunk) // 2 + plan[dim] = plan[dim][:idx] + [chunk[:mid], chunk[mid:]] + plan[dim][idx + 1 :] + + +def _filter_chunk_counts(n_clauses: int) -> list[int]: + """Candidate filter chunk counts to evaluate during joint planning. + + Powers of two from 1 up to (and including) ``n_clauses``. Covers + the trade-off curve coarsely enough to find a good plan without + evaluating every possible ``k``. Each doubling halves the per-chunk + filter byte cost, which is what matters for the budget trade-off.""" + if n_clauses < 1: + return [1] + counts = [] + k = 1 + while k < n_clauses: + counts.append(k) + k *= 2 + counts.append(n_clauses) + return counts + + +def _filter_candidates( + clauses: list[str], original_filter: str | None +) -> Iterator[tuple[list[str | None], str | None]]: + """Yield ``(filter_chunks, worst_filter)`` for each candidate filter + chunk count. ``filter_chunks`` is the list of OR-joined sub-filters + the wrapper will substitute one at a time; ``worst_filter`` is the + longest URL-encoded chunk, used by the planner to size the list dims + against the most demanding sub-request. + + Falls through to a single ``(filter_chunks=[original_filter], None)`` + candidate when the filter has no top-level OR splits (single clause, + cql-json, missing filter): the wrapper still iterates that one + "chunk" but the planner can skip substituting a filter into the + URL probe.""" + if len(clauses) < 2: + chunks: list[str | None] = ( + [original_filter] if original_filter is not None else [None] + ) + yield chunks, None + return + for k in _filter_chunk_counts(len(clauses)): + groups = _partition_clauses(clauses, k) + worst = max(groups, key=lambda g: _joined_url_bytes(g, _OR_SEP)) + yield [_OR_SEP.join(g) for g in groups], _OR_SEP.join(worst) + + +@dataclass(frozen=True) +class ChunkPlan: + """A precomputed strategy for issuing one user-level request as a + sequence of sub-requests whose URLs each fit ``url_limit``. + + ``ChunkPlan.from_args`` always returns a plan, even when no + chunking is needed: the passthrough case is represented by empty + ``list_chunks`` and a single-element ``filter_chunks=[None]`` so + ``total == 1`` and ``iter_sub_args`` yields the original args + unchanged. The wrapper's loop is therefore the same shape whether + chunking was needed or not. + + Attributes + ---------- + args : dict + The original user-level args this plan was built for. Bound to + the plan so ``iter_sub_args`` is self-contained. + list_chunks : dict[str, list[list]] + Per-param chunkings of multi-value list inputs. Empty in the + passthrough case. + filter_chunks : list[str | None] + Filter sub-expressions to substitute one per sub-request. + ``[None]`` means "leave ``args['filter']`` as-is" (passthrough + and single-clause cases). + canonical_url : str | None + URL of the full original request, used to overwrite the first + chunk's ``response.url`` so ``BaseMetadata`` reflects the + user's full query. ``None`` on the nothing-to-chunk passthrough + path: ``fetch_once``'s response already carries the canonical + URL, so the override is skipped to avoid an extra + ``build_request`` call on the hot path. + """ + + args: dict[str, Any] + list_chunks: dict[str, list[list[Any]]] + filter_chunks: list[str | None] + canonical_url: str | None + + @property + def total(self) -> int: + """Sub-request count: product of list-dim chunk counts times + ``len(filter_chunks)``.""" + list_count = math.prod((len(c) for c in self.list_chunks.values()), start=1) + return list_count * len(self.filter_chunks) + + def iter_sub_args(self) -> Iterator[dict[str, Any]]: + """Yield substituted args for each sub-request, in deterministic + order: list-dim cartesian product (dict insertion order) crossed + with filter chunks. Same plan → same sequence — resume is + well-defined.""" + # Trivial-passthrough fast path: nothing to substitute, just + # yield the original args. Skips a wasted dict copy on the + # most common Water Data call shape. + if not self.list_chunks and self.filter_chunks == [None]: + yield self.args + return + list_combos = ( + itertools.product(*self.list_chunks.values()) if self.list_chunks else [()] + ) + for combo in list_combos: + base = {**self.args, **dict(zip(self.list_chunks, combo))} + for filter_chunk in self.filter_chunks: + if filter_chunk is None: + yield base + else: + yield {**base, _FILTER_KEY: filter_chunk} + + def execute(self, fetch_once: _FetchOnce) -> tuple[pd.DataFrame, requests.Response]: + """Run the plan and return the combined result. See + ``_ChunkExecution`` for the per-sub-request semantics.""" + return _ChunkExecution(self, fetch_once).run() + + @classmethod + def from_args( + cls, + args: dict[str, Any], + build_request: Callable[..., Any], + url_limit: int, + ) -> ChunkPlan: + """Compute the cheapest joint plan for ``args``. Returns a + passthrough plan when the request already fits or nothing's + chunkable; raises ``RequestTooLarge`` only when chunking *is* + needed but no candidate plan fits ``url_limit``. + + Algorithm: enumerate filter chunk counts ``k = 1, 2, 4, ..., + n_clauses``; for each, partition clauses into ``k`` + count-balanced groups joined by ``OR`` and pick the worst + (longest URL-encoded) group; substitute that as the filter + and plan list chunking with greedy halving. Keep the candidate + whose ``list_count × k`` is smallest. + """ + filter_expr = args.get(_FILTER_KEY) + clauses: list[str] = [] + if _is_chunkable(filter_expr, args.get("filter_lang")): + _check_numeric_filter_pitfall(filter_expr) + clauses = _split_top_level_or(filter_expr) + + # Trivial passthrough: no multi-value lists and no top-level-OR + # filter to split, so chunking has no leverage. Skip the + # ``build_request`` call entirely — ``fetch_once``'s response + # will carry the canonical URL already (set by + # ``_finalize_paginated_response``), so the wrapper can elide + # the override. This is the common Water Data call shape, so + # the saved request prep is worth a small branch here. + if not _chunkable_params(args) and len(clauses) < 2: + return cls( + args=args, list_chunks={}, filter_chunks=[None], canonical_url=None + ) + + initial_request = build_request(**args) + canonical_url = initial_request.url + + # Already-fits passthrough: chunking is possible but unnecessary. + if _request_bytes(initial_request) <= url_limit: + return cls( + args=args, + list_chunks={}, + filter_chunks=[None], + canonical_url=canonical_url, + ) + + best: tuple[int, dict[str, list[list[Any]]], list[str | None]] | None = None + last_error: RequestTooLarge | None = None + + for filter_chunks, worst_filter in _filter_candidates(clauses, filter_expr): + plan_args = ( + args if worst_filter is None else {**args, _FILTER_KEY: worst_filter} + ) + try: + list_chunks = _plan_list_chunks(plan_args, build_request, url_limit) + except RequestTooLarge as exc: + last_error = exc + continue + if list_chunks is None: + list_chunks = {} + # ``_plan_list_chunks`` returns ``None`` both when no list + # dims are chunkable AND when the request fits. Filter + # chunking alone has to close the gap — verify before + # committing to a list-empty candidate. + if ( + not list_chunks + and _request_bytes(build_request(**plan_args)) > url_limit + ): + continue + list_count = math.prod((len(c) for c in list_chunks.values()), start=1) + total = list_count * len(filter_chunks) + if best is None or total < best[0]: + best = (total, list_chunks, filter_chunks) + + if best is None: + raise last_error or RequestTooLarge( + "No filter-chunking candidate produces a fitting plan. " + "Reduce list sizes or simplify the filter." + ) + + return cls( + args=args, + list_chunks=best[1], + filter_chunks=best[2], + canonical_url=canonical_url, + ) + + +def _read_remaining(response: requests.Response) -> int | None: + """Parse ``x-ratelimit-remaining`` from a response. Returns ``None`` + when the header is missing or unparseable; the wrapper treats that + as "no quota signal" and skips the post-first-chunk plan check.""" + raw = response.headers.get(_QUOTA_HEADER) + if raw is None: + return None + try: + return int(raw) + except (TypeError, ValueError): + return None + + +def _is_429(exc: BaseException) -> bool: + """True iff ``exc`` or anywhere along its ``__cause__`` chain is a + ``utils.RateLimited``. ``_walk_pages`` re-wraps mid-pagination + failures as ``RuntimeError`` with the original ``RateLimited`` + linked as ``__cause__``, so the chunker has to follow the chain + rather than just ``isinstance(exc, RateLimited)``. + + Lazy import: ``utils`` imports this module to decorate + ``_fetch_once``, so a top-level import would be circular. + """ + from .utils import RateLimited + + cur: BaseException | None = exc + while cur is not None: + if isinstance(cur, RateLimited): + return True + cur = cur.__cause__ + return False + + +def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame: + """Concatenate per-chunk frames, dropping empties and deduping by ``id``. + + ``_get_resp_data`` returns a plain ``pd.DataFrame()`` on empty responses; + concat'ing it with real GeoDataFrames downgrades the result to plain + DataFrame and strips geometry/CRS, so empties are dropped first. Dedup + on the pre-rename feature ``id`` keeps overlapping user OR-clauses from + producing duplicate rows across chunks. + """ + non_empty = [f for f in frames if not f.empty] + if not non_empty: + return pd.DataFrame() + if len(non_empty) == 1: + return non_empty[0] + combined = pd.concat(non_empty, ignore_index=True) + if "id" in combined.columns: + combined = combined.drop_duplicates(subset="id", ignore_index=True) + return combined + + +def _combine_chunk_responses( + responses: list[requests.Response], canonical_url: str | None +) -> requests.Response: + """Fold per-sub-request responses into one. The first response is + mutated in place: ``.headers`` becomes the last response's (so + ``x-ratelimit-remaining`` reflects current state), ``.elapsed`` + accumulates total wall-clock, and ``.url`` is set to the canonical + original-query URL so ``BaseMetadata`` reflects the user's full + request rather than the first sub-chunk. + + ``canonical_url=None`` skips the URL override — used by the + trivial-passthrough path where ``fetch_once`` already returns a + response whose ``.url`` is the original-query URL.""" + head = responses[0] + if len(responses) > 1: + head.headers = responses[-1].headers + head.elapsed = sum((r.elapsed for r in responses[1:]), start=head.elapsed) + if canonical_url is not None: + head.url = canonical_url + return head + + +class _ChunkExecution: + """In-flight execution of a ``ChunkPlan``. Issues each sub-request, + accumulates frames and responses, translates 429s into + ``QuotaExhausted`` with the partial state captured so far, and + raises ``RequestExceedsQuota`` after the first sub-request if the + rest of the plan won't fit the current rate-limit window.""" + + def __init__(self, plan: ChunkPlan, fetch_once: _FetchOnce) -> None: + self.plan = plan + self.fetch_once = fetch_once + self.frames: list[pd.DataFrame] = [] + self.responses: list[requests.Response] = [] + + def run(self) -> tuple[pd.DataFrame, requests.Response]: + for sub_args in self.plan.iter_sub_args(): + self.issue(sub_args) + return self.finalize() + + def issue(self, sub_args: dict[str, Any]) -> None: + try: + frame, response = self.fetch_once(sub_args) + except RuntimeError as exc: + if not _is_429(exc): + raise + raise self._quota_exhausted() from exc + self.frames.append(frame) + self.responses.append(response) + if len(self.responses) == 1 and self.plan.total > 1: + self._check_quota_after_first() + + def finalize(self) -> tuple[pd.DataFrame, requests.Response]: + return ( + _combine_chunk_frames(self.frames), + _combine_chunk_responses(self.responses, self.plan.canonical_url), + ) + + def _check_quota_after_first(self) -> None: + remaining = _read_remaining(self.responses[0]) + if remaining is None or remaining >= self.plan.total - 1: + return + raise RequestExceedsQuota( + planned_chunks=self.plan.total, + available=remaining + 1, + deficit=self.plan.total - remaining - 1, + ) + + def _quota_exhausted(self) -> QuotaExhausted: + return QuotaExhausted( + partial_frame=_combine_chunk_frames(self.frames), + partial_response=( + _combine_chunk_responses(self.responses, self.plan.canonical_url) + if self.responses + else None + ), + completed_chunks=len(self.responses), + total_chunks=self.plan.total, + ) + + +def multi_value_chunked( + *, + build_request: Callable[..., Any], + url_limit: int | None = None, +) -> Callable[[_FetchOnce], _FetchOnce]: + """Decorator that splits multi-value list params and cql-text filters + across sub-requests so each fits ``url_limit`` bytes (default + ``_WATERDATA_URL_BYTE_LIMIT``, resolved at call time so test patches + take effect). Builds a ``ChunkPlan`` and runs it: passthrough + requests are a trivial single-step plan, so there's one code path + either way. + + See ``ChunkPlan`` and ``_ChunkExecution`` for planning and + rate-limit semantics. Exceptions: ``RequestTooLarge`` if no plan + fits, ``RequestExceedsQuota`` if the remaining plan can't fit the + current rate-limit window, ``QuotaExhausted`` on a 429 mid-execution. + """ + + def decorator(fetch_once: _FetchOnce) -> _FetchOnce: + @functools.wraps(fetch_once) + def wrapper( + args: dict[str, Any], + ) -> tuple[pd.DataFrame, requests.Response]: + limit = _WATERDATA_URL_BYTE_LIMIT if url_limit is None else url_limit + return ChunkPlan.from_args(args, build_request, limit).execute(fetch_once) + + return wrapper + + return decorator diff --git a/dataretrieval/waterdata/filters.py b/dataretrieval/waterdata/filters.py index 4c136b82..5e1c0a67 100644 --- a/dataretrieval/waterdata/filters.py +++ b/dataretrieval/waterdata/filters.py @@ -1,47 +1,27 @@ """CQL ``filter`` support for the Water Data OGC getters. -Two names are public to the rest of the package: +Public: - ``FILTER_LANG``: the type alias used for the ``filter_lang`` kwarg. -- ``chunked``: the decorator ``utils.py`` applies to its single-request - fetch function. It runs the lexicographic-comparison pitfall guard, - splits long cql-text filters at top-level ``OR`` so each sub-request - fits under the server's URL byte limit, and concatenates the results. -Other CQL shapes (``AND``, ``NOT``, ``LIKE``, spatial/temporal predicates, -function calls) are forwarded verbatim — only top-level ``OR`` chunks -losslessly into independent sub-queries whose result sets can be union'd. +Internal helpers used by ``chunking.multi_value_chunked``'s joint +planner: ``_split_top_level_or`` (clause partitioning), +``_is_chunkable`` (filter-language gate), and +``_check_numeric_filter_pitfall`` (the lexicographic-comparison guard). + +Other CQL shapes (``AND``, ``NOT``, ``LIKE``, spatial/temporal +predicates, function calls) are forwarded verbatim — only top-level +``OR`` chunks losslessly into independent sub-queries whose result sets +can be union'd. """ from __future__ import annotations -import functools import re -from collections.abc import Callable -from typing import Any, Literal, TypeVar -from urllib.parse import quote_plus - -import pandas as pd -import requests +from typing import Any, Literal FILTER_LANG = Literal["cql-text", "cql-json"] -# Conservative fallback budget when ``_chunk_cql_or`` is called without -# an explicit ``max_len``. The ``chunked`` decorator computes a tighter -# per-request budget from ``_WATERDATA_URL_BYTE_LIMIT``. -_CQL_FILTER_CHUNK_LEN = 5000 - -# Empirically the API replies HTTP 414 above ~8200 bytes of full URL — -# matches nginx's default ``large_client_header_buffers`` of 8 KB. 8000 -# leaves ~200 bytes for request-line framing and proxy variance. -_WATERDATA_URL_BYTE_LIMIT = 8000 - -# Conservative over-estimate of URL bytes used by everything *except* -# the filter value. Used only by the fast path in -# ``_effective_filter_budget`` to skip the probe when the encoded filter -# clearly already fits. -_NON_FILTER_URL_HEADROOM = 1000 - _NUM = r"-?(?:\d+(?:\.\d+)?|\.\d+)(?:[eE][+-]?\d+)?" _IDENT = r"[A-Za-z_]\w*" @@ -120,69 +100,6 @@ def _split_top_level_or(expr: str) -> list[str]: return [p for p in parts if p] -def _chunk_cql_or(expr: str, max_len: int = _CQL_FILTER_CHUNK_LEN) -> list[str]: - """Split ``expr`` into OR-chunks each under ``max_len`` characters. - - Only top-level ``OR`` chains can be recombined losslessly as a disjunction - of independent sub-queries. Returns ``[expr]`` unchanged when the whole - expression already fits, when there is no top-level ``OR``, or when any - single clause exceeds ``max_len`` (sending it as-is and surfacing the - server's 414 is clearer than silently dropping data). - """ - if len(expr) <= max_len: - return [expr] - parts = _split_top_level_or(expr) - if len(parts) < 2 or any(len(p) > max_len for p in parts): - return [expr] - - chunks = [] - current: list[str] = [] - current_len = 0 - for part in parts: - join_cost = len(" OR ") if current else 0 - if current and current_len + join_cost + len(part) > max_len: - chunks.append(" OR ".join(current)) - current = [part] - current_len = len(part) - else: - current.append(part) - current_len += join_cost + len(part) - if current: - chunks.append(" OR ".join(current)) - return chunks - - -def _effective_filter_budget( - args: dict[str, Any], - filter_expr: str, - build_request: Callable[..., Any], -) -> int: - """Raw-CQL byte budget that, after URL-encoding, fits the URL byte limit. - - The server caps total URL length, not raw CQL length. We probe the - non-filter URL bytes by building the request with a 1-byte placeholder - filter, subtract from the URL limit to get the bytes available for the - encoded filter, then convert back to raw CQL bytes via the *maximum* - per-clause encoding ratio (a chunk could contain only the heavier-encoding - clauses, so budgeting by the average ratio could overflow). - """ - # Fast path: encoded filter clearly fits with room for any plausible - # non-filter URL. Skips the PreparedRequest build and splitter scan. - encoded_len = len(quote_plus(filter_expr)) - if encoded_len + _NON_FILTER_URL_HEADROOM <= _WATERDATA_URL_BYTE_LIMIT: - return len(filter_expr) + 1 - - probe = build_request(**{**args, "filter": "x"}) - available_url_bytes = _WATERDATA_URL_BYTE_LIMIT - (len(probe.url) - 1) - if available_url_bytes <= 0: - # Non-filter URL already over the limit. Pass through unchanged so - # the caller sees one 414 instead of N parallel sub-request failures. - return len(filter_expr) + 1 - parts = _split_top_level_or(filter_expr) or [filter_expr] - encoding_ratio = max(len(quote_plus(p)) / len(p) for p in parts) - return max(100, int(available_url_bytes / encoding_ratio)) - - def _check_numeric_filter_pitfall(filter_expr: str) -> None: """Raise if the filter pairs a field with an unquoted numeric literal. @@ -243,92 +160,3 @@ def _is_chunkable(filter_expr: Any, filter_lang: Any) -> bool: and bool(filter_expr) and filter_lang in {None, "cql-text"} ) - - -def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame: - """Concatenate per-chunk frames, dropping empties and deduping by ``id``. - - ``_get_resp_data`` returns a plain ``pd.DataFrame()`` on empty responses; - concat'ing it with real GeoDataFrames downgrades the result to plain - DataFrame and strips geometry/CRS, so empties are dropped first. Dedup - on the pre-rename feature ``id`` keeps overlapping user OR-clauses from - producing duplicate rows across chunks. - """ - non_empty = [f for f in frames if not f.empty] - if not non_empty: - return pd.DataFrame() - if len(non_empty) == 1: - return non_empty[0] - combined = pd.concat(non_empty, ignore_index=True) - if "id" in combined.columns: - combined = combined.drop_duplicates(subset="id", ignore_index=True) - return combined - - -def _combine_chunk_responses( - responses: list[requests.Response], -) -> requests.Response: - """Return one response: first chunk's URL/headers + summed ``elapsed``. - - Mutates the first response in place (only ``elapsed``); downstream only - reads ``elapsed`` (in ``BaseMetadata.query_time``), URL, and headers. - """ - head = responses[0] - if len(responses) > 1: - head.elapsed = sum((r.elapsed for r in responses[1:]), start=head.elapsed) - return head - - -_FetchOnce = TypeVar( - "_FetchOnce", - bound=Callable[[dict[str, Any]], tuple[pd.DataFrame, requests.Response]], -) - - -def chunked(*, build_request: Callable[..., Any]) -> Callable[[_FetchOnce], _FetchOnce]: - """Decorator that adds CQL-filter chunking to a single-request fetch. - - The wrapped function has signature ``(args: dict) -> (frame, response)`` - and represents one HTTP round-trip. The decorator inspects ``args``: - - - No chunkable filter: pass through unchanged. - - Chunkable cql-text filter: run the lexicographic-pitfall guard, split - into URL-length-safe sub-expressions, call the wrapped function once - per chunk, concatenate frames (drop empties, dedup by feature ``id``), - and return an aggregated response (first chunk's URL/headers, summed - ``elapsed``). - - Either way the return shape matches the undecorated function's, so the - caller wraps the response in ``BaseMetadata`` the same way in both paths. - - ``build_request`` is injected so the decorator can probe URL length - without importing any specific HTTP builder; it receives the same kwargs - the wrapped function's ``args`` would and returns a prepared-request-like - object with a ``.url`` attribute. - """ - - def decorator(fetch_once: _FetchOnce) -> _FetchOnce: - @functools.wraps(fetch_once) - def wrapper( - args: dict[str, Any], - ) -> tuple[pd.DataFrame, requests.Response]: - filter_expr = args.get("filter") - if not _is_chunkable(filter_expr, args.get("filter_lang")): - return fetch_once(args) - - _check_numeric_filter_pitfall(filter_expr) - budget = _effective_filter_budget(args, filter_expr, build_request) - chunks = _chunk_cql_or(filter_expr, max_len=budget) - - frames: list[pd.DataFrame] = [] - responses: list[requests.Response] = [] - for chunk in chunks: - frame, response = fetch_once({**args, "filter": chunk}) - frames.append(frame) - responses.append(response) - - return _combine_chunk_frames(frames), _combine_chunk_responses(responses) - - return wrapper # type: ignore[return-value] - - return decorator diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 9245bb92..9af20d66 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -5,7 +5,7 @@ import os import re from collections.abc import Iterable, Mapping -from datetime import datetime +from datetime import datetime, timedelta from typing import Any, get_args from zoneinfo import ZoneInfo @@ -14,7 +14,7 @@ from dataretrieval import __version__ from dataretrieval.utils import BaseMetadata -from dataretrieval.waterdata import filters +from dataretrieval.waterdata import chunking from dataretrieval.waterdata.types import ( PROFILE_LOOKUP, PROFILES, @@ -410,16 +410,30 @@ def _error_body(resp: requests.Response): ) +class RateLimited(RuntimeError): + """A USGS Water Data API request was rejected with HTTP 429. Exposed + as a typed exception so callers (notably the multi-value chunker) + can detect rate-limit failures via ``isinstance`` instead of + string-matching error messages.""" + + def _raise_for_non_200(resp: requests.Response) -> None: - """Raise ``RuntimeError(_error_body(resp))`` if ``resp`` is not 200. + """Raise on a non-200 response. ``RateLimited`` for 429 (so the + chunker can branch on it without parsing the message); plain + ``RuntimeError`` for every other failure. Routes through ``_error_body`` (USGS-API-aware: handles 429/403 specially, extracts ``code``/``description`` from JSON error bodies) rather than ``Response.raise_for_status``, which raises ``HTTPError`` with a generic message. """ - if resp.status_code != 200: - raise RuntimeError(_error_body(resp)) + status = resp.status_code + if status == 200: + return + body = _error_body(resp) + if status == 429: + raise RateLimited(body) + raise RuntimeError(body) def _paginated_failure_message(pages_collected: int, cause: BaseException) -> str: @@ -436,7 +450,7 @@ def _paginated_failure_message(pages_collected: int, cause: BaseException) -> st # message is always informative. if not cause_str.strip(): cause_str = type(cause).__name__ - if cause_str.startswith("429"): + if isinstance(cause, RateLimited): action = "wait for the rate-limit window to reset and retry" else: action = "retry the request (possibly after a short backoff)" @@ -585,7 +599,7 @@ def _next_req_url(resp: requests.Response) -> str | None: if os.getenv("API_USGS_PAT", ""): logger.info( "Remaining requests this hour: %s", - header_info.get("x-ratelimit-remaining", ""), + header_info.get(chunking._QUOTA_HEADER, ""), ) for link in body.get("links", []): if link.get("rel") == "next": @@ -644,6 +658,26 @@ def _get_resp_data(resp: requests.Response, geopd: bool) -> pd.DataFrame: return df +def _finalize_paginated_response( + initial: requests.Response, + last: requests.Response, + total_elapsed: timedelta, +) -> None: + """Carry the last page's headers + cumulative elapsed onto the initial + response in place. + + The initial response stays canonical for ``md.url`` (user's original + query), but its ``.headers`` and ``.elapsed`` are overwritten so the + multi-value chunker's ``QuotaExhausted`` guard sees current + ``x-ratelimit-remaining`` and ``md.query_time`` reflects total + wall-clock across pages. No-op when ``initial is last`` (single page). + """ + if last is initial: + return + initial.headers = last.headers + initial.elapsed = total_elapsed + + def _walk_pages( geopd: bool, req: requests.PreparedRequest, @@ -669,7 +703,11 @@ def _walk_pages( pd.DataFrame A DataFrame containing the aggregated results from all pages. requests.Response - The initial response object containing metadata about the first request. + Aggregated response: the initial request's URL (for query + identity), the final page's headers (so downstream callers see + current rate-limit state, which the multi-value chunker's + ``QuotaExhausted`` guard relies on), and cumulative ``elapsed`` + summed across every page. Raises ------ @@ -700,9 +738,11 @@ def _walk_pages( try: resp = client.send(req) _raise_for_non_200(resp) - - # Store the initial response for metadata + # Keep the original-request response as the "canonical" one for + # ``md.url`` reproducibility; ``.headers`` and ``.elapsed`` get + # overwritten with latest/cumulative values below. initial_response = resp + total_elapsed = resp.elapsed # Grab some aspects of the original request: headers and the # request type (GET or POST) @@ -723,6 +763,7 @@ def _walk_pages( ) _raise_for_non_200(resp) dfs.append(_get_resp_data(resp, geopd=geopd)) + total_elapsed += resp.elapsed curr_url = _next_req_url(resp) except Exception as e: # noqa: BLE001 logger.warning( @@ -730,6 +771,8 @@ def _walk_pages( ) raise RuntimeError(_paginated_failure_message(len(dfs), e)) from e + _finalize_paginated_response(initial_response, resp, total_elapsed) + # Concatenate all pages at once for efficiency return pd.concat(dfs, ignore_index=True), initial_response finally: @@ -957,17 +1000,18 @@ def get_ogc_data( return return_list, BaseMetadata(response) -@filters.chunked(build_request=_construct_api_requests) +@chunking.multi_value_chunked(build_request=_construct_api_requests) def _fetch_once( args: dict[str, Any], ) -> tuple[pd.DataFrame, requests.Response]: """Send one prepared-args OGC request; return the frame + response. - Filter chunking is added orthogonally by the ``@filters.chunked`` - decorator: with no filter (or an un-chunkable one) the decorator - passes ``args`` through to this body; with a chunkable filter it - fans out and calls this body once per sub-filter, then combines. - Either way the return shape is ``(frame, response)``. + ``@chunking.multi_value_chunked`` plans list-chunking and + filter-chunking jointly and iterates the cartesian product, picking + the allocation between list chunks and filter chunks that minimizes + total sub-requests. With no chunkable inputs the decorator passes + args through unchanged. Either way the return shape is + ``(frame, response)``. """ req = _construct_api_requests(**args) return _walk_pages(geopd=GEOPANDAS, req=req) @@ -1158,9 +1202,11 @@ def get_stats_data( try: resp = client.send(req) _raise_for_non_200(resp) - - # Store the initial response for metadata + # Keep the original-request response as the "canonical" one for + # ``md.url`` reproducibility; ``.headers`` and ``.elapsed`` get + # overwritten with latest/cumulative values below. initial_response = resp + total_elapsed = resp.elapsed # Grab some aspects of the original request: headers and the # request type (GET or POST) @@ -1186,6 +1232,7 @@ def get_stats_data( _raise_for_non_200(resp) body = resp.json() all_dfs.append(_handle_stats_nesting(body, geopd=GEOPANDAS)) + total_elapsed += resp.elapsed next_token = body["next"] except Exception as e: # noqa: BLE001 logger.warning( @@ -1196,6 +1243,8 @@ def get_stats_data( ) raise RuntimeError(_paginated_failure_message(len(all_dfs), e)) from e + _finalize_paginated_response(initial_response, resp, total_elapsed) + dfs = pd.concat(all_dfs, ignore_index=True) if len(all_dfs) > 1 else all_dfs[0] # . If expand percentiles is True, make each percentile diff --git a/tests/waterdata_chunking_test.py b/tests/waterdata_chunking_test.py new file mode 100644 index 00000000..220b5643 --- /dev/null +++ b/tests/waterdata_chunking_test.py @@ -0,0 +1,670 @@ +"""Tests for ``dataretrieval.waterdata.chunking``. + +These tests exercise the joint planner with a fake ``build_request`` +whose URL byte length is a deterministic function of its inputs: + +- non-chunkable args contribute ``base_bytes``, +- every multi-value list contributes ``len(",".join(map(str, v)))``, +- the ``filter`` kwarg contributes ``len(filter)``. + +That isolates planner behaviour from the real HTTP request builder. +The one exception is +``test_joint_planner_url_construction_long_filter_and_long_sites``, +which uses the real ``_construct_api_requests`` so URL-encoding +surprises (``%``, ``+``, ``/``, ``&``, …) can't pass against a fake +and then fail in production. +""" + +import datetime +import sys +from unittest import mock +from urllib.parse import quote_plus + +import pandas as pd +import pytest + +if sys.version_info < (3, 10): + pytest.skip("Skip entire module on Python < 3.10", allow_module_level=True) + +from dataretrieval.waterdata import chunking as _chunking +from dataretrieval.waterdata.chunking import ( + _QUOTA_HEADER, + ChunkPlan, + QuotaExhausted, + RequestExceedsQuota, + RequestTooLarge, + _chunkable_params, + _filter_chunk_counts, + _partition_clauses, + _plan_list_chunks, + _read_remaining, + multi_value_chunked, +) +from dataretrieval.waterdata.utils import RateLimited, _construct_api_requests + + +class _FakeReq: + __slots__ = ("url", "body") + + def __init__(self, url, body=None): + self.url = url + self.body = body + + +def _fake_build(*, base=200, **kwargs): + """Fake build_request: URL length deterministic in its inputs. + + Mirrors the GET-routed shape: payload goes in the URL, body is None. + List/string values are URL-encoded via ``quote_plus`` so the fake's + byte count matches what the real ``_construct_api_requests`` would + produce; otherwise an alphanumeric test could pass against the fake + but fail in production once values containing ``%``, ``+``, ``/``, + ``&`` etc. (which expand under encoding) reach the same code path. + """ + bytes_ = base + for v in kwargs.values(): + if isinstance(v, (list, tuple)): + bytes_ += len(quote_plus(",".join(map(str, v)))) + elif isinstance(v, str): + bytes_ += len(quote_plus(v)) + return _FakeReq("x" * bytes_) + + +def test_partition_clauses_balanced(): + """k roughly-balanced groups of atoms, with the remainder distributed + across the first groups so no group differs from another by more + than one clause. Returns raw groups (not joined) so the planner can + size candidates without materializing discarded partitions.""" + clauses = ["a='1'", "b='2'", "c='3'", "d='4'", "e='5'"] + assert _partition_clauses(clauses, 1) == [clauses] + # 5 clauses into 2 groups → sizes 3, 2 (remainder lands in the first). + assert _partition_clauses(clauses, 2) == [ + ["a='1'", "b='2'", "c='3'"], + ["d='4'", "e='5'"], + ] + # Singletons. + assert _partition_clauses(clauses, 5) == [[c] for c in clauses] + # k > len(clauses) → singletons (don't synthesize empty groups). + assert _partition_clauses(clauses, 99) == [[c] for c in clauses] + + +def test_filter_chunk_counts_powers_of_two_plus_n(): + """Candidate counts cover the trade-off at powers of two, and always + include ``n_clauses`` itself so the fully-singleton case is always + evaluated even when n isn't a power of two.""" + assert _filter_chunk_counts(0) == [1] + assert _filter_chunk_counts(1) == [1] + assert _filter_chunk_counts(5) == [1, 2, 4, 5] + assert _filter_chunk_counts(16) == [1, 2, 4, 8, 16] + + +def test_plan_chunks_returns_none_when_request_fits(): + """URL under limit → planner returns None, decorator passes through.""" + args = {"monitoring_location_id": ["A", "B", "C"]} + plan = _plan_list_chunks(args, _fake_build, url_limit=8000) + assert plan is None + + +def test_plan_chunks_returns_none_when_no_chunkable_lists(): + """No multi-value lists, however over-limit → planner can't help, returns None + (decorator falls through; server may 414 but that's not chunker's job).""" + args = {"monitoring_location_id": "scalar-only"} + plan = _plan_list_chunks(args, _fake_build, url_limit=10) + assert plan is None + + +def test_plan_chunks_greedy_halving_targets_largest_dim(): + """Two dims with one much larger — the heavy dim halves first.""" + args = { + "monitoring_location_id": ["X" * 30, "Y" * 30, "Z" * 30, "W" * 30], + "parameter_code": ["00060", "00065"], + } + # full URL ≈ 200 + 123 + 12 = 335; force splitting heavy dim only. + plan = _plan_list_chunks(args, _fake_build, url_limit=310) + assert len(plan["monitoring_location_id"]) > 1 + assert len(plan["parameter_code"]) == 1 # heavy-dim split was enough + + +def test_plan_chunks_raises_request_too_large_at_singleton_floor(): + """Limit below singleton-per-dim floor (with no chunkable filter to + fall back on) → RequestTooLarge with a clear message.""" + args = {"monitoring_location_id": ["A", "B"]} + # base=200 alone exceeds limit; no relief possible. + with pytest.raises(RequestTooLarge, match="smallest reducible"): + _plan_list_chunks(args, _fake_build, url_limit=100) + + +def test_chunk_plan_fans_out_filter_when_list_alone_cannot_fit(): + """When the request can only fit by chunking BOTH the list and the + filter, the plan must touch both dims.""" + clauses = [f"f='{i}'" for i in range(10)] + args = { + "monitoring_location_id": ["A" * 10, "B" * 10, "C" * 10, "D" * 10], + "filter": " OR ".join(clauses), + } + # Singleton list + full filter ≈ 200 + 10 + 86 = 296 (over limit 240). + # Joint planner must split the filter into k >= 2 groups. + plan = ChunkPlan.from_args(args, _fake_build, url_limit=240) + # Either the filter was chunked, the list was chunked, or both. + assert len(plan.filter_chunks) > 1 or any( + len(v) > 1 for v in plan.list_chunks.values() + ) + + +def test_chunk_plan_minimizes_total_sub_requests(): + """When both dims need shrinking, picking smaller filter chunks + frees URL budget for larger list chunks, and vice versa. The + planner should pick the allocation with the *fewest* total + sub-requests, not just the first allocation that fits.""" + # 16 short clauses (no inflation under URL encoding so the math is + # tractable). Each clause = 5 bytes (e.g. "f='0'"); full filter ≈ + # 16*5 + 15*4 = 140 bytes raw. + clauses = [f"f='{i}'" for i in range(16)] + args = { + "sites": ["S" * 30 for _ in range(8)], # 8 sites @ 30 chars + "filter": " OR ".join(clauses), + } + # Tight limit forces both dims to participate. + plan = ChunkPlan.from_args(args, _fake_build, url_limit=380) + # Plan must beat the bail-floor-style worst case (8 singletons × 16 + # filter chunks = 128 sub-requests) by a healthy margin. + assert plan.total < 128 + + +def test_chunk_plan_raises_when_smallest_plan_doesnt_fit(): + """If even the most aggressive joint plan (singleton lists + + singleton filter clauses) still exceeds the limit, surface + RequestTooLarge — there's nothing left to shrink.""" + args = { + "monitoring_location_id": ["A" * 10, "B" * 10], + "filter": "x='12345' OR x='67890'", # min clause is 9 chars + } + # Base 200 + singleton site (10) + singleton clause (9) = 219; limit + # below 219 → no joint plan can fit. + with pytest.raises(RequestTooLarge): + ChunkPlan.from_args(args, _fake_build, url_limit=210) + + +def test_chunk_plan_passthrough_when_request_fits(): + """A request that already fits gets a trivial single-step plan: + no list chunks, ``filter_chunks=[None]``, ``total == 1``. The + wrapper still iterates it through one fetch_once call.""" + args = {"monitoring_location_id": ["A", "B", "C"]} + plan = ChunkPlan.from_args(args, _fake_build, url_limit=8000) + assert plan.list_chunks == {} + assert plan.filter_chunks == [None] + assert plan.total == 1 + + +def test_chunk_plan_passthrough_when_nothing_chunkable(): + """A request with no multi-value lists and no top-level-OR filter + is also a passthrough plan, even if the URL is technically over + the limit (the server may 414, but the chunker has nothing to + split).""" + args = {"monitoring_location_id": "scalar-only"} + plan = ChunkPlan.from_args(args, _fake_build, url_limit=10) + assert plan.list_chunks == {} + assert plan.filter_chunks == [None] + assert plan.total == 1 + + +def test_chunk_plan_iter_sub_args_passthrough_yields_original_args_once(): + """The passthrough plan's ``iter_sub_args`` yields exactly one + sub-args dict equal to the original args (modulo dict identity).""" + args = {"monitoring_location_id": ["A", "B", "C"], "limit": 100} + plan = ChunkPlan.from_args(args, _fake_build, url_limit=8000) + subs = list(plan.iter_sub_args()) + assert len(subs) == 1 + assert subs[0] == args + + +def test_multi_value_chunked_passes_through_when_url_fits(): + """No planning needed → decorator calls underlying function exactly once + with the original args.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=8000) + def fetch(args): + calls.append(args) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + fetch({"monitoring_location_id": ["A", "B"]}) + assert len(calls) == 1 + assert calls[0]["monitoring_location_id"] == ["A", "B"] + + +def test_multi_value_chunked_emits_cartesian_product(): + """Two chunkable dims, each split into 2 chunks → exactly 4 sub-calls, + each pairing one chunk from each dim.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + calls.append({k: v for k, v in args.items() if k in ("sites", "pcodes")}) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + fetch( + { + "sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10], + "pcodes": ["P1" * 10, "P2" * 10, "P3" * 10, "P4" * 10], + } + ) + # Both heavy → planner should split both dims. Confirm a cartesian shape: + # every unique site-chunk pairs with every unique pcode-chunk. + sites_seen = {tuple(c["sites"]) for c in calls} + pcodes_seen = {tuple(c["pcodes"]) for c in calls} + assert len(calls) == len(sites_seen) * len(pcodes_seen) + assert len(sites_seen) > 1 + assert len(pcodes_seen) > 1 + + +def test_multi_value_chunked_emits_3d_cartesian_product(): + """Three chunkable dims, each forced to split → exhaustive cartesian + product across all three. Verifies the planner's halving loop handles + N>2 dims uniformly and the wrapper's ``itertools.product`` enumerates + every combination exactly once.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + calls.append(tuple(tuple(args[k]) for k in ("sites", "pcodes", "stats"))) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + fetch( + { + "sites": ["S" * 12 + str(i) for i in range(4)], + "pcodes": ["P" * 12 + str(i) for i in range(4)], + "stats": ["T" * 12 + str(i) for i in range(4)], + } + ) + + # Three independent axes — every (site_chunk, pcode_chunk, stat_chunk) + # triple must appear exactly once. Confirm: + sites_seen = {c[0] for c in calls} + pcodes_seen = {c[1] for c in calls} + stats_seen = {c[2] for c in calls} + + assert len(sites_seen) > 1, "sites dim was not split" + assert len(pcodes_seen) > 1, "pcodes dim was not split" + assert len(stats_seen) > 1, "stats dim was not split" + + # Cartesian shape: # sub-calls == product of unique chunks across dims + expected = len(sites_seen) * len(pcodes_seen) * len(stats_seen) + assert len(calls) == expected, ( + f"expected {expected} cartesian-product sub-calls, got {len(calls)}" + ) + # And no triple repeats (exhaustive enumeration, no duplicates). + assert len(set(calls)) == len(calls) + # The chunked values, when unioned across calls, recover the original list. + assert {x for tup in sites_seen for x in tup} == { + "S" * 12 + str(i) for i in range(4) + } + assert {x for tup in pcodes_seen for x in tup} == { + "P" * 12 + str(i) for i in range(4) + } + assert {x for tup in stats_seen for x in tup} == { + "T" * 12 + str(i) for i in range(4) + } + + +def test_multi_value_chunked_lazy_url_limit(monkeypatch): + """``url_limit=None`` → resolve chunking._WATERDATA_URL_BYTE_LIMIT at call + time, so tests that patch the constant affect this decorator too.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build) # url_limit defaults to None + def fetch(args): + calls.append(args) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + monkeypatch.setattr(_chunking, "_WATERDATA_URL_BYTE_LIMIT", 240) + # 4 sites of 10 chars → exceeds 240 → planner splits. + fetch({"sites": ["S" * 10 + str(i) for i in range(4)]}) + assert len(calls) > 1, "patched constant should drive chunking" + + +def _quota_response(remaining: int | str | None) -> mock.Mock: + """A mock requests.Response-like object whose ``x-ratelimit-remaining`` + header reflects the given value (None → header absent).""" + resp = mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + resp.headers = {} if remaining is None else {_QUOTA_HEADER: str(remaining)} + return resp + + +def test_read_remaining_parses_header(): + assert _read_remaining(_quota_response(42)) == 42 + + +def test_read_remaining_returns_none_when_header_missing(): + """No rate-limit header → ``None`` so the wrapper can branch on + ``is None`` instead of comparing against a magic sentinel.""" + assert _read_remaining(_quota_response(None)) is None + + +def test_read_remaining_returns_none_on_malformed_header(): + """Non-integer header value → ``None`` so a parse failure doesn't + trip the quota check.""" + assert _read_remaining(_quota_response("not-a-number")) is None + + +def test_request_exceeds_quota_after_first_chunk(): + """Plan totals 4 sub-requests. The first response reports + ``x-ratelimit-remaining=1`` — only 2 sub-requests fit total + (the one just issued + 1 more). The wrapper must raise + ``RequestExceedsQuota`` *before* issuing chunk 2.""" + calls: list[dict] = [] + + def fetch(args): + calls.append(args) + return pd.DataFrame({"sites": list(args["sites"])}), _quota_response(1) + + decorated = multi_value_chunked(build_request=_fake_build, url_limit=240)(fetch) + + with pytest.raises(RequestExceedsQuota) as excinfo: + decorated({"sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10]}) + + err = excinfo.value + assert err.planned_chunks == 4 + assert err.available == 2 # remaining=1 + the chunk we just spent + assert err.deficit == 2 + assert len(calls) == 1, "only the first chunk should have been issued" + + +def test_request_exceeds_quota_message_reports_deficit(): + """The error must surface planned / available / deficit so callers + know precisely how far over budget the call is.""" + e = RequestExceedsQuota(planned_chunks=10, available=4, deficit=6) + msg = str(e) + assert "10" in msg + assert "4" in msg + assert "6" in msg + + +def test_request_exceeds_quota_not_raised_when_plan_fits(): + """If ``x-ratelimit-remaining`` is large enough to cover the rest + of the plan, the wrapper proceeds normally.""" + remaining_seq = iter([100, 99, 98, 97]) + + def fetch(args): + return ( + pd.DataFrame({"sites": list(args["sites"])}), + _quota_response(next(remaining_seq)), + ) + + decorated = multi_value_chunked(build_request=_fake_build, url_limit=240)(fetch) + df, _ = decorated({"sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10]}) + assert len(df) == 4 + + +def test_no_quota_check_when_header_absent(): + """Without an ``x-ratelimit-remaining`` header the wrapper has no + quota signal and must NOT synthesize a ``RequestExceedsQuota``; + every planned sub-request runs.""" + + def fetch(args): + return pd.DataFrame({"sites": list(args["sites"])}), _quota_response(None) + + decorated = multi_value_chunked(build_request=_fake_build, url_limit=240)(fetch) + df, _ = decorated({"sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10]}) + assert len(df) == 4 + + +def test_quota_exhausted_on_mid_call_429(): + """Mid-call 429 (a concurrent caller drained the bucket) surfaces + as ``QuotaExhausted`` carrying the partial frame plus the chunk + offset so callers can resume after the window resets.""" + state = {"i": 0} + + def fetch(args): + i = state["i"] + state["i"] += 1 + if i == 2: + # Match _walk_pages's wrapping: a generic mid-pagination + # RuntimeError with the typed RateLimited as __cause__. + try: + raise RateLimited("429: Too many requests made.") + except RateLimited as cause: + raise RuntimeError( + "Paginated request failed after collecting 0 page(s): " + "429: Too many requests made." + ) from cause + return ( + pd.DataFrame({"i": [i], "sites": list(args["sites"])}), + _quota_response(500), + ) + + decorated = multi_value_chunked(build_request=_fake_build, url_limit=240)(fetch) + with pytest.raises(QuotaExhausted) as excinfo: + decorated({"sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10, "S5" * 10]}) + + err = excinfo.value + assert err.completed_chunks == 2 # chunks 0 and 1 banked; 429 hit on i=2 + assert err.total_chunks == 5 + assert err.partial_frame is not None + assert set(err.partial_frame["i"]) == {0, 1} + + +def test_quota_exhausted_on_first_chunk_429_has_no_partial_response(): + """A 429 on the very first sub-request means no responses are + banked; ``partial_response`` is ``None`` (and ``partial_frame`` is + empty) so callers can branch on that to distinguish "abort before + any data arrived" from "abort after partial collection".""" + + def fetch(args): + raise RateLimited("429: Too many requests made.") + + decorated = multi_value_chunked(build_request=_fake_build, url_limit=240)(fetch) + with pytest.raises(QuotaExhausted) as excinfo: + decorated({"sites": ["S1" * 10, "S2" * 10]}) + err = excinfo.value + assert err.completed_chunks == 0 + assert err.partial_response is None + assert err.partial_frame.empty + + +def test_chunker_passes_through_non_429_runtime_error(): + """A non-429 ``RuntimeError`` (e.g. a 500) is not a quota signal; + it must propagate unchanged so callers see the real cause.""" + state = {"i": 0} + + def fetch(args): + i = state["i"] + state["i"] += 1 + if i == 2: + raise RuntimeError("500: Internal server error.") + return ( + pd.DataFrame({"sites": list(args["sites"])}), + _quota_response(500), + ) + + decorated = multi_value_chunked(build_request=_fake_build, url_limit=240)(fetch) + with pytest.raises(RuntimeError, match=r"^500:"): + decorated({"sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10, "S5" * 10]}) + + +def test_quota_exhausted_message_points_at_resume(): + """The error message must surface the chunk offset and the resume + affordance — ``partial_frame`` is a footgun without it.""" + e = QuotaExhausted( + partial_frame=pd.DataFrame(), + partial_response=mock.Mock(), + completed_chunks=7, + total_chunks=20, + ) + msg = str(e) + assert "7/20" in msg + assert "429" in msg + assert "resume" in msg + + +def test_request_bytes_rejects_non_sizable_body(): + """``_request_bytes`` requires a deterministic byte count up front; + silently treating an unknown body as zero would under-chunk and let + the request blow past the server's POST-body limit. Generators, + iterables, and file-like objects must surface as ``TypeError``.""" + from dataretrieval.waterdata.chunking import _request_bytes + + class _FakeReqWithGenBody: + url = "https://example.com/foo" + body = (b"x" for _ in range(3)) + + with pytest.raises(TypeError, match="cannot size a request body"): + _request_bytes(_FakeReqWithGenBody()) + + +def test_request_bytes_handles_supported_body_types(): + """Sanity-check the supported body types: None (GET), bytes (raw + POST), str (JSON-as-string POST).""" + from dataretrieval.waterdata.chunking import _request_bytes + + class _Req: + def __init__(self, url, body): + self.url = url + self.body = body + + assert _request_bytes(_Req("ab", None)) == 2 + assert _request_bytes(_Req("ab", b"cd")) == 4 + assert _request_bytes(_Req("ab", "cd")) == 4 + assert _request_bytes(_Req("ab", bytearray(b"cd"))) == 4 + + +def test_multi_value_chunked_restores_canonical_url(): + """When chunking fans out, the aggregated response's ``.url`` must + reflect the *user's original* query (rebuilt from the unchunked + args), not the first chunk's URL. Callers logging ``md.url`` for + reproducibility need the full query.""" + sites = ["S" * 10 + str(i) for i in range(4)] + sub_urls: list[str] = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + # Each sub-response carries the chunked sub_args's URL, so + # without canonical restoration the first chunk's URL would + # leak through to md.url. + sub_url = _fake_build(**args).url + sub_urls.append(sub_url) + resp = mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + resp.headers = {} + resp.url = sub_url + return pd.DataFrame(), resp + + _df, md = fetch({"sites": sites}) + + assert len(sub_urls) > 1, "test setup error: chunker didn't fan out" + # md.url must equal the URL the unchunked query would have produced. + assert md.url == _fake_build(sites=sites).url + # And differ from every sub-request's URL (each carries a smaller list). + assert all(md.url != u for u in sub_urls) + # The canonical URL is strictly bigger byte-wise than any sub-request. + assert all(len(md.url) > len(u) for u in sub_urls) + + +def test_chunkable_params_skips_filter_passed_as_list(): + """Defensive guard: ``filter`` is documented as a string. If a caller + mistakenly passes it as a list, the chunker must NOT treat it as a + multi-value dim — comma-joining CQL clauses inside the URL would + produce a malformed filter expression. The joint planner partitions + ``filter`` via top-level ``OR`` splitting in ``_plan_joint``; it must + never be sliced as a list dim.""" + args = { + "monitoring_location_id": ["USGS-A", "USGS-B"], + "filter": ["a='1'", "a='2'"], # malformed input + "filter_lang": ["cql-text", "cql-json"], # ditto + } + chunkable = _chunkable_params(args) + assert "monitoring_location_id" in chunkable + assert "filter" not in chunkable + assert "filter_lang" not in chunkable + + +def test_chunkable_params_skips_scalar_contract_params(): + """``limit`` and ``skip_geometry`` are scalars by contract + (``int | None`` and ``bool | None`` respectively). If a caller smuggles + a list through type erasure (e.g. ``limit=["100","200"]`` after a bad + cast), the chunker must NOT treat it as a multi-value dim. Chunking + ``limit`` would silently fan into separate paginated queries with + different per-request caps; chunking ``skip_geometry`` would emit + sub-requests with conflicting geometry-output settings.""" + args = { + "monitoring_location_id": ["USGS-A", "USGS-B"], + "limit": ["100", "200"], + "skip_geometry": ["true", "false"], + } + chunkable = _chunkable_params(args) + assert "monitoring_location_id" in chunkable + assert "limit" not in chunkable + assert "skip_geometry" not in chunkable + + +def test_joint_planner_url_construction_long_filter_and_long_sites(): + """Realistic stress: 20 datetime OR-clauses combined with 100 USGS + site IDs. Every sub-request URL built from the plan must fit the + 8000-byte limit, the joint planner must beat the naive "filter at + bail-floor, chunk lists" approach, and the partitioned filters + must union to the user's original filter expression. + + Uses the real ``_construct_api_requests`` builder so the test + catches URL-encoding surprises that a fake builder would miss. + """ + # Realistic AGENCY-ID site format: USGS-{8 digits}. 500 sites is + # enough to force the URL well past the 8000-byte server limit + # without any filter contribution. + sites = [f"USGS-{i:08d}" for i in range(500)] + # 20 datetime equality clauses; each ~30 bytes raw, more after URL + # encoding (the apostrophes and `:` characters expand). + clauses = [ + f"time='2024-{m:02d}-{d:02d}T00:00:00Z'" + for m in range(1, 6) + for d in (1, 8, 15, 22) + ] + assert len(clauses) == 20 + filter_expr = " OR ".join(clauses) + + args = { + "service": "daily", + "monitoring_location_id": sites, + "filter": filter_expr, + } + url_limit = 8000 + + plan = ChunkPlan.from_args(args, _construct_api_requests, url_limit) + assert plan.total > 1, "expected non-trivial plan for over-limit request" + list_plan = plan.list_chunks + filter_chunks = plan.filter_chunks + + # Walk every sub-request the plan would issue and assert URL fits. + over_limit = [] + for sub_args in plan.iter_sub_args(): + req = _construct_api_requests(**sub_args) + url_len = len(req.url) + (len(req.body) if req.body else 0) + if url_len > url_limit: + over_limit.append((url_len, sub_args)) + assert not over_limit, ( + f"{len(over_limit)} sub-request(s) exceeded the URL limit; " + f"first: {over_limit[0]}" + ) + + # Filter partitions must union back to the original (modulo + # whitespace around `OR`). Each clause must appear exactly once. + union_clauses: list[str] = [] + for chunk in filter_chunks: + if chunk is None: + continue + union_clauses.extend(c.strip() for c in chunk.split(" OR ")) + assert union_clauses == clauses, ( + "filter partitioning must cover every original clause exactly once" + ) + + # List partitions: every original site appears in exactly one list + # chunk for each dim. Joined sites across all chunks recover the + # input set. + if "monitoring_location_id" in list_plan: + seen = [s for chunk in list_plan["monitoring_location_id"] for s in chunk] + assert sorted(seen) == sorted(sites) + + # Joint plan must beat the bail-floor-style worst case (singleton + # sites × all filter clauses singleton = 100 * 20 = 2000) — joint + # planning of these inputs cuts that by at least a factor of 4. + assert plan.total < 500, ( + f"joint plan emitted {plan.total} sub-requests (expected <500)" + ) diff --git a/tests/waterdata_filters_test.py b/tests/waterdata_filters_test.py index 545f7039..9d9d183e 100644 --- a/tests/waterdata_filters_test.py +++ b/tests/waterdata_filters_test.py @@ -7,11 +7,7 @@ import pytest from dataretrieval.waterdata.filters import ( - _CQL_FILTER_CHUNK_LEN, - _WATERDATA_URL_BYTE_LIMIT, _check_numeric_filter_pitfall, - _chunk_cql_or, - _effective_filter_budget, _split_top_level_or, ) from dataretrieval.waterdata.utils import _construct_api_requests @@ -35,11 +31,6 @@ def _fake_response(url="https://example.test", elapsed_ms=1): ) -def _build_request(**kwargs): - """Wrapper that matches the ``build_request`` callable shape.""" - return _construct_api_requests(**kwargs) - - def test_construct_filter_passthrough(): """`filter` is forwarded verbatim as a query parameter.""" expr = ( @@ -113,35 +104,6 @@ def test_split_top_level_or_single_clause(): ] -def test_chunk_cql_or_short_passthrough(): - expr = "time >= '2023-01-01T00:00:00Z'" - assert _chunk_cql_or(expr, max_len=1000) == [expr] - - -def test_chunk_cql_or_splits_into_multiple(): - clause = "(time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:30:00Z')" - expr = " OR ".join([clause] * 200) - chunks = _chunk_cql_or(expr, max_len=1000) - # each chunk must be under the budget - assert all(len(c) <= 1000 for c in chunks) - # rejoined chunks must cover every clause - rejoined_clauses = sum(len(c.split(" OR ")) for c in chunks) - assert rejoined_clauses == 200 - # and must be a valid OR chain (each chunk is itself a top-level OR of clauses) - assert len(chunks) > 1 - - -def test_chunk_cql_or_unsplittable_returns_input(): - big = "value > 0 AND " + ("A " * 4000) - assert _chunk_cql_or(big, max_len=1000) == [big] - - -def test_chunk_cql_or_single_clause_over_budget_returns_input(): - huge_clause = "(value > " + "9" * 6000 + ")" - expr = f"{huge_clause} OR (value > 0)" - assert _chunk_cql_or(expr, max_len=1000) == [expr] - - @pytest.mark.parametrize( "service", [ @@ -167,41 +129,47 @@ def test_construct_filter_on_all_ogc_services(service): assert qs["filter-lang"] == ["cql-text"] -def test_long_filter_fans_out_into_multiple_requests(): - """An oversized top-level OR filter triggers multiple HTTP requests - whose results are concatenated.""" - from dataretrieval.waterdata import get_continuous - +def _filter_chunking_clauses(n: int = 300) -> str: + """Stock long filter used by the end-to-end fan-out tests below.""" clause = ( "(time >= '2023-01-{day:02d}T00:00:00Z' " "AND time <= '2023-01-{day:02d}T00:30:00Z')" ) - expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) - assert len(expr) > _CQL_FILTER_CHUNK_LEN + return " OR ".join(clause.format(day=(i % 28) + 1) for i in range(n)) - sent_filters = [] - def fake_construct_api_requests(**kwargs): - sent_filters.append(kwargs.get("filter")) - return _fake_prepared_request() +def _filter_size_aware_build(**kwargs): + """Fake ``_construct_api_requests`` whose returned URL length scales + with the request's ``filter`` value, so the joint planner naturally + triggers chunking on long filters.""" + return _fake_prepared_request( + url=f"https://example.test/?filter={kwargs.get('filter', '')}", + ) - def fake_walk_pages(*_args, **_kwargs): + +def test_long_filter_fans_out_into_multiple_requests(): + """An oversized top-level OR filter triggers multiple HTTP + sub-requests via the joint planner; every original clause is + preserved across sub-requests; results concatenate to one row per + sub-request given the one-row-per-chunk mock.""" + from dataretrieval.waterdata import get_continuous + + expr = _filter_chunking_clauses() + sent_filters: list[str] = [] + + def fake_walk_pages(*, geopd, req): idx = len(sent_filters) - frame = pd.DataFrame({"id": [f"chunk-{idx}"], "value": [idx]}) - return frame, _fake_response() + sent_filters.append(_query_params(req).get("filter", [None])[0]) + return pd.DataFrame({"id": [f"chunk-{idx}"], "value": [idx]}), _fake_response() with ( mock.patch( "dataretrieval.waterdata.utils._construct_api_requests", - side_effect=fake_construct_api_requests, + side_effect=_filter_size_aware_build, ), mock.patch( "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages ), - mock.patch( - "dataretrieval.waterdata.filters._effective_filter_budget", - return_value=_CQL_FILTER_CHUNK_LEN, - ), ): df, _ = get_continuous( monitoring_location_id="USGS-07374525", @@ -210,51 +178,38 @@ def fake_walk_pages(*_args, **_kwargs): filter_lang="cql-text", ) - # Mocking _effective_filter_budget bypasses the URL-length probe, so - # sent_filters contains only real chunk requests. Assert invariants: - # chunking happened, every original clause is preserved exactly once - # in order, each chunk stays under the budget, and the mock's - # one-row-per-chunk responses concatenate to a row per chunk. expected_parts = _split_top_level_or(expr) assert len(sent_filters) > 1 - rejoined_parts = [] + rejoined_parts: list[str] = [] for chunk in sent_filters: rejoined_parts.extend(_split_top_level_or(chunk)) assert rejoined_parts == expected_parts assert len(df) == len(sent_filters) - assert all(len(chunk) <= _CQL_FILTER_CHUNK_LEN for chunk in sent_filters) def test_long_filter_deduplicates_cross_chunk_overlap(): - """Features returned by multiple chunks (same feature `id`) are - deduplicated in the concatenated result.""" + """Features returned by multiple sub-requests with the same ``id`` + are deduplicated in the concatenated result.""" from dataretrieval.waterdata import get_continuous - clause = ( - "(time >= '2023-01-{day:02d}T00:00:00Z' " - "AND time <= '2023-01-{day:02d}T00:30:00Z')" - ) - expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) - + expr = _filter_chunking_clauses() call_count = {"n": 0} def fake_walk_pages(*_args, **_kwargs): call_count["n"] += 1 - frame = pd.DataFrame({"id": ["shared-feature"], "value": [1]}) - return frame, _fake_response() + return ( + pd.DataFrame({"id": ["shared-feature"], "value": [1]}), + _fake_response(), + ) with ( mock.patch( "dataretrieval.waterdata.utils._construct_api_requests", - return_value=_fake_prepared_request(), + side_effect=_filter_size_aware_build, ), mock.patch( "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages ), - mock.patch( - "dataretrieval.waterdata.filters._effective_filter_budget", - return_value=_CQL_FILTER_CHUNK_LEN, - ), ): df, _ = get_continuous( monitoring_location_id="USGS-07374525", @@ -263,56 +218,46 @@ def fake_walk_pages(*_args, **_kwargs): filter_lang="cql-text", ) - # Chunking must have happened (otherwise dedup wouldn't be exercised). - assert call_count["n"] > 1 - # Even though each chunk returned a feature, dedup by id collapses them. - assert len(df) == 1 + assert call_count["n"] > 1 # chunking must have happened + assert len(df) == 1 # dedup by ``id`` collapses the duplicates def test_empty_chunks_do_not_downgrade_geodataframe(): - """A mix of empty and non-empty chunk responses must not downgrade a - GeoDataFrame-typed result to a plain DataFrame. ``_get_resp_data`` - returns ``pd.DataFrame()`` on empty responses, which would otherwise - strip geometry/CRS from the concatenated output.""" + """A mix of empty and non-empty sub-request responses must not + downgrade a GeoDataFrame-typed result to a plain DataFrame. + ``_get_resp_data`` returns ``pd.DataFrame()`` on empty responses, + which would otherwise strip geometry/CRS from the concatenated + output.""" pytest.importorskip("geopandas") import geopandas as gpd from shapely.geometry import Point from dataretrieval.waterdata import get_continuous - clause = ( - "(time >= '2023-01-{day:02d}T00:00:00Z' " - "AND time <= '2023-01-{day:02d}T00:30:00Z')" - ) - expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) - + expr = _filter_chunking_clauses() call_count = {"n": 0} def fake_walk_pages(*_args, **_kwargs): call_count["n"] += 1 - # Chunk 2 returns empty; chunks 1 and 3 return GeoDataFrames. if call_count["n"] == 2: - frame = pd.DataFrame() - else: - frame = gpd.GeoDataFrame( + return pd.DataFrame(), _fake_response() + return ( + gpd.GeoDataFrame( {"id": [f"feat-{call_count['n']}"], "value": [call_count["n"]]}, geometry=[Point(call_count["n"], call_count["n"])], crs="EPSG:4326", - ) - return frame, _fake_response() + ), + _fake_response(), + ) with ( mock.patch( "dataretrieval.waterdata.utils._construct_api_requests", - return_value=_fake_prepared_request(), + side_effect=_filter_size_aware_build, ), mock.patch( "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages ), - mock.patch( - "dataretrieval.waterdata.filters._effective_filter_budget", - return_value=_CQL_FILTER_CHUNK_LEN, - ), ): df, _ = get_continuous( monitoring_location_id="USGS-07374525", @@ -321,119 +266,11 @@ def fake_walk_pages(*_args, **_kwargs): filter_lang="cql-text", ) - # The empty chunk must not have stripped the GeoDataFrame type. assert isinstance(df, gpd.GeoDataFrame) assert "geometry" in df.columns assert df.crs is not None -def test_effective_filter_budget_respects_url_limit(): - """The computed budget, once encoded, fits within the URL byte limit - alongside the other query params.""" - from urllib.parse import quote_plus - - filter_expr = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" - args = { - "service": "continuous", - "monitoring_location_id": "USGS-02238500", - "parameter_code": "00060", - "filter": filter_expr, - "filter_lang": "cql-text", - } - raw_budget = _effective_filter_budget(args, filter_expr, _build_request) - - # Build a chunk exactly at the raw budget (padded with the clause repeated) - # and confirm the full URL it produces stays under the URL byte limit. - padded = (" OR ".join([filter_expr] * 200))[:raw_budget] - req = _construct_api_requests(**{**args, "filter": padded}) - assert len(req.url) <= _WATERDATA_URL_BYTE_LIMIT - # And the budget scales inversely with encoding ratio (sanity). - assert raw_budget < _WATERDATA_URL_BYTE_LIMIT - # Quick sanity on the encoding math itself. - assert len(quote_plus(padded)) <= _WATERDATA_URL_BYTE_LIMIT - - -def test_effective_filter_budget_uses_max_clause_ratio(): - """Heavy clauses clustered in one part of the filter must not be able - to push any chunk over the URL limit. The budget is computed against - the max per-clause encoding ratio, not the whole-filter average, so - a chunk of only-heaviest-clauses still fits.""" - from urllib.parse import quote_plus - - heavy = ( - "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z' " - "AND approval_status IN ('Approved','Provisional','Revised'))" - ) - light = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" - # Heavy ratio < light ratio for these shapes; cluster them at opposite - # ends so the chunker must produce at least one light-only chunk. - clauses = [heavy] * 100 + [light] * 400 - expr = " OR ".join(clauses) - args = { - "service": "continuous", - "monitoring_location_id": "USGS-02238500", - "filter": expr, - "filter_lang": "cql-text", - } - budget = _effective_filter_budget(args, expr, _build_request) - chunks = _chunk_cql_or(expr, max_len=budget) - assert len(chunks) > 1 - - # Every chunk, once built into a full request, fits under the URL byte - # limit — even the all-light chunks that have a higher-than-average ratio. - for chunk in chunks: - req = _construct_api_requests(**{**args, "filter": chunk}) - assert len(req.url) <= _WATERDATA_URL_BYTE_LIMIT, ( - f"chunk url {len(req.url)} exceeds {_WATERDATA_URL_BYTE_LIMIT}" - ) - - # Budget should be tight enough that a chunk of only-light clauses - # (the heavier-encoding shape here) still fits. - assert len(quote_plus(light)) * (budget // len(light)) < _WATERDATA_URL_BYTE_LIMIT - - -def test_effective_filter_budget_passes_through_when_no_url_space(): - """If the non-filter URL already exceeds the byte limit, chunking - cannot make the request succeed. The budget helper should signal - pass-through (return a budget larger than the filter) so - ``_chunk_cql_or`` emits one chunk — one 414 from the server is - clearer than a burst of N guaranteed-414 sub-requests.""" - expr = " OR ".join( - ["(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')"] * 50 - ) - fake_build = mock.Mock( - return_value=_fake_prepared_request(url="https://example.test/" + "A" * 9000) - ) - budget = _effective_filter_budget({"filter": expr}, expr, fake_build) - # Budget is large enough that _chunk_cql_or returns the expression - # unchanged (passthrough) rather than producing many small chunks. - assert budget > len(expr) - assert _chunk_cql_or(expr, max_len=budget) == [expr] - - -def test_effective_filter_budget_shrinks_with_more_url_params(): - """Adding more scalar query params consumes URL bytes and should - shrink the raw filter budget accordingly. Use a filter large enough - to skip the short-circuit fast path so the probe actually runs.""" - clause = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" - expr = " OR ".join([clause] * 100) - sparse_args = { - "service": "continuous", - "monitoring_location_id": "USGS-02238500", - "filter": expr, - "filter_lang": "cql-text", - } - dense_args = { - **sparse_args, - "parameter_code": "00060", - "statistic_id": "00003", - "last_modified": "2023-01-01T00:00:00Z/2023-12-31T23:59:59Z", - } - sparse_budget = _effective_filter_budget(sparse_args, expr, _build_request) - dense_budget = _effective_filter_budget(dense_args, expr, _build_request) - assert dense_budget < sparse_budget - - def test_cql_json_filter_is_not_chunked(): """Chunking applies only to cql-text; cql-json is passed through unchanged.""" from dataretrieval.waterdata import get_continuous diff --git a/tests/waterdata_test.py b/tests/waterdata_test.py index 18e78594..39a9a4ee 100644 --- a/tests/waterdata_test.py +++ b/tests/waterdata_test.py @@ -49,7 +49,7 @@ reruns=2, reruns_delay=5, only_rerun=[ - r"RuntimeError:\s*(?:429|5\d\d):", # _raise_for_non_200 output + r"(?:RateLimited|RuntimeError):\s*(?:429|5\d\d):", # _raise_for_non_200 output r"ConnectionError", r"ReadTimeout|ConnectTimeout|Timeout", ],