From 6e5b36b53652e531351305cda54b9a681c5fca6b Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Wed, 6 May 2026 17:02:44 +0100 Subject: [PATCH 1/2] feat: add version-aware push notification support The protocol version used when creating a TaskPushNotificationConfig dictates the wire format of subsequent push notifications. v0.3 configs receive plain Task payloads; v1.0 configs receive StreamResponse-wrapped events. Key changes: - PushNotificationPayloadFormatter SPI for version-specific serialization - Protocol version tracked in PushNotificationConfigStore alongside configs - BasePushNotificationSender resolves formatter per config at dispatch time - JPA store persists protocol_version column Co-Authored-By: Claude Opus 4.6 --- .../apps/quarkus/A2AServerRoutes_v0_3.java | 2 +- .../apps/quarkus/CallContextFactory_v0_3.java | 6 + .../rest/quarkus/A2AServerRoutes_v0_3.java | 2 +- .../rest/quarkus/CallContextFactory_v0_3.java | 6 + ...PushNotificationPayloadFormatter_v0_3.java | 44 ++++ .../AbstractA2ARequestHandlerTest_v0_3.java | 2 +- ...otificationPayloadFormatter_v0_3_Test.java | 94 ++++++++ .../grpc/handler/CallContextFactory_v0_3.java | 3 + .../grpc/handler/GrpcHandler_v0_3.java | 2 +- .../grpc/handler/GrpcHandler_v0_3_Test.java | 2 +- .../handler/JSONRPCHandler_v0_3_Test.java | 2 +- .../rest/handler/RestHandler_v0_3_Test.java | 2 +- ...paDatabasePushNotificationConfigStore.java | 19 +- .../jpa/JpaPushNotificationConfig.java | 16 +- ...otificationConfigStoreIntegrationTest.java | 2 +- .../JpaPushNotificationConfigStoreTest.java | 6 +- .../jpa/MockPushNotificationSender.java | 2 +- .../core/ReplicatedQueueManagerTest.java | 2 +- .../server/events/MainEventBusProcessor.java | 22 +- .../DefaultRequestHandler.java | 13 +- .../tasks/BasePushNotificationSender.java | 97 ++++++-- .../InMemoryPushNotificationConfigStore.java | 17 +- .../tasks/PushNotificationConfigStore.java | 35 +++ .../PushNotificationPayloadFormatter.java | 12 + .../server/tasks/PushNotificationSender.java | 30 +-- .../sdk/server/tasks/TaskManager.java | 38 ++- .../sdk/server/events/EventConsumerTest.java | 2 +- .../sdk/server/events/EventQueueTest.java | 2 +- .../events/InMemoryQueueManagerTest.java | 2 +- .../MainEventBusProcessorExceptionTest.java | 2 +- .../AbstractA2ARequestHandlerTest.java | 2 +- .../DefaultRequestHandlerTest.java | 219 +++++++++++++++++- .../sdk/server/tasks/AgentEmitterTest.java | 2 +- ...MemoryPushNotificationConfigStoreTest.java | 43 +++- .../tasks/PushNotificationSenderTest.java | 82 ++++++- .../server/tasks/ResultAggregatorTest.java | 10 +- .../server/tasks/TaskManagerSnapshotTest.java | 75 ++++++ 37 files changed, 815 insertions(+), 104 deletions(-) create mode 100644 compat-0.3/server-conversion/src/main/java/org/a2aproject/sdk/compat03/conversion/PushNotificationPayloadFormatter_v0_3.java create mode 100644 compat-0.3/server-conversion/src/test/java/org/a2aproject/sdk/compat03/conversion/PushNotificationPayloadFormatter_v0_3_Test.java create mode 100644 server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationPayloadFormatter.java create mode 100644 server-common/src/test/java/org/a2aproject/sdk/server/tasks/TaskManagerSnapshotTest.java diff --git a/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/A2AServerRoutes_v0_3.java b/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/A2AServerRoutes_v0_3.java index 5e268ca9c..06a2bb4b6 100644 --- a/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/A2AServerRoutes_v0_3.java +++ b/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/A2AServerRoutes_v0_3.java @@ -327,7 +327,7 @@ public String getUsername() { List extensionHeaderValues = rc.request().headers().getAll(A2AHeaders_v0_3.X_A2A_EXTENSIONS); Set requestedExtensions = A2AExtensions.getRequestedExtensions(extensionHeaderValues); - return new ServerCallContext(user, state, requestedExtensions); + return new ServerCallContext(user, state, requestedExtensions, "0.3"); } else { CallContextFactory_v0_3 builder = callContextFactory.get(); return builder.build(rc); diff --git a/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/CallContextFactory_v0_3.java b/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/CallContextFactory_v0_3.java index f00bce4db..4c3725a0a 100644 --- a/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/CallContextFactory_v0_3.java +++ b/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/CallContextFactory_v0_3.java @@ -3,6 +3,12 @@ import org.a2aproject.sdk.server.ServerCallContext; import io.vertx.ext.web.RoutingContext; +/** + * Factory interface for creating ServerCallContext from a Vert.x RoutingContext. + * + *

Implementations MUST pass {@code "0.3"} as the protocol version when constructing + * {@link ServerCallContext} so that push notification payloads are formatted correctly.

+ */ public interface CallContextFactory_v0_3 { ServerCallContext build(RoutingContext rc); } diff --git a/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/A2AServerRoutes_v0_3.java b/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/A2AServerRoutes_v0_3.java index bc76641cf..0501ba994 100644 --- a/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/A2AServerRoutes_v0_3.java +++ b/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/A2AServerRoutes_v0_3.java @@ -431,7 +431,7 @@ public String getUsername() { List extensionHeaderValues = rc.request().headers().getAll(A2AHeaders_v0_3.X_A2A_EXTENSIONS); Set requestedExtensions = A2AExtensions.getRequestedExtensions(extensionHeaderValues); - return new ServerCallContext(user, state, requestedExtensions); + return new ServerCallContext(user, state, requestedExtensions, "0.3"); } else { CallContextFactory_v0_3 builder = callContextFactory.get(); return builder.build(rc); diff --git a/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/CallContextFactory_v0_3.java b/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/CallContextFactory_v0_3.java index a1f3922c2..fbe8d79f3 100644 --- a/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/CallContextFactory_v0_3.java +++ b/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/CallContextFactory_v0_3.java @@ -3,6 +3,12 @@ import org.a2aproject.sdk.server.ServerCallContext; import io.vertx.ext.web.RoutingContext; +/** + * Factory interface for creating ServerCallContext from a Vert.x RoutingContext. + * + *

Implementations MUST pass {@code "0.3"} as the protocol version when constructing + * {@link ServerCallContext} so that push notification payloads are formatted correctly.

+ */ public interface CallContextFactory_v0_3 { ServerCallContext build(RoutingContext rc); } diff --git a/compat-0.3/server-conversion/src/main/java/org/a2aproject/sdk/compat03/conversion/PushNotificationPayloadFormatter_v0_3.java b/compat-0.3/server-conversion/src/main/java/org/a2aproject/sdk/compat03/conversion/PushNotificationPayloadFormatter_v0_3.java new file mode 100644 index 000000000..9e97e5a60 --- /dev/null +++ b/compat-0.3/server-conversion/src/main/java/org/a2aproject/sdk/compat03/conversion/PushNotificationPayloadFormatter_v0_3.java @@ -0,0 +1,44 @@ +package org.a2aproject.sdk.compat03.conversion; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.a2aproject.sdk.compat03.conversion.mappers.domain.TaskMapper_v0_3; +import org.a2aproject.sdk.compat03.json.JsonProcessingException_v0_3; +import org.a2aproject.sdk.compat03.json.JsonUtil_v0_3; +import org.a2aproject.sdk.compat03.spec.Task_v0_3; +import org.a2aproject.sdk.server.tasks.PushNotificationPayloadFormatter; +import org.a2aproject.sdk.spec.Message; +import org.a2aproject.sdk.spec.StreamingEventKind; +import org.a2aproject.sdk.spec.Task; +import org.jspecify.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ApplicationScoped +public class PushNotificationPayloadFormatter_v0_3 implements PushNotificationPayloadFormatter { + + private static final Logger LOGGER = LoggerFactory.getLogger(PushNotificationPayloadFormatter_v0_3.class); + + @Override + public String targetVersion() { + return "0.3"; + } + + @Override + public @Nullable String formatPayload(StreamingEventKind event, @Nullable Task taskSnapshot) { + if (event instanceof Message) { + return null; + } + if (taskSnapshot == null) { + LOGGER.warn("Cannot format v0.3 push notification: no task snapshot available"); + return null; + } + Task_v0_3 v03Task = TaskMapper_v0_3.INSTANCE.fromV10(taskSnapshot); + try { + return JsonUtil_v0_3.toJson(v03Task); + } catch (JsonProcessingException_v0_3 e) { + LOGGER.error("Failed to serialize v0.3 task for push notification", e); + return null; + } + } +} diff --git a/compat-0.3/server-conversion/src/test/java/org/a2aproject/sdk/compat03/conversion/AbstractA2ARequestHandlerTest_v0_3.java b/compat-0.3/server-conversion/src/test/java/org/a2aproject/sdk/compat03/conversion/AbstractA2ARequestHandlerTest_v0_3.java index 9e4c3b692..f61fc8ef9 100644 --- a/compat-0.3/server-conversion/src/test/java/org/a2aproject/sdk/compat03/conversion/AbstractA2ARequestHandlerTest_v0_3.java +++ b/compat-0.3/server-conversion/src/test/java/org/a2aproject/sdk/compat03/conversion/AbstractA2ARequestHandlerTest_v0_3.java @@ -83,7 +83,7 @@ public abstract class AbstractA2ARequestHandlerTest_v0_3 { .parts(new TextPart_v0_3("test message")) .build(); - private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = (event, snapshot) -> {}; // V1.0 backend infrastructure protected AgentExecutor agentExecutor; diff --git a/compat-0.3/server-conversion/src/test/java/org/a2aproject/sdk/compat03/conversion/PushNotificationPayloadFormatter_v0_3_Test.java b/compat-0.3/server-conversion/src/test/java/org/a2aproject/sdk/compat03/conversion/PushNotificationPayloadFormatter_v0_3_Test.java new file mode 100644 index 000000000..8861cd901 --- /dev/null +++ b/compat-0.3/server-conversion/src/test/java/org/a2aproject/sdk/compat03/conversion/PushNotificationPayloadFormatter_v0_3_Test.java @@ -0,0 +1,94 @@ +package org.a2aproject.sdk.compat03.conversion; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.a2aproject.sdk.spec.Message; +import org.a2aproject.sdk.spec.Task; +import org.a2aproject.sdk.spec.TaskState; +import org.a2aproject.sdk.spec.TaskStatus; +import org.a2aproject.sdk.spec.TaskStatusUpdateEvent; +import org.a2aproject.sdk.spec.TextPart; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class PushNotificationPayloadFormatter_v0_3_Test { + + private PushNotificationPayloadFormatter_v0_3 formatter; + + @BeforeEach + void setUp() { + formatter = new PushNotificationPayloadFormatter_v0_3(); + } + + @Test + void targetVersionIs03() { + assertEquals("0.3", formatter.targetVersion()); + } + + @Test + void formatsTaskEventAsV03Task() { + Task task = Task.builder() + .id("t1").contextId("c1") + .status(new TaskStatus(TaskState.TASK_STATE_COMPLETED)) + .build(); + + String payload = formatter.formatPayload(task, task); + + assertNotNull(payload); + assertTrue(payload.contains("\"kind\":\"task\"")); + assertTrue(payload.contains("\"id\":\"t1\"")); + assertTrue(payload.contains("\"status\"")); + } + + @Test + void formatsStatusUpdateUsingTaskSnapshot() { + Task snapshot = Task.builder() + .id("t1").contextId("c1") + .status(new TaskStatus(TaskState.TASK_STATE_WORKING)) + .build(); + + TaskStatusUpdateEvent event = TaskStatusUpdateEvent.builder() + .taskId("t1").contextId("c1") + .status(new TaskStatus(TaskState.TASK_STATE_WORKING)) + .build(); + + String payload = formatter.formatPayload(event, snapshot); + + assertNotNull(payload); + assertTrue(payload.contains("\"kind\":\"task\"")); + assertTrue(payload.contains("\"id\":\"t1\"")); + } + + @Test + void skipsMessageEvents() { + Task snapshot = Task.builder() + .id("t1").contextId("c1") + .status(new TaskStatus(TaskState.TASK_STATE_WORKING)) + .build(); + + Message message = Message.builder() + .messageId("m1") + .role(Message.Role.ROLE_AGENT) + .parts(new TextPart("hello")) + .build(); + + String payload = formatter.formatPayload(message, snapshot); + + assertNull(payload); + } + + @Test + void returnsNullWhenSnapshotIsNull() { + Task task = Task.builder() + .id("t1").contextId("c1") + .status(new TaskStatus(TaskState.TASK_STATE_COMPLETED)) + .build(); + + String payload = formatter.formatPayload(task, null); + + assertNull(payload); + } +} diff --git a/compat-0.3/transport/grpc/src/main/java/org/a2aproject/sdk/compat03/transport/grpc/handler/CallContextFactory_v0_3.java b/compat-0.3/transport/grpc/src/main/java/org/a2aproject/sdk/compat03/transport/grpc/handler/CallContextFactory_v0_3.java index c5dc184cc..fe34ba4a1 100644 --- a/compat-0.3/transport/grpc/src/main/java/org/a2aproject/sdk/compat03/transport/grpc/handler/CallContextFactory_v0_3.java +++ b/compat-0.3/transport/grpc/src/main/java/org/a2aproject/sdk/compat03/transport/grpc/handler/CallContextFactory_v0_3.java @@ -6,6 +6,9 @@ /** * Factory interface for creating ServerCallContext from gRPC StreamObserver. * Implementations can provide custom context creation logic. + * + *

Implementations MUST pass {@code "0.3"} as the protocol version when constructing + * {@link ServerCallContext} so that push notification payloads are formatted correctly.

*/ public interface CallContextFactory_v0_3 { ServerCallContext create(StreamObserver responseObserver); diff --git a/compat-0.3/transport/grpc/src/main/java/org/a2aproject/sdk/compat03/transport/grpc/handler/GrpcHandler_v0_3.java b/compat-0.3/transport/grpc/src/main/java/org/a2aproject/sdk/compat03/transport/grpc/handler/GrpcHandler_v0_3.java index 12694af91..fe7f77f17 100644 --- a/compat-0.3/transport/grpc/src/main/java/org/a2aproject/sdk/compat03/transport/grpc/handler/GrpcHandler_v0_3.java +++ b/compat-0.3/transport/grpc/src/main/java/org/a2aproject/sdk/compat03/transport/grpc/handler/GrpcHandler_v0_3.java @@ -363,7 +363,7 @@ private ServerCallContext createCallContext(StreamObserver responseObserv Map state = new HashMap<>(); state.put("grpc_response_observer", responseObserver); Set requestedExtensions = new HashSet<>(); - return new ServerCallContext(user, state, requestedExtensions); + return new ServerCallContext(user, state, requestedExtensions, "0.3"); } else { return factory.create(responseObserver); } diff --git a/compat-0.3/transport/grpc/src/test/java/org/a2aproject/sdk/compat03/transport/grpc/handler/GrpcHandler_v0_3_Test.java b/compat-0.3/transport/grpc/src/test/java/org/a2aproject/sdk/compat03/transport/grpc/handler/GrpcHandler_v0_3_Test.java index 5a422c3bd..6333a3f52 100644 --- a/compat-0.3/transport/grpc/src/test/java/org/a2aproject/sdk/compat03/transport/grpc/handler/GrpcHandler_v0_3_Test.java +++ b/compat-0.3/transport/grpc/src/test/java/org/a2aproject/sdk/compat03/transport/grpc/handler/GrpcHandler_v0_3_Test.java @@ -65,7 +65,7 @@ public class GrpcHandler_v0_3_Test extends AbstractA2ARequestHandlerTest_v0_3 { .build(); private final ServerCallContext callContext = new ServerCallContext( - UnauthenticatedUser.INSTANCE, Map.of("foo", "bar"), new HashSet<>()); + UnauthenticatedUser.INSTANCE, Map.of("foo", "bar"), new HashSet<>(), "0.3"); // ======================================== // GetTask Tests diff --git a/compat-0.3/transport/jsonrpc/src/test/java/org/a2aproject/sdk/compat03/transport/jsonrpc/handler/JSONRPCHandler_v0_3_Test.java b/compat-0.3/transport/jsonrpc/src/test/java/org/a2aproject/sdk/compat03/transport/jsonrpc/handler/JSONRPCHandler_v0_3_Test.java index c031e8a4f..e1d3c3ae8 100644 --- a/compat-0.3/transport/jsonrpc/src/test/java/org/a2aproject/sdk/compat03/transport/jsonrpc/handler/JSONRPCHandler_v0_3_Test.java +++ b/compat-0.3/transport/jsonrpc/src/test/java/org/a2aproject/sdk/compat03/transport/jsonrpc/handler/JSONRPCHandler_v0_3_Test.java @@ -85,7 +85,7 @@ public class JSONRPCHandler_v0_3_Test extends AbstractA2ARequestHandlerTest_v0_3 { private final ServerCallContext callContext = new ServerCallContext( - UnauthenticatedUser.INSTANCE, Map.of("foo", "bar"), new HashSet<>()); + UnauthenticatedUser.INSTANCE, Map.of("foo", "bar"), new HashSet<>(), "0.3"); // ======================================== // GetTask Tests diff --git a/compat-0.3/transport/rest/src/test/java/org/a2aproject/sdk/compat03/transport/rest/handler/RestHandler_v0_3_Test.java b/compat-0.3/transport/rest/src/test/java/org/a2aproject/sdk/compat03/transport/rest/handler/RestHandler_v0_3_Test.java index a76f2259d..90ecd7081 100644 --- a/compat-0.3/transport/rest/src/test/java/org/a2aproject/sdk/compat03/transport/rest/handler/RestHandler_v0_3_Test.java +++ b/compat-0.3/transport/rest/src/test/java/org/a2aproject/sdk/compat03/transport/rest/handler/RestHandler_v0_3_Test.java @@ -32,7 +32,7 @@ public class RestHandler_v0_3_Test extends AbstractA2ARequestHandlerTest_v0_3 { private final ServerCallContext callContext = new ServerCallContext( - UnauthenticatedUser.INSTANCE, Map.of("foo", "bar"), new HashSet<>()); + UnauthenticatedUser.INSTANCE, Map.of("foo", "bar"), new HashSet<>(), "0.3"); // ======================================== // GetTask Tests diff --git a/extras/push-notification-config-store-database-jpa/src/main/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java b/extras/push-notification-config-store-database-jpa/src/main/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java index 43adeac31..83a53cb81 100644 --- a/extras/push-notification-config-store-database-jpa/src/main/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java +++ b/extras/push-notification-config-store-database-jpa/src/main/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java @@ -18,6 +18,7 @@ import org.a2aproject.sdk.util.Assert; import org.a2aproject.sdk.util.PageToken; import org.a2aproject.sdk.spec.TaskPushNotificationConfig; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,12 @@ public class JpaDatabasePushNotificationConfigStore implements PushNotificationC @Transactional @Override public TaskPushNotificationConfig setInfo(TaskPushNotificationConfig notificationConfig) { + return setInfo(notificationConfig, null); + } + + @Transactional + @Override + public TaskPushNotificationConfig setInfo(TaskPushNotificationConfig notificationConfig, @Nullable String protocolVersion) { String taskId = Assert.checkNotNullParam("taskId", notificationConfig.taskId()); // Ensure config has an ID - default to taskId if not provided (mirroring InMemoryPushNotificationConfigStore behavior) if (notificationConfig.id().isEmpty()) { @@ -44,6 +51,7 @@ public TaskPushNotificationConfig setInfo(TaskPushNotificationConfig notificatio notificationConfig = TaskPushNotificationConfig.builder(notificationConfig).id(taskId).build(); } + String resolvedVersion = PushNotificationConfigStore.resolveProtocolVersion(protocolVersion); LOGGER.debug("Saving PushNotificationConfig for Task '{}' with ID: {}", taskId, notificationConfig.id()); try { TaskConfigId configId = new TaskConfigId(taskId, notificationConfig.id()); @@ -54,11 +62,12 @@ public TaskPushNotificationConfig setInfo(TaskPushNotificationConfig notificatio if (existingJpaConfig != null) { // Update existing entity existingJpaConfig.setConfig(notificationConfig); + existingJpaConfig.setProtocolVersion(resolvedVersion); LOGGER.debug("Updated existing PushNotificationConfig for Task '{}' with ID: {}", taskId, notificationConfig.id()); } else { // Create new entity - JpaPushNotificationConfig jpaConfig = JpaPushNotificationConfig.createFromConfig(taskId, notificationConfig); + JpaPushNotificationConfig jpaConfig = JpaPushNotificationConfig.createFromConfig(taskId, notificationConfig, resolvedVersion); em.persist(jpaConfig); LOGGER.debug("Persisted new PushNotificationConfig for Task '{}' with ID: {}", taskId, notificationConfig.id()); @@ -164,4 +173,12 @@ public void deleteInfo(String taskId, String configId) { } } + @Transactional + @Override + public @Nullable String getProtocolVersion(String taskId, String configId) { + JpaPushNotificationConfig jpaConfig = em.find(JpaPushNotificationConfig.class, + new TaskConfigId(taskId, configId)); + return jpaConfig != null ? jpaConfig.getProtocolVersion() : null; + } + } diff --git a/extras/push-notification-config-store-database-jpa/src/main/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaPushNotificationConfig.java b/extras/push-notification-config-store-database-jpa/src/main/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaPushNotificationConfig.java index 5d513c0f8..fe005f22e 100644 --- a/extras/push-notification-config-store-database-jpa/src/main/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaPushNotificationConfig.java +++ b/extras/push-notification-config-store-database-jpa/src/main/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaPushNotificationConfig.java @@ -10,6 +10,8 @@ import org.a2aproject.sdk.jsonrpc.common.json.JsonProcessingException; import org.a2aproject.sdk.jsonrpc.common.json.JsonUtil; import org.a2aproject.sdk.spec.TaskPushNotificationConfig; +import org.jspecify.annotations.Nullable; + import java.time.Instant; @Entity @@ -21,6 +23,9 @@ public class JpaPushNotificationConfig { @Column(name = "task_data", columnDefinition = "TEXT", nullable = false) private String configJson; + @Column(name = "protocol_version") + private String protocolVersion; + @Column(name = "created_at") private Instant createdAt; @@ -79,11 +84,20 @@ public void setCreatedAt(Instant createdAt) { this.createdAt = createdAt; } - static JpaPushNotificationConfig createFromConfig(String taskId, TaskPushNotificationConfig config) throws JsonProcessingException { + public @Nullable String getProtocolVersion() { + return protocolVersion; + } + + public void setProtocolVersion(String protocolVersion) { + this.protocolVersion = protocolVersion; + } + + static JpaPushNotificationConfig createFromConfig(String taskId, TaskPushNotificationConfig config, @Nullable String protocolVersion) throws JsonProcessingException { String json = JsonUtil.toJson(config); JpaPushNotificationConfig jpaPushNotificationConfig = new JpaPushNotificationConfig(new TaskConfigId(taskId, config.id()), json); jpaPushNotificationConfig.config = config; + jpaPushNotificationConfig.protocolVersion = protocolVersion; return jpaPushNotificationConfig; } } diff --git a/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java b/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java index 3fc330215..4ff600268 100644 --- a/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java +++ b/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java @@ -91,7 +91,7 @@ public void testDirectNotificationTrigger() { .build(); // Directly trigger the mock - mockPushNotificationSender.sendNotification(testTask); + mockPushNotificationSender.sendNotification(testTask, null); // Verify it was captured Queue captured = mockPushNotificationSender.getCapturedTasks(); diff --git a/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaPushNotificationConfigStoreTest.java b/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaPushNotificationConfigStoreTest.java index 1ae2bfe9d..057dc304a 100644 --- a/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaPushNotificationConfigStoreTest.java +++ b/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaPushNotificationConfigStoreTest.java @@ -249,7 +249,7 @@ public void testSendNotificationSuccess() throws Exception { when(mockPostBuilder.post()).thenReturn(mockHttpResponse); when(mockHttpResponse.success()).thenReturn(true); - notificationSender.sendNotification(task); + notificationSender.sendNotification(task, null); // Verify HTTP client was called ArgumentCaptor bodyCaptor = ArgumentCaptor.forClass(String.class); @@ -281,7 +281,7 @@ public void testSendNotificationWithToken() throws Exception { when(mockPostBuilder.post()).thenReturn(mockHttpResponse); when(mockHttpResponse.success()).thenReturn(true); - notificationSender.sendNotification(task); + notificationSender.sendNotification(task, null); // TODO: Once token authentication is implemented, verify that: // 1. The token is included in request headers (e.g., X-A2A-Notification-Token) @@ -307,7 +307,7 @@ public void testSendNotificationNoConfig() throws Exception { String taskId = "task_send_no_config"; Task task = createSampleTask(taskId, TaskState.TASK_STATE_COMPLETED); - notificationSender.sendNotification(task); + notificationSender.sendNotification(task, null); // Verify HTTP client was never called verify(mockHttpClient, never()).createPost(); diff --git a/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/MockPushNotificationSender.java b/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/MockPushNotificationSender.java index 7e0edca46..d431f934c 100644 --- a/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/MockPushNotificationSender.java +++ b/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/MockPushNotificationSender.java @@ -23,7 +23,7 @@ public class MockPushNotificationSender implements PushNotificationSender { private final Queue capturedEvents = new ConcurrentLinkedQueue<>(); @Override - public void sendNotification(StreamingEventKind event) { + public void sendNotification(StreamingEventKind event, Task taskSnapshot) { capturedEvents.add(event); } diff --git a/extras/queue-manager-replicated/core/src/test/java/org/a2aproject/sdk/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java b/extras/queue-manager-replicated/core/src/test/java/org/a2aproject/sdk/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java index 8998c2195..926b9497d 100644 --- a/extras/queue-manager-replicated/core/src/test/java/org/a2aproject/sdk/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java +++ b/extras/queue-manager-replicated/core/src/test/java/org/a2aproject/sdk/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java @@ -44,7 +44,7 @@ class ReplicatedQueueManagerTest { private StreamingEventKind testEvent; private MainEventBus mainEventBus; private MainEventBusProcessor mainEventBusProcessor; - private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = (event, snapshot) -> {}; @BeforeEach void setUp() { diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java b/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java index 9bb9b9c26..006252125 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java @@ -1,6 +1,7 @@ package org.a2aproject.sdk.server.events; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; import jakarta.annotation.Nullable; import jakarta.annotation.PostConstruct; @@ -16,8 +17,8 @@ import org.a2aproject.sdk.spec.A2AError; import org.a2aproject.sdk.spec.Event; import org.a2aproject.sdk.spec.InternalError; -import org.a2aproject.sdk.spec.Task; import org.a2aproject.sdk.spec.StreamingEventKind; +import org.a2aproject.sdk.spec.Task; import org.a2aproject.sdk.spec.TaskArtifactUpdateEvent; import org.a2aproject.sdk.spec.TaskStatusUpdateEvent; import org.slf4j.Logger; @@ -58,6 +59,8 @@ public class MainEventBusProcessor implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(MainEventBusProcessor.class); + private record UpdateResult(boolean isFinal, @Nullable Task taskSnapshot) {} + /** * Callback for testing synchronization with async event processing. * Default is NOOP to avoid null checks in production code. @@ -200,13 +203,15 @@ private void processEvent(MainEventBusContext context) { taskId, event.getClass().getSimpleName()); Event eventToDistribute = null; + UpdateResult updateResult = null; boolean isReplicated = context.eventQueueItem().isReplicated(); try { // Step 1: Update TaskStore FIRST (persistence before clients see it) // If this throws, we distribute an error to ensure "persist before client visibility" try { - boolean isFinal = updateTaskStore(taskId, event, isReplicated); + updateResult = updateTaskStore(taskId, event, isReplicated); + boolean isFinal = updateResult.isFinal(); eventToDistribute = event; // Success - distribute original event @@ -239,7 +244,7 @@ private void processEvent(MainEventBusContext context) { // per A2A spec section 4.3.3 if (!isReplicated && event instanceof StreamingEventKind streamingEvent) { // Send the streaming event directly - it will be wrapped in StreamResponse format by PushNotificationSender - sendPushNotification(taskId, streamingEvent); + sendPushNotification(taskId, streamingEvent, updateResult != null ? updateResult.taskSnapshot() : null); } // Step 3: Then distribute to ChildQueues (clients see either event or error AFTER persistence attempt) @@ -293,7 +298,7 @@ private void processEvent(MainEventBusContext context) { * @return true if the task reached a final state, false otherwise * @throws InternalError if persistence fails */ - private boolean updateTaskStore(String taskId, Event event, boolean isReplicated) throws InternalError { + private UpdateResult updateTaskStore(String taskId, Event event, boolean isReplicated) throws InternalError { try { // Extract contextId from event (all relevant events have it) String contextId = extractContextId(event); @@ -302,10 +307,11 @@ private boolean updateTaskStore(String taskId, Event event, boolean isReplicated TaskManager taskManager = new TaskManager(taskId, contextId, taskStore, null); // Use TaskManager.process() - handles all event types with existing logic - boolean isFinal = taskManager.process(event, isReplicated); + AtomicReference taskSnapshot = new AtomicReference<>(); + boolean isFinal = taskManager.process(event, isReplicated, taskSnapshot); LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {} (final: {}, replicated: {})", taskId, event.getClass().getSimpleName(), isFinal, isReplicated); - return isFinal; + return new UpdateResult(isFinal, taskSnapshot.get()); } catch (TaskSerializationException e) { // Data corruption or schema mismatch - ALWAYS permanent @@ -362,12 +368,12 @@ private boolean updateTaskStore(String taskId, Event event, boolean isReplicated * @param taskId the task ID * @param event the streaming event to send (Task, Message, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent) */ - private void sendPushNotification(String taskId, StreamingEventKind event) { + private void sendPushNotification(String taskId, StreamingEventKind event, @Nullable Task taskSnapshot) { Runnable pushTask = () -> { try { if (event != null) { LOGGER.debug("Sending push notification for task {}", taskId); - pushSender.sendNotification(event); + pushSender.sendNotification(event, taskSnapshot); } else { LOGGER.debug("Skipping push notification - event is null for task {}", taskId); } diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java index b81cc22c7..d2c3aa028 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java @@ -504,8 +504,9 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte if (mss.task() == null && kind instanceof Task createdTask && shouldAddPushInfo(params)) { LOGGER.debug("Storing push notification config for new task {} (original taskId from params: {})", createdTask.id(), params.message().taskId()); + String version = context != null ? context.getRequestedProtocolVersion() : null; pushConfigStore.setInfo(TaskPushNotificationConfig.builder(params.configuration().taskPushNotificationConfig()) - .taskId(createdTask.id()).build()); + .taskId(createdTask.id()).build(), version); } // Check if task requires immediate return (AUTH_REQUIRED) @@ -662,8 +663,9 @@ public Flow.Publisher onMessageSendStream( Objects.requireNonNull(taskId.get(), "taskId was null"); LOGGER.debug("Storing push notification config for new streaming task {} EARLY (original taskId from params: {})", taskId.get(), params.message().taskId()); + String version = context != null ? context.getRequestedProtocolVersion() : null; pushConfigStore.setInfo(TaskPushNotificationConfig.builder(params.configuration().taskPushNotificationConfig()) - .taskId(taskId.get()).build()); + .taskId(taskId.get()).build(), version); } ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, executor, eventConsumerExecutor); @@ -796,7 +798,9 @@ public TaskPushNotificationConfig onCreateTaskPushNotificationConfig( throw new TaskNotFoundError(); } - return pushConfigStore.setInfo(params); + String version = context != null ? context.getRequestedProtocolVersion() : null; + TaskPushNotificationConfig result = pushConfigStore.setInfo(params, version); + return result; } @Override @@ -1051,8 +1055,9 @@ private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallCon if (pushConfigStore != null && params.configuration() != null && params.configuration().taskPushNotificationConfig() != null) { LOGGER.debug("Adding push info"); + String version = context != null ? context.getRequestedProtocolVersion() : null; pushConfigStore.setInfo(TaskPushNotificationConfig.builder(params.configuration().taskPushNotificationConfig()) - .taskId(task.id()).build()); + .taskId(task.id()).build(), version); } requestContext = requestContextBuilder.get() diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/BasePushNotificationSender.java b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/BasePushNotificationSender.java index 53f1a38d3..02d40c980 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/BasePushNotificationSender.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/BasePushNotificationSender.java @@ -4,17 +4,18 @@ import static org.a2aproject.sdk.client.http.A2AHttpClient.CONTENT_TYPE; import static org.a2aproject.sdk.common.A2AHeaders.X_A2A_NOTIFICATION_TOKEN; -import jakarta.annotation.Nullable; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; - import org.a2aproject.sdk.client.http.A2AHttpClient; import org.a2aproject.sdk.client.http.A2AHttpClientFactory; import org.a2aproject.sdk.jsonrpc.common.json.JsonUtil; @@ -26,6 +27,7 @@ import org.a2aproject.sdk.spec.TaskArtifactUpdateEvent; import org.a2aproject.sdk.spec.TaskPushNotificationConfig; import org.a2aproject.sdk.spec.TaskStatusUpdateEvent; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +42,7 @@ public class BasePushNotificationSender implements PushNotificationSender { // final, is not proxyable in all runtimes private A2AHttpClient httpClient; private PushNotificationConfigStore configStore; + private Map formattersByVersion; /** @@ -52,21 +55,44 @@ protected BasePushNotificationSender() { // For CDI proxy creation this.httpClient = null; this.configStore = null; + this.formattersByVersion = Map.of(); } - @Inject public BasePushNotificationSender(PushNotificationConfigStore configStore) { this.httpClient = A2AHttpClientFactory.create(); this.configStore = configStore; + this.formattersByVersion = Map.of(); + } + + @Inject + public BasePushNotificationSender(PushNotificationConfigStore configStore, + Instance formatters) { + this.httpClient = A2AHttpClientFactory.create(); + this.configStore = configStore; + this.formattersByVersion = new HashMap<>(); + for (PushNotificationPayloadFormatter f : formatters) { + this.formattersByVersion.put(f.targetVersion(), f); + } } public BasePushNotificationSender(PushNotificationConfigStore configStore, A2AHttpClient httpClient) { this.configStore = configStore; this.httpClient = httpClient; + this.formattersByVersion = Map.of(); + } + + public BasePushNotificationSender(PushNotificationConfigStore configStore, A2AHttpClient httpClient, + List formatters) { + this.configStore = configStore; + this.httpClient = httpClient; + this.formattersByVersion = new HashMap<>(); + for (PushNotificationPayloadFormatter f : formatters) { + formattersByVersion.put(f.targetVersion(), f); + } } @Override - public void sendNotification(StreamingEventKind event) { + public void sendNotification(StreamingEventKind event, @Nullable Task taskSnapshot) { String taskId = extractTaskId(event); if (taskId == null) { LOGGER.warn("Cannot send push notification: event does not contain taskId"); @@ -84,9 +110,20 @@ public void sendNotification(StreamingEventKind event) { nextPageToken = pageResult.nextPageToken(); } while (nextPageToken != null); + Map versionsByConfigKey = new HashMap<>(); + for (TaskPushNotificationConfig config : configs) { + String configTaskId = config.taskId(); + if (configTaskId != null) { + String version = configStore.getProtocolVersion(configTaskId, config.id()); + if (version != null) { + versionsByConfigKey.put(configTaskId + ":" + config.id(), version); + } + } + } + List> dispatchResults = configs .stream() - .map(pushConfig -> dispatch(event, pushConfig)) + .map(pushConfig -> dispatch(event, taskSnapshot, pushConfig, versionsByConfigKey)) .toList(); CompletableFuture allFutures = CompletableFuture.allOf(dispatchResults.toArray(new CompletableFuture[0])); CompletableFuture dispatchResult = allFutures.thenApply(v -> dispatchResults.stream() @@ -123,14 +160,48 @@ public void sendNotification(StreamingEventKind event) { throw new IllegalStateException("Unknown StreamingEventKind: " + event); } - private CompletableFuture dispatch(StreamingEventKind event, TaskPushNotificationConfig pushInfo) { - return CompletableFuture.supplyAsync(() -> dispatchNotification(event, pushInfo)); + private CompletableFuture dispatch(StreamingEventKind event, + @Nullable Task taskSnapshot, + TaskPushNotificationConfig pushInfo, + Map versionsByConfigKey) { + return CompletableFuture.supplyAsync(() -> dispatchNotification(event, taskSnapshot, pushInfo, versionsByConfigKey)); } - private boolean dispatchNotification(StreamingEventKind event, TaskPushNotificationConfig pushInfo) { + private boolean dispatchNotification(StreamingEventKind event, + @Nullable Task taskSnapshot, + TaskPushNotificationConfig pushInfo, + Map versionsByConfigKey) { String url = pushInfo.url(); String token = pushInfo.token(); + String taskId = pushInfo.taskId(); + String version = taskId != null ? versionsByConfigKey.get(taskId + ":" + pushInfo.id()) : null; + PushNotificationPayloadFormatter formatter = version != null + ? formattersByVersion.get(version) : null; + + String body; + if (formatter != null) { + try { + body = formatter.formatPayload(event, taskSnapshot); + } catch (Throwable throwable) { + LOGGER.error("Error formatting payload with {} formatter: {}", + version, throwable.getMessage(), throwable); + return false; + } + if (body == null) { + LOGGER.debug("Formatter for version {} returned null, skipping notification for {}", + version, url); + return true; + } + } else { + try { + body = JsonUtil.toJson(event); + } catch (Throwable throwable) { + LOGGER.error("Error serializing StreamingEventKind to JSON: {}", throwable.getMessage(), throwable); + return false; + } + } + A2AHttpClient.PostBuilder postBuilder = httpClient.createPost(); if (token != null && !token.isBlank()) { postBuilder.addHeader(X_A2A_NOTIFICATION_TOKEN, token); @@ -140,16 +211,6 @@ private boolean dispatchNotification(StreamingEventKind event, TaskPushNotificat pushInfo.authentication().scheme() + " " + pushInfo.authentication().credentials()); } - String body; - try { - // JsonUtil.toJson automatically wraps StreamingEventKind in StreamResponse format - // (task/message/statusUpdate/artifactUpdate) per A2A spec section 4.3.3 - body = JsonUtil.toJson(event); - } catch (Throwable throwable) { - LOGGER.error("Error serializing StreamingEventKind to JSON: {}", throwable.getMessage(), throwable); - return false; - } - try { postBuilder .url(url) diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/InMemoryPushNotificationConfigStore.java b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/InMemoryPushNotificationConfigStore.java index cd02cf5d6..4a32406a5 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/InMemoryPushNotificationConfigStore.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/InMemoryPushNotificationConfigStore.java @@ -10,10 +10,11 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; -import org.a2aproject.sdk.util.Assert; import org.a2aproject.sdk.spec.ListTaskPushNotificationConfigsParams; import org.a2aproject.sdk.spec.ListTaskPushNotificationConfigsResult; import org.a2aproject.sdk.spec.TaskPushNotificationConfig; +import org.a2aproject.sdk.util.Assert; +import org.jspecify.annotations.Nullable; /** * In-memory implementation of the PushNotificationConfigStore interface. @@ -24,6 +25,7 @@ public class InMemoryPushNotificationConfigStore implements PushNotificationConfigStore { private final Map> pushNotificationInfos = Collections.synchronizedMap(new HashMap<>()); + private final Map protocolVersions = Collections.synchronizedMap(new HashMap<>()); @Inject public InMemoryPushNotificationConfigStore() { @@ -52,6 +54,13 @@ public TaskPushNotificationConfig setInfo(TaskPushNotificationConfig notificatio return notificationConfig; } + @Override + public TaskPushNotificationConfig setInfo(TaskPushNotificationConfig config, @Nullable String protocolVersion) { + TaskPushNotificationConfig result = setInfo(config); + protocolVersions.put(result.taskId() + ":" + result.id(), PushNotificationConfigStore.resolveProtocolVersion(protocolVersion)); + return result; + } + @Override public ListTaskPushNotificationConfigsResult getInfo(ListTaskPushNotificationConfigsParams params) { List configs = pushNotificationInfos.get(params.id()); @@ -106,8 +115,14 @@ public void deleteInfo(String taskId, String configId) { break; } } + protocolVersions.remove(taskId + ":" + configId); if (notificationConfigList.isEmpty()) { pushNotificationInfos.remove(taskId); } } + + @Override + public @Nullable String getProtocolVersion(String taskId, String configId) { + return protocolVersions.get(taskId + ":" + configId); + } } diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationConfigStore.java b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationConfigStore.java index fd1460b9a..54f44768d 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationConfigStore.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationConfigStore.java @@ -3,6 +3,7 @@ import org.a2aproject.sdk.spec.ListTaskPushNotificationConfigsParams; import org.a2aproject.sdk.spec.ListTaskPushNotificationConfigsResult; import org.a2aproject.sdk.spec.TaskPushNotificationConfig; +import org.jspecify.annotations.Nullable; /** * Interface for storing and retrieving push notification configurations for tasks. @@ -81,6 +82,29 @@ public interface PushNotificationConfigStore { */ TaskPushNotificationConfig setInfo(TaskPushNotificationConfig notificationConfig); + /** + * Sets or updates the push notification configuration for a task, along with the + * protocol version that registered it. + *

+ * This merged method ensures the protocol version is stored using the normalized + * config ID (after {@link #setInfo(TaskPushNotificationConfig)} applies defaults). + *

+ * + * @param notificationConfig the task push notification configuration + * @param protocolVersion the protocol version string, or null to use {@link org.a2aproject.sdk.spec.AgentInterface#CURRENT_PROTOCOL_VERSION} + * @return the potentially updated configuration (with ID set if it was null) + */ + default TaskPushNotificationConfig setInfo(TaskPushNotificationConfig notificationConfig, @Nullable String protocolVersion) { + return setInfo(notificationConfig); + } + + /** + * Resolves the protocol version, defaulting null to the current protocol version. + */ + static String resolveProtocolVersion(@Nullable String protocolVersion) { + return protocolVersion != null ? protocolVersion : org.a2aproject.sdk.spec.AgentInterface.CURRENT_PROTOCOL_VERSION; + } + /** * Retrieves push notification configurations for a task with pagination support. *

@@ -119,4 +143,15 @@ public interface PushNotificationConfigStore { */ void deleteInfo(String taskId, String configId); + /** + * Gets the protocol version associated with a push notification configuration. + * + * @param taskId the task ID + * @param configId the push notification configuration ID + * @return the protocol version string, or null if not set + */ + default @Nullable String getProtocolVersion(String taskId, String configId) { + return null; + } + } diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationPayloadFormatter.java b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationPayloadFormatter.java new file mode 100644 index 000000000..6a75a96a7 --- /dev/null +++ b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationPayloadFormatter.java @@ -0,0 +1,12 @@ +package org.a2aproject.sdk.server.tasks; + +import org.a2aproject.sdk.spec.StreamingEventKind; +import org.a2aproject.sdk.spec.Task; +import org.jspecify.annotations.Nullable; + +public interface PushNotificationPayloadFormatter { + + String targetVersion(); + + @Nullable String formatPayload(StreamingEventKind event, @Nullable Task taskSnapshot); +} diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationSender.java b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationSender.java index 304fed683..bc3bf5fc4 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationSender.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationSender.java @@ -2,6 +2,7 @@ import org.a2aproject.sdk.spec.StreamingEventKind; import org.a2aproject.sdk.spec.Task; +import org.jspecify.annotations.Nullable; /** * Interface for delivering push notifications containing task state updates to external systems. @@ -52,7 +53,7 @@ * KafkaProducer producer; * * @Override - * public void sendNotification(StreamingEventKind event) { + * public void sendNotification(StreamingEventKind event, Task taskSnapshot) { * String taskId = extractTaskId(event); * producer.send("task-updates", taskId, event); * } @@ -79,30 +80,5 @@ */ public interface PushNotificationSender { - /** - * Sends a push notification containing a streaming event. - *

- * Called after the event has been persisted to {@link TaskStore}. The event is wrapped - * in a StreamResponse format (per A2A spec section 4.3.3) with the appropriate oneof - * field set (task, message, statusUpdate, or artifactUpdate). - *

- *

- * Retrieve push notification URLs or messaging configurations from - * {@link PushNotificationConfigStore} using the task ID extracted from the event. - *

- * Supported event types: - *
    - *
  • {@link Task} - wrapped in StreamResponse.task
  • - *
  • {@link org.a2aproject.sdk.spec.Message} - wrapped in StreamResponse.message
  • - *
  • {@link org.a2aproject.sdk.spec.TaskStatusUpdateEvent} - wrapped in StreamResponse.statusUpdate
  • - *
  • {@link org.a2aproject.sdk.spec.TaskArtifactUpdateEvent} - wrapped in StreamResponse.artifactUpdate
  • - *
- *

- * Error Handling: Log errors but don't throw exceptions. Notifications are - * best-effort and should not fail the primary request. - *

- * - * @param event the streaming event to send (Task, Message, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent) - */ - void sendNotification(StreamingEventKind event); + void sendNotification(StreamingEventKind event, @Nullable Task taskSnapshot); } diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/TaskManager.java b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/TaskManager.java index 53c929c26..0bbd399b9 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/TaskManager.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/TaskManager.java @@ -10,6 +10,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import org.a2aproject.sdk.spec.A2AError; import org.a2aproject.sdk.spec.A2AServerException; @@ -62,12 +63,25 @@ public TaskManager(@Nullable String taskId, @Nullable String contextId, TaskStor } boolean saveTaskEvent(Task task, boolean isReplicated) throws A2AServerException { + return saveTaskEvent(task, isReplicated, null); + } + + boolean saveTaskEvent(Task task, boolean isReplicated, @Nullable AtomicReference taskSnapshot) + throws A2AServerException { checkIdsAndUpdateIfNecessary(task.id(), task.contextId()); Task savedTask = saveTask(task, isReplicated); + if (taskSnapshot != null) { + taskSnapshot.set(savedTask); + } return savedTask.status() != null && savedTask.status().state() != null && savedTask.status().state().isFinal(); } boolean saveTaskEvent(TaskStatusUpdateEvent event, boolean isReplicated) throws A2AServerException { + return saveTaskEvent(event, isReplicated, null); + } + + boolean saveTaskEvent(TaskStatusUpdateEvent event, boolean isReplicated, @Nullable AtomicReference taskSnapshot) + throws A2AServerException { checkIdsAndUpdateIfNecessary(event.taskId(), event.contextId()); Task task = ensureTask(event.taskId(), event.contextId()); @@ -90,10 +104,18 @@ boolean saveTaskEvent(TaskStatusUpdateEvent event, boolean isReplicated) throws task = builder.build(); Task savedTask = saveTask(task, isReplicated); + if (taskSnapshot != null) { + taskSnapshot.set(savedTask); + } return savedTask.status() != null && savedTask.status().state() != null && savedTask.status().state().isFinal(); } boolean saveTaskEvent(TaskArtifactUpdateEvent event, boolean isReplicated) throws A2AServerException { + return saveTaskEvent(event, isReplicated, null); + } + + boolean saveTaskEvent(TaskArtifactUpdateEvent event, boolean isReplicated, @Nullable AtomicReference taskSnapshot) + throws A2AServerException { checkIdsAndUpdateIfNecessary(event.taskId(), event.contextId()); Task task = ensureTask(event.taskId(), event.contextId()); // taskId is guaranteed to be non-null after checkIdsAndUpdateIfNecessary @@ -103,17 +125,25 @@ boolean saveTaskEvent(TaskArtifactUpdateEvent event, boolean isReplicated) throw } task = appendArtifactToTask(task, event, nonNullTaskId); Task savedTask = saveTask(task, isReplicated); + if (taskSnapshot != null) { + taskSnapshot.set(savedTask); + } return savedTask.status() != null && savedTask.status().state() != null && savedTask.status().state().isFinal(); } public boolean process(Event event, boolean isReplicated) throws A2AServerException { + return process(event, isReplicated, null); + } + + public boolean process(Event event, boolean isReplicated, @Nullable AtomicReference taskSnapshot) + throws A2AServerException { boolean isFinal = false; if (event instanceof Task task) { - isFinal = saveTaskEvent(task, isReplicated); + isFinal = saveTaskEvent(task, isReplicated, taskSnapshot); } else if (event instanceof TaskStatusUpdateEvent taskStatusUpdateEvent) { - isFinal = saveTaskEvent(taskStatusUpdateEvent, isReplicated); + isFinal = saveTaskEvent(taskStatusUpdateEvent, isReplicated, taskSnapshot); } else if (event instanceof TaskArtifactUpdateEvent taskArtifactUpdateEvent) { - isFinal = saveTaskEvent(taskArtifactUpdateEvent, isReplicated); + isFinal = saveTaskEvent(taskArtifactUpdateEvent, isReplicated, taskSnapshot); } else if (event instanceof A2AError) { // A2AError events trigger automatic transition to FAILED state // Error details are NOT persisted in TaskStore (client-specific) @@ -144,7 +174,7 @@ public boolean process(Event event, boolean isReplicated) throws A2AServerExcept .contextId(errorContextId) .status(new TaskStatus(TASK_STATE_FAILED)) .build(); - isFinal = saveTaskEvent(failedEvent, isReplicated); + isFinal = saveTaskEvent(failedEvent, isReplicated, taskSnapshot); } else { // Can't update status without contextId, but error is still terminal LOGGER.debug("A2AError event for task {} without contextId - skipping state update", taskId); diff --git a/server-common/src/test/java/org/a2aproject/sdk/server/events/EventConsumerTest.java b/server-common/src/test/java/org/a2aproject/sdk/server/events/EventConsumerTest.java index eee402df5..41389c3ec 100644 --- a/server-common/src/test/java/org/a2aproject/sdk/server/events/EventConsumerTest.java +++ b/server-common/src/test/java/org/a2aproject/sdk/server/events/EventConsumerTest.java @@ -35,7 +35,7 @@ public class EventConsumerTest { - private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = (event, snapshot) -> {}; private static final String TASK_ID = "123"; // Must match MINIMAL_TASK id private EventQueue eventQueue; diff --git a/server-common/src/test/java/org/a2aproject/sdk/server/events/EventQueueTest.java b/server-common/src/test/java/org/a2aproject/sdk/server/events/EventQueueTest.java index 11fade0a2..3b49f6524 100644 --- a/server-common/src/test/java/org/a2aproject/sdk/server/events/EventQueueTest.java +++ b/server-common/src/test/java/org/a2aproject/sdk/server/events/EventQueueTest.java @@ -55,7 +55,7 @@ public class EventQueueTest { } """; - private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = (event, snapshot) -> {}; @BeforeEach public void init() { diff --git a/server-common/src/test/java/org/a2aproject/sdk/server/events/InMemoryQueueManagerTest.java b/server-common/src/test/java/org/a2aproject/sdk/server/events/InMemoryQueueManagerTest.java index 3f2be111e..71dbaa868 100644 --- a/server-common/src/test/java/org/a2aproject/sdk/server/events/InMemoryQueueManagerTest.java +++ b/server-common/src/test/java/org/a2aproject/sdk/server/events/InMemoryQueueManagerTest.java @@ -28,7 +28,7 @@ public class InMemoryQueueManagerTest { private InMemoryTaskStore taskStore; private MainEventBus mainEventBus; private MainEventBusProcessor mainEventBusProcessor; - private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = (event, snapshot) -> {}; @BeforeEach public void setUp() { diff --git a/server-common/src/test/java/org/a2aproject/sdk/server/events/MainEventBusProcessorExceptionTest.java b/server-common/src/test/java/org/a2aproject/sdk/server/events/MainEventBusProcessorExceptionTest.java index 986958a14..23264977b 100644 --- a/server-common/src/test/java/org/a2aproject/sdk/server/events/MainEventBusProcessorExceptionTest.java +++ b/server-common/src/test/java/org/a2aproject/sdk/server/events/MainEventBusProcessorExceptionTest.java @@ -45,7 +45,7 @@ */ public class MainEventBusProcessorExceptionTest { - private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = (event, snapshot) -> {}; private static final String TASK_ID = "test-task-123"; private MainEventBus mainEventBus; diff --git a/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/AbstractA2ARequestHandlerTest.java b/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/AbstractA2ARequestHandlerTest.java index 7ee97b094..0934808c2 100644 --- a/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/AbstractA2ARequestHandlerTest.java +++ b/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/AbstractA2ARequestHandlerTest.java @@ -69,7 +69,7 @@ public class AbstractA2ARequestHandlerTest { private static final String PREFERRED_TRANSPORT = "preferred-transport"; private static final String A2A_REQUESTHANDLER_TEST_PROPERTIES = "/a2a-requesthandler-test.properties"; - private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = (event, snapshot) -> {}; protected AgentExecutor executor; protected TaskStore taskStore; diff --git a/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandlerTest.java b/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandlerTest.java index 8914586f5..be84073ba 100644 --- a/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandlerTest.java +++ b/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandlerTest.java @@ -8,9 +8,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -36,9 +39,11 @@ import org.a2aproject.sdk.spec.Message; import org.a2aproject.sdk.spec.MessageSendConfiguration; import org.a2aproject.sdk.spec.MessageSendParams; +import org.a2aproject.sdk.spec.StreamingEventKind; import org.a2aproject.sdk.spec.Task; import org.a2aproject.sdk.spec.TaskArtifactUpdateEvent; import org.a2aproject.sdk.spec.TaskNotFoundError; +import org.a2aproject.sdk.spec.TaskPushNotificationConfig; import org.a2aproject.sdk.spec.TaskState; import org.a2aproject.sdk.spec.TaskStatus; import org.a2aproject.sdk.spec.TaskStatusUpdateEvent; @@ -72,11 +77,12 @@ public class DefaultRequestHandlerTest { .parts(new TextPart("test message")) .build(); - private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = (event, snapshot) -> {}; // Test infrastructure components protected AgentExecutor executor; protected TaskStore taskStore; + protected PushNotificationConfigStore pushConfigStore; protected RequestHandler requestHandler; protected InMemoryQueueManager queueManager; protected MainEventBus mainEventBus; @@ -109,7 +115,7 @@ public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2A InMemoryTaskStore inMemoryTaskStore = new InMemoryTaskStore(); taskStore = inMemoryTaskStore; - PushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore(); + pushConfigStore = new InMemoryPushNotificationConfigStore(); // Create MainEventBus and MainEventBusProcessor mainEventBus = new MainEventBus(); @@ -905,4 +911,213 @@ void testSendMessage_FollowUpWithTaskIdOnly_PreservesOriginalContextId() throws // Cleanup: release the first agent releaseFirstAgent.countDown(); } + + // ── Protocol version propagation tests ────────────────────────────────────── + + private static ServerCallContext contextWithVersion(String version) { + return new ServerCallContext(null, Map.of(), Set.of(), version); + } + + /** + * Verify that onCreateTaskPushNotificationConfig stores the protocol version + * from the ServerCallContext in the PushNotificationConfigStore. + */ + @Test + void testVersionStored_OnCreateTaskPushNotificationConfig() throws Exception { + // Arrange: create a task directly in the store so the handler can find it + String taskId = "version-test-task-1"; + Task task = new Task( + taskId, + "ctx-1", + new TaskStatus(TaskState.TASK_STATE_WORKING), + null, + null, + null + ); + taskStore.save(task, false); + + TaskPushNotificationConfig pushConfig = TaskPushNotificationConfig.builder() + .id("") + .taskId(taskId) + .url("http://example.com/webhook") + .build(); + + // Act + requestHandler.onCreateTaskPushNotificationConfig(pushConfig, contextWithVersion("1.0")); + + // Assert: version is stored; configId defaults to taskId when id is empty + assertEquals("1.0", pushConfigStore.getProtocolVersion(taskId, taskId), + "Protocol version should be stored for the push notification config"); + } + + /** + * Verify that onMessageSend stores the protocol version when the request + * includes a push notification config (new task path). + */ + @Test + void testVersionStored_OnMessageSend_NewTask() throws Exception { + // Arrange: agent completes immediately + agentExecutorExecute = (context, emitter) -> emitter.complete(); + + TaskPushNotificationConfig pushConfig = TaskPushNotificationConfig.builder() + .id("") + .url("http://example.com/webhook") + .build(); + MessageSendConfiguration config = MessageSendConfiguration.builder() + .returnImmediately(true) + .acceptedOutputModes(List.of()) + .taskPushNotificationConfig(pushConfig) + .build(); + MessageSendParams params = MessageSendParams.builder() + .message(MESSAGE) + .configuration(config) + .build(); + + // Act + EventKind eventKind = requestHandler.onMessageSend(params, contextWithVersion("1.0")); + + // Assert + assertInstanceOf(Task.class, eventKind); + Task result = (Task) eventKind; + String taskId = result.id(); + + assertEquals("1.0", pushConfigStore.getProtocolVersion(taskId, taskId), + "Protocol version should be stored when push config is provided via onMessageSend"); + } + + /** + * Verify that onMessageSend stores the protocol version when the request + * includes a push notification config on a follow-up to an existing task. + */ + @Test + void testVersionStored_OnMessageSend_ExistingTask() throws Exception { + // Arrange: create an initial task (no push config) — agent stays active + CountDownLatch agentStarted = new CountDownLatch(1); + CountDownLatch agentRelease = new CountDownLatch(1); + + agentExecutorExecute = (context, emitter) -> { + emitter.startWork(); + agentStarted.countDown(); + try { + agentRelease.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + emitter.complete(); + }; + + MessageSendParams initialParams = MessageSendParams.builder() + .message(MESSAGE) + .configuration(DEFAULT_CONFIG) + .build(); + + EventKind initialResult = requestHandler.onMessageSend(initialParams, NULL_CONTEXT); + assertInstanceOf(Task.class, initialResult); + Task existingTask = (Task) initialResult; + assertTrue(agentStarted.await(5, TimeUnit.SECONDS), "Agent should start"); + + try { + // Now set up agent for the follow-up + agentExecutorExecute = (context, emitter) -> emitter.complete(); + + // Follow-up message WITH push config and version context + TaskPushNotificationConfig pushConfig = TaskPushNotificationConfig.builder() + .id("") + .url("http://example.com/webhook") + .build(); + MessageSendConfiguration followUpConfig = MessageSendConfiguration.builder() + .returnImmediately(true) + .acceptedOutputModes(List.of()) + .taskPushNotificationConfig(pushConfig) + .build(); + Message followUpMsg = Message.builder() + .messageId("follow-up-version-test") + .role(Message.Role.ROLE_USER) + .taskId(existingTask.id()) + .parts(new TextPart("follow up")) + .build(); + MessageSendParams followUpParams = MessageSendParams.builder() + .message(followUpMsg) + .configuration(followUpConfig) + .build(); + + // Act + EventKind result = requestHandler.onMessageSend(followUpParams, contextWithVersion("1.0")); + + // Assert + assertInstanceOf(Task.class, result); + String taskId = existingTask.id(); + assertEquals("1.0", pushConfigStore.getProtocolVersion(taskId, taskId), + "Protocol version should be stored for push config on existing task"); + } finally { + agentRelease.countDown(); + } + } + + /** + * Verify that onMessageSendStream stores the protocol version when the request + * includes a push notification config (new task, streaming path). + */ + @Test + void testVersionStored_OnMessageSendStream_NewTask() throws Exception { + // Arrange: agent completes immediately + agentExecutorExecute = (context, emitter) -> emitter.complete(); + + TaskPushNotificationConfig pushConfig = TaskPushNotificationConfig.builder() + .id("") + .url("http://example.com/webhook") + .build(); + MessageSendConfiguration config = MessageSendConfiguration.builder() + .returnImmediately(true) + .acceptedOutputModes(List.of()) + .taskPushNotificationConfig(pushConfig) + .build(); + MessageSendParams params = MessageSendParams.builder() + .message(MESSAGE) + .configuration(config) + .build(); + + // Act + Flow.Publisher publisher = requestHandler.onMessageSendStream( + params, contextWithVersion("1.0")); + + AtomicReference taskIdRef = new AtomicReference<>(); + CountDownLatch streamDone = new CountDownLatch(1); + publisher.subscribe(new Flow.Subscriber<>() { + Flow.Subscription subscription; + + @Override + public void onSubscribe(Flow.Subscription s) { + subscription = s; + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(StreamingEventKind item) { + if (item instanceof Task t) { + taskIdRef.set(t.id()); + } else if (item instanceof TaskStatusUpdateEvent e) { + taskIdRef.set(e.taskId()); + } + } + + @Override + public void onError(Throwable t) { + streamDone.countDown(); + } + + @Override + public void onComplete() { + streamDone.countDown(); + } + }); + + assertTrue(streamDone.await(5, TimeUnit.SECONDS), "Stream should complete"); + String taskId = taskIdRef.get(); + assertNotNull(taskId, "Should have received a task ID from the stream"); + + // Assert + assertEquals("1.0", pushConfigStore.getProtocolVersion(taskId, taskId), + "Protocol version should be stored when push config is provided via onMessageSendStream"); + } } diff --git a/server-common/src/test/java/org/a2aproject/sdk/server/tasks/AgentEmitterTest.java b/server-common/src/test/java/org/a2aproject/sdk/server/tasks/AgentEmitterTest.java index e69863286..b63f9b54c 100644 --- a/server-common/src/test/java/org/a2aproject/sdk/server/tasks/AgentEmitterTest.java +++ b/server-common/src/test/java/org/a2aproject/sdk/server/tasks/AgentEmitterTest.java @@ -43,7 +43,7 @@ public class AgentEmitterTest { private static final List> SAMPLE_PARTS = List.of(new TextPart("Test message")); - private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = (event, snapshot) -> {}; EventQueue eventQueue; private MainEventBus mainEventBus; diff --git a/server-common/src/test/java/org/a2aproject/sdk/server/tasks/InMemoryPushNotificationConfigStoreTest.java b/server-common/src/test/java/org/a2aproject/sdk/server/tasks/InMemoryPushNotificationConfigStoreTest.java index b407c9f33..ecc7afed7 100644 --- a/server-common/src/test/java/org/a2aproject/sdk/server/tasks/InMemoryPushNotificationConfigStoreTest.java +++ b/server-common/src/test/java/org/a2aproject/sdk/server/tasks/InMemoryPushNotificationConfigStoreTest.java @@ -249,7 +249,7 @@ public void testSendNotificationSuccess() throws Exception { // Mock successful HTTP response setupBasicMockHttpResponse(); - notificationSender.sendNotification(task); + notificationSender.sendNotification(task, null); // Verify HTTP client was called ArgumentCaptor bodyCaptor = ArgumentCaptor.forClass(String.class); @@ -279,7 +279,7 @@ public void testSendNotificationWithToken() throws Exception { when(mockPostBuilder.post()).thenReturn(mockHttpResponse); when(mockHttpResponse.success()).thenReturn(true); - notificationSender.sendNotification(task); + notificationSender.sendNotification(task, null); // Verify HTTP client was called with proper authentication ArgumentCaptor bodyCaptor = ArgumentCaptor.forClass(String.class); @@ -301,7 +301,7 @@ public void testSendNotificationNoConfig() throws Exception { String taskId = "task_send_no_config"; Task task = createSampleTask(taskId, TaskState.TASK_STATE_COMPLETED); - notificationSender.sendNotification(task); + notificationSender.sendNotification(task, null); // Verify HTTP client was never called verify(mockHttpClient, never()).createPost(); @@ -315,7 +315,7 @@ public void testSendNotificationWithEmptyToken() throws Exception { configStore.setInfo(config); setupBasicMockHttpResponse(); - notificationSender.sendNotification(task); + notificationSender.sendNotification(task, null); verifyHttpCallWithoutToken(config, task, ""); } @@ -327,7 +327,7 @@ public void testSendNotificationWithBlankToken() throws Exception { configStore.setInfo(config); setupBasicMockHttpResponse(); - notificationSender.sendNotification(task); + notificationSender.sendNotification(task, null); verifyHttpCallWithoutToken(config, task, " "); } @@ -593,6 +593,39 @@ public void testPaginationEmptyTaskWithPageSize() { assertNull(result.nextPageToken(), "Should not have nextPageToken for empty result"); } + @Test + public void testSetAndGetProtocolVersion() { + String taskId = "task1"; + String configId = "cfg1"; + TaskPushNotificationConfig config = TaskPushNotificationConfig.builder() + .id(configId).taskId(taskId).url("http://example.com/hook").build(); + configStore.setInfo(config); + + assertNull(configStore.getProtocolVersion(taskId, configId)); + + configStore.setInfo(config, "0.3"); + assertEquals("0.3", configStore.getProtocolVersion(taskId, configId)); + } + + @Test + public void testDeleteInfoCleansUpProtocolVersion() { + String taskId = "task1"; + String configId = "cfg1"; + TaskPushNotificationConfig config = TaskPushNotificationConfig.builder() + .id(configId).taskId(taskId).url("http://example.com/hook").build(); + configStore.setInfo(config, "0.3"); + + assertEquals("0.3", configStore.getProtocolVersion(taskId, configId)); + + configStore.deleteInfo(taskId, configId); + assertNull(configStore.getProtocolVersion(taskId, configId)); + } + + @Test + public void testGetProtocolVersionReturnsNullForUnknownConfig() { + assertNull(configStore.getProtocolVersion("nonexistent", "nonexistent")); + } + @Test public void testPaginationFullIteration() { String taskId = "task_pagination_full"; diff --git a/server-common/src/test/java/org/a2aproject/sdk/server/tasks/PushNotificationSenderTest.java b/server-common/src/test/java/org/a2aproject/sdk/server/tasks/PushNotificationSenderTest.java index 979a95a9f..1c3cef00e 100644 --- a/server-common/src/test/java/org/a2aproject/sdk/server/tasks/PushNotificationSenderTest.java +++ b/server-common/src/test/java/org/a2aproject/sdk/server/tasks/PushNotificationSenderTest.java @@ -18,6 +18,7 @@ import org.a2aproject.sdk.client.http.A2AHttpClient; import org.a2aproject.sdk.client.http.A2AHttpResponse; +import org.jspecify.annotations.Nullable; import org.a2aproject.sdk.common.A2AHeaders; import org.a2aproject.sdk.jsonrpc.common.json.JsonProcessingException; import org.a2aproject.sdk.jsonrpc.common.json.JsonUtil; @@ -163,7 +164,7 @@ private void testSendNotificationWithInvalidToken(String token, String testName) // Set up latch to wait for async completion testHttpClient.latch = new CountDownLatch(1); - sender.sendNotification(taskData); + sender.sendNotification(taskData, null); // Wait for the async operation to complete assertTrue(testHttpClient.latch.await(5, TimeUnit.SECONDS), "HTTP call should complete within 5 seconds"); @@ -217,7 +218,7 @@ public void testSendNotificationSuccess() throws InterruptedException { // Set up latch to wait for async completion testHttpClient.latch = new CountDownLatch(1); - sender.sendNotification(taskData); + sender.sendNotification(taskData, null); // Wait for the async operation to complete assertTrue(testHttpClient.latch.await(5, TimeUnit.SECONDS), "HTTP call should complete within 5 seconds"); @@ -248,7 +249,7 @@ public void testSendNotificationWithTokenSuccess() throws InterruptedException { // Set up latch to wait for async completion testHttpClient.latch = new CountDownLatch(1); - sender.sendNotification(taskData); + sender.sendNotification(taskData, null); // Wait for the async operation to complete assertTrue(testHttpClient.latch.await(5, TimeUnit.SECONDS), "HTTP call should complete within 5 seconds"); @@ -278,7 +279,7 @@ public void testSendNotificationNoConfig() { Task taskData = createSampleTask(taskId, TaskState.TASK_STATE_COMPLETED); // Don't set any configuration in the store - sender.sendNotification(taskData); + sender.sendNotification(taskData, null); // Verify no HTTP calls were made assertEquals(0, testHttpClient.events.size()); @@ -308,7 +309,7 @@ public void testSendNotificationMultipleConfigs() throws InterruptedException { // Set up latch to wait for async completion (2 calls expected) testHttpClient.latch = new CountDownLatch(2); - sender.sendNotification(taskData); + sender.sendNotification(taskData, null); // Wait for the async operations to complete assertTrue(testHttpClient.latch.await(5, TimeUnit.SECONDS), "HTTP calls should complete within 5 seconds"); @@ -341,7 +342,7 @@ public void testSendNotificationHttpError() { testHttpClient.shouldThrowException = true; // This should not throw an exception - errors should be handled gracefully - sender.sendNotification(taskData); + sender.sendNotification(taskData, null); // Verify no events were successfully processed due to the error assertEquals(0, testHttpClient.events.size()); @@ -363,7 +364,7 @@ public void testSendNotificationMessage() throws InterruptedException { // Set up latch to wait for async completion testHttpClient.latch = new CountDownLatch(1); - sender.sendNotification(message); + sender.sendNotification(message, null); // Wait for the async operation to complete assertTrue(testHttpClient.latch.await(5, TimeUnit.SECONDS), "HTTP call should complete within 5 seconds"); @@ -396,7 +397,7 @@ public void testSendNotificationTaskStatusUpdate() throws InterruptedException { // Set up latch to wait for async completion testHttpClient.latch = new CountDownLatch(1); - sender.sendNotification(statusUpdate); + sender.sendNotification(statusUpdate, null); // Wait for the async operation to complete assertTrue(testHttpClient.latch.await(5, TimeUnit.SECONDS), "HTTP call should complete within 5 seconds"); @@ -435,7 +436,7 @@ public void testSendNotificationTaskArtifactUpdate() throws InterruptedException // Set up latch to wait for async completion testHttpClient.latch = new CountDownLatch(1); - sender.sendNotification(artifactUpdate); + sender.sendNotification(artifactUpdate, null); // Wait for the async operation to complete assertTrue(testHttpClient.latch.await(5, TimeUnit.SECONDS), "HTTP call should complete within 5 seconds"); @@ -451,4 +452,67 @@ public void testSendNotificationTaskArtifactUpdate() throws InterruptedException String rawBody = testHttpClient.rawBodies.get(0); assertTrue(rawBody.contains("\"artifactUpdate\""), "Raw body should contain 'artifactUpdate' discriminator for StreamResponse"); } + + @Test + public void testSendNotificationUsesFormatterForVersionedConfig() throws InterruptedException { + String taskId = "task_formatter_test"; + Task taskData = createSampleTask(taskId, TaskState.TASK_STATE_COMPLETED); + + TaskPushNotificationConfig config = TaskPushNotificationConfig.builder() + .url("http://notify.me/here") + .id("cfg1") + .taskId(taskId) + .build(); + configStore.setInfo(config, "0.3"); + + PushNotificationPayloadFormatter formatter = new PushNotificationPayloadFormatter() { + @Override + public String targetVersion() { return "0.3"; } + + @Override + public @Nullable String formatPayload(StreamingEventKind event, @Nullable Task snapshot) { + return "{\"id\":\"" + taskId + "\",\"kind\":\"task\",\"formatted\":true}"; + } + }; + + BasePushNotificationSender formatterSender = new BasePushNotificationSender( + configStore, testHttpClient, List.of(formatter)); + testHttpClient.latch = new CountDownLatch(1); + + formatterSender.sendNotification(taskData, taskData); + + assertTrue(testHttpClient.latch.await(5, TimeUnit.SECONDS)); + assertEquals(1, testHttpClient.rawBodies.size()); + assertTrue(testHttpClient.rawBodies.get(0).contains("\"formatted\":true")); + } + + @Test + public void testSendNotificationSkipsWhenFormatterReturnsNull() throws InterruptedException { + String taskId = "task_formatter_skip"; + Task taskData = createSampleTask(taskId, TaskState.TASK_STATE_COMPLETED); + + TaskPushNotificationConfig config = TaskPushNotificationConfig.builder() + .url("http://notify.me/here") + .id("cfg1") + .taskId(taskId) + .build(); + configStore.setInfo(config, "0.3"); + + PushNotificationPayloadFormatter formatter = new PushNotificationPayloadFormatter() { + @Override + public String targetVersion() { return "0.3"; } + + @Override + public @Nullable String formatPayload(StreamingEventKind event, @Nullable Task snapshot) { + return null; + } + }; + + BasePushNotificationSender formatterSender = new BasePushNotificationSender( + configStore, testHttpClient, List.of(formatter)); + + formatterSender.sendNotification(taskData, taskData); + + assertTrue(testHttpClient.rawBodies.isEmpty()); + } } diff --git a/server-common/src/test/java/org/a2aproject/sdk/server/tasks/ResultAggregatorTest.java b/server-common/src/test/java/org/a2aproject/sdk/server/tasks/ResultAggregatorTest.java index e641c5631..7b206e01a 100644 --- a/server-common/src/test/java/org/a2aproject/sdk/server/tasks/ResultAggregatorTest.java +++ b/server-common/src/test/java/org/a2aproject/sdk/server/tasks/ResultAggregatorTest.java @@ -248,7 +248,7 @@ void testConsumeAndBreakNonBlocking() throws Exception { InMemoryTaskStore taskStore = new InMemoryTaskStore(); InMemoryQueueManager queueManager = new InMemoryQueueManager(new MockTaskStateProvider(), mainEventBus); - MainEventBusProcessor processor = new MainEventBusProcessor(mainEventBus, taskStore, task -> {}, queueManager); + MainEventBusProcessor processor = new MainEventBusProcessor(mainEventBus, taskStore, (event, snapshot) -> {}, queueManager); EventQueueUtil.start(processor); EventQueue queue = queueManager.getEventQueueBuilder(taskId).build().tap(); @@ -296,7 +296,7 @@ void testConsumeAndBreakOnAuthRequired_Blocking() throws Exception { InMemoryTaskStore taskStore = new InMemoryTaskStore(); InMemoryQueueManager queueManager = new InMemoryQueueManager(new MockTaskStateProvider(), mainEventBus); - MainEventBusProcessor processor = new MainEventBusProcessor(mainEventBus, taskStore, task -> {}, queueManager); + MainEventBusProcessor processor = new MainEventBusProcessor(mainEventBus, taskStore, (event, snapshot) -> {}, queueManager); EventQueueUtil.start(processor); EventQueue queue = queueManager.getEventQueueBuilder(taskId).build().tap(); @@ -338,7 +338,7 @@ void testConsumeAndBreakOnAuthRequired_NonBlocking() throws Exception { InMemoryTaskStore taskStore = new InMemoryTaskStore(); InMemoryQueueManager queueManager = new InMemoryQueueManager(new MockTaskStateProvider(), mainEventBus); - MainEventBusProcessor processor = new MainEventBusProcessor(mainEventBus, taskStore, task -> {}, queueManager); + MainEventBusProcessor processor = new MainEventBusProcessor(mainEventBus, taskStore, (event, snapshot) -> {}, queueManager); EventQueueUtil.start(processor); EventQueue queue = queueManager.getEventQueueBuilder(taskId).build().tap(); @@ -385,7 +385,7 @@ void testAuthRequiredWithTaskStatusUpdateEvent() throws Exception { InMemoryTaskStore taskStore = new InMemoryTaskStore(); InMemoryQueueManager queueManager = new InMemoryQueueManager(new MockTaskStateProvider(), mainEventBus); - MainEventBusProcessor processor = new MainEventBusProcessor(mainEventBus, taskStore, task -> {}, queueManager); + MainEventBusProcessor processor = new MainEventBusProcessor(mainEventBus, taskStore, (event, snapshot) -> {}, queueManager); EventQueueUtil.start(processor); EventQueue queue = queueManager.getEventQueueBuilder(taskId).build().tap(); @@ -431,7 +431,7 @@ void testAuthRequiredWithTaskEvent() throws Exception { InMemoryTaskStore taskStore = new InMemoryTaskStore(); InMemoryQueueManager queueManager = new InMemoryQueueManager(new MockTaskStateProvider(), mainEventBus); - MainEventBusProcessor processor = new MainEventBusProcessor(mainEventBus, taskStore, task -> {}, queueManager); + MainEventBusProcessor processor = new MainEventBusProcessor(mainEventBus, taskStore, (event, snapshot) -> {}, queueManager); EventQueueUtil.start(processor); EventQueue queue = queueManager.getEventQueueBuilder(taskId).build().tap(); diff --git a/server-common/src/test/java/org/a2aproject/sdk/server/tasks/TaskManagerSnapshotTest.java b/server-common/src/test/java/org/a2aproject/sdk/server/tasks/TaskManagerSnapshotTest.java new file mode 100644 index 000000000..0e54caf8f --- /dev/null +++ b/server-common/src/test/java/org/a2aproject/sdk/server/tasks/TaskManagerSnapshotTest.java @@ -0,0 +1,75 @@ +package org.a2aproject.sdk.server.tasks; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.concurrent.atomic.AtomicReference; + +import org.a2aproject.sdk.spec.Task; +import org.a2aproject.sdk.spec.TaskState; +import org.a2aproject.sdk.spec.TaskStatus; +import org.a2aproject.sdk.spec.TaskStatusUpdateEvent; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class TaskManagerSnapshotTest { + + private InMemoryTaskStore taskStore; + + @BeforeEach + void setUp() { + taskStore = new InMemoryTaskStore(); + } + + @Test + void snapshotCapturedForTaskEvent() throws Exception { + Task task = Task.builder() + .id("t1").contextId("c1") + .status(new TaskStatus(TaskState.TASK_STATE_SUBMITTED)) + .build(); + + AtomicReference snapshot = new AtomicReference<>(); + TaskManager tm = new TaskManager("t1", "c1", taskStore, null); + tm.process(task, false, snapshot); + + assertNotNull(snapshot.get()); + assertEquals("t1", snapshot.get().id()); + } + + @Test + void snapshotCapturedForStatusUpdateEvent() throws Exception { + // Seed a task first + Task task = Task.builder() + .id("t1").contextId("c1") + .status(new TaskStatus(TaskState.TASK_STATE_SUBMITTED)) + .build(); + taskStore.save(task, false); + + TaskStatusUpdateEvent event = TaskStatusUpdateEvent.builder() + .taskId("t1").contextId("c1") + .status(new TaskStatus(TaskState.TASK_STATE_WORKING)) + .build(); + + AtomicReference snapshot = new AtomicReference<>(); + TaskManager tm = new TaskManager("t1", "c1", taskStore, null); + tm.process(event, false, snapshot); + + assertNotNull(snapshot.get()); + assertEquals(TaskState.TASK_STATE_WORKING, snapshot.get().status().state()); + } + + @Test + void snapshotNullWhenNotProvided() throws Exception { + Task task = Task.builder() + .id("t1").contextId("c1") + .status(new TaskStatus(TaskState.TASK_STATE_SUBMITTED)) + .build(); + + TaskManager tm = new TaskManager("t1", "c1", taskStore, null); + // Old signature still works, no snapshot + tm.process(task, false); + + // Just confirm the old method doesn't crash + assertNotNull(taskStore.get("t1")); + } +} From f1cb0f0b5044688bc0d3cb1d363dacc3429f93c3 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Wed, 6 May 2026 17:24:00 +0100 Subject: [PATCH 2/2] Review fixes --- ...paDatabasePushNotificationConfigStore.java | 15 +++++++++++++ .../server/events/MainEventBusProcessor.java | 2 +- .../tasks/BasePushNotificationSender.java | 22 +++++-------------- .../InMemoryPushNotificationConfigStore.java | 12 ++++++++++ .../tasks/PushNotificationConfigStore.java | 10 +++++++++ 5 files changed, 44 insertions(+), 17 deletions(-) diff --git a/extras/push-notification-config-store-database-jpa/src/main/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java b/extras/push-notification-config-store-database-jpa/src/main/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java index 83a53cb81..84421f2ca 100644 --- a/extras/push-notification-config-store-database-jpa/src/main/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java +++ b/extras/push-notification-config-store-database-jpa/src/main/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java @@ -181,4 +181,19 @@ public void deleteInfo(String taskId, String configId) { return jpaConfig != null ? jpaConfig.getProtocolVersion() : null; } + @Transactional + @Override + public java.util.Map getProtocolVersions(String taskId) { + List results = em.createQuery( + "SELECT c.id.configId, c.protocolVersion FROM JpaPushNotificationConfig c " + + "WHERE c.id.taskId = :taskId AND c.protocolVersion IS NOT NULL", Object[].class) + .setParameter("taskId", taskId) + .getResultList(); + java.util.Map versions = new java.util.HashMap<>(); + for (Object[] row : results) { + versions.put((String) row[0], (String) row[1]); + } + return versions; + } + } diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java b/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java index 006252125..eab991ff7 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java @@ -242,7 +242,7 @@ private void processEvent(MainEventBusContext context) { // Skip push notifications for replicated events to avoid duplicate notifications in multi-instance deployments // Push notifications are sent for all StreamingEventKind events (Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent) // per A2A spec section 4.3.3 - if (!isReplicated && event instanceof StreamingEventKind streamingEvent) { + if (!isReplicated && eventToDistribute == event && event instanceof StreamingEventKind streamingEvent) { // Send the streaming event directly - it will be wrapped in StreamResponse format by PushNotificationSender sendPushNotification(taskId, streamingEvent, updateResult != null ? updateResult.taskSnapshot() : null); } diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/BasePushNotificationSender.java b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/BasePushNotificationSender.java index 02d40c980..5605c9001 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/BasePushNotificationSender.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/BasePushNotificationSender.java @@ -110,20 +110,11 @@ public void sendNotification(StreamingEventKind event, @Nullable Task taskSnapsh nextPageToken = pageResult.nextPageToken(); } while (nextPageToken != null); - Map versionsByConfigKey = new HashMap<>(); - for (TaskPushNotificationConfig config : configs) { - String configTaskId = config.taskId(); - if (configTaskId != null) { - String version = configStore.getProtocolVersion(configTaskId, config.id()); - if (version != null) { - versionsByConfigKey.put(configTaskId + ":" + config.id(), version); - } - } - } + Map versionsByConfigId = configStore.getProtocolVersions(taskId); List> dispatchResults = configs .stream() - .map(pushConfig -> dispatch(event, taskSnapshot, pushConfig, versionsByConfigKey)) + .map(pushConfig -> dispatch(event, taskSnapshot, pushConfig, versionsByConfigId)) .toList(); CompletableFuture allFutures = CompletableFuture.allOf(dispatchResults.toArray(new CompletableFuture[0])); CompletableFuture dispatchResult = allFutures.thenApply(v -> dispatchResults.stream() @@ -163,19 +154,18 @@ public void sendNotification(StreamingEventKind event, @Nullable Task taskSnapsh private CompletableFuture dispatch(StreamingEventKind event, @Nullable Task taskSnapshot, TaskPushNotificationConfig pushInfo, - Map versionsByConfigKey) { - return CompletableFuture.supplyAsync(() -> dispatchNotification(event, taskSnapshot, pushInfo, versionsByConfigKey)); + Map versionsByConfigId) { + return CompletableFuture.supplyAsync(() -> dispatchNotification(event, taskSnapshot, pushInfo, versionsByConfigId)); } private boolean dispatchNotification(StreamingEventKind event, @Nullable Task taskSnapshot, TaskPushNotificationConfig pushInfo, - Map versionsByConfigKey) { + Map versionsByConfigId) { String url = pushInfo.url(); String token = pushInfo.token(); - String taskId = pushInfo.taskId(); - String version = taskId != null ? versionsByConfigKey.get(taskId + ":" + pushInfo.id()) : null; + String version = versionsByConfigId.get(pushInfo.id()); PushNotificationPayloadFormatter formatter = version != null ? formattersByVersion.get(version) : null; diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/InMemoryPushNotificationConfigStore.java b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/InMemoryPushNotificationConfigStore.java index 4a32406a5..21e6ce100 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/InMemoryPushNotificationConfigStore.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/InMemoryPushNotificationConfigStore.java @@ -125,4 +125,16 @@ public void deleteInfo(String taskId, String configId) { public @Nullable String getProtocolVersion(String taskId, String configId) { return protocolVersions.get(taskId + ":" + configId); } + + @Override + public Map getProtocolVersions(String taskId) { + String prefix = taskId + ":"; + Map result = new HashMap<>(); + protocolVersions.forEach((key, version) -> { + if (key.startsWith(prefix)) { + result.put(key.substring(prefix.length()), version); + } + }); + return result; + } } diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationConfigStore.java b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationConfigStore.java index 54f44768d..54aac9d37 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationConfigStore.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/tasks/PushNotificationConfigStore.java @@ -154,4 +154,14 @@ static String resolveProtocolVersion(@Nullable String protocolVersion) { return null; } + /** + * Gets all protocol versions for a task's push notification configurations in a single call. + * + * @param taskId the task ID + * @return a map of config ID to protocol version (only includes configs with a version set) + */ + default java.util.Map getProtocolVersions(String taskId) { + return java.util.Map.of(); + } + }