From 1a973f0944f37916466ace39cd678318dec00255 Mon Sep 17 00:00:00 2001 From: sokoliva Date: Fri, 5 Dec 2025 16:52:29 +0000 Subject: [PATCH 01/11] feat: Add Agent Card Signature support --- pyproject.toml | 2 + src/a2a/client/base_client.py | 9 +- src/a2a/client/client.py | 1 + src/a2a/client/transports/base.py | 3 +- src/a2a/client/transports/grpc.py | 6 +- src/a2a/client/transports/jsonrpc.py | 14 +- src/a2a/client/transports/rest.py | 9 +- src/a2a/utils/proto_utils.py | 28 ++ src/a2a/utils/signing.py | 161 ++++++++++ .../test_client_server_integration.py | 280 +++++++++++++++++- tests/utils/test_proto_utils.py | 153 +++++++++- tests/utils/test_signing.py | 145 +++++++++ 12 files changed, 800 insertions(+), 11 deletions(-) create mode 100644 src/a2a/utils/signing.py create mode 100644 tests/utils/test_signing.py diff --git a/pyproject.toml b/pyproject.toml index 46f7400a..44c53679 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ telemetry = ["opentelemetry-api>=1.33.0", "opentelemetry-sdk>=1.33.0"] postgresql = ["sqlalchemy[asyncio,postgresql-asyncpg]>=2.0.0"] mysql = ["sqlalchemy[asyncio,aiomysql]>=2.0.0"] sqlite = ["sqlalchemy[asyncio,aiosqlite]>=2.0.0"] +signing = ["python-jose>=3.0.0"] sql = ["a2a-sdk[postgresql,mysql,sqlite]"] @@ -45,6 +46,7 @@ all = [ "a2a-sdk[encryption]", "a2a-sdk[grpc]", "a2a-sdk[telemetry]", + "a2a-sdk[signing]", ] [project.urls] diff --git a/src/a2a/client/base_client.py b/src/a2a/client/base_client.py index fac7ecad..1a92d410 100644 --- a/src/a2a/client/base_client.py +++ b/src/a2a/client/base_client.py @@ -1,4 +1,4 @@ -from collections.abc import AsyncIterator +from collections.abc import AsyncIterator, Callable from typing import Any from a2a.client.client import ( @@ -261,6 +261,7 @@ async def get_card( *, context: ClientCallContext | None = None, extensions: list[str] | None = None, + signature_verifier: Callable[[AgentCard], None] | None = None, ) -> AgentCard: """Retrieves the agent's card. @@ -270,12 +271,16 @@ async def get_card( Args: context: The client call context. extensions: List of extensions to be activated. + key_provider: A callable that takes key-id (kid) and JSON web key url (jku) + and returns the verification key for signature verification. Returns: The `AgentCard` for the agent. """ card = await self._transport.get_card( - context=context, extensions=extensions + context=context, + extensions=extensions, + signature_verifier=signature_verifier, ) self._card = card return card diff --git a/src/a2a/client/client.py b/src/a2a/client/client.py index fd97b4d1..286641a7 100644 --- a/src/a2a/client/client.py +++ b/src/a2a/client/client.py @@ -185,6 +185,7 @@ async def get_card( *, context: ClientCallContext | None = None, extensions: list[str] | None = None, + signature_verifier: Callable[[AgentCard], None] | None = None, ) -> AgentCard: """Retrieves the agent's card.""" diff --git a/src/a2a/client/transports/base.py b/src/a2a/client/transports/base.py index 8f114d95..0c54a28d 100644 --- a/src/a2a/client/transports/base.py +++ b/src/a2a/client/transports/base.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from collections.abc import AsyncGenerator +from collections.abc import AsyncGenerator, Callable from a2a.client.middleware import ClientCallContext from a2a.types import ( @@ -103,6 +103,7 @@ async def get_card( *, context: ClientCallContext | None = None, extensions: list[str] | None = None, + signature_verifier: Callable[[AgentCard], None] | None = None, ) -> AgentCard: """Retrieves the AgentCard.""" diff --git a/src/a2a/client/transports/grpc.py b/src/a2a/client/transports/grpc.py index 4e27953a..c5edf7a1 100644 --- a/src/a2a/client/transports/grpc.py +++ b/src/a2a/client/transports/grpc.py @@ -1,6 +1,6 @@ import logging -from collections.abc import AsyncGenerator +from collections.abc import AsyncGenerator, Callable try: @@ -223,6 +223,7 @@ async def get_card( *, context: ClientCallContext | None = None, extensions: list[str] | None = None, + signature_verifier: Callable[[AgentCard], None] | None = None, ) -> AgentCard: """Retrieves the agent's card.""" card = self.agent_card @@ -236,6 +237,9 @@ async def get_card( metadata=self._get_grpc_metadata(extensions), ) card = proto_utils.FromProto.agent_card(card_pb) + if signature_verifier is not None: + signature_verifier(card) + self.agent_card = card self._needs_extended_card = False return card diff --git a/src/a2a/client/transports/jsonrpc.py b/src/a2a/client/transports/jsonrpc.py index 6cce1eff..a5529c68 100644 --- a/src/a2a/client/transports/jsonrpc.py +++ b/src/a2a/client/transports/jsonrpc.py @@ -1,7 +1,7 @@ import json import logging -from collections.abc import AsyncGenerator +from collections.abc import AsyncGenerator, Callable from typing import Any from uuid import uuid4 @@ -376,6 +376,7 @@ async def get_card( *, context: ClientCallContext | None = None, extensions: list[str] | None = None, + signature_verifier: Callable[[AgentCard], None] | None = None, ) -> AgentCard: """Retrieves the agent's card.""" modified_kwargs = update_extension_header( @@ -383,9 +384,12 @@ async def get_card( extensions if extensions is not None else self.extensions, ) card = self.agent_card + if not card: resolver = A2ACardResolver(self.httpx_client, self.url) card = await resolver.get_agent_card(http_kwargs=modified_kwargs) + if signature_verifier is not None: + signature_verifier(card) self._needs_extended_card = ( card.supports_authenticated_extended_card ) @@ -410,9 +414,13 @@ async def get_card( ) if isinstance(response.root, JSONRPCErrorResponse): raise A2AClientJSONRPCError(response.root) - self.agent_card = response.root.result + card = response.root.result + if signature_verifier is not None: + signature_verifier(card) + + self.agent_card = card self._needs_extended_card = False - return self.agent_card + return card async def close(self) -> None: """Closes the httpx client.""" diff --git a/src/a2a/client/transports/rest.py b/src/a2a/client/transports/rest.py index 948f3f35..9c78a07f 100644 --- a/src/a2a/client/transports/rest.py +++ b/src/a2a/client/transports/rest.py @@ -1,7 +1,7 @@ import json import logging -from collections.abc import AsyncGenerator +from collections.abc import AsyncGenerator, Callable from typing import Any import httpx @@ -368,6 +368,7 @@ async def get_card( *, context: ClientCallContext | None = None, extensions: list[str] | None = None, + signature_verifier: Callable[[AgentCard], None] | None = None, ) -> AgentCard: """Retrieves the agent's card.""" modified_kwargs = update_extension_header( @@ -375,9 +376,12 @@ async def get_card( extensions if extensions is not None else self.extensions, ) card = self.agent_card + if not card: resolver = A2ACardResolver(self.httpx_client, self.url) card = await resolver.get_agent_card(http_kwargs=modified_kwargs) + if signature_verifier is not None: + signature_verifier(card) self._needs_extended_card = ( card.supports_authenticated_extended_card ) @@ -395,6 +399,9 @@ async def get_card( '/v1/card', {}, modified_kwargs ) card = AgentCard.model_validate(response_data) + if signature_verifier is not None: + signature_verifier(card) + self.agent_card = card self._needs_extended_card = False return card diff --git a/src/a2a/utils/proto_utils.py b/src/a2a/utils/proto_utils.py index 8bf01eea..14ac098d 100644 --- a/src/a2a/utils/proto_utils.py +++ b/src/a2a/utils/proto_utils.py @@ -397,6 +397,21 @@ def agent_card( ] if card.additional_interfaces else None, + signatures=[cls.agent_card_signature(x) for x in card.signatures] + if card.signatures + else None, + ) + + @classmethod + def agent_card_signature( + cls, signature: types.AgentCardSignature + ) -> a2a_pb2.AgentCardSignature: + return a2a_pb2.AgentCardSignature( + protected=signature.protected, + signature=signature.signature, + header=dict_to_struct(signature.header) + if signature.header is not None + else None, ) @classmethod @@ -865,6 +880,19 @@ def agent_card( ] if card.additional_interfaces else None, + signatures=[cls.agent_card_signature(x) for x in card.signatures] + if card.signatures + else None, + ) + + @classmethod + def agent_card_signature( + cls, signature: a2a_pb2.AgentCardSignature + ) -> types.AgentCardSignature: + return types.AgentCardSignature( + protected=signature.protected, + signature=signature.signature, + header=json_format.MessageToDict(signature.header), ) @classmethod diff --git a/src/a2a/utils/signing.py b/src/a2a/utils/signing.py new file mode 100644 index 00000000..cd8dece6 --- /dev/null +++ b/src/a2a/utils/signing.py @@ -0,0 +1,161 @@ +import json + +from collections.abc import Callable +from typing import Any + + +try: + from jose import jws + from jose.backends.base import Key + from jose.exceptions import JOSEError + from jose.utils import base64url_decode, base64url_encode +except ImportError as e: + raise ImportError( + 'A2AUtilsSigning requires python-jose to be installed. ' + 'Install with: ' + "'pip install a2a-sdk[signing]'" + ) from e + +from a2a.types import AgentCard, AgentCardSignature + + +def clean_empty(d: Any) -> Any: + """Recursively remove empty lists, dicts, strings, and None values from a dictionary.""" + if isinstance(d, dict): + cleaned = {k: clean_empty(v) for k, v in d.items()} + return { + k: v + for k, v in cleaned.items() + if v is not None and (isinstance(v, (bool, int, float)) or v) + } + if isinstance(d, list): + cleaned = [clean_empty(v) for v in d] + return [ + v + for v in cleaned + if v is not None and (isinstance(v, (bool, int, float)) or v) + ] + return d if d not in [None, '', [], {}] else None + + +def canonicalize_agent_card(agent_card: AgentCard) -> str: + """Canonicalizes the Agent Card JSON according to RFC 8785 (JCS).""" + card_dict = agent_card.model_dump( + exclude={'signatures'}, + exclude_defaults=True, + by_alias=True, + ) + # Ensure 'protocol_version' is always included + protocol_version_alias = ( + AgentCard.model_fields['protocol_version'].alias or 'protocol_version' + ) + if protocol_version_alias not in card_dict: + card_dict[protocol_version_alias] = agent_card.protocol_version + + # Recursively remove empty/None values + cleaned_dict = clean_empty(card_dict) + + return json.dumps(cleaned_dict, separators=(',', ':'), sort_keys=True) + + +def create_agent_card_signer( + signing_key: str | bytes | dict[str, Any] | Key, + kid: str, + alg: str = 'HS256', + jku: str | None = None, +) -> Callable[[AgentCard], AgentCard]: + """Creates a function that signs an AgentCard and adds the signature. + + Args: + signing_key: The private key for signing. + kid: Key ID for the signing key. + alg: The algorithm to use (e.g., "ES256", "RS256"). + jku: Optional URL to the JWKS. + + Returns: + A callable that takes an AgentCard and returns the modified AgentCard with a signature. + """ + + def agent_card_signer(agent_card: AgentCard) -> AgentCard: + """The actual card_modifier function.""" + canonical_payload = canonicalize_agent_card(agent_card) + + headers = {'kid': kid, 'typ': 'JOSE'} + if jku: + headers['jku'] = jku + + jws_string = jws.sign( + payload=canonical_payload.encode('utf-8'), + key=signing_key, + headers=headers, + algorithm=alg, + ) + + # The result of jws.sign is a compact serialization: HEADER.PAYLOAD.SIGNATURE + protected_header, _, signature = jws_string.split('.') + + agent_card_signature = AgentCardSignature( + protected=protected_header, + signature=signature, + ) + + agent_card.signatures = (agent_card.signatures or []) + [ + agent_card_signature + ] + return agent_card + + return agent_card_signer + + +def create_signature_verifier( + key_provider: Callable[ + [str | None, str | None], str | bytes | dict[str, Any] | Key + ], +) -> Callable[[AgentCard], None]: + """Creates a function that verifies AgentCard signatures. + + Args: + key_provider: A callable that takes key-id (kid) and JSON web key url (jku) and returns the verification key. + + Returns: + A callable that takes an AgentCard, and raises an error if none of the signatures are valid. + """ + + def signature_verifier( + agent_card: AgentCard, + ) -> None: + """The actual signature_verifier function.""" + if not agent_card.signatures: + raise JOSEError('No signatures found on AgentCard') + + last_error = None + for agent_card_signature in agent_card.signatures: + try: + # fetch kid and jku from protected header + protected_header_json = base64url_decode( + agent_card_signature.protected.encode('utf-8') + ).decode('utf-8') + protected_header = json.loads(protected_header_json) + kid = protected_header.get('kid') + jku = protected_header.get('jku') + verification_key = key_provider(kid, jku) + + canonical_payload = canonicalize_agent_card(agent_card) + encoded_payload = base64url_encode( + canonical_payload.encode('utf-8') + ).decode('utf-8') + token = f'{agent_card_signature.protected}.{encoded_payload}.{agent_card_signature.signature}' + + jws.verify( + token=token, + key=verification_key, + algorithms=None, + ) + return # Found a valid signature + + except JOSEError as e: + last_error = e + continue + raise JOSEError('No valid signature found') from last_error + + return signature_verifier diff --git a/tests/integration/test_client_server_integration.py b/tests/integration/test_client_server_integration.py index e0a564ee..9cc2844d 100644 --- a/tests/integration/test_client_server_integration.py +++ b/tests/integration/test_client_server_integration.py @@ -1,6 +1,6 @@ import asyncio from collections.abc import AsyncGenerator -from typing import NamedTuple +from typing import NamedTuple, Any from unittest.mock import ANY, AsyncMock, patch import grpc @@ -8,6 +8,7 @@ import pytest import pytest_asyncio from grpc.aio import Channel +from jose.backends.base import Key from a2a.client import ClientConfig from a2a.client.base_client import BaseClient @@ -17,6 +18,10 @@ from a2a.grpc import a2a_pb2_grpc from a2a.server.apps import A2AFastAPIApplication, A2ARESTFastAPIApplication from a2a.server.request_handlers import GrpcHandler, RequestHandler +from a2a.utils.signing import ( + create_agent_card_signer, + create_signature_verifier, +) from a2a.types import ( AgentCapabilities, AgentCard, @@ -37,6 +42,7 @@ TextPart, TransportProtocol, ) +from cryptography.hazmat.primitives import asymmetric # --- Test Constants --- @@ -83,6 +89,15 @@ ) +def create_key_provider(verification_key: str | bytes | dict[str, Any] | Key): + """Creates a key provider function for testing.""" + + def key_provider(kid: str | None, jku: str | None): + return verification_key + + return key_provider + + # --- Test Fixtures --- @@ -772,7 +787,7 @@ def channel_factory(address: str) -> Channel: @pytest.mark.asyncio -async def test_base_client_sends_message_with_extensions( +async def test_json_transport_base_client_send_message_with_extensions( jsonrpc_setup: TransportSetup, agent_card: AgentCard ) -> None: """ @@ -827,3 +842,264 @@ async def test_base_client_sends_message_with_extensions( if hasattr(transport, 'close'): await transport.close() + + +@pytest.mark.asyncio +async def test_json_transport_get_signed_base_card_no_initial( + jsonrpc_setup: TransportSetup, agent_card: AgentCard +) -> None: + """Tests fetching and verifying a symmetrically signed AgentCard via JSON-RPC. + + The client transport is initialized without a card, forcing it to fetch + the base card from the server. The server signs the card using HS384. + The client then verifies the signature. + """ + mock_request_handler = jsonrpc_setup.handler + agent_card.supports_authenticated_extended_card = False + + # Setup signing on the server side + key = 'key12345' + signer = create_agent_card_signer( + signing_key=key, alg='HS384', kid='testkey' + ) + + app_builder = A2AFastAPIApplication( + agent_card, + mock_request_handler, + card_modifier=signer, # Sign the base card + ) + app = app_builder.build() + httpx_client = httpx.AsyncClient(transport=httpx.ASGITransport(app=app)) + + transport = JsonRpcTransport( + httpx_client=httpx_client, + url=agent_card.url, + agent_card=None, + ) + + # Get the card, this will trigger verification in get_card + signature_verifier = create_signature_verifier(create_key_provider(key)) + result = await transport.get_card(signature_verifier=signature_verifier) + assert result.name == agent_card.name + assert result.signatures is not None + assert len(result.signatures) == 1 + assert transport.agent_card.name == agent_card.name + assert transport._needs_extended_card is False + + if hasattr(transport, 'close'): + await transport.close() + + +@pytest.mark.asyncio +async def test_json_transport_get_signed_extended_card( + jsonrpc_setup: TransportSetup, agent_card: AgentCard +) -> None: + """Tests fetching and verifying an asymmetrically signed extended AgentCard via JSON-RPC. + + The client has a base card and fetches the extended card, which is signed + by the server using ES256. The client verifies the signature on the + received extended card. + """ + mock_request_handler = jsonrpc_setup.handler + agent_card.supports_authenticated_extended_card = True + extended_agent_card = agent_card.model_copy(deep=True) + extended_agent_card.name = 'Extended Agent Card' + + # Setup signing on the server side + private_key = asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) + public_key = private_key.public_key() + signer = create_agent_card_signer( + signing_key=private_key, alg='ES256', kid='testkey' + ) + + app_builder = A2AFastAPIApplication( + agent_card, + mock_request_handler, + extended_agent_card=extended_agent_card, + extended_card_modifier=lambda card, ctx: signer( + card + ), # Sign the extended card + ) + app = app_builder.build() + httpx_client = httpx.AsyncClient(transport=httpx.ASGITransport(app=app)) + + transport = JsonRpcTransport( + httpx_client=httpx_client, agent_card=agent_card + ) + + # Get the card, this will trigger verification in get_card + signature_verifier = create_signature_verifier( + create_key_provider(public_key) + ) + result = await transport.get_card(signature_verifier=signature_verifier) + assert result.name == extended_agent_card.name + assert result.signatures is not None + assert len(result.signatures) == 1 + assert transport.agent_card.name == extended_agent_card.name + assert transport._needs_extended_card is False + + if hasattr(transport, 'close'): + await transport.close() + + +@pytest.mark.asyncio +async def test_json_transport_get_signed_base_and_extended_cards( + jsonrpc_setup: TransportSetup, agent_card: AgentCard +) -> None: + """Tests fetching and verifying both base and extended cards via JSON-RPC when no card is initially provided. + + The client starts with no card. It first fetches the base card, which is + signed. It then fetches the extended card, which is also signed. Both signatures + are verified independently upon retrieval. + """ + mock_request_handler = jsonrpc_setup.handler + assert agent_card.signatures is None + agent_card.supports_authenticated_extended_card = True + extended_agent_card = agent_card.model_copy(deep=True) + extended_agent_card.name = 'Extended Agent Card' + + # Setup signing on the server side + private_key = asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) + public_key = private_key.public_key() + signer = create_agent_card_signer( + signing_key=private_key, alg='ES256', kid='testkey' + ) + + app_builder = A2AFastAPIApplication( + agent_card, + mock_request_handler, + extended_agent_card=extended_agent_card, + card_modifier=signer, # Sign the base card + extended_card_modifier=lambda card, ctx: signer( + card + ), # Sign the extended card + ) + app = app_builder.build() + httpx_client = httpx.AsyncClient(transport=httpx.ASGITransport(app=app)) + + transport = JsonRpcTransport( + httpx_client=httpx_client, + url=agent_card.url, + agent_card=None, + ) + + # Get the card, this will trigger verification in get_card + signature_verifier = create_signature_verifier( + create_key_provider(public_key) + ) + result = await transport.get_card(signature_verifier=signature_verifier) + assert result.name == extended_agent_card.name + assert result.signatures is not None + assert len(result.signatures) == 1 + assert transport.agent_card.name == extended_agent_card.name + assert transport._needs_extended_card is False + + if hasattr(transport, 'close'): + await transport.close() + + +@pytest.mark.asyncio +async def test_rest_transport_get_signed_card( + rest_setup: TransportSetup, agent_card: AgentCard +) -> None: + """Tests fetching and verifying signed base and extended cards via REST. + + The client starts with no card. It first fetches the base card, which is + signed. It then fetches the extended card, which is also signed. Both signatures + are verified independently upon retrieval. + """ + mock_request_handler = rest_setup.handler + agent_card.supports_authenticated_extended_card = True + extended_agent_card = agent_card.model_copy(deep=True) + extended_agent_card.name = 'Extended Agent Card' + + # Setup signing on the server side + private_key = asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) + public_key = private_key.public_key() + signer = create_agent_card_signer( + signing_key=private_key, alg='ES256', kid='testkey' + ) + + app_builder = A2ARESTFastAPIApplication( + agent_card, + mock_request_handler, + extended_agent_card=extended_agent_card, + card_modifier=signer, # Sign the base card + extended_card_modifier=lambda card, ctx: signer( + card + ), # Sign the extended card + ) + app = app_builder.build() + httpx_client = httpx.AsyncClient(transport=httpx.ASGITransport(app=app)) + + transport = RestTransport( + httpx_client=httpx_client, + url=agent_card.url, + agent_card=None, + ) + + # Get the card, this will trigger verification in get_card + signature_verifier = create_signature_verifier( + create_key_provider(public_key) + ) + result = await transport.get_card(signature_verifier=signature_verifier) + assert result.name == extended_agent_card.name + assert result.signatures is not None + assert len(result.signatures) == 1 + assert transport.agent_card.name == extended_agent_card.name + assert transport._needs_extended_card is False + + if hasattr(transport, 'close'): + await transport.close() + + +@pytest.mark.asyncio +async def test_grpc_transport_get_signed_card( + mock_request_handler: AsyncMock, agent_card: AgentCard +) -> None: + """Tests fetching and verifying a signed AgentCard via gRPC.""" + # Setup signing on the server side + agent_card.supports_authenticated_extended_card = True + + private_key = asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) + public_key = private_key.public_key() + signer = create_agent_card_signer( + signing_key=private_key, alg='ES256', kid='testkey' + ) + + server = grpc.aio.server() + port = server.add_insecure_port('[::]:0') + server_address = f'localhost:{port}' + agent_card.url = server_address + + servicer = GrpcHandler( + agent_card, + mock_request_handler, + card_modifier=signer, + ) + a2a_pb2_grpc.add_A2AServiceServicer_to_server(servicer, server) + await server.start() + + transport = None # Initialize transport + try: + + def channel_factory(address: str) -> Channel: + return grpc.aio.insecure_channel(address) + + channel = channel_factory(server_address) + transport = GrpcTransport(channel=channel, agent_card=agent_card) + transport.agent_card = None + assert transport._needs_extended_card is True + + # Get the card, this will trigger verification in get_card + signature_verifier = create_signature_verifier( + create_key_provider(public_key) + ) + result = await transport.get_card(signature_verifier=signature_verifier) + assert result.signatures is not None + assert len(result.signatures) == 1 + assert transport._needs_extended_card is False + finally: + if transport: + await transport.close() + await server.stop(0) # Gracefully stop the server diff --git a/tests/utils/test_proto_utils.py b/tests/utils/test_proto_utils.py index 33be1f3f..f68d5c10 100644 --- a/tests/utils/test_proto_utils.py +++ b/tests/utils/test_proto_utils.py @@ -108,6 +108,18 @@ def sample_agent_card() -> types.AgentCard: ) ), }, + signatures=[ + types.AgentCardSignature( + protected='protected_test', + signature='signature_test', + header={'alg': 'ES256'}, + ), + types.AgentCardSignature( + protected='protected_val', + signature='signature_val', + header={'alg': 'ES256', 'kid': 'unique-key-identifier-123'}, + ), + ], ) @@ -523,7 +535,7 @@ def test_task_conversion_roundtrip( assert roundtrip_task.status == types.TaskStatus( state=types.TaskState.working, message=sample_message ) - assert roundtrip_task.history == [sample_message] + assert roundtrip_task.history == sample_task.history assert roundtrip_task.artifacts == [ types.Artifact( artifact_id='art-1', @@ -536,3 +548,142 @@ def test_task_conversion_roundtrip( ) ] assert roundtrip_task.metadata == {'source': 'test'} + + def test_agent_card_conversion_roundtrip( + self, sample_agent_card: types.AgentCard + ): + """Test conversion of AgentCard to proto and back.""" + proto_card = proto_utils.ToProto.agent_card(sample_agent_card) + assert isinstance(proto_card, a2a_pb2.AgentCard) + + roundtrip_card = proto_utils.FromProto.agent_card(proto_card) + assert roundtrip_card.name == 'Test Agent' + assert roundtrip_card.description == 'A test agent' + assert roundtrip_card.url == 'http://localhost' + assert roundtrip_card.version == '1.0.0' + assert roundtrip_card.capabilities == types.AgentCapabilities( + extensions=[], streaming=True, push_notifications=True + ) + assert roundtrip_card.default_input_modes == ['text/plain'] + assert roundtrip_card.default_output_modes == ['text/plain'] + assert roundtrip_card.skills == [ + types.AgentSkill( + id='skill1', + name='Test Skill', + description='A test skill', + tags=['test'], + examples=[], + input_modes=[], + output_modes=[], + ) + ] + assert roundtrip_card.provider == types.AgentProvider( + organization='Test Org', url='http://test.org' + ) + assert roundtrip_card.security == [{'oauth_scheme': ['read', 'write']}] + + # Normalized version of security_schemes. None fields are filled with defaults. + expected_security_schemes = { + 'oauth_scheme': types.SecurityScheme( + root=types.OAuth2SecurityScheme( + description='', + flows=types.OAuthFlows( + client_credentials=types.ClientCredentialsOAuthFlow( + refresh_url='', + scopes={ + 'write': 'Write access', + 'read': 'Read access', + }, + token_url='http://token.url', + ), + ), + ) + ), + 'apiKey': types.SecurityScheme( + root=types.APIKeySecurityScheme( + description='', + in_=types.In.header, + name='X-API-KEY', + ) + ), + 'httpAuth': types.SecurityScheme( + root=types.HTTPAuthSecurityScheme( + bearer_format='', + description='', + scheme='bearer', + ) + ), + 'oidc': types.SecurityScheme( + root=types.OpenIdConnectSecurityScheme( + description='', + open_id_connect_url='http://oidc.url', + ) + ), + } + assert roundtrip_card.security_schemes == expected_security_schemes + assert roundtrip_card.signatures == [ + types.AgentCardSignature( + protected='protected_test', + signature='signature_test', + header={'alg': 'ES256'}, + ), + types.AgentCardSignature( + protected='protected_val', + signature='signature_val', + header={'alg': 'ES256', 'kid': 'unique-key-identifier-123'}, + ), + ] + + @pytest.mark.parametrize( + 'signature_data, expected_data', + [ + ( + types.AgentCardSignature( + protected='protected_val', + signature='signature_val', + header={'alg': 'ES256'}, + ), + types.AgentCardSignature( + protected='protected_val', + signature='signature_val', + header={'alg': 'ES256'}, + ), + ), + ( + types.AgentCardSignature( + protected='protected_val', + signature='signature_val', + header=None, + ), + types.AgentCardSignature( + protected='protected_val', + signature='signature_val', + header={}, + ), + ), + ( + types.AgentCardSignature( + protected='', + signature='', + header={}, + ), + types.AgentCardSignature( + protected='', + signature='', + header={}, + ), + ), + ], + ) + def test_agent_card_signature_conversion_roundtrip( + self, signature_data, expected_data + ): + """Test conversion of AgentCardSignature to proto and back.""" + proto_signature = proto_utils.ToProto.agent_card_signature( + signature_data + ) + assert isinstance(proto_signature, a2a_pb2.AgentCardSignature) + roundtrip_signature = proto_utils.FromProto.agent_card_signature( + proto_signature + ) + assert roundtrip_signature == expected_data diff --git a/tests/utils/test_signing.py b/tests/utils/test_signing.py new file mode 100644 index 00000000..22d648b8 --- /dev/null +++ b/tests/utils/test_signing.py @@ -0,0 +1,145 @@ +from a2a.types import ( + AgentCard, + AgentCapabilities, + AgentSkill, +) +from a2a.types import ( + AgentCard, + AgentCapabilities, + AgentSkill, +) +from a2a.utils.signing import ( + canonicalize_agent_card, + create_agent_card_signer, + create_signature_verifier, +) +from typing import Any +from jose.backends.base import Key +from jose.exceptions import JOSEError + +import pytest +from cryptography.hazmat.primitives import asymmetric + + +def create_key_provider(verification_key: str | bytes | dict[str, Any] | Key): + """Creates a key provider function for testing.""" + + def key_provider(kid: str | None, jku: str | None): + return verification_key + + return key_provider + + +# Fixture for a complete sample AgentCard +@pytest.fixture +def sample_agent_card() -> AgentCard: + return AgentCard( + name='Test Agent', + description='A test agent', + url='http://localhost', + version='1.0.0', + capabilities=AgentCapabilities( + streaming=None, + push_notifications=True, + ), + default_input_modes=['text/plain'], + default_output_modes=['text/plain'], + skills=[ + AgentSkill( + id='skill1', + name='Test Skill', + description='A test skill', + tags=['test'], + ) + ], + ) + + +def test_signer_and_verifier_symmetric(sample_agent_card: AgentCard): + """Test the agent card signing and verification process with symmetric key encryption.""" + key = 'key12345' # Using a simple symmetric key for HS256 + wrong_key = 'wrongkey' + + agent_card_signer = create_agent_card_signer( + signing_key=key, alg='HS384', kid='key1' + ) + signed_card = agent_card_signer(sample_agent_card) + + assert signed_card.signatures is not None + assert len(signed_card.signatures) == 1 + signature = signed_card.signatures[0] + assert signature.protected is not None + assert signature.signature is not None + + # Verify the signature + verifier = create_signature_verifier(create_key_provider(key)) + try: + verifier(signed_card) + except JOSEError: + pytest.fail('Signature verification failed with correct key') + + # Verify with wrong key + verifier_wrong_key = create_signature_verifier( + create_key_provider(wrong_key) + ) + with pytest.raises(JOSEError): + verifier_wrong_key(signed_card) + + +def test_signer_and_verifier_asymmetric(sample_agent_card: AgentCard): + """Test the agent card signing and verification process with an asymmetric key encryption.""" + # Generate a dummy EC private key for ES256 + private_key = asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) + public_key = private_key.public_key() + # Generate another key pair for negative test + private_key_error = asymmetric.ec.generate_private_key( + asymmetric.ec.SECP256R1() + ) + public_key_error = private_key_error.public_key() + + agent_card_signer = create_agent_card_signer( + signing_key=private_key, alg='ES256', kid='key1' + ) + signed_card = agent_card_signer(sample_agent_card) + + assert signed_card.signatures is not None + assert len(signed_card.signatures) == 1 + signature = signed_card.signatures[0] + assert signature.protected is not None + assert signature.signature is not None + + verifier = create_signature_verifier(create_key_provider(public_key)) + try: + verifier(signed_card) + except JOSEError: + pytest.fail('Signature verification failed with correct key') + + # Verify with wrong key + verifier_wrong_key = create_signature_verifier( + create_key_provider(public_key_error) + ) + with pytest.raises(JOSEError): + verifier_wrong_key(signed_card) + + +def test_canonicalize_agent_card( + sample_agent_card: AgentCard, +): + """Test canonicalize_agent_card with defaults, optionals, and exceptions. + + - extensions is omitted as it's not set and optional. + - protocolVersion is included because it's always added by canonicalize_agent_card. + - signatures should be omitted. + """ + sample_agent_card.signatures = ( + [{'protected': 'protected_header', 'signature': 'test_signature'}], + ) + expected_jcs = ( + '{"capabilities":{"pushNotifications":true},' + '"defaultInputModes":["text/plain"],"defaultOutputModes":["text/plain"],' + '"description":"A test agent","name":"Test Agent","protocolVersion":"0.3.0",' + '"skills":[{"description":"A test skill","id":"skill1","name":"Test Skill","tags":["test"]}],' + '"url":"http://localhost","version":"1.0.0"}' + ) + result = canonicalize_agent_card(sample_agent_card) + assert result == expected_jcs From a55e4f78c7de11728ea9a15d2ebcdb160004ae69 Mon Sep 17 00:00:00 2001 From: sokoliva Date: Mon, 8 Dec 2025 12:50:39 +0000 Subject: [PATCH 02/11] refactor: add argument description to get_card method, improve readability of clean_empty by declaring types, add a multiple signatures test to test_signing.py --- src/a2a/client/base_client.py | 3 +-- src/a2a/utils/signing.py | 20 +++++++++-------- tests/utils/test_signing.py | 42 +++++++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 11 deletions(-) diff --git a/src/a2a/client/base_client.py b/src/a2a/client/base_client.py index 1a92d410..c870f329 100644 --- a/src/a2a/client/base_client.py +++ b/src/a2a/client/base_client.py @@ -271,8 +271,7 @@ async def get_card( Args: context: The client call context. extensions: List of extensions to be activated. - key_provider: A callable that takes key-id (kid) and JSON web key url (jku) - and returns the verification key for signature verification. + signature_verifier: A callable used to verify the agent card's signatures. Returns: The `AgentCard` for the agent. diff --git a/src/a2a/utils/signing.py b/src/a2a/utils/signing.py index cd8dece6..a2a1c368 100644 --- a/src/a2a/utils/signing.py +++ b/src/a2a/utils/signing.py @@ -20,22 +20,22 @@ def clean_empty(d: Any) -> Any: - """Recursively remove empty lists, dicts, strings, and None values from a dictionary.""" + """Recursively remove empty strings, lists, dicts, and None values from a dictionary.""" if isinstance(d, dict): - cleaned = {k: clean_empty(v) for k, v in d.items()} + cleaned_dict: dict[Any, Any] = {k: clean_empty(v) for k, v in d.items()} return { k: v - for k, v in cleaned.items() + for k, v in cleaned_dict.items() if v is not None and (isinstance(v, (bool, int, float)) or v) } if isinstance(d, list): - cleaned = [clean_empty(v) for v in d] + cleaned_list: list[Any] = [clean_empty(v) for v in d] return [ v - for v in cleaned + for v in cleaned_list if v is not None and (isinstance(v, (bool, int, float)) or v) ] - return d if d not in [None, '', [], {}] else None + return d if d not in ['', [], {}, None] else None def canonicalize_agent_card(agent_card: AgentCard) -> str: @@ -151,11 +151,13 @@ def signature_verifier( key=verification_key, algorithms=None, ) - return # Found a valid signature - + # Found a valid signature, exit the loop and function + break except JOSEError as e: last_error = e continue - raise JOSEError('No valid signature found') from last_error + else: + # This block runs only if the loop completes without a break + raise JOSEError('No valid signature found') from last_error return signature_verifier diff --git a/tests/utils/test_signing.py b/tests/utils/test_signing.py index 22d648b8..938558b5 100644 --- a/tests/utils/test_signing.py +++ b/tests/utils/test_signing.py @@ -7,6 +7,7 @@ AgentCard, AgentCapabilities, AgentSkill, + AgentCardSignature, ) from a2a.utils.signing import ( canonicalize_agent_card, @@ -16,6 +17,7 @@ from typing import Any from jose.backends.base import Key from jose.exceptions import JOSEError +from jose.utils import base64url_encode import pytest from cryptography.hazmat.primitives import asymmetric @@ -86,6 +88,46 @@ def test_signer_and_verifier_symmetric(sample_agent_card: AgentCard): verifier_wrong_key(signed_card) +def test_signer_and_verifier_symmetric_multiple_signatures( + sample_agent_card: AgentCard, +): + """Test the agent card signing and verification process with symmetric key encryption. + This test adds a signatures to the AgentCard before signing.""" + encoded_header = base64url_encode( + b'{"alg": "HS256", "kid": "old_key"}' + ).decode('utf-8') + sample_agent_card.signatures = [ + AgentCardSignature(protected=encoded_header, signature='old_signature') + ] + key = 'key12345' # Using a simple symmetric key for HS256 + wrong_key = 'wrongkey' + + agent_card_signer = create_agent_card_signer( + signing_key=key, alg='HS384', kid='key1' + ) + signed_card = agent_card_signer(sample_agent_card) + + assert signed_card.signatures is not None + assert len(signed_card.signatures) == 2 + signature = signed_card.signatures[1] + assert signature.protected is not None + assert signature.signature is not None + + # Verify the signature + verifier = create_signature_verifier(create_key_provider(key)) + try: + verifier(signed_card) + except JOSEError: + pytest.fail('Signature verification failed with correct key') + + # Verify with wrong key + verifier_wrong_key = create_signature_verifier( + create_key_provider(wrong_key) + ) + with pytest.raises(JOSEError): + verifier_wrong_key(signed_card) + + def test_signer_and_verifier_asymmetric(sample_agent_card: AgentCard): """Test the agent card signing and verification process with an asymmetric key encryption.""" # Generate a dummy EC private key for ES256 From f915ff2ed7351fa994d4515a2cec485a10537a64 Mon Sep 17 00:00:00 2001 From: sokoliva Date: Mon, 8 Dec 2025 13:00:12 +0000 Subject: [PATCH 03/11] refactor: Pass the 'algorithms list' to 'jws.verify' to ensure only expected algorithms are used for signature validation. --- src/a2a/utils/signing.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/a2a/utils/signing.py b/src/a2a/utils/signing.py index a2a1c368..b2efc262 100644 --- a/src/a2a/utils/signing.py +++ b/src/a2a/utils/signing.py @@ -138,6 +138,7 @@ def signature_verifier( protected_header = json.loads(protected_header_json) kid = protected_header.get('kid') jku = protected_header.get('jku') + alg = protected_header.get('alg') verification_key = key_provider(kid, jku) canonical_payload = canonicalize_agent_card(agent_card) @@ -149,7 +150,7 @@ def signature_verifier( jws.verify( token=token, key=verification_key, - algorithms=None, + algorithms=[alg] if alg else None, ) # Found a valid signature, exit the loop and function break From 634710ed117bd98d17721daa30e51f940717d8a7 Mon Sep 17 00:00:00 2001 From: sokoliva Date: Mon, 8 Dec 2025 13:24:25 +0000 Subject: [PATCH 04/11] Fix: add an algorithms parameter to create_signature_verifier function to security measures. --- src/a2a/utils/signing.py | 11 ++++---- .../test_client_server_integration.py | 15 ++++++----- tests/utils/test_signing.py | 27 ++++++++++++------- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/src/a2a/utils/signing.py b/src/a2a/utils/signing.py index b2efc262..c532b6d2 100644 --- a/src/a2a/utils/signing.py +++ b/src/a2a/utils/signing.py @@ -11,7 +11,7 @@ from jose.utils import base64url_decode, base64url_encode except ImportError as e: raise ImportError( - 'A2AUtilsSigning requires python-jose to be installed. ' + 'A2A Signing requires python-jose to be installed. ' 'Install with: ' "'pip install a2a-sdk[signing]'" ) from e @@ -70,7 +70,7 @@ def create_agent_card_signer( signing_key: The private key for signing. kid: Key ID for the signing key. alg: The algorithm to use (e.g., "ES256", "RS256"). - jku: Optional URL to the JWKS. + jku: Optional URL to the JSON Web Keys. Returns: A callable that takes an AgentCard and returns the modified AgentCard with a signature. @@ -111,11 +111,13 @@ def create_signature_verifier( key_provider: Callable[ [str | None, str | None], str | bytes | dict[str, Any] | Key ], + algorithms: list[str], ) -> Callable[[AgentCard], None]: """Creates a function that verifies AgentCard signatures. Args: - key_provider: A callable that takes key-id (kid) and JSON web key url (jku) and returns the verification key. + key_provider: A callable that takes key-id and JSON web key url and returns the verification key. + algorithms: List of acceptable algorithms for verification used to prevent algorithm confusion attacks. Returns: A callable that takes an AgentCard, and raises an error if none of the signatures are valid. @@ -138,7 +140,6 @@ def signature_verifier( protected_header = json.loads(protected_header_json) kid = protected_header.get('kid') jku = protected_header.get('jku') - alg = protected_header.get('alg') verification_key = key_provider(kid, jku) canonical_payload = canonicalize_agent_card(agent_card) @@ -150,7 +151,7 @@ def signature_verifier( jws.verify( token=token, key=verification_key, - algorithms=[alg] if alg else None, + algorithms=algorithms, ) # Found a valid signature, exit the loop and function break diff --git a/tests/integration/test_client_server_integration.py b/tests/integration/test_client_server_integration.py index 9cc2844d..9f9c69e9 100644 --- a/tests/integration/test_client_server_integration.py +++ b/tests/integration/test_client_server_integration.py @@ -878,7 +878,9 @@ async def test_json_transport_get_signed_base_card_no_initial( ) # Get the card, this will trigger verification in get_card - signature_verifier = create_signature_verifier(create_key_provider(key)) + signature_verifier = create_signature_verifier( + create_key_provider(key), ['HS384'] + ) result = await transport.get_card(signature_verifier=signature_verifier) assert result.name == agent_card.name assert result.signatures is not None @@ -929,7 +931,7 @@ async def test_json_transport_get_signed_extended_card( # Get the card, this will trigger verification in get_card signature_verifier = create_signature_verifier( - create_key_provider(public_key) + create_key_provider(public_key), ['HS384', 'ES256'] ) result = await transport.get_card(signature_verifier=signature_verifier) assert result.name == extended_agent_card.name @@ -985,7 +987,7 @@ async def test_json_transport_get_signed_base_and_extended_cards( # Get the card, this will trigger verification in get_card signature_verifier = create_signature_verifier( - create_key_provider(public_key) + create_key_provider(public_key), ['HS384', 'ES256', 'RS256'] ) result = await transport.get_card(signature_verifier=signature_verifier) assert result.name == extended_agent_card.name @@ -1039,8 +1041,9 @@ async def test_rest_transport_get_signed_card( ) # Get the card, this will trigger verification in get_card - signature_verifier = create_signature_verifier( - create_key_provider(public_key) + signature_verifier = ( + create_key_provider(public_key), + ['HS384', 'ES256', 'RS256'], ) result = await transport.get_card(signature_verifier=signature_verifier) assert result.name == extended_agent_card.name @@ -1093,7 +1096,7 @@ def channel_factory(address: str) -> Channel: # Get the card, this will trigger verification in get_card signature_verifier = create_signature_verifier( - create_key_provider(public_key) + create_key_provider(public_key), ['HS384', 'ES256', 'RS256'] ) result = await transport.get_card(signature_verifier=signature_verifier) assert result.signatures is not None diff --git a/tests/utils/test_signing.py b/tests/utils/test_signing.py index 938558b5..62d455c7 100644 --- a/tests/utils/test_signing.py +++ b/tests/utils/test_signing.py @@ -74,7 +74,9 @@ def test_signer_and_verifier_symmetric(sample_agent_card: AgentCard): assert signature.signature is not None # Verify the signature - verifier = create_signature_verifier(create_key_provider(key)) + verifier = create_signature_verifier( + create_key_provider(key), ['HS256', 'HS384', 'ES256', 'RS256'] + ) try: verifier(signed_card) except JOSEError: @@ -82,7 +84,7 @@ def test_signer_and_verifier_symmetric(sample_agent_card: AgentCard): # Verify with wrong key verifier_wrong_key = create_signature_verifier( - create_key_provider(wrong_key) + create_key_provider(wrong_key), ['HS256', 'HS384', 'ES256', 'RS256'] ) with pytest.raises(JOSEError): verifier_wrong_key(signed_card) @@ -114,7 +116,9 @@ def test_signer_and_verifier_symmetric_multiple_signatures( assert signature.signature is not None # Verify the signature - verifier = create_signature_verifier(create_key_provider(key)) + verifier = create_signature_verifier( + create_key_provider(key), ['HS256', 'HS384', 'ES256', 'RS256'] + ) try: verifier(signed_card) except JOSEError: @@ -122,7 +126,7 @@ def test_signer_and_verifier_symmetric_multiple_signatures( # Verify with wrong key verifier_wrong_key = create_signature_verifier( - create_key_provider(wrong_key) + create_key_provider(wrong_key), ['HS256', 'HS384', 'ES256', 'RS256'] ) with pytest.raises(JOSEError): verifier_wrong_key(signed_card) @@ -150,7 +154,9 @@ def test_signer_and_verifier_asymmetric(sample_agent_card: AgentCard): assert signature.protected is not None assert signature.signature is not None - verifier = create_signature_verifier(create_key_provider(public_key)) + verifier = create_signature_verifier( + create_key_provider(public_key), ['HS256', 'HS384', 'ES256', 'RS256'] + ) try: verifier(signed_card) except JOSEError: @@ -158,7 +164,8 @@ def test_signer_and_verifier_asymmetric(sample_agent_card: AgentCard): # Verify with wrong key verifier_wrong_key = create_signature_verifier( - create_key_provider(public_key_error) + create_key_provider(public_key_error), + ['HS256', 'HS384', 'ES256', 'RS256'], ) with pytest.raises(JOSEError): verifier_wrong_key(signed_card) @@ -173,9 +180,11 @@ def test_canonicalize_agent_card( - protocolVersion is included because it's always added by canonicalize_agent_card. - signatures should be omitted. """ - sample_agent_card.signatures = ( - [{'protected': 'protected_header', 'signature': 'test_signature'}], - ) + sample_agent_card.signatures = [ + AgentCardSignature( + protected='protected_header', signature='test_signature' + ) + ] expected_jcs = ( '{"capabilities":{"pushNotifications":true},' '"defaultInputModes":["text/plain"],"defaultOutputModes":["text/plain"],' From c5f69713eca525f5b81b456f6551ee64d95e5b08 Mon Sep 17 00:00:00 2001 From: sokoliva Date: Mon, 8 Dec 2025 13:29:05 +0000 Subject: [PATCH 05/11] Fix: fix accidental error --- tests/integration/test_client_server_integration.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_client_server_integration.py b/tests/integration/test_client_server_integration.py index 9f9c69e9..0d06e02e 100644 --- a/tests/integration/test_client_server_integration.py +++ b/tests/integration/test_client_server_integration.py @@ -1041,9 +1041,8 @@ async def test_rest_transport_get_signed_card( ) # Get the card, this will trigger verification in get_card - signature_verifier = ( - create_key_provider(public_key), - ['HS384', 'ES256', 'RS256'], + signature_verifier = create_signature_verifier( + create_key_provider(public_key), ['HS384', 'ES256', 'RS256'] ) result = await transport.get_card(signature_verifier=signature_verifier) assert result.name == extended_agent_card.name From 57e4a68dc8d747d29612aba1db2fa34372b3632b Mon Sep 17 00:00:00 2001 From: sokoliva Date: Mon, 8 Dec 2025 14:19:31 +0000 Subject: [PATCH 06/11] fix: add jku, jwk, jwks, jws and kid to allow.txt --- .github/actions/spelling/allow.txt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index a016962c..27b5cb4c 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -47,9 +47,14 @@ initdb inmemory INR isready +jku JPY JSONRPCt +jwk +jwks JWS +jws +kid kwarg langgraph lifecycles From 701eb3902922fe782f952e864187d8e99c458b17 Mon Sep 17 00:00:00 2001 From: sokoliva Date: Wed, 10 Dec 2025 13:39:49 +0000 Subject: [PATCH 07/11] fix: replace `python-jose` library with `PyJWK`. Add `signature verification errors`. Add `class ProtectedHeader(TypedDict)` to `signing.py`. --- pyproject.toml | 2 +- src/a2a/utils/signing.py | 127 ++++++++++-------- .../test_client_server_integration.py | 52 +++++-- tests/utils/test_signing.py | 47 +++++-- 4 files changed, 148 insertions(+), 80 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 44c53679..2f6b523d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ telemetry = ["opentelemetry-api>=1.33.0", "opentelemetry-sdk>=1.33.0"] postgresql = ["sqlalchemy[asyncio,postgresql-asyncpg]>=2.0.0"] mysql = ["sqlalchemy[asyncio,aiomysql]>=2.0.0"] sqlite = ["sqlalchemy[asyncio,aiosqlite]>=2.0.0"] -signing = ["python-jose>=3.0.0"] +signing = ["PyJWT>=2.0.0"] sql = ["a2a-sdk[postgresql,mysql,sqlite]"] diff --git a/src/a2a/utils/signing.py b/src/a2a/utils/signing.py index c532b6d2..9a4ef2af 100644 --- a/src/a2a/utils/signing.py +++ b/src/a2a/utils/signing.py @@ -1,17 +1,18 @@ import json from collections.abc import Callable -from typing import Any +from typing import Any, TypedDict try: - from jose import jws - from jose.backends.base import Key - from jose.exceptions import JOSEError - from jose.utils import base64url_decode, base64url_encode + import jwt + + from jwt.api_jwk import PyJWK + from jwt.exceptions import PyJWTError + from jwt.utils import base64url_decode, base64url_encode except ImportError as e: raise ImportError( - 'A2A Signing requires python-jose to be installed. ' + 'A2A Signing requires PyJWT to be installed. ' 'Install with: ' "'pip install a2a-sdk[signing]'" ) from e @@ -19,23 +20,43 @@ from a2a.types import AgentCard, AgentCardSignature +class SignatureVerificationError(Exception): + """Base exception for signature verification errors.""" + + +class NoSignatureError(SignatureVerificationError): + """Exception raised when no signature is found on an AgentCard.""" + + +class InvalidSignaturesError(SignatureVerificationError): + """Exception raised when all signatures are invalid.""" + + +class ProtectedHeader(TypedDict): + """Protected header parameters for JWS (JSON Web Signature).""" + + kid: str + """ Key identifier. """ + alg: str | None + """ Algorithm used for signing. """ + jku: str | None + """ JSON Web Key Set URL. """ + typ: str | None + """ Token type. + + Best practice: SHOULD be "JOSE" for JWS tokens. + """ + + def clean_empty(d: Any) -> Any: - """Recursively remove empty strings, lists, dicts, and None values from a dictionary.""" + """Recursively remove empty strings, lists and dicts from a dictionary.""" if isinstance(d, dict): cleaned_dict: dict[Any, Any] = {k: clean_empty(v) for k, v in d.items()} - return { - k: v - for k, v in cleaned_dict.items() - if v is not None and (isinstance(v, (bool, int, float)) or v) - } + return {k: v for k, v in cleaned_dict.items() if v} if isinstance(d, list): cleaned_list: list[Any] = [clean_empty(v) for v in d] - return [ - v - for v in cleaned_list - if v is not None and (isinstance(v, (bool, int, float)) or v) - ] - return d if d not in ['', [], {}, None] else None + return [v for v in cleaned_list if v] + return d if d not in ['', [], {}] else None def canonicalize_agent_card(agent_card: AgentCard) -> str: @@ -43,59 +64,48 @@ def canonicalize_agent_card(agent_card: AgentCard) -> str: card_dict = agent_card.model_dump( exclude={'signatures'}, exclude_defaults=True, + exclude_none=True, by_alias=True, ) - # Ensure 'protocol_version' is always included - protocol_version_alias = ( - AgentCard.model_fields['protocol_version'].alias or 'protocol_version' - ) - if protocol_version_alias not in card_dict: - card_dict[protocol_version_alias] = agent_card.protocol_version - - # Recursively remove empty/None values + # Recursively remove empty values cleaned_dict = clean_empty(card_dict) - return json.dumps(cleaned_dict, separators=(',', ':'), sort_keys=True) def create_agent_card_signer( - signing_key: str | bytes | dict[str, Any] | Key, - kid: str, - alg: str = 'HS256', - jku: str | None = None, + signing_key: PyJWK | str | bytes, + protected_header: ProtectedHeader, + header: dict[str, Any] | None = None, ) -> Callable[[AgentCard], AgentCard]: """Creates a function that signs an AgentCard and adds the signature. Args: signing_key: The private key for signing. - kid: Key ID for the signing key. - alg: The algorithm to use (e.g., "ES256", "RS256"). - jku: Optional URL to the JSON Web Keys. + protected_header: The protected header parameters. + header: Unprotected header parameters. Returns: A callable that takes an AgentCard and returns the modified AgentCard with a signature. """ def agent_card_signer(agent_card: AgentCard) -> AgentCard: - """The actual card_modifier function.""" + """Signs agent card.""" canonical_payload = canonicalize_agent_card(agent_card) + payload_dict = json.loads(canonical_payload) - headers = {'kid': kid, 'typ': 'JOSE'} - if jku: - headers['jku'] = jku - - jws_string = jws.sign( - payload=canonical_payload.encode('utf-8'), + jws_string = jwt.encode( + payload=payload_dict, key=signing_key, - headers=headers, - algorithm=alg, + algorithm=protected_header.get('alg', 'HS256'), + headers=protected_header, ) - # The result of jws.sign is a compact serialization: HEADER.PAYLOAD.SIGNATURE - protected_header, _, signature = jws_string.split('.') + # The result of jwt.encode is a compact serialization: HEADER.PAYLOAD.SIGNATURE + protected, _, signature = jws_string.split('.') agent_card_signature = AgentCardSignature( - protected=protected_header, + header=header, + protected=protected, signature=signature, ) @@ -108,9 +118,7 @@ def agent_card_signer(agent_card: AgentCard) -> AgentCard: def create_signature_verifier( - key_provider: Callable[ - [str | None, str | None], str | bytes | dict[str, Any] | Key - ], + key_provider: Callable[[str | None, str | None], PyJWK | str | bytes], algorithms: list[str], ) -> Callable[[AgentCard], None]: """Creates a function that verifies AgentCard signatures. @@ -126,14 +134,17 @@ def create_signature_verifier( def signature_verifier( agent_card: AgentCard, ) -> None: - """The actual signature_verifier function.""" + """Verifies agent card signatures. + + Checks if at least one signature matches the key, otherwise raises an error. + """ if not agent_card.signatures: - raise JOSEError('No signatures found on AgentCard') + raise NoSignatureError('AgentCard has no signatures to verify.') last_error = None for agent_card_signature in agent_card.signatures: try: - # fetch kid and jku from protected header + # get verification key protected_header_json = base64url_decode( agent_card_signature.protected.encode('utf-8') ).decode('utf-8') @@ -146,20 +157,22 @@ def signature_verifier( encoded_payload = base64url_encode( canonical_payload.encode('utf-8') ).decode('utf-8') - token = f'{agent_card_signature.protected}.{encoded_payload}.{agent_card_signature.signature}' - jws.verify( - token=token, + token = f'{agent_card_signature.protected}.{encoded_payload}.{agent_card_signature.signature}' + jwt.decode( + jwt=token, key=verification_key, algorithms=algorithms, ) # Found a valid signature, exit the loop and function break - except JOSEError as e: + except PyJWTError as e: last_error = e continue else: # This block runs only if the loop completes without a break - raise JOSEError('No valid signature found') from last_error + raise InvalidSignaturesError( + 'No valid signature found' + ) from last_error return signature_verifier diff --git a/tests/integration/test_client_server_integration.py b/tests/integration/test_client_server_integration.py index 0d06e02e..e6552fcb 100644 --- a/tests/integration/test_client_server_integration.py +++ b/tests/integration/test_client_server_integration.py @@ -8,8 +8,8 @@ import pytest import pytest_asyncio from grpc.aio import Channel -from jose.backends.base import Key +from jwt.api_jwk import PyJWK from a2a.client import ClientConfig from a2a.client.base_client import BaseClient from a2a.client.transports import JsonRpcTransport, RestTransport @@ -89,7 +89,7 @@ ) -def create_key_provider(verification_key: str | bytes | dict[str, Any] | Key): +def create_key_provider(verification_key: PyJWK | str | bytes): """Creates a key provider function for testing.""" def key_provider(kid: str | None, jku: str | None): @@ -754,6 +754,7 @@ async def test_http_transport_get_authenticated_card( transport = RestTransport(httpx_client=httpx_client, agent_card=agent_card) result = await transport.get_card() assert result.name == extended_agent_card.name + assert transport.agent_card is not None assert transport.agent_card.name == extended_agent_card.name assert transport._needs_extended_card is False @@ -776,6 +777,7 @@ def channel_factory(address: str) -> Channel: transport = GrpcTransport(channel=channel, agent_card=agent_card) # The transport starts with a minimal card, get_card() fetches the full one + assert transport.agent_card is not None transport.agent_card.supports_authenticated_extended_card = True result = await transport.get_card() @@ -845,7 +847,7 @@ async def test_json_transport_base_client_send_message_with_extensions( @pytest.mark.asyncio -async def test_json_transport_get_signed_base_card_no_initial( +async def test_json_transport_get_signed_base_card( jsonrpc_setup: TransportSetup, agent_card: AgentCard ) -> None: """Tests fetching and verifying a symmetrically signed AgentCard via JSON-RPC. @@ -860,7 +862,13 @@ async def test_json_transport_get_signed_base_card_no_initial( # Setup signing on the server side key = 'key12345' signer = create_agent_card_signer( - signing_key=key, alg='HS384', kid='testkey' + signing_key=key, + protected_header={ + 'alg': 'HS384', + 'kid': 'testkey', + 'jku': None, + 'typ': 'JOSE', + }, ) app_builder = A2AFastAPIApplication( @@ -885,6 +893,7 @@ async def test_json_transport_get_signed_base_card_no_initial( assert result.name == agent_card.name assert result.signatures is not None assert len(result.signatures) == 1 + assert transport.agent_card is not None assert transport.agent_card.name == agent_card.name assert transport._needs_extended_card is False @@ -911,7 +920,13 @@ async def test_json_transport_get_signed_extended_card( private_key = asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) public_key = private_key.public_key() signer = create_agent_card_signer( - signing_key=private_key, alg='ES256', kid='testkey' + signing_key=private_key, + protected_header={ + 'alg': 'ES256', + 'kid': 'testkey', + 'jku': None, + 'typ': 'JOSE', + }, ) app_builder = A2AFastAPIApplication( @@ -937,6 +952,7 @@ async def test_json_transport_get_signed_extended_card( assert result.name == extended_agent_card.name assert result.signatures is not None assert len(result.signatures) == 1 + assert transport.agent_card is not None assert transport.agent_card.name == extended_agent_card.name assert transport._needs_extended_card is False @@ -964,7 +980,13 @@ async def test_json_transport_get_signed_base_and_extended_cards( private_key = asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) public_key = private_key.public_key() signer = create_agent_card_signer( - signing_key=private_key, alg='ES256', kid='testkey' + signing_key=private_key, + protected_header={ + 'alg': 'ES256', + 'kid': 'testkey', + 'jku': None, + 'typ': 'JOSE', + }, ) app_builder = A2AFastAPIApplication( @@ -993,6 +1015,7 @@ async def test_json_transport_get_signed_base_and_extended_cards( assert result.name == extended_agent_card.name assert result.signatures is not None assert len(result.signatures) == 1 + assert transport.agent_card is not None assert transport.agent_card.name == extended_agent_card.name assert transport._needs_extended_card is False @@ -1019,7 +1042,13 @@ async def test_rest_transport_get_signed_card( private_key = asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) public_key = private_key.public_key() signer = create_agent_card_signer( - signing_key=private_key, alg='ES256', kid='testkey' + signing_key=private_key, + protected_header={ + 'alg': 'ES256', + 'kid': 'testkey', + 'jku': None, + 'typ': 'JOSE', + }, ) app_builder = A2ARESTFastAPIApplication( @@ -1048,6 +1077,7 @@ async def test_rest_transport_get_signed_card( assert result.name == extended_agent_card.name assert result.signatures is not None assert len(result.signatures) == 1 + assert transport.agent_card is not None assert transport.agent_card.name == extended_agent_card.name assert transport._needs_extended_card is False @@ -1066,7 +1096,13 @@ async def test_grpc_transport_get_signed_card( private_key = asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) public_key = private_key.public_key() signer = create_agent_card_signer( - signing_key=private_key, alg='ES256', kid='testkey' + signing_key=private_key, + protected_header={ + 'alg': 'ES256', + 'kid': 'testkey', + 'jku': None, + 'typ': 'JOSE', + }, ) server = grpc.aio.server() diff --git a/tests/utils/test_signing.py b/tests/utils/test_signing.py index 62d455c7..d8b0ac9c 100644 --- a/tests/utils/test_signing.py +++ b/tests/utils/test_signing.py @@ -13,17 +13,16 @@ canonicalize_agent_card, create_agent_card_signer, create_signature_verifier, + InvalidSignaturesError, ) from typing import Any -from jose.backends.base import Key -from jose.exceptions import JOSEError -from jose.utils import base64url_encode +from jwt.utils import base64url_encode import pytest from cryptography.hazmat.primitives import asymmetric -def create_key_provider(verification_key: str | bytes | dict[str, Any] | Key): +def create_key_provider(verification_key: str | bytes | dict[str, Any]): """Creates a key provider function for testing.""" def key_provider(kid: str | None, jku: str | None): @@ -46,6 +45,8 @@ def sample_agent_card() -> AgentCard: ), default_input_modes=['text/plain'], default_output_modes=['text/plain'], + documentation_url=None, + icon_url='', skills=[ AgentSkill( id='skill1', @@ -63,7 +64,13 @@ def test_signer_and_verifier_symmetric(sample_agent_card: AgentCard): wrong_key = 'wrongkey' agent_card_signer = create_agent_card_signer( - signing_key=key, alg='HS384', kid='key1' + signing_key=key, + protected_header={ + 'alg': 'HS384', + 'kid': 'key1', + 'jku': None, + 'typ': 'JOSE', + }, ) signed_card = agent_card_signer(sample_agent_card) @@ -79,14 +86,14 @@ def test_signer_and_verifier_symmetric(sample_agent_card: AgentCard): ) try: verifier(signed_card) - except JOSEError: + except InvalidSignaturesError: pytest.fail('Signature verification failed with correct key') # Verify with wrong key verifier_wrong_key = create_signature_verifier( create_key_provider(wrong_key), ['HS256', 'HS384', 'ES256', 'RS256'] ) - with pytest.raises(JOSEError): + with pytest.raises(InvalidSignaturesError): verifier_wrong_key(signed_card) @@ -105,7 +112,13 @@ def test_signer_and_verifier_symmetric_multiple_signatures( wrong_key = 'wrongkey' agent_card_signer = create_agent_card_signer( - signing_key=key, alg='HS384', kid='key1' + signing_key=key, + protected_header={ + 'alg': 'HS384', + 'kid': 'key1', + 'jku': None, + 'typ': 'JOSE', + }, ) signed_card = agent_card_signer(sample_agent_card) @@ -121,14 +134,14 @@ def test_signer_and_verifier_symmetric_multiple_signatures( ) try: verifier(signed_card) - except JOSEError: + except InvalidSignaturesError: pytest.fail('Signature verification failed with correct key') # Verify with wrong key verifier_wrong_key = create_signature_verifier( create_key_provider(wrong_key), ['HS256', 'HS384', 'ES256', 'RS256'] ) - with pytest.raises(JOSEError): + with pytest.raises(InvalidSignaturesError): verifier_wrong_key(signed_card) @@ -144,7 +157,13 @@ def test_signer_and_verifier_asymmetric(sample_agent_card: AgentCard): public_key_error = private_key_error.public_key() agent_card_signer = create_agent_card_signer( - signing_key=private_key, alg='ES256', kid='key1' + signing_key=private_key, + protected_header={ + 'alg': 'ES256', + 'kid': 'key2', + 'jku': None, + 'typ': 'JOSE', + }, ) signed_card = agent_card_signer(sample_agent_card) @@ -159,7 +178,7 @@ def test_signer_and_verifier_asymmetric(sample_agent_card: AgentCard): ) try: verifier(signed_card) - except JOSEError: + except InvalidSignaturesError: pytest.fail('Signature verification failed with correct key') # Verify with wrong key @@ -167,7 +186,7 @@ def test_signer_and_verifier_asymmetric(sample_agent_card: AgentCard): create_key_provider(public_key_error), ['HS256', 'HS384', 'ES256', 'RS256'], ) - with pytest.raises(JOSEError): + with pytest.raises(InvalidSignaturesError): verifier_wrong_key(signed_card) @@ -188,7 +207,7 @@ def test_canonicalize_agent_card( expected_jcs = ( '{"capabilities":{"pushNotifications":true},' '"defaultInputModes":["text/plain"],"defaultOutputModes":["text/plain"],' - '"description":"A test agent","name":"Test Agent","protocolVersion":"0.3.0",' + '"description":"A test agent","name":"Test Agent",' '"skills":[{"description":"A test skill","id":"skill1","name":"Test Skill","tags":["test"]}],' '"url":"http://localhost","version":"1.0.0"}' ) From 257e4dd1860be6b64d7184c1d577bc128d19139a Mon Sep 17 00:00:00 2001 From: sokoliva Date: Fri, 12 Dec 2025 10:22:59 +0000 Subject: [PATCH 08/11] fix: make changes based on review comments. - Move `canonicalize_agent_card` from `signing` to `helpers' - Add `PyJWT` liberary to dev in pyproject.toml - Remove `last_error` from `signature_verifier` - Update tests --- pyproject.toml | 3 +- src/a2a/utils/helpers.py | 28 +++++++++++++++++ src/a2a/utils/signing.py | 34 +++----------------- tests/utils/test_helpers.py | 52 +++++++++++++++++++++++++++++++ tests/utils/test_signing.py | 62 ++++++++++--------------------------- 5 files changed, 102 insertions(+), 77 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2f6b523d..f5a3e98a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,8 +35,8 @@ grpc = ["grpcio>=1.60", "grpcio-tools>=1.60", "grpcio_reflection>=1.7.0"] telemetry = ["opentelemetry-api>=1.33.0", "opentelemetry-sdk>=1.33.0"] postgresql = ["sqlalchemy[asyncio,postgresql-asyncpg]>=2.0.0"] mysql = ["sqlalchemy[asyncio,aiomysql]>=2.0.0"] -sqlite = ["sqlalchemy[asyncio,aiosqlite]>=2.0.0"] signing = ["PyJWT>=2.0.0"] +sqlite = ["sqlalchemy[asyncio,aiosqlite]>=2.0.0"] sql = ["a2a-sdk[postgresql,mysql,sqlite]"] @@ -88,6 +88,7 @@ style = "pep440" dev = [ "datamodel-code-generator>=0.30.0", "mypy>=1.15.0", + "PyJWT>=2.0.0" "pytest>=8.3.5", "pytest-asyncio>=0.26.0", "pytest-cov>=6.1.1", diff --git a/src/a2a/utils/helpers.py b/src/a2a/utils/helpers.py index 96c1646a..96acdc1e 100644 --- a/src/a2a/utils/helpers.py +++ b/src/a2a/utils/helpers.py @@ -2,6 +2,7 @@ import functools import inspect +import json import logging from collections.abc import Callable @@ -9,6 +10,7 @@ from uuid import uuid4 from a2a.types import ( + AgentCard, Artifact, MessageSendParams, Part, @@ -340,3 +342,29 @@ def are_modalities_compatible( return True return any(x in server_output_modes for x in client_output_modes) + + +def _clean_empty(d: Any) -> Any: + """Recursively remove empty strings, lists and dicts from a dictionary.""" + if isinstance(d, dict): + cleaned_dict: dict[Any, Any] = { + k: _clean_empty(v) for k, v in d.items() + } + return {k: v for k, v in cleaned_dict.items() if v} + if isinstance(d, list): + cleaned_list: list[Any] = [_clean_empty(v) for v in d] + return [v for v in cleaned_list if v] + return d if d not in ['', [], {}] else None + + +def canonicalize_agent_card(agent_card: AgentCard) -> str: + """Canonicalizes the Agent Card JSON according to RFC 8785 (JCS).""" + card_dict = agent_card.model_dump( + exclude={'signatures'}, + exclude_defaults=True, + exclude_none=True, + by_alias=True, + ) + # Recursively remove empty values + cleaned_dict = _clean_empty(card_dict) + return json.dumps(cleaned_dict, separators=(',', ':'), sort_keys=True) diff --git a/src/a2a/utils/signing.py b/src/a2a/utils/signing.py index 9a4ef2af..d89db287 100644 --- a/src/a2a/utils/signing.py +++ b/src/a2a/utils/signing.py @@ -3,6 +3,8 @@ from collections.abc import Callable from typing import Any, TypedDict +from a2a.utils.helpers import canonicalize_agent_card + try: import jwt @@ -48,30 +50,6 @@ class ProtectedHeader(TypedDict): """ -def clean_empty(d: Any) -> Any: - """Recursively remove empty strings, lists and dicts from a dictionary.""" - if isinstance(d, dict): - cleaned_dict: dict[Any, Any] = {k: clean_empty(v) for k, v in d.items()} - return {k: v for k, v in cleaned_dict.items() if v} - if isinstance(d, list): - cleaned_list: list[Any] = [clean_empty(v) for v in d] - return [v for v in cleaned_list if v] - return d if d not in ['', [], {}] else None - - -def canonicalize_agent_card(agent_card: AgentCard) -> str: - """Canonicalizes the Agent Card JSON according to RFC 8785 (JCS).""" - card_dict = agent_card.model_dump( - exclude={'signatures'}, - exclude_defaults=True, - exclude_none=True, - by_alias=True, - ) - # Recursively remove empty values - cleaned_dict = clean_empty(card_dict) - return json.dumps(cleaned_dict, separators=(',', ':'), sort_keys=True) - - def create_agent_card_signer( signing_key: PyJWK | str | bytes, protected_header: ProtectedHeader, @@ -141,7 +119,6 @@ def signature_verifier( if not agent_card.signatures: raise NoSignatureError('AgentCard has no signatures to verify.') - last_error = None for agent_card_signature in agent_card.signatures: try: # get verification key @@ -166,13 +143,10 @@ def signature_verifier( ) # Found a valid signature, exit the loop and function break - except PyJWTError as e: - last_error = e + except PyJWTError: continue else: # This block runs only if the loop completes without a break - raise InvalidSignaturesError( - 'No valid signature found' - ) from last_error + raise InvalidSignaturesError('No valid signature found') return signature_verifier diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py index 28acd27c..f3227d32 100644 --- a/tests/utils/test_helpers.py +++ b/tests/utils/test_helpers.py @@ -7,6 +7,10 @@ from a2a.types import ( Artifact, + AgentCard, + AgentCardSignature, + AgentCapabilities, + AgentSkill, Message, MessageSendParams, Part, @@ -23,6 +27,7 @@ build_text_artifact, create_task_obj, validate, + canonicalize_agent_card, ) @@ -45,6 +50,34 @@ 'type': 'task', } +SAMPLE_AGENT_CARD: dict[str, Any] = { + 'name': 'Test Agent', + 'description': 'A test agent', + 'url': 'http://localhost', + 'version': '1.0.0', + 'capabilities': AgentCapabilities( + streaming=None, + push_notifications=True, + ), + 'default_input_modes': ['text/plain'], + 'default_output_modes': ['text/plain'], + 'documentation_url': None, + 'icon_url': '', + 'skills': [ + AgentSkill( + id='skill1', + name='Test Skill', + description='A test skill', + tags=['test'], + ) + ], + 'signatures': [ + AgentCardSignature( + protected='protected_header', signature='test_signature' + ) + ], +} + # Test create_task_obj def test_create_task_obj(): @@ -328,3 +361,22 @@ def test_are_modalities_compatible_both_empty(): ) is True ) + + +def test_canonicalize_agent_card(): + """Test canonicalize_agent_card with defaults, optionals, and exceptions. + + - extensions is omitted as it's not set and optional. + - protocolVersion is included because it's always added by canonicalize_agent_card. + - signatures should be omitted. + """ + agent_card = AgentCard(**SAMPLE_AGENT_CARD) + expected_jcs = ( + '{"capabilities":{"pushNotifications":true},' + '"defaultInputModes":["text/plain"],"defaultOutputModes":["text/plain"],' + '"description":"A test agent","name":"Test Agent",' + '"skills":[{"description":"A test skill","id":"skill1","name":"Test Skill","tags":["test"]}],' + '"url":"http://localhost","version":"1.0.0"}' + ) + result = canonicalize_agent_card(agent_card) + assert result == expected_jcs diff --git a/tests/utils/test_signing.py b/tests/utils/test_signing.py index d8b0ac9c..9a843d34 100644 --- a/tests/utils/test_signing.py +++ b/tests/utils/test_signing.py @@ -9,12 +9,7 @@ AgentSkill, AgentCardSignature, ) -from a2a.utils.signing import ( - canonicalize_agent_card, - create_agent_card_signer, - create_signature_verifier, - InvalidSignaturesError, -) +from a2a.utils import signing from typing import Any from jwt.utils import base64url_encode @@ -63,7 +58,7 @@ def test_signer_and_verifier_symmetric(sample_agent_card: AgentCard): key = 'key12345' # Using a simple symmetric key for HS256 wrong_key = 'wrongkey' - agent_card_signer = create_agent_card_signer( + agent_card_signer = signing.create_agent_card_signer( signing_key=key, protected_header={ 'alg': 'HS384', @@ -81,19 +76,19 @@ def test_signer_and_verifier_symmetric(sample_agent_card: AgentCard): assert signature.signature is not None # Verify the signature - verifier = create_signature_verifier( + verifier = signing.create_signature_verifier( create_key_provider(key), ['HS256', 'HS384', 'ES256', 'RS256'] ) try: verifier(signed_card) - except InvalidSignaturesError: + except signing.InvalidSignaturesError: pytest.fail('Signature verification failed with correct key') # Verify with wrong key - verifier_wrong_key = create_signature_verifier( + verifier_wrong_key = signing.create_signature_verifier( create_key_provider(wrong_key), ['HS256', 'HS384', 'ES256', 'RS256'] ) - with pytest.raises(InvalidSignaturesError): + with pytest.raises(signing.InvalidSignaturesError): verifier_wrong_key(signed_card) @@ -111,7 +106,7 @@ def test_signer_and_verifier_symmetric_multiple_signatures( key = 'key12345' # Using a simple symmetric key for HS256 wrong_key = 'wrongkey' - agent_card_signer = create_agent_card_signer( + agent_card_signer = signing.create_agent_card_signer( signing_key=key, protected_header={ 'alg': 'HS384', @@ -129,19 +124,19 @@ def test_signer_and_verifier_symmetric_multiple_signatures( assert signature.signature is not None # Verify the signature - verifier = create_signature_verifier( + verifier = signing.create_signature_verifier( create_key_provider(key), ['HS256', 'HS384', 'ES256', 'RS256'] ) try: verifier(signed_card) - except InvalidSignaturesError: + except signing.InvalidSignaturesError: pytest.fail('Signature verification failed with correct key') # Verify with wrong key - verifier_wrong_key = create_signature_verifier( + verifier_wrong_key = signing.create_signature_verifier( create_key_provider(wrong_key), ['HS256', 'HS384', 'ES256', 'RS256'] ) - with pytest.raises(InvalidSignaturesError): + with pytest.raises(signing.InvalidSignaturesError): verifier_wrong_key(signed_card) @@ -156,7 +151,7 @@ def test_signer_and_verifier_asymmetric(sample_agent_card: AgentCard): ) public_key_error = private_key_error.public_key() - agent_card_signer = create_agent_card_signer( + agent_card_signer = signing.create_agent_card_signer( signing_key=private_key, protected_header={ 'alg': 'ES256', @@ -173,43 +168,18 @@ def test_signer_and_verifier_asymmetric(sample_agent_card: AgentCard): assert signature.protected is not None assert signature.signature is not None - verifier = create_signature_verifier( + verifier = signing.create_signature_verifier( create_key_provider(public_key), ['HS256', 'HS384', 'ES256', 'RS256'] ) try: verifier(signed_card) - except InvalidSignaturesError: + except signing.InvalidSignaturesError: pytest.fail('Signature verification failed with correct key') # Verify with wrong key - verifier_wrong_key = create_signature_verifier( + verifier_wrong_key = signing.create_signature_verifier( create_key_provider(public_key_error), ['HS256', 'HS384', 'ES256', 'RS256'], ) - with pytest.raises(InvalidSignaturesError): + with pytest.raises(signing.InvalidSignaturesError): verifier_wrong_key(signed_card) - - -def test_canonicalize_agent_card( - sample_agent_card: AgentCard, -): - """Test canonicalize_agent_card with defaults, optionals, and exceptions. - - - extensions is omitted as it's not set and optional. - - protocolVersion is included because it's always added by canonicalize_agent_card. - - signatures should be omitted. - """ - sample_agent_card.signatures = [ - AgentCardSignature( - protected='protected_header', signature='test_signature' - ) - ] - expected_jcs = ( - '{"capabilities":{"pushNotifications":true},' - '"defaultInputModes":["text/plain"],"defaultOutputModes":["text/plain"],' - '"description":"A test agent","name":"Test Agent",' - '"skills":[{"description":"A test skill","id":"skill1","name":"Test Skill","tags":["test"]}],' - '"url":"http://localhost","version":"1.0.0"}' - ) - result = canonicalize_agent_card(sample_agent_card) - assert result == expected_jcs From 900f0ee313b4c758919a1e123237320c6b7bee84 Mon Sep 17 00:00:00 2001 From: sokoliva Date: Fri, 12 Dec 2025 10:25:31 +0000 Subject: [PATCH 09/11] fix: fix parsing error --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f5a3e98a..561a5a45 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -88,7 +88,7 @@ style = "pep440" dev = [ "datamodel-code-generator>=0.30.0", "mypy>=1.15.0", - "PyJWT>=2.0.0" + "PyJWT>=2.0.0", "pytest>=8.3.5", "pytest-asyncio>=0.26.0", "pytest-cov>=6.1.1", From b680757f450c67f11c88736b94d6e4779ffc6b76 Mon Sep 17 00:00:00 2001 From: sokoliva Date: Fri, 12 Dec 2025 10:34:47 +0000 Subject: [PATCH 10/11] fix: covert argument type in `jwt.encode` from `ProtectedHeader` to `dict` --- src/a2a/utils/signing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/a2a/utils/signing.py b/src/a2a/utils/signing.py index d89db287..dd46f006 100644 --- a/src/a2a/utils/signing.py +++ b/src/a2a/utils/signing.py @@ -75,7 +75,7 @@ def agent_card_signer(agent_card: AgentCard) -> AgentCard: payload=payload_dict, key=signing_key, algorithm=protected_header.get('alg', 'HS256'), - headers=protected_header, + headers=dict(protected_header), ) # The result of jwt.encode is a compact serialization: HEADER.PAYLOAD.SIGNATURE From 054d187597bfce7305e44b32f87ae85dad01dc5a Mon Sep 17 00:00:00 2001 From: sokoliva Date: Fri, 12 Dec 2025 12:16:30 +0000 Subject: [PATCH 11/11] refractor: update `create_signature_verifier` description --- src/a2a/utils/signing.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/a2a/utils/signing.py b/src/a2a/utils/signing.py index dd46f006..6ea8c21b 100644 --- a/src/a2a/utils/signing.py +++ b/src/a2a/utils/signing.py @@ -99,23 +99,23 @@ def create_signature_verifier( key_provider: Callable[[str | None, str | None], PyJWK | str | bytes], algorithms: list[str], ) -> Callable[[AgentCard], None]: - """Creates a function that verifies AgentCard signatures. + """Creates a function that verifies the signatures on an AgentCard. + + The verifier succeeds if at least one signature is valid. Otherwise, it raises an error. Args: - key_provider: A callable that takes key-id and JSON web key url and returns the verification key. - algorithms: List of acceptable algorithms for verification used to prevent algorithm confusion attacks. + key_provider: A callable that accepts a key ID (kid) and a JWK Set URL (jku) and returns the verification key. + This function is responsible for fetching the correct key for a given signature. + algorithms: A list of acceptable algorithms (e.g., ['ES256', 'RS256']) for verification used to prevent algorithm confusion attacks. Returns: - A callable that takes an AgentCard, and raises an error if none of the signatures are valid. + A function that takes an AgentCard as input, and raises an error if none of the signatures are valid. """ def signature_verifier( agent_card: AgentCard, ) -> None: - """Verifies agent card signatures. - - Checks if at least one signature matches the key, otherwise raises an error. - """ + """Verifies agent card signatures.""" if not agent_card.signatures: raise NoSignatureError('AgentCard has no signatures to verify.')