Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 81 additions & 82 deletions python/numbersprotocol_capture/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import httpx

from .crypto import create_integrity_proof, sha256, sign_integrity_proof
from .errors import CaptureError, ValidationError, create_api_error
from .errors import CaptureError, NetworkError, ValidationError, create_api_error
from .types import (
Asset,
AssetSearchOptions,
Expand All @@ -36,6 +36,9 @@
ASSET_SEARCH_API_URL = "https://us-central1-numbers-protocol-api.cloudfunctions.net/asset-search"
NFT_SEARCH_API_URL = "https://eofveg1f59hrbn.m.pipedream.net"

# Maximum allowed response body size (10 MB)
MAX_RESPONSE_SIZE = 10 * 1024 * 1024

# Common MIME types by extension
MIME_TYPES: dict[str, str] = {
"jpg": "image/jpeg",
Expand Down Expand Up @@ -168,6 +171,20 @@ def close(self) -> None:
"""Close the HTTP client."""
self._client.close()

def _read_response_body(self, response: httpx.Response) -> bytes:
"""Reads response body with a maximum size limit."""
content_length = response.headers.get("content-length")
if content_length and int(content_length) > MAX_RESPONSE_SIZE:
raise NetworkError("Response body too large")
chunks: list[bytes] = []
total_size = 0
for chunk in response.iter_bytes():
total_size += len(chunk)
if total_size > MAX_RESPONSE_SIZE:
raise NetworkError("Response body too large")
chunks.append(chunk)
return b"".join(chunks)

def _request(
self,
method: str,
Expand All @@ -179,47 +196,37 @@ def _request(
nid: str | None = None,
) -> dict[str, Any]:
"""Makes an authenticated API request."""
headers = {"Authorization": f"token {self._token}"}
headers: dict[str, str] = {"Authorization": f"token {self._token}"}
if json_body:
headers["Content-Type"] = "application/json"

kwargs: dict[str, Any] = {"headers": headers}
if files:
kwargs["data"] = data
kwargs["files"] = files
elif json_body:
kwargs["json"] = json_body
elif data:
kwargs["data"] = data

try:
if files:
response = self._client.request(
method,
url,
headers=headers,
data=data,
files=files,
)
elif json_body:
headers["Content-Type"] = "application/json"
response = self._client.request(
method,
url,
headers=headers,
json=json_body,
)
else:
response = self._client.request(
method,
url,
headers=headers,
data=data,
)
with self._client.stream(method, url, **kwargs) as response:
body = self._read_response_body(response)

if not response.is_success:
message = f"API request failed with status {response.status_code}"
try:
error_data = json.loads(body)
message = error_data.get("detail") or error_data.get("message") or message
except Exception:
pass
raise create_api_error(response.status_code, message, nid)

result: dict[str, Any] = json.loads(body)
return result
except httpx.RequestError as e:
raise create_api_error(0, f"Network error: {e}", nid) from e

if not response.is_success:
message = f"API request failed with status {response.status_code}"
try:
error_data = response.json()
message = error_data.get("detail") or error_data.get("message") or message
except Exception:
pass
raise create_api_error(response.status_code, message, nid)

result: dict[str, Any] = response.json()
return result

def register(
self,
file: FileInput,
Expand Down Expand Up @@ -678,37 +685,30 @@ def search_asset(
headers = {"Authorization": f"token {self._token}"}

try:
request_kwargs: dict[str, Any] = {
"headers": headers,
"data": form_data,
}
if files_data:
response = self._client.post(
ASSET_SEARCH_API_URL,
headers=headers,
data=form_data,
files=files_data,
)
else:
response = self._client.post(
ASSET_SEARCH_API_URL,
headers=headers,
data=form_data,
)
request_kwargs["files"] = files_data
with self._client.stream("POST", ASSET_SEARCH_API_URL, **request_kwargs) as response:
body = self._read_response_body(response)
if not response.is_success:
message = f"Asset search failed with status {response.status_code}"
try:
error_data = json.loads(body)
message = (
error_data.get("message")
or error_data.get("error")
or message
)
except Exception:
pass
raise create_api_error(response.status_code, message)
data = json.loads(body)
except httpx.RequestError as e:
raise create_api_error(0, f"Network error: {e}") from e

if not response.is_success:
message = f"Asset search failed with status {response.status_code}"
try:
error_data = response.json()
message = (
error_data.get("message")
or error_data.get("error")
or message
)
except Exception:
pass
raise create_api_error(response.status_code, message)

data = response.json()

# Map response to our type
similar_matches = [
SimilarMatch(nid=m["nid"], distance=m["distance"])
Expand Down Expand Up @@ -742,33 +742,32 @@ def search_nft(self, nid: str) -> NftSearchResult:

headers = {
"Content-Type": "application/json",
"Authorization": f"token {self._token}",
}

try:
response = self._client.post(
with self._client.stream(
"POST",
NFT_SEARCH_API_URL,
headers=headers,
json={"nid": nid},
)
) as response:
body = self._read_response_body(response)
if not response.is_success:
message = f"NFT search failed with status {response.status_code}"
try:
error_data = json.loads(body)
message = (
error_data.get("message")
or error_data.get("error")
or message
)
except Exception:
pass
raise create_api_error(response.status_code, message, nid)
data = json.loads(body)
except httpx.RequestError as e:
raise create_api_error(0, f"Network error: {e}", nid) from e

if not response.is_success:
message = f"NFT search failed with status {response.status_code}"
try:
error_data = response.json()
message = (
error_data.get("message")
or error_data.get("error")
or message
)
except Exception:
pass
raise create_api_error(response.status_code, message, nid)

data = response.json()

# Map response to our type
records = [
NftRecord(
Expand Down
63 changes: 63 additions & 0 deletions python/tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Tests for the Capture client."""

import pytest
import respx
from httpx import Response as HttpxResponse

from numbersprotocol_capture import Capture, ValidationError

Expand Down Expand Up @@ -76,3 +78,64 @@ def test_update_empty_nid_raises_error(self) -> None:
with Capture(token="test-token") as capture:
with pytest.raises(ValidationError, match="nid is required"):
capture.update("", caption="test")


class TestNftSearchSecurity:
"""Tests verifying NFT search does not leak the auth token to third-party endpoint."""

@respx.mock
def test_search_nft_does_not_send_authorization_header(self) -> None:
"""Verify token is NOT sent in Authorization header to the Pipedream endpoint."""
nft_search_url = "https://eofveg1f59hrbn.m.pipedream.net"
test_token = "my-secret-token"

route = respx.post(nft_search_url).mock(
return_value=HttpxResponse(200, json={"records": [], "order_id": "order_1"})
)

with Capture(token=test_token) as capture:
capture.search_nft("bafybeitest")

request = route.calls[0].request
assert "Authorization" not in request.headers


class TestResponseSizeLimit:
"""Tests for HTTP response body size limits."""

@respx.mock
def test_search_nft_raises_on_oversized_response(self) -> None:
"""Verify NetworkError is raised when NFT search response exceeds size limit."""
from numbersprotocol_capture.errors import NetworkError
from numbersprotocol_capture.client import MAX_RESPONSE_SIZE

nft_search_url = "https://eofveg1f59hrbn.m.pipedream.net"
oversized_body = b"x" * (MAX_RESPONSE_SIZE + 1)

respx.post(nft_search_url).mock(
return_value=HttpxResponse(200, content=oversized_body)
)

with Capture(token="test-token") as capture:
with pytest.raises(NetworkError, match="Response body too large"):
capture.search_nft("bafybeitest")

@respx.mock
def test_search_nft_raises_on_large_content_length_header(self) -> None:
"""Verify NetworkError is raised when Content-Length header exceeds size limit."""
from numbersprotocol_capture.errors import NetworkError
from numbersprotocol_capture.client import MAX_RESPONSE_SIZE

nft_search_url = "https://eofveg1f59hrbn.m.pipedream.net"

respx.post(nft_search_url).mock(
return_value=HttpxResponse(
200,
content=b'{"records":[]}',
headers={"content-length": str(MAX_RESPONSE_SIZE + 1)},
)
)

with Capture(token="test-token") as capture:
with pytest.raises(NetworkError, match="Response body too large"):
capture.search_nft("bafybeitest")
Loading