diff --git a/python-sdk/commandlayer/__init__.py b/python-sdk/commandlayer/__init__.py index 778d227..37903b9 100644 --- a/python-sdk/commandlayer/__init__.py +++ b/python-sdk/commandlayer/__init__.py @@ -19,8 +19,8 @@ ) from .verify import ( canonicalize_stable_json_v1, - parse_ed25519_pubkey, extract_receipt_verb, + parse_ed25519_pubkey, recompute_receipt_hash_sha256, resolve_signer_key, sha256_hex_utf8, diff --git a/python-sdk/commandlayer/client.py b/python-sdk/commandlayer/client.py index e12f863..302f564 100644 --- a/python-sdk/commandlayer/client.py +++ b/python-sdk/commandlayer/client.py @@ -8,7 +8,7 @@ import httpx from .errors import CommandLayerError -from .types import CommandResponse, RuntimeMetadata, VerifyOptions +from .types import CanonicalReceipt, CommandResponse, RuntimeMetadata, VerifyOptions from .verify import verify_receipt COMMONS_VERSION = "1.1.0" @@ -42,7 +42,6 @@ def build_commons_request( request_actor = str(body.get("actor") or actor or "sdk-user") return { "x402": {"verb": verb, "version": version}, - "actor": request_actor, **body, "actor": request_actor, } @@ -75,10 +74,10 @@ def normalize_command_response(payload: Any) -> CommandResponse: return response receipt = dict(payload) runtime_metadata = receipt.pop("trace", None) - response: CommandResponse = {"receipt": receipt} + normalized_response: CommandResponse = {"receipt": cast(CanonicalReceipt, receipt)} if isinstance(runtime_metadata, dict): - response["runtime_metadata"] = cast(RuntimeMetadata, runtime_metadata) - return response + normalized_response["runtime_metadata"] = cast(RuntimeMetadata, runtime_metadata) + return normalized_response class CommandLayerClient: @@ -112,7 +111,10 @@ def __init__( def _ensure_verify_config_if_enabled(self) -> None: if not self.verify_receipts: return - explicit_public_key = self.verify_defaults.get("public_key") or self.verify_defaults.get("publicKey") + explicit_public_key = ( + self.verify_defaults.get("public_key") + or self.verify_defaults.get("publicKey") + ) has_explicit = bool(str(explicit_public_key or "").strip()) ens = self.verify_defaults.get("ens") or {} has_ens = bool(ens.get("name") and (ens.get("rpcUrl") or ens.get("rpc_url"))) @@ -123,10 +125,34 @@ def _ensure_verify_config_if_enabled(self) -> None: 400, ) - def summarize(self, *, content: str, style: str | None = None, format: str | None = None, max_tokens: int = 1000) -> CommandResponse: - return self.call("summarize", {"input": {"content": content, "summary_style": style, "format_hint": format}, "limits": {"max_output_tokens": max_tokens}}) + def summarize( + self, + *, + content: str, + style: str | None = None, + format: str | None = None, + max_tokens: int = 1000, + ) -> CommandResponse: + return self.call( + "summarize", + { + "input": { + "content": content, + "summary_style": style, + "format_hint": format, + }, + "limits": {"max_output_tokens": max_tokens}, + }, + ) - def analyze(self, *, content: str, goal: str | None = None, hints: list[str] | None = None, max_tokens: int = 1000) -> CommandResponse: + def analyze( + self, + *, + content: str, + goal: str | None = None, + hints: list[str] | None = None, + max_tokens: int = 1000, + ) -> CommandResponse: payload: dict[str, Any] = {"input": content, "limits": {"max_output_tokens": max_tokens}} if goal: payload["goal"] = goal @@ -134,20 +160,104 @@ def analyze(self, *, content: str, goal: str | None = None, hints: list[str] | N payload["hints"] = hints return self.call("analyze", payload) - def classify(self, *, content: str, max_labels: int = 5, max_tokens: int = 1000) -> CommandResponse: - return self.call("classify", {"input": {"content": content}, "limits": {"max_labels": max_labels, "max_output_tokens": max_tokens}}) + def classify( + self, + *, + content: str, + max_labels: int = 5, + max_tokens: int = 1000, + ) -> CommandResponse: + return self.call( + "classify", + { + "input": {"content": content}, + "limits": { + "max_labels": max_labels, + "max_output_tokens": max_tokens, + }, + }, + ) - def clean(self, *, content: str, operations: list[str] | None = None, max_tokens: int = 1000) -> CommandResponse: - return self.call("clean", {"input": {"content": content, "operations": operations or ["normalize_newlines", "collapse_whitespace", "trim"]}, "limits": {"max_output_tokens": max_tokens}}) + def clean( + self, + *, + content: str, + operations: list[str] | None = None, + max_tokens: int = 1000, + ) -> CommandResponse: + return self.call( + "clean", + { + "input": { + "content": content, + "operations": operations + or ["normalize_newlines", "collapse_whitespace", "trim"], + }, + "limits": {"max_output_tokens": max_tokens}, + }, + ) - def convert(self, *, content: str, from_format: str, to_format: str, max_tokens: int = 1000) -> CommandResponse: - return self.call("convert", {"input": {"content": content, "source_format": from_format, "target_format": to_format}, "limits": {"max_output_tokens": max_tokens}}) + def convert( + self, + *, + content: str, + from_format: str, + to_format: str, + max_tokens: int = 1000, + ) -> CommandResponse: + return self.call( + "convert", + { + "input": { + "content": content, + "source_format": from_format, + "target_format": to_format, + }, + "limits": {"max_output_tokens": max_tokens}, + }, + ) - def describe(self, *, subject: str, audience: str = "general", detail: str = "medium", max_tokens: int = 1000) -> CommandResponse: - return self.call("describe", {"input": {"subject": (subject or "")[:140], "audience": audience, "detail_level": detail}, "limits": {"max_output_tokens": max_tokens}}) + def describe( + self, + *, + subject: str, + audience: str = "general", + detail: str = "medium", + max_tokens: int = 1000, + ) -> CommandResponse: + return self.call( + "describe", + { + "input": { + "subject": (subject or "")[:140], + "audience": audience, + "detail_level": detail, + }, + "limits": {"max_output_tokens": max_tokens}, + }, + ) - def explain(self, *, subject: str, audience: str = "general", style: str = "step-by-step", detail: str = "medium", max_tokens: int = 1000) -> CommandResponse: - return self.call("explain", {"input": {"subject": (subject or "")[:140], "audience": audience, "style": style, "detail_level": detail}, "limits": {"max_output_tokens": max_tokens}}) + def explain( + self, + *, + subject: str, + audience: str = "general", + style: str = "step-by-step", + detail: str = "medium", + max_tokens: int = 1000, + ) -> CommandResponse: + return self.call( + "explain", + { + "input": { + "subject": (subject or "")[:140], + "audience": audience, + "style": style, + "detail_level": detail, + }, + "limits": {"max_output_tokens": max_tokens}, + }, + ) def format(self, *, content: str, to: str, max_tokens: int = 1000) -> CommandResponse: return self.call( @@ -180,7 +290,14 @@ def parse( payload["input"]["schema"] = schema or target_schema return self.call("parse", payload) - def fetch(self, *, source: str, query: str | None = None, include_metadata: bool | None = None, max_tokens: int = 1000) -> CommandResponse: + def fetch( + self, + *, + source: str, + query: str | None = None, + include_metadata: bool | None = None, + max_tokens: int = 1000, + ) -> CommandResponse: input_obj: dict[str, Any] = {"source": source} if query is not None: input_obj["query"] = query @@ -225,7 +342,11 @@ def call(self, verb: str, body: dict[str, Any]) -> CommandResponse: if not response.is_success: message = ( (data.get("message") if isinstance(data, dict) else None) - or ((data.get("error") or {}).get("message") if isinstance(data, dict) and isinstance(data.get("error"), dict) else None) + or ( + (data.get("error") or {}).get("message") + if isinstance(data, dict) and isinstance(data.get("error"), dict) + else None + ) or f"HTTP {response.status_code}" ) raise CommandLayerError(str(message), response.status_code, data) @@ -233,7 +354,10 @@ def call(self, verb: str, body: dict[str, Any]) -> CommandResponse: if self.verify_receipts: verify_result = verify_receipt( normalized["receipt"], - public_key=self.verify_defaults.get("public_key") or self.verify_defaults.get("publicKey"), + public_key=( + self.verify_defaults.get("public_key") + or self.verify_defaults.get("publicKey") + ), ens=self.verify_defaults.get("ens"), ) if not verify_result["ok"]: diff --git a/python-sdk/commandlayer/verify.py b/python-sdk/commandlayer/verify.py index bd480ab..5e3040c 100644 --- a/python-sdk/commandlayer/verify.py +++ b/python-sdk/commandlayer/verify.py @@ -5,13 +5,19 @@ import hashlib import json import re -from typing import Any, Protocol +from typing import Any, Protocol, cast from nacl.exceptions import BadSignatureError from nacl.signing import VerifyKey from web3 import Web3 -from .types import CanonicalReceipt, CommandResponse, EnsVerifyOptions, SignerKeyResolution, VerifyResult +from .types import ( + CanonicalReceipt, + CommandResponse, + EnsVerifyOptions, + SignerKeyResolution, + VerifyResult, +) CANONICAL_ALG = "ed25519-sha256" CANONICAL_FORMAT = "cl-stable-json-v1" @@ -45,9 +51,10 @@ def _is_mapping(value: Any) -> bool: def extract_receipt(subject: CanonicalReceipt | CommandResponse) -> CanonicalReceipt: - if isinstance(subject, dict) and isinstance(subject.get("receipt"), dict): - return dict(subject["receipt"]) - return dict(subject) + receipt = subject.get("receipt") if isinstance(subject, dict) else None + if isinstance(receipt, dict): + return cast(CanonicalReceipt, dict(receipt)) + return cast(CanonicalReceipt, dict(subject)) def extract_receipt_verb(subject: CanonicalReceipt | CommandResponse) -> str | None: @@ -114,7 +121,9 @@ def parse_ed25519_pubkey(text: str) -> bytes: return decoded -def verify_ed25519_signature_over_utf8_hash_string(hash_hex: str, signature_b64: str, pubkey32: bytes) -> bool: +def verify_ed25519_signature_over_utf8_hash_string( + hash_hex: str, signature_b64: str, pubkey32: bytes +) -> bool: if len(pubkey32) != 32: raise ValueError("ed25519: pubkey must be 32 bytes") try: @@ -131,7 +140,9 @@ def verify_ed25519_signature_over_utf8_hash_string(hash_hex: str, signature_b64: return False -def resolve_signer_key(name: str, rpc_url: str, *, resolver: EnsTextResolver | None = None) -> SignerKeyResolution: +def resolve_signer_key( + name: str, rpc_url: str, *, resolver: EnsTextResolver | None = None +) -> SignerKeyResolution: if not rpc_url: raise ValueError("rpcUrl is required for ENS verification") txt_resolver = resolver or Web3EnsTextResolver(rpc_url) @@ -147,12 +158,18 @@ def resolve_signer_key(name: str, rpc_url: str, *, resolver: EnsTextResolver | N try: raw_public_key_bytes = parse_ed25519_pubkey(pub_key_text) except ValueError as err: - raise ValueError(f"ENS TXT cl.sig.pub malformed for signer ENS name: {signer_name}. {err}") from err - return SignerKeyResolution(algorithm="ed25519", kid=kid, raw_public_key_bytes=raw_public_key_bytes) + raise ValueError( + f"ENS TXT cl.sig.pub malformed for signer ENS name: {signer_name}. {err}" + ) from err + return SignerKeyResolution( + algorithm="ed25519", + kid=kid, + raw_public_key_bytes=raw_public_key_bytes, + ) def to_unsigned_receipt(receipt: CanonicalReceipt | CommandResponse) -> CanonicalReceipt: - unsigned = copy.deepcopy(extract_receipt(receipt)) + unsigned = cast(dict[str, Any], copy.deepcopy(extract_receipt(receipt))) if not _is_mapping(unsigned): raise ValueError("receipt must be an object") unsigned.pop("receipt_id", None) @@ -161,12 +178,15 @@ def to_unsigned_receipt(receipt: CanonicalReceipt | CommandResponse) -> Canonica metadata.pop("receipt_id", None) proof = metadata.get("proof") if isinstance(proof, dict): - metadata["proof"] = { + metadata["proof"] = cast( + Any, + { key: proof[key] for key in ("alg", "canonical", "signer_id") if isinstance(proof.get(key), str) - } - return unsigned + }, + ) + return cast(CanonicalReceipt, unsigned) def recompute_receipt_hash_sha256(receipt: CanonicalReceipt | CommandResponse) -> dict[str, str]: @@ -179,7 +199,11 @@ def _extract_rpc_url(ens: EnsVerifyOptions) -> str: return str(ens.get("rpcUrl") or ens.get("rpc_url") or "") -def verify_receipt(receipt: CanonicalReceipt | CommandResponse, public_key: str | None = None, ens: EnsVerifyOptions | None = None) -> VerifyResult: +def verify_receipt( + receipt: CanonicalReceipt | CommandResponse, + public_key: str | None = None, + ens: EnsVerifyOptions | None = None, +) -> VerifyResult: target = extract_receipt(receipt) try: metadata = target.get("metadata") if isinstance(target, dict) else None @@ -187,8 +211,14 @@ def verify_receipt(receipt: CanonicalReceipt | CommandResponse, public_key: str if not isinstance(proof, dict): proof = {} - claimed_hash = proof.get("hash_sha256") if isinstance(proof.get("hash_sha256"), str) else None - signature_b64 = proof.get("signature_b64") if isinstance(proof.get("signature_b64"), str) else None + claimed_hash = ( + proof.get("hash_sha256") if isinstance(proof.get("hash_sha256"), str) else None + ) + signature_b64 = ( + proof.get("signature_b64") + if isinstance(proof.get("signature_b64"), str) + else None + ) alg = proof.get("alg") if isinstance(proof.get("alg"), str) else None canonical = proof.get("canonical") if isinstance(proof.get("canonical"), str) else None signer_id = proof.get("signer_id") if isinstance(proof.get("signer_id"), str) else None @@ -231,15 +261,26 @@ def verify_receipt(receipt: CanonicalReceipt | CommandResponse, public_key: str elif not claimed_hash or not signature_b64: signature_error = "missing proof.hash_sha256 or proof.signature_b64" elif not pubkey: - signature_error = ens_error or "no public key available (provide public_key/publicKey or ens)" + signature_error = ( + ens_error + or "no public key available (provide public_key/publicKey or ens)" + ) else: try: - signature_valid = verify_ed25519_signature_over_utf8_hash_string(claimed_hash, signature_b64, pubkey) + signature_valid = verify_ed25519_signature_over_utf8_hash_string( + claimed_hash, signature_b64, pubkey + ) except Exception as err: # noqa: BLE001 signature_error = str(err) return { - "ok": alg_matches and canonical_matches and hash_matches and receipt_id_matches and signature_valid, + "ok": ( + alg_matches + and canonical_matches + and hash_matches + and receipt_id_matches + and signature_valid + ), "checks": { "hash_matches": hash_matches, "signature_valid": signature_valid, @@ -265,7 +306,11 @@ def verify_receipt(receipt: CanonicalReceipt | CommandResponse, public_key: str }, } except Exception as err: # noqa: BLE001 - proof = ((target.get("metadata") or {}).get("proof") or {}) if isinstance(target, dict) else {} + proof = ( + ((target.get("metadata") or {}).get("proof") or {}) + if isinstance(target, dict) + else {} + ) metadata = target.get("metadata") if isinstance(target, dict) else None return { "ok": False, @@ -278,12 +323,24 @@ def verify_receipt(receipt: CanonicalReceipt | CommandResponse, public_key: str }, "values": { "verb": extract_receipt_verb(target), - "signer_id": proof.get("signer_id") if isinstance(proof.get("signer_id"), str) else None, + "signer_id": ( + proof.get("signer_id") if isinstance(proof.get("signer_id"), str) else None + ), "alg": proof.get("alg") if isinstance(proof.get("alg"), str) else None, - "canonical": proof.get("canonical") if isinstance(proof.get("canonical"), str) else None, - "claimed_hash": proof.get("hash_sha256") if isinstance(proof.get("hash_sha256"), str) else None, + "canonical": ( + proof.get("canonical") if isinstance(proof.get("canonical"), str) else None + ), + "claimed_hash": ( + proof.get("hash_sha256") + if isinstance(proof.get("hash_sha256"), str) + else None + ), "recomputed_hash": None, - "receipt_id": metadata.get("receipt_id") if isinstance(metadata, dict) and isinstance(metadata.get("receipt_id"), str) else None, + "receipt_id": ( + metadata.get("receipt_id") + if isinstance(metadata, dict) and isinstance(metadata.get("receipt_id"), str) + else None + ), "pubkey_source": None, "ens_txt_key": None, }, diff --git a/python-sdk/tests/test_client.py b/python-sdk/tests/test_client.py index aed1c8b..3c020cd 100644 --- a/python-sdk/tests/test_client.py +++ b/python-sdk/tests/test_client.py @@ -75,7 +75,10 @@ def test_parse_uses_current_schema_field() -> None: def handler(request: httpx.Request) -> httpx.Response: captured["json"] = json.loads(request.content.decode("utf-8")) - return httpx.Response(200, json={"receipt": {"status": "success", "metadata": {"proof": {}}}}) + return httpx.Response( + 200, + json={"receipt": {"status": "success", "metadata": {"proof": {}}}}, + ) client = CommandLayerClient( runtime="https://runtime.commandlayer.org",