diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index a016962c..27b5cb4c 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -47,9 +47,14 @@ initdb inmemory INR isready +jku JPY JSONRPCt +jwk +jwks JWS +jws +kid kwarg langgraph lifecycles diff --git a/pyproject.toml b/pyproject.toml index 46f7400a..561a5a45 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ grpc = ["grpcio>=1.60", "grpcio-tools>=1.60", "grpcio_reflection>=1.7.0"] telemetry = ["opentelemetry-api>=1.33.0", "opentelemetry-sdk>=1.33.0"] postgresql = ["sqlalchemy[asyncio,postgresql-asyncpg]>=2.0.0"] mysql = ["sqlalchemy[asyncio,aiomysql]>=2.0.0"] +signing = ["PyJWT>=2.0.0"] sqlite = ["sqlalchemy[asyncio,aiosqlite]>=2.0.0"] sql = ["a2a-sdk[postgresql,mysql,sqlite]"] @@ -45,6 +46,7 @@ all = [ "a2a-sdk[encryption]", "a2a-sdk[grpc]", "a2a-sdk[telemetry]", + "a2a-sdk[signing]", ] [project.urls] @@ -86,6 +88,7 @@ style = "pep440" dev = [ "datamodel-code-generator>=0.30.0", "mypy>=1.15.0", + "PyJWT>=2.0.0", "pytest>=8.3.5", "pytest-asyncio>=0.26.0", "pytest-cov>=6.1.1", diff --git a/src/a2a/client/base_client.py b/src/a2a/client/base_client.py index fac7ecad..c870f329 100644 --- a/src/a2a/client/base_client.py +++ b/src/a2a/client/base_client.py @@ -1,4 +1,4 @@ -from collections.abc import AsyncIterator +from collections.abc import AsyncIterator, Callable from typing import Any from a2a.client.client import ( @@ -261,6 +261,7 @@ async def get_card( *, context: ClientCallContext | None = None, extensions: list[str] | None = None, + signature_verifier: Callable[[AgentCard], None] | None = None, ) -> AgentCard: """Retrieves the agent's card. @@ -270,12 +271,15 @@ async def get_card( Args: context: The client call context. extensions: List of extensions to be activated. + signature_verifier: A callable used to verify the agent card's signatures. Returns: The `AgentCard` for the agent. """ card = await self._transport.get_card( - context=context, extensions=extensions + context=context, + extensions=extensions, + signature_verifier=signature_verifier, ) self._card = card return card diff --git a/src/a2a/client/client.py b/src/a2a/client/client.py index fd97b4d1..286641a7 100644 --- a/src/a2a/client/client.py +++ b/src/a2a/client/client.py @@ -185,6 +185,7 @@ async def get_card( *, context: ClientCallContext | None = None, extensions: list[str] | None = None, + signature_verifier: Callable[[AgentCard], None] | None = None, ) -> AgentCard: """Retrieves the agent's card.""" diff --git a/src/a2a/client/transports/base.py b/src/a2a/client/transports/base.py index 8f114d95..0c54a28d 100644 --- a/src/a2a/client/transports/base.py +++ b/src/a2a/client/transports/base.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from collections.abc import AsyncGenerator +from collections.abc import AsyncGenerator, Callable from a2a.client.middleware import ClientCallContext from a2a.types import ( @@ -103,6 +103,7 @@ async def get_card( *, context: ClientCallContext | None = None, extensions: list[str] | None = None, + signature_verifier: Callable[[AgentCard], None] | None = None, ) -> AgentCard: """Retrieves the AgentCard.""" diff --git a/src/a2a/client/transports/grpc.py b/src/a2a/client/transports/grpc.py index 4e27953a..c5edf7a1 100644 --- a/src/a2a/client/transports/grpc.py +++ b/src/a2a/client/transports/grpc.py @@ -1,6 +1,6 @@ import logging -from collections.abc import AsyncGenerator +from collections.abc import AsyncGenerator, Callable try: @@ -223,6 +223,7 @@ async def get_card( *, context: ClientCallContext | None = None, extensions: list[str] | None = None, + signature_verifier: Callable[[AgentCard], None] | None = None, ) -> AgentCard: """Retrieves the agent's card.""" card = self.agent_card @@ -236,6 +237,9 @@ async def get_card( metadata=self._get_grpc_metadata(extensions), ) card = proto_utils.FromProto.agent_card(card_pb) + if signature_verifier is not None: + signature_verifier(card) + self.agent_card = card self._needs_extended_card = False return card diff --git a/src/a2a/client/transports/jsonrpc.py b/src/a2a/client/transports/jsonrpc.py index 32cf74f2..54c758ff 100644 --- a/src/a2a/client/transports/jsonrpc.py +++ b/src/a2a/client/transports/jsonrpc.py @@ -1,7 +1,7 @@ import json import logging -from collections.abc import AsyncGenerator +from collections.abc import AsyncGenerator, Callable from typing import Any from uuid import uuid4 @@ -379,6 +379,7 @@ async def get_card( *, context: ClientCallContext | None = None, extensions: list[str] | None = None, + signature_verifier: Callable[[AgentCard], None] | None = None, ) -> AgentCard: """Retrieves the agent's card.""" modified_kwargs = update_extension_header( @@ -386,9 +387,12 @@ async def get_card( extensions if extensions is not None else self.extensions, ) card = self.agent_card + if not card: resolver = A2ACardResolver(self.httpx_client, self.url) card = await resolver.get_agent_card(http_kwargs=modified_kwargs) + if signature_verifier is not None: + signature_verifier(card) self._needs_extended_card = ( card.supports_authenticated_extended_card ) @@ -413,9 +417,13 @@ async def get_card( ) if isinstance(response.root, JSONRPCErrorResponse): raise A2AClientJSONRPCError(response.root) - self.agent_card = response.root.result + card = response.root.result + if signature_verifier is not None: + signature_verifier(card) + + self.agent_card = card self._needs_extended_card = False - return self.agent_card + return card async def close(self) -> None: """Closes the httpx client.""" diff --git a/src/a2a/client/transports/rest.py b/src/a2a/client/transports/rest.py index bdfcc8ba..1649be1c 100644 --- a/src/a2a/client/transports/rest.py +++ b/src/a2a/client/transports/rest.py @@ -1,7 +1,7 @@ import json import logging -from collections.abc import AsyncGenerator +from collections.abc import AsyncGenerator, Callable from typing import Any import httpx @@ -371,6 +371,7 @@ async def get_card( *, context: ClientCallContext | None = None, extensions: list[str] | None = None, + signature_verifier: Callable[[AgentCard], None] | None = None, ) -> AgentCard: """Retrieves the agent's card.""" modified_kwargs = update_extension_header( @@ -378,9 +379,12 @@ async def get_card( extensions if extensions is not None else self.extensions, ) card = self.agent_card + if not card: resolver = A2ACardResolver(self.httpx_client, self.url) card = await resolver.get_agent_card(http_kwargs=modified_kwargs) + if signature_verifier is not None: + signature_verifier(card) self._needs_extended_card = ( card.supports_authenticated_extended_card ) @@ -398,6 +402,9 @@ async def get_card( '/v1/card', {}, modified_kwargs ) card = AgentCard.model_validate(response_data) + if signature_verifier is not None: + signature_verifier(card) + self.agent_card = card self._needs_extended_card = False return card diff --git a/src/a2a/utils/helpers.py b/src/a2a/utils/helpers.py index 96c1646a..96acdc1e 100644 --- a/src/a2a/utils/helpers.py +++ b/src/a2a/utils/helpers.py @@ -2,6 +2,7 @@ import functools import inspect +import json import logging from collections.abc import Callable @@ -9,6 +10,7 @@ from uuid import uuid4 from a2a.types import ( + AgentCard, Artifact, MessageSendParams, Part, @@ -340,3 +342,29 @@ def are_modalities_compatible( return True return any(x in server_output_modes for x in client_output_modes) + + +def _clean_empty(d: Any) -> Any: + """Recursively remove empty strings, lists and dicts from a dictionary.""" + if isinstance(d, dict): + cleaned_dict: dict[Any, Any] = { + k: _clean_empty(v) for k, v in d.items() + } + return {k: v for k, v in cleaned_dict.items() if v} + if isinstance(d, list): + cleaned_list: list[Any] = [_clean_empty(v) for v in d] + return [v for v in cleaned_list if v] + return d if d not in ['', [], {}] else None + + +def canonicalize_agent_card(agent_card: AgentCard) -> str: + """Canonicalizes the Agent Card JSON according to RFC 8785 (JCS).""" + card_dict = agent_card.model_dump( + exclude={'signatures'}, + exclude_defaults=True, + exclude_none=True, + by_alias=True, + ) + # Recursively remove empty values + cleaned_dict = _clean_empty(card_dict) + return json.dumps(cleaned_dict, separators=(',', ':'), sort_keys=True) diff --git a/src/a2a/utils/proto_utils.py b/src/a2a/utils/proto_utils.py index 8bf01eea..14ac098d 100644 --- a/src/a2a/utils/proto_utils.py +++ b/src/a2a/utils/proto_utils.py @@ -397,6 +397,21 @@ def agent_card( ] if card.additional_interfaces else None, + signatures=[cls.agent_card_signature(x) for x in card.signatures] + if card.signatures + else None, + ) + + @classmethod + def agent_card_signature( + cls, signature: types.AgentCardSignature + ) -> a2a_pb2.AgentCardSignature: + return a2a_pb2.AgentCardSignature( + protected=signature.protected, + signature=signature.signature, + header=dict_to_struct(signature.header) + if signature.header is not None + else None, ) @classmethod @@ -865,6 +880,19 @@ def agent_card( ] if card.additional_interfaces else None, + signatures=[cls.agent_card_signature(x) for x in card.signatures] + if card.signatures + else None, + ) + + @classmethod + def agent_card_signature( + cls, signature: a2a_pb2.AgentCardSignature + ) -> types.AgentCardSignature: + return types.AgentCardSignature( + protected=signature.protected, + signature=signature.signature, + header=json_format.MessageToDict(signature.header), ) @classmethod diff --git a/src/a2a/utils/signing.py b/src/a2a/utils/signing.py new file mode 100644 index 00000000..6ea8c21b --- /dev/null +++ b/src/a2a/utils/signing.py @@ -0,0 +1,152 @@ +import json + +from collections.abc import Callable +from typing import Any, TypedDict + +from a2a.utils.helpers import canonicalize_agent_card + + +try: + import jwt + + from jwt.api_jwk import PyJWK + from jwt.exceptions import PyJWTError + from jwt.utils import base64url_decode, base64url_encode +except ImportError as e: + raise ImportError( + 'A2A Signing requires PyJWT to be installed. ' + 'Install with: ' + "'pip install a2a-sdk[signing]'" + ) from e + +from a2a.types import AgentCard, AgentCardSignature + + +class SignatureVerificationError(Exception): + """Base exception for signature verification errors.""" + + +class NoSignatureError(SignatureVerificationError): + """Exception raised when no signature is found on an AgentCard.""" + + +class InvalidSignaturesError(SignatureVerificationError): + """Exception raised when all signatures are invalid.""" + + +class ProtectedHeader(TypedDict): + """Protected header parameters for JWS (JSON Web Signature).""" + + kid: str + """ Key identifier. """ + alg: str | None + """ Algorithm used for signing. """ + jku: str | None + """ JSON Web Key Set URL. """ + typ: str | None + """ Token type. + + Best practice: SHOULD be "JOSE" for JWS tokens. + """ + + +def create_agent_card_signer( + signing_key: PyJWK | str | bytes, + protected_header: ProtectedHeader, + header: dict[str, Any] | None = None, +) -> Callable[[AgentCard], AgentCard]: + """Creates a function that signs an AgentCard and adds the signature. + + Args: + signing_key: The private key for signing. + protected_header: The protected header parameters. + header: Unprotected header parameters. + + Returns: + A callable that takes an AgentCard and returns the modified AgentCard with a signature. + """ + + def agent_card_signer(agent_card: AgentCard) -> AgentCard: + """Signs agent card.""" + canonical_payload = canonicalize_agent_card(agent_card) + payload_dict = json.loads(canonical_payload) + + jws_string = jwt.encode( + payload=payload_dict, + key=signing_key, + algorithm=protected_header.get('alg', 'HS256'), + headers=dict(protected_header), + ) + + # The result of jwt.encode is a compact serialization: HEADER.PAYLOAD.SIGNATURE + protected, _, signature = jws_string.split('.') + + agent_card_signature = AgentCardSignature( + header=header, + protected=protected, + signature=signature, + ) + + agent_card.signatures = (agent_card.signatures or []) + [ + agent_card_signature + ] + return agent_card + + return agent_card_signer + + +def create_signature_verifier( + key_provider: Callable[[str | None, str | None], PyJWK | str | bytes], + algorithms: list[str], +) -> Callable[[AgentCard], None]: + """Creates a function that verifies the signatures on an AgentCard. + + The verifier succeeds if at least one signature is valid. Otherwise, it raises an error. + + Args: + key_provider: A callable that accepts a key ID (kid) and a JWK Set URL (jku) and returns the verification key. + This function is responsible for fetching the correct key for a given signature. + algorithms: A list of acceptable algorithms (e.g., ['ES256', 'RS256']) for verification used to prevent algorithm confusion attacks. + + Returns: + A function that takes an AgentCard as input, and raises an error if none of the signatures are valid. + """ + + def signature_verifier( + agent_card: AgentCard, + ) -> None: + """Verifies agent card signatures.""" + if not agent_card.signatures: + raise NoSignatureError('AgentCard has no signatures to verify.') + + for agent_card_signature in agent_card.signatures: + try: + # get verification key + protected_header_json = base64url_decode( + agent_card_signature.protected.encode('utf-8') + ).decode('utf-8') + protected_header = json.loads(protected_header_json) + kid = protected_header.get('kid') + jku = protected_header.get('jku') + verification_key = key_provider(kid, jku) + + canonical_payload = canonicalize_agent_card(agent_card) + encoded_payload = base64url_encode( + canonical_payload.encode('utf-8') + ).decode('utf-8') + + token = f'{agent_card_signature.protected}.{encoded_payload}.{agent_card_signature.signature}' + jwt.decode( + jwt=token, + key=verification_key, + algorithms=algorithms, + ) + # Found a valid signature, exit the loop and function + break + except PyJWTError: + continue + else: + # This block runs only if the loop completes without a break + raise InvalidSignaturesError('No valid signature found') + + return signature_verifier diff --git a/tests/integration/test_client_server_integration.py b/tests/integration/test_client_server_integration.py index e0a564ee..e6552fcb 100644 --- a/tests/integration/test_client_server_integration.py +++ b/tests/integration/test_client_server_integration.py @@ -1,6 +1,6 @@ import asyncio from collections.abc import AsyncGenerator -from typing import NamedTuple +from typing import NamedTuple, Any from unittest.mock import ANY, AsyncMock, patch import grpc @@ -9,6 +9,7 @@ import pytest_asyncio from grpc.aio import Channel +from jwt.api_jwk import PyJWK from a2a.client import ClientConfig from a2a.client.base_client import BaseClient from a2a.client.transports import JsonRpcTransport, RestTransport @@ -17,6 +18,10 @@ from a2a.grpc import a2a_pb2_grpc from a2a.server.apps import A2AFastAPIApplication, A2ARESTFastAPIApplication from a2a.server.request_handlers import GrpcHandler, RequestHandler +from a2a.utils.signing import ( + create_agent_card_signer, + create_signature_verifier, +) from a2a.types import ( AgentCapabilities, AgentCard, @@ -37,6 +42,7 @@ TextPart, TransportProtocol, ) +from cryptography.hazmat.primitives import asymmetric # --- Test Constants --- @@ -83,6 +89,15 @@ ) +def create_key_provider(verification_key: PyJWK | str | bytes): + """Creates a key provider function for testing.""" + + def key_provider(kid: str | None, jku: str | None): + return verification_key + + return key_provider + + # --- Test Fixtures --- @@ -739,6 +754,7 @@ async def test_http_transport_get_authenticated_card( transport = RestTransport(httpx_client=httpx_client, agent_card=agent_card) result = await transport.get_card() assert result.name == extended_agent_card.name + assert transport.agent_card is not None assert transport.agent_card.name == extended_agent_card.name assert transport._needs_extended_card is False @@ -761,6 +777,7 @@ def channel_factory(address: str) -> Channel: transport = GrpcTransport(channel=channel, agent_card=agent_card) # The transport starts with a minimal card, get_card() fetches the full one + assert transport.agent_card is not None transport.agent_card.supports_authenticated_extended_card = True result = await transport.get_card() @@ -772,7 +789,7 @@ def channel_factory(address: str) -> Channel: @pytest.mark.asyncio -async def test_base_client_sends_message_with_extensions( +async def test_json_transport_base_client_send_message_with_extensions( jsonrpc_setup: TransportSetup, agent_card: AgentCard ) -> None: """ @@ -827,3 +844,300 @@ async def test_base_client_sends_message_with_extensions( if hasattr(transport, 'close'): await transport.close() + + +@pytest.mark.asyncio +async def test_json_transport_get_signed_base_card( + jsonrpc_setup: TransportSetup, agent_card: AgentCard +) -> None: + """Tests fetching and verifying a symmetrically signed AgentCard via JSON-RPC. + + The client transport is initialized without a card, forcing it to fetch + the base card from the server. The server signs the card using HS384. + The client then verifies the signature. + """ + mock_request_handler = jsonrpc_setup.handler + agent_card.supports_authenticated_extended_card = False + + # Setup signing on the server side + key = 'key12345' + signer = create_agent_card_signer( + signing_key=key, + protected_header={ + 'alg': 'HS384', + 'kid': 'testkey', + 'jku': None, + 'typ': 'JOSE', + }, + ) + + app_builder = A2AFastAPIApplication( + agent_card, + mock_request_handler, + card_modifier=signer, # Sign the base card + ) + app = app_builder.build() + httpx_client = httpx.AsyncClient(transport=httpx.ASGITransport(app=app)) + + transport = JsonRpcTransport( + httpx_client=httpx_client, + url=agent_card.url, + agent_card=None, + ) + + # Get the card, this will trigger verification in get_card + signature_verifier = create_signature_verifier( + create_key_provider(key), ['HS384'] + ) + result = await transport.get_card(signature_verifier=signature_verifier) + assert result.name == agent_card.name + assert result.signatures is not None + assert len(result.signatures) == 1 + assert transport.agent_card is not None + assert transport.agent_card.name == agent_card.name + assert transport._needs_extended_card is False + + if hasattr(transport, 'close'): + await transport.close() + + +@pytest.mark.asyncio +async def test_json_transport_get_signed_extended_card( + jsonrpc_setup: TransportSetup, agent_card: AgentCard +) -> None: + """Tests fetching and verifying an asymmetrically signed extended AgentCard via JSON-RPC. + + The client has a base card and fetches the extended card, which is signed + by the server using ES256. The client verifies the signature on the + received extended card. + """ + mock_request_handler = jsonrpc_setup.handler + agent_card.supports_authenticated_extended_card = True + extended_agent_card = agent_card.model_copy(deep=True) + extended_agent_card.name = 'Extended Agent Card' + + # Setup signing on the server side + private_key = asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) + public_key = private_key.public_key() + signer = create_agent_card_signer( + signing_key=private_key, + protected_header={ + 'alg': 'ES256', + 'kid': 'testkey', + 'jku': None, + 'typ': 'JOSE', + }, + ) + + app_builder = A2AFastAPIApplication( + agent_card, + mock_request_handler, + extended_agent_card=extended_agent_card, + extended_card_modifier=lambda card, ctx: signer( + card + ), # Sign the extended card + ) + app = app_builder.build() + httpx_client = httpx.AsyncClient(transport=httpx.ASGITransport(app=app)) + + transport = JsonRpcTransport( + httpx_client=httpx_client, agent_card=agent_card + ) + + # Get the card, this will trigger verification in get_card + signature_verifier = create_signature_verifier( + create_key_provider(public_key), ['HS384', 'ES256'] + ) + result = await transport.get_card(signature_verifier=signature_verifier) + assert result.name == extended_agent_card.name + assert result.signatures is not None + assert len(result.signatures) == 1 + assert transport.agent_card is not None + assert transport.agent_card.name == extended_agent_card.name + assert transport._needs_extended_card is False + + if hasattr(transport, 'close'): + await transport.close() + + +@pytest.mark.asyncio +async def test_json_transport_get_signed_base_and_extended_cards( + jsonrpc_setup: TransportSetup, agent_card: AgentCard +) -> None: + """Tests fetching and verifying both base and extended cards via JSON-RPC when no card is initially provided. + + The client starts with no card. It first fetches the base card, which is + signed. It then fetches the extended card, which is also signed. Both signatures + are verified independently upon retrieval. + """ + mock_request_handler = jsonrpc_setup.handler + assert agent_card.signatures is None + agent_card.supports_authenticated_extended_card = True + extended_agent_card = agent_card.model_copy(deep=True) + extended_agent_card.name = 'Extended Agent Card' + + # Setup signing on the server side + private_key = asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) + public_key = private_key.public_key() + signer = create_agent_card_signer( + signing_key=private_key, + protected_header={ + 'alg': 'ES256', + 'kid': 'testkey', + 'jku': None, + 'typ': 'JOSE', + }, + ) + + app_builder = A2AFastAPIApplication( + agent_card, + mock_request_handler, + extended_agent_card=extended_agent_card, + card_modifier=signer, # Sign the base card + extended_card_modifier=lambda card, ctx: signer( + card + ), # Sign the extended card + ) + app = app_builder.build() + httpx_client = httpx.AsyncClient(transport=httpx.ASGITransport(app=app)) + + transport = JsonRpcTransport( + httpx_client=httpx_client, + url=agent_card.url, + agent_card=None, + ) + + # Get the card, this will trigger verification in get_card + signature_verifier = create_signature_verifier( + create_key_provider(public_key), ['HS384', 'ES256', 'RS256'] + ) + result = await transport.get_card(signature_verifier=signature_verifier) + assert result.name == extended_agent_card.name + assert result.signatures is not None + assert len(result.signatures) == 1 + assert transport.agent_card is not None + assert transport.agent_card.name == extended_agent_card.name + assert transport._needs_extended_card is False + + if hasattr(transport, 'close'): + await transport.close() + + +@pytest.mark.asyncio +async def test_rest_transport_get_signed_card( + rest_setup: TransportSetup, agent_card: AgentCard +) -> None: + """Tests fetching and verifying signed base and extended cards via REST. + + The client starts with no card. It first fetches the base card, which is + signed. It then fetches the extended card, which is also signed. Both signatures + are verified independently upon retrieval. + """ + mock_request_handler = rest_setup.handler + agent_card.supports_authenticated_extended_card = True + extended_agent_card = agent_card.model_copy(deep=True) + extended_agent_card.name = 'Extended Agent Card' + + # Setup signing on the server side + private_key = asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) + public_key = private_key.public_key() + signer = create_agent_card_signer( + signing_key=private_key, + protected_header={ + 'alg': 'ES256', + 'kid': 'testkey', + 'jku': None, + 'typ': 'JOSE', + }, + ) + + app_builder = A2ARESTFastAPIApplication( + agent_card, + mock_request_handler, + extended_agent_card=extended_agent_card, + card_modifier=signer, # Sign the base card + extended_card_modifier=lambda card, ctx: signer( + card + ), # Sign the extended card + ) + app = app_builder.build() + httpx_client = httpx.AsyncClient(transport=httpx.ASGITransport(app=app)) + + transport = RestTransport( + httpx_client=httpx_client, + url=agent_card.url, + agent_card=None, + ) + + # Get the card, this will trigger verification in get_card + signature_verifier = create_signature_verifier( + create_key_provider(public_key), ['HS384', 'ES256', 'RS256'] + ) + result = await transport.get_card(signature_verifier=signature_verifier) + assert result.name == extended_agent_card.name + assert result.signatures is not None + assert len(result.signatures) == 1 + assert transport.agent_card is not None + assert transport.agent_card.name == extended_agent_card.name + assert transport._needs_extended_card is False + + if hasattr(transport, 'close'): + await transport.close() + + +@pytest.mark.asyncio +async def test_grpc_transport_get_signed_card( + mock_request_handler: AsyncMock, agent_card: AgentCard +) -> None: + """Tests fetching and verifying a signed AgentCard via gRPC.""" + # Setup signing on the server side + agent_card.supports_authenticated_extended_card = True + + private_key = asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) + public_key = private_key.public_key() + signer = create_agent_card_signer( + signing_key=private_key, + protected_header={ + 'alg': 'ES256', + 'kid': 'testkey', + 'jku': None, + 'typ': 'JOSE', + }, + ) + + server = grpc.aio.server() + port = server.add_insecure_port('[::]:0') + server_address = f'localhost:{port}' + agent_card.url = server_address + + servicer = GrpcHandler( + agent_card, + mock_request_handler, + card_modifier=signer, + ) + a2a_pb2_grpc.add_A2AServiceServicer_to_server(servicer, server) + await server.start() + + transport = None # Initialize transport + try: + + def channel_factory(address: str) -> Channel: + return grpc.aio.insecure_channel(address) + + channel = channel_factory(server_address) + transport = GrpcTransport(channel=channel, agent_card=agent_card) + transport.agent_card = None + assert transport._needs_extended_card is True + + # Get the card, this will trigger verification in get_card + signature_verifier = create_signature_verifier( + create_key_provider(public_key), ['HS384', 'ES256', 'RS256'] + ) + result = await transport.get_card(signature_verifier=signature_verifier) + assert result.signatures is not None + assert len(result.signatures) == 1 + assert transport._needs_extended_card is False + finally: + if transport: + await transport.close() + await server.stop(0) # Gracefully stop the server diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py index 28acd27c..f3227d32 100644 --- a/tests/utils/test_helpers.py +++ b/tests/utils/test_helpers.py @@ -7,6 +7,10 @@ from a2a.types import ( Artifact, + AgentCard, + AgentCardSignature, + AgentCapabilities, + AgentSkill, Message, MessageSendParams, Part, @@ -23,6 +27,7 @@ build_text_artifact, create_task_obj, validate, + canonicalize_agent_card, ) @@ -45,6 +50,34 @@ 'type': 'task', } +SAMPLE_AGENT_CARD: dict[str, Any] = { + 'name': 'Test Agent', + 'description': 'A test agent', + 'url': 'http://localhost', + 'version': '1.0.0', + 'capabilities': AgentCapabilities( + streaming=None, + push_notifications=True, + ), + 'default_input_modes': ['text/plain'], + 'default_output_modes': ['text/plain'], + 'documentation_url': None, + 'icon_url': '', + 'skills': [ + AgentSkill( + id='skill1', + name='Test Skill', + description='A test skill', + tags=['test'], + ) + ], + 'signatures': [ + AgentCardSignature( + protected='protected_header', signature='test_signature' + ) + ], +} + # Test create_task_obj def test_create_task_obj(): @@ -328,3 +361,22 @@ def test_are_modalities_compatible_both_empty(): ) is True ) + + +def test_canonicalize_agent_card(): + """Test canonicalize_agent_card with defaults, optionals, and exceptions. + + - extensions is omitted as it's not set and optional. + - protocolVersion is included because it's always added by canonicalize_agent_card. + - signatures should be omitted. + """ + agent_card = AgentCard(**SAMPLE_AGENT_CARD) + expected_jcs = ( + '{"capabilities":{"pushNotifications":true},' + '"defaultInputModes":["text/plain"],"defaultOutputModes":["text/plain"],' + '"description":"A test agent","name":"Test Agent",' + '"skills":[{"description":"A test skill","id":"skill1","name":"Test Skill","tags":["test"]}],' + '"url":"http://localhost","version":"1.0.0"}' + ) + result = canonicalize_agent_card(agent_card) + assert result == expected_jcs diff --git a/tests/utils/test_proto_utils.py b/tests/utils/test_proto_utils.py index 33be1f3f..f68d5c10 100644 --- a/tests/utils/test_proto_utils.py +++ b/tests/utils/test_proto_utils.py @@ -108,6 +108,18 @@ def sample_agent_card() -> types.AgentCard: ) ), }, + signatures=[ + types.AgentCardSignature( + protected='protected_test', + signature='signature_test', + header={'alg': 'ES256'}, + ), + types.AgentCardSignature( + protected='protected_val', + signature='signature_val', + header={'alg': 'ES256', 'kid': 'unique-key-identifier-123'}, + ), + ], ) @@ -523,7 +535,7 @@ def test_task_conversion_roundtrip( assert roundtrip_task.status == types.TaskStatus( state=types.TaskState.working, message=sample_message ) - assert roundtrip_task.history == [sample_message] + assert roundtrip_task.history == sample_task.history assert roundtrip_task.artifacts == [ types.Artifact( artifact_id='art-1', @@ -536,3 +548,142 @@ def test_task_conversion_roundtrip( ) ] assert roundtrip_task.metadata == {'source': 'test'} + + def test_agent_card_conversion_roundtrip( + self, sample_agent_card: types.AgentCard + ): + """Test conversion of AgentCard to proto and back.""" + proto_card = proto_utils.ToProto.agent_card(sample_agent_card) + assert isinstance(proto_card, a2a_pb2.AgentCard) + + roundtrip_card = proto_utils.FromProto.agent_card(proto_card) + assert roundtrip_card.name == 'Test Agent' + assert roundtrip_card.description == 'A test agent' + assert roundtrip_card.url == 'http://localhost' + assert roundtrip_card.version == '1.0.0' + assert roundtrip_card.capabilities == types.AgentCapabilities( + extensions=[], streaming=True, push_notifications=True + ) + assert roundtrip_card.default_input_modes == ['text/plain'] + assert roundtrip_card.default_output_modes == ['text/plain'] + assert roundtrip_card.skills == [ + types.AgentSkill( + id='skill1', + name='Test Skill', + description='A test skill', + tags=['test'], + examples=[], + input_modes=[], + output_modes=[], + ) + ] + assert roundtrip_card.provider == types.AgentProvider( + organization='Test Org', url='http://test.org' + ) + assert roundtrip_card.security == [{'oauth_scheme': ['read', 'write']}] + + # Normalized version of security_schemes. None fields are filled with defaults. + expected_security_schemes = { + 'oauth_scheme': types.SecurityScheme( + root=types.OAuth2SecurityScheme( + description='', + flows=types.OAuthFlows( + client_credentials=types.ClientCredentialsOAuthFlow( + refresh_url='', + scopes={ + 'write': 'Write access', + 'read': 'Read access', + }, + token_url='http://token.url', + ), + ), + ) + ), + 'apiKey': types.SecurityScheme( + root=types.APIKeySecurityScheme( + description='', + in_=types.In.header, + name='X-API-KEY', + ) + ), + 'httpAuth': types.SecurityScheme( + root=types.HTTPAuthSecurityScheme( + bearer_format='', + description='', + scheme='bearer', + ) + ), + 'oidc': types.SecurityScheme( + root=types.OpenIdConnectSecurityScheme( + description='', + open_id_connect_url='http://oidc.url', + ) + ), + } + assert roundtrip_card.security_schemes == expected_security_schemes + assert roundtrip_card.signatures == [ + types.AgentCardSignature( + protected='protected_test', + signature='signature_test', + header={'alg': 'ES256'}, + ), + types.AgentCardSignature( + protected='protected_val', + signature='signature_val', + header={'alg': 'ES256', 'kid': 'unique-key-identifier-123'}, + ), + ] + + @pytest.mark.parametrize( + 'signature_data, expected_data', + [ + ( + types.AgentCardSignature( + protected='protected_val', + signature='signature_val', + header={'alg': 'ES256'}, + ), + types.AgentCardSignature( + protected='protected_val', + signature='signature_val', + header={'alg': 'ES256'}, + ), + ), + ( + types.AgentCardSignature( + protected='protected_val', + signature='signature_val', + header=None, + ), + types.AgentCardSignature( + protected='protected_val', + signature='signature_val', + header={}, + ), + ), + ( + types.AgentCardSignature( + protected='', + signature='', + header={}, + ), + types.AgentCardSignature( + protected='', + signature='', + header={}, + ), + ), + ], + ) + def test_agent_card_signature_conversion_roundtrip( + self, signature_data, expected_data + ): + """Test conversion of AgentCardSignature to proto and back.""" + proto_signature = proto_utils.ToProto.agent_card_signature( + signature_data + ) + assert isinstance(proto_signature, a2a_pb2.AgentCardSignature) + roundtrip_signature = proto_utils.FromProto.agent_card_signature( + proto_signature + ) + assert roundtrip_signature == expected_data diff --git a/tests/utils/test_signing.py b/tests/utils/test_signing.py new file mode 100644 index 00000000..9a843d34 --- /dev/null +++ b/tests/utils/test_signing.py @@ -0,0 +1,185 @@ +from a2a.types import ( + AgentCard, + AgentCapabilities, + AgentSkill, +) +from a2a.types import ( + AgentCard, + AgentCapabilities, + AgentSkill, + AgentCardSignature, +) +from a2a.utils import signing +from typing import Any +from jwt.utils import base64url_encode + +import pytest +from cryptography.hazmat.primitives import asymmetric + + +def create_key_provider(verification_key: str | bytes | dict[str, Any]): + """Creates a key provider function for testing.""" + + def key_provider(kid: str | None, jku: str | None): + return verification_key + + return key_provider + + +# Fixture for a complete sample AgentCard +@pytest.fixture +def sample_agent_card() -> AgentCard: + return AgentCard( + name='Test Agent', + description='A test agent', + url='http://localhost', + version='1.0.0', + capabilities=AgentCapabilities( + streaming=None, + push_notifications=True, + ), + default_input_modes=['text/plain'], + default_output_modes=['text/plain'], + documentation_url=None, + icon_url='', + skills=[ + AgentSkill( + id='skill1', + name='Test Skill', + description='A test skill', + tags=['test'], + ) + ], + ) + + +def test_signer_and_verifier_symmetric(sample_agent_card: AgentCard): + """Test the agent card signing and verification process with symmetric key encryption.""" + key = 'key12345' # Using a simple symmetric key for HS256 + wrong_key = 'wrongkey' + + agent_card_signer = signing.create_agent_card_signer( + signing_key=key, + protected_header={ + 'alg': 'HS384', + 'kid': 'key1', + 'jku': None, + 'typ': 'JOSE', + }, + ) + signed_card = agent_card_signer(sample_agent_card) + + assert signed_card.signatures is not None + assert len(signed_card.signatures) == 1 + signature = signed_card.signatures[0] + assert signature.protected is not None + assert signature.signature is not None + + # Verify the signature + verifier = signing.create_signature_verifier( + create_key_provider(key), ['HS256', 'HS384', 'ES256', 'RS256'] + ) + try: + verifier(signed_card) + except signing.InvalidSignaturesError: + pytest.fail('Signature verification failed with correct key') + + # Verify with wrong key + verifier_wrong_key = signing.create_signature_verifier( + create_key_provider(wrong_key), ['HS256', 'HS384', 'ES256', 'RS256'] + ) + with pytest.raises(signing.InvalidSignaturesError): + verifier_wrong_key(signed_card) + + +def test_signer_and_verifier_symmetric_multiple_signatures( + sample_agent_card: AgentCard, +): + """Test the agent card signing and verification process with symmetric key encryption. + This test adds a signatures to the AgentCard before signing.""" + encoded_header = base64url_encode( + b'{"alg": "HS256", "kid": "old_key"}' + ).decode('utf-8') + sample_agent_card.signatures = [ + AgentCardSignature(protected=encoded_header, signature='old_signature') + ] + key = 'key12345' # Using a simple symmetric key for HS256 + wrong_key = 'wrongkey' + + agent_card_signer = signing.create_agent_card_signer( + signing_key=key, + protected_header={ + 'alg': 'HS384', + 'kid': 'key1', + 'jku': None, + 'typ': 'JOSE', + }, + ) + signed_card = agent_card_signer(sample_agent_card) + + assert signed_card.signatures is not None + assert len(signed_card.signatures) == 2 + signature = signed_card.signatures[1] + assert signature.protected is not None + assert signature.signature is not None + + # Verify the signature + verifier = signing.create_signature_verifier( + create_key_provider(key), ['HS256', 'HS384', 'ES256', 'RS256'] + ) + try: + verifier(signed_card) + except signing.InvalidSignaturesError: + pytest.fail('Signature verification failed with correct key') + + # Verify with wrong key + verifier_wrong_key = signing.create_signature_verifier( + create_key_provider(wrong_key), ['HS256', 'HS384', 'ES256', 'RS256'] + ) + with pytest.raises(signing.InvalidSignaturesError): + verifier_wrong_key(signed_card) + + +def test_signer_and_verifier_asymmetric(sample_agent_card: AgentCard): + """Test the agent card signing and verification process with an asymmetric key encryption.""" + # Generate a dummy EC private key for ES256 + private_key = asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) + public_key = private_key.public_key() + # Generate another key pair for negative test + private_key_error = asymmetric.ec.generate_private_key( + asymmetric.ec.SECP256R1() + ) + public_key_error = private_key_error.public_key() + + agent_card_signer = signing.create_agent_card_signer( + signing_key=private_key, + protected_header={ + 'alg': 'ES256', + 'kid': 'key2', + 'jku': None, + 'typ': 'JOSE', + }, + ) + signed_card = agent_card_signer(sample_agent_card) + + assert signed_card.signatures is not None + assert len(signed_card.signatures) == 1 + signature = signed_card.signatures[0] + assert signature.protected is not None + assert signature.signature is not None + + verifier = signing.create_signature_verifier( + create_key_provider(public_key), ['HS256', 'HS384', 'ES256', 'RS256'] + ) + try: + verifier(signed_card) + except signing.InvalidSignaturesError: + pytest.fail('Signature verification failed with correct key') + + # Verify with wrong key + verifier_wrong_key = signing.create_signature_verifier( + create_key_provider(public_key_error), + ['HS256', 'HS384', 'ES256', 'RS256'], + ) + with pytest.raises(signing.InvalidSignaturesError): + verifier_wrong_key(signed_card)