diff --git a/client/base/src/main/java/io/a2a/client/MessageEvent.java b/client/base/src/main/java/io/a2a/client/MessageEvent.java
index b5970ab78..94c8a1058 100644
--- a/client/base/src/main/java/io/a2a/client/MessageEvent.java
+++ b/client/base/src/main/java/io/a2a/client/MessageEvent.java
@@ -21,6 +21,19 @@ public MessageEvent(Message message) {
public Message getMessage() {
return message;
}
-}
-
+ @Override
+ public String toString() {
+ String messageAsString = "{"
+ + "role=" + message.getRole()
+ + ", parts=" + message.getParts()
+ + ", messageId=" + message.getMessageId()
+ + ", contextId=" + message.getContextId()
+ + ", taskId=" + message.getTaskId()
+ + ", metadata=" + message.getMetadata()
+ + ", kind=" + message.getKind()
+ + ", referenceTaskIds=" + message.getReferenceTaskIds()
+ + ", extensions=" + message.getExtensions() + '}';
+ return "MessageEvent{" + "message=" + messageAsString + '}';
+ }
+}
diff --git a/examples/helloworld/client/pom.xml b/examples/helloworld/client/pom.xml
index 8f5b63406..eb1b747be 100644
--- a/examples/helloworld/client/pom.xml
+++ b/examples/helloworld/client/pom.xml
@@ -12,7 +12,7 @@
a2a-java-sdk-examples-client
- Java SDK A2A Examples
+ Java SDK A2A HelloWorld Example - Client
Examples for the Java SDK for the Agent2Agent Protocol (A2A)
diff --git a/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java b/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java
index a82438a35..5922624fd 100644
--- a/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java
+++ b/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java
@@ -12,16 +12,18 @@
import io.a2a.A2A;
import io.a2a.client.Client;
-import io.a2a.client.ClientBuilder;
import io.a2a.client.ClientEvent;
import io.a2a.client.MessageEvent;
import io.a2a.client.http.A2ACardResolver;
import io.a2a.client.transport.jsonrpc.JSONRPCTransport;
import io.a2a.client.transport.jsonrpc.JSONRPCTransportConfig;
+import io.a2a.client.transport.spi.interceptors.ClientCallContext;
+import io.a2a.common.A2AHeaders;
import io.a2a.spec.AgentCard;
import io.a2a.spec.Message;
import io.a2a.spec.Part;
import io.a2a.spec.TextPart;
+import java.util.Collections;
/**
* A simple example of using the A2A Java SDK to communicate with an A2A server.
@@ -61,6 +63,7 @@ public static void main(String[] args) {
List> consumers = new ArrayList<>();
consumers.add((event, agentCard) -> {
if (event instanceof MessageEvent messageEvent) {
+ System.out.println("Received client MessageEvent: " + messageEvent);
Message responseMessage = messageEvent.getMessage();
StringBuilder textBuilder = new StringBuilder();
if (responseMessage.getParts() != null) {
@@ -89,11 +92,11 @@ public static void main(String[] args) {
.streamingErrorHandler(streamingErrorHandler)
.withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig())
.build();
+ ClientCallContext clientContext = new ClientCallContext(Collections.emptyMap(), Map.of(A2AHeaders.X_A2A_EXTENSIONS, "https://github.com/a2aproject/a2a-samples/extensions/timestamp/v1"));
Message message = A2A.toUserMessage(MESSAGE_TEXT); // the message ID will be automatically generated for you
-
System.out.println("Sending message: " + MESSAGE_TEXT);
- client.sendMessage(message);
+ client.sendMessage(message, clientContext);
System.out.println("Message sent successfully. Responses will be handled by the configured consumers.");
try {
diff --git a/examples/helloworld/server/pom.xml b/examples/helloworld/server/pom.xml
index 5c660336f..92792fb7f 100644
--- a/examples/helloworld/server/pom.xml
+++ b/examples/helloworld/server/pom.xml
@@ -12,7 +12,7 @@
a2a-java-sdk-examples-server
- Java SDK A2A Examples
+ Java SDK A2A HelloWorld Example - Server
Examples for the Java SDK for the Agent2Agent Protocol (A2A)
@@ -20,6 +20,11 @@
io.github.a2asdk
a2a-java-sdk-client
+
+ io.github.a2asdk
+ a2a-java-timestamp-extension
+ ${project.version}
+
io.github.a2asdk
a2a-java-sdk-reference-jsonrpc
diff --git a/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentExecutorProducer.java b/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentExecutorProducer.java
index 1d7519b60..dcc246580 100644
--- a/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentExecutorProducer.java
+++ b/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentExecutorProducer.java
@@ -7,6 +7,7 @@
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.events.EventQueue;
import io.a2a.A2A;
+import io.a2a.extension.timestamp.TimeStampAgentExecutorWrapper;
import io.a2a.spec.JSONRPCError;
import io.a2a.spec.UnsupportedOperationError;
@@ -15,7 +16,7 @@ public class AgentExecutorProducer {
@Produces
public AgentExecutor agentExecutor() {
- return new AgentExecutor() {
+ return new TimeStampAgentExecutorWrapper(new AgentExecutor() {
@Override
public void execute(RequestContext context, EventQueue eventQueue) throws JSONRPCError {
eventQueue.enqueueEvent(A2A.toAgentMessage("Hello World"));
@@ -25,6 +26,6 @@ public void execute(RequestContext context, EventQueue eventQueue) throws JSONRP
public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPCError {
throw new UnsupportedOperationError();
}
- };
+ });
}
}
diff --git a/extensions/timestamp/pom.xml b/extensions/timestamp/pom.xml
new file mode 100644
index 000000000..0ee8f4a9c
--- /dev/null
+++ b/extensions/timestamp/pom.xml
@@ -0,0 +1,40 @@
+
+
+ 4.0.0
+
+
+ io.github.a2asdk
+ a2a-java-sdk-parent
+ 0.4.0.Alpha1-SNAPSHOT
+ ../../pom.xml
+
+
+ a2a-java-timestamp-extension
+ A2A Java SDK :: Extension :: Timestamp
+ Simple Timestamp Extension
+
+
+
+ io.github.a2asdk
+ a2a-java-sdk-server-common
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+
+
+
diff --git a/extensions/timestamp/src/main/java/io/a2a/extension/timestamp/TimeStampAgentExecutorWrapper.java b/extensions/timestamp/src/main/java/io/a2a/extension/timestamp/TimeStampAgentExecutorWrapper.java
new file mode 100644
index 000000000..ccee969ca
--- /dev/null
+++ b/extensions/timestamp/src/main/java/io/a2a/extension/timestamp/TimeStampAgentExecutorWrapper.java
@@ -0,0 +1,51 @@
+package io.a2a.extension.timestamp;
+
+import io.a2a.server.agentexecution.AgentExecutor;
+import io.a2a.server.agentexecution.RequestContext;
+import io.a2a.server.events.EventQueue;
+import io.a2a.spec.JSONRPCError;
+import java.util.logging.Logger;
+
+public class TimeStampAgentExecutorWrapper implements AgentExecutor {
+
+ public static final String CORE_PATH = "github.com/a2aproject/a2a-samples/extensions/timestamp/v1";
+ public static final String URI = "https://" + CORE_PATH;
+ public static final String TIMESTAMP_FIELD = CORE_PATH + "/timestamp";
+
+ private static final Logger logger = Logger.getLogger(TimeStampAgentExecutorWrapper.class.getName());
+ private final AgentExecutor delegate;
+
+ public TimeStampAgentExecutorWrapper(AgentExecutor delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void execute(RequestContext context, EventQueue eventQueue) throws JSONRPCError {
+ if(isActivated(context)) {
+ delegate.execute(context, new TimeStampEventQueue(eventQueue));
+ } else {
+ delegate.execute(context, eventQueue);
+ }
+ }
+
+ @Override
+ public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPCError {
+ if(isActivated(context)) {
+ delegate.cancel(context, new TimeStampEventQueue(eventQueue));
+ } else {
+ delegate.cancel(context, eventQueue);
+ }
+ }
+
+ private boolean isActivated(final RequestContext context) {
+ if (context.getCallContext().isExtensionActivated(URI)) {
+ return true;
+ }
+ if (context.getCallContext().isExtensionRequested(URI)) {
+ logger.info("Activated extension: " + URI);
+ context.getCallContext().activateExtension(URI);
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/extensions/timestamp/src/main/java/io/a2a/extension/timestamp/TimeStampEventQueue.java b/extensions/timestamp/src/main/java/io/a2a/extension/timestamp/TimeStampEventQueue.java
new file mode 100644
index 000000000..e02a4ca1f
--- /dev/null
+++ b/extensions/timestamp/src/main/java/io/a2a/extension/timestamp/TimeStampEventQueue.java
@@ -0,0 +1,133 @@
+package io.a2a.extension.timestamp;
+
+import static io.a2a.extension.timestamp.TimeStampAgentExecutorWrapper.TIMESTAMP_FIELD;
+import static io.a2a.extension.timestamp.TimeStampAgentExecutorWrapper.URI;
+
+import io.a2a.server.events.EventQueue;
+import io.a2a.spec.Artifact;
+import io.a2a.spec.Event;
+import io.a2a.spec.Message;
+import io.a2a.spec.Task;
+import io.a2a.spec.TaskArtifactUpdateEvent;
+import io.a2a.spec.TaskStatusUpdateEvent;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TimeStampEventQueue extends EventQueue {
+
+ private final EventQueue delegate;
+
+ public TimeStampEventQueue(EventQueue delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void enqueueEvent(Event event) {
+ this.delegate.enqueueEvent(timestampEvent(event));
+ }
+
+ private Event timestampEvent(Event event) {
+ if (event instanceof Message message) {
+ return processMessage(message);
+ }
+ if (event instanceof TaskArtifactUpdateEvent taskArtifactUpdateEvent) {
+ return processTaskArtifactUpdateEvent(taskArtifactUpdateEvent);
+ }
+ if (event instanceof TaskStatusUpdateEvent taskStatusUpdateEvent) {
+ return processTaskStatusUpdateEvent(taskStatusUpdateEvent);
+ }
+ if (event instanceof Task task) {
+ return processTask(task);
+ }
+ return event;
+ }
+
+ private Message processMessage(Message message) {
+ Map metadata = message.getMetadata() == null ? new HashMap<>() : new HashMap<>(message.getMetadata());
+ if (!metadata.containsKey(TIMESTAMP_FIELD)) {
+ metadata.put(TIMESTAMP_FIELD, OffsetDateTime.now(ZoneOffset.UTC));
+ }
+ List extensions = message.getExtensions() == null ? new ArrayList<>() : new ArrayList<>(message.getExtensions());
+ if (!extensions.contains(URI)) {
+ extensions.add(URI);
+ }
+ return new Message.Builder(message).metadata(metadata).extensions(extensions).build();
+ }
+
+ private Task processTask(Task task) {
+ Map metadata = task.getMetadata() == null ? new HashMap<>() : new HashMap<>(task.getMetadata());
+ if (!metadata.containsKey(TIMESTAMP_FIELD)) {
+ metadata.put(TIMESTAMP_FIELD, OffsetDateTime.now(ZoneOffset.UTC));
+ }
+ List artifacts = new ArrayList<>();
+ for (Artifact artifact : task.getArtifacts()) {
+ artifacts.add(processArtifact(artifact));
+ }
+ return new Task.Builder(task).artifacts(artifacts).metadata(metadata).build();
+ }
+
+ private TaskStatusUpdateEvent processTaskStatusUpdateEvent(TaskStatusUpdateEvent taskStatusUpdateEvent) {
+ Map metadata = taskStatusUpdateEvent.getMetadata() == null ? new HashMap<>() : new HashMap<>(taskStatusUpdateEvent.getMetadata());
+ if (!metadata.containsKey(TIMESTAMP_FIELD)) {
+ metadata.put(TIMESTAMP_FIELD, OffsetDateTime.now(ZoneOffset.UTC));
+ }
+ return new TaskStatusUpdateEvent.Builder(taskStatusUpdateEvent).metadata(metadata).build();
+ }
+
+ private TaskArtifactUpdateEvent processTaskArtifactUpdateEvent(TaskArtifactUpdateEvent taskArtifactUpdateEvent) {
+ Map metadata = taskArtifactUpdateEvent.getMetadata() == null ? new HashMap<>() : new HashMap<>(taskArtifactUpdateEvent.getMetadata());
+ if (!metadata.containsKey(TIMESTAMP_FIELD)) {
+ metadata.put(TIMESTAMP_FIELD, OffsetDateTime.now(ZoneOffset.UTC));
+ }
+ if (taskArtifactUpdateEvent.getArtifact() != null) {
+ return new TaskArtifactUpdateEvent.Builder(taskArtifactUpdateEvent).artifact(processArtifact(taskArtifactUpdateEvent.getArtifact())).metadata(metadata).build();
+ }
+ return new TaskArtifactUpdateEvent.Builder(taskArtifactUpdateEvent).metadata(metadata).build();
+ }
+
+ private Artifact processArtifact(Artifact artifact) {
+ Map metadata = artifact.metadata() == null ? new HashMap<>() : new HashMap<>(artifact.metadata());
+ if (!metadata.containsKey(TIMESTAMP_FIELD)) {
+ metadata.put(TIMESTAMP_FIELD, OffsetDateTime.now(ZoneOffset.UTC));
+ }
+ List extensions = artifact.extensions() == null ? new ArrayList<>() : new ArrayList<>(artifact.extensions());
+ if (!extensions.contains(URI)) {
+ extensions.add(URI);
+ }
+ return new Artifact.Builder(artifact).metadata(metadata).extensions(extensions).build();
+ }
+
+ @Override
+ public void awaitQueuePollerStart() throws InterruptedException {
+ this.delegate.awaitQueuePollerStart();
+ }
+
+ @Override
+ public void close() {
+ this.delegate.close();
+ }
+
+ @Override
+ public void close(boolean immediate) {
+ this.delegate.close(immediate);
+ }
+
+ @Override
+ public void close(boolean immediate, boolean notifyParent) {
+ this.delegate.close(immediate, notifyParent);
+ }
+
+ @Override
+ public void signalQueuePollerStarted() {
+ this.delegate.signalQueuePollerStarted();
+ }
+
+ @Override
+ public EventQueue tap() {
+ return this;
+ }
+}
diff --git a/extensions/timestamp/src/test/java/io/a2a/extension/timestamp/TimeStampEventQueueTest.java b/extensions/timestamp/src/test/java/io/a2a/extension/timestamp/TimeStampEventQueueTest.java
new file mode 100644
index 000000000..4bbb1536f
--- /dev/null
+++ b/extensions/timestamp/src/test/java/io/a2a/extension/timestamp/TimeStampEventQueueTest.java
@@ -0,0 +1,437 @@
+package io.a2a.extension.timestamp;
+
+import static io.a2a.extension.timestamp.TimeStampAgentExecutorWrapper.TIMESTAMP_FIELD;
+import static io.a2a.extension.timestamp.TimeStampAgentExecutorWrapper.URI;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import io.a2a.server.events.EventQueue;
+import io.a2a.spec.Artifact;
+import io.a2a.spec.Event;
+import io.a2a.spec.Message;
+import io.a2a.spec.Message.Role;
+import io.a2a.spec.Task;
+import io.a2a.spec.TaskArtifactUpdateEvent;
+import io.a2a.spec.TaskState;
+import io.a2a.spec.TaskStatus;
+import io.a2a.spec.TaskStatusUpdateEvent;
+import io.a2a.spec.TextPart;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TimeStampEventQueueTest {
+
+ private EventQueue delegateQueue;
+ private TimeStampEventQueue timestampQueue;
+
+ @BeforeEach
+ public void setUp() {
+ delegateQueue = mock(EventQueue.class);
+ timestampQueue = new TimeStampEventQueue(delegateQueue);
+ }
+
+ @Test
+ public void testEnqueueEvent_delegatesEvent() {
+ Event event = mock(Event.class);
+
+ timestampQueue.enqueueEvent(event);
+
+ verify(delegateQueue).enqueueEvent(any(Event.class));
+ }
+
+ @Test
+ public void testProcessMessage_withNullMetadataAndExtensions() {
+ Message message = new Message.Builder()
+ .role(Role.USER)
+ .parts(List.of(new TextPart("test message")))
+ .build();
+
+ ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class);
+ timestampQueue.enqueueEvent(message);
+
+ verify(delegateQueue).enqueueEvent(eventCaptor.capture());
+ Message processedMessage = (Message) eventCaptor.getValue();
+
+ assertNotNull(processedMessage.getMetadata());
+ assertTrue(processedMessage.getMetadata().containsKey(TIMESTAMP_FIELD));
+ assertNotNull(processedMessage.getMetadata().get(TIMESTAMP_FIELD));
+ assertTrue(processedMessage.getMetadata().get(TIMESTAMP_FIELD) instanceof OffsetDateTime);
+
+ assertNotNull(processedMessage.getExtensions());
+ assertTrue(processedMessage.getExtensions().contains(URI));
+ }
+
+ @Test
+ public void testProcessMessage_withEmptyMetadata() {
+ Map metadata = new HashMap<>();
+ List extensions = new ArrayList<>();
+
+ Message message = new Message.Builder()
+ .role(Role.USER)
+ .parts(List.of(new TextPart("test message")))
+ .metadata(metadata)
+ .extensions(extensions)
+ .build();
+
+ ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class);
+ timestampQueue.enqueueEvent(message);
+
+ verify(delegateQueue).enqueueEvent(eventCaptor.capture());
+ Message processedMessage = (Message) eventCaptor.getValue();
+
+ assertTrue(processedMessage.getMetadata().containsKey(TIMESTAMP_FIELD));
+ assertTrue(processedMessage.getExtensions().contains(URI));
+ }
+
+ @Test
+ public void testProcessMessage_withExistingMetadata() {
+ Map metadata = new HashMap<>();
+ metadata.put("existing", "value");
+ List extensions = new ArrayList<>();
+ extensions.add("existing-extension");
+
+ Message message = new Message.Builder()
+ .role(Role.USER)
+ .parts(List.of(new TextPart("test message")))
+ .metadata(metadata)
+ .extensions(extensions)
+ .build();
+
+ ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class);
+ timestampQueue.enqueueEvent(message);
+
+ verify(delegateQueue).enqueueEvent(eventCaptor.capture());
+ Message processedMessage = (Message) eventCaptor.getValue();
+
+ assertTrue(processedMessage.getMetadata().containsKey(TIMESTAMP_FIELD));
+ assertTrue(processedMessage.getMetadata().containsKey("existing"));
+ assertTrue(processedMessage.getExtensions().contains(URI));
+ assertTrue(processedMessage.getExtensions().contains("existing-extension"));
+ }
+
+ @Test
+ public void testProcessMessage_withExistingTimestamp() {
+ OffsetDateTime existingTimestamp = OffsetDateTime.now();
+ Map metadata = new HashMap<>();
+ metadata.put(TIMESTAMP_FIELD, existingTimestamp);
+ List extensions = new ArrayList<>();
+ extensions.add(URI);
+
+ Message message = new Message.Builder()
+ .role(Role.USER)
+ .parts(List.of(new TextPart("test message")))
+ .metadata(metadata)
+ .extensions(extensions)
+ .build();
+
+ ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class);
+ timestampQueue.enqueueEvent(message);
+
+ verify(delegateQueue).enqueueEvent(eventCaptor.capture());
+ Message processedMessage = (Message) eventCaptor.getValue();
+
+ assertEquals(existingTimestamp, processedMessage.getMetadata().get(TIMESTAMP_FIELD));
+ }
+
+ @Test
+ public void testProcessTask_withNullMetadata() {
+ Task task = new Task.Builder()
+ .id("test task")
+ .contextId("context-id")
+ .status(new TaskStatus(TaskState.COMPLETED))
+ .build();
+
+ ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class);
+ timestampQueue.enqueueEvent(task);
+
+ verify(delegateQueue).enqueueEvent(eventCaptor.capture());
+ Task processedTask = (Task) eventCaptor.getValue();
+
+ assertNotNull(processedTask.getMetadata());
+ assertTrue(processedTask.getMetadata().containsKey(TIMESTAMP_FIELD));
+ assertNotNull(processedTask.getMetadata().get(TIMESTAMP_FIELD));
+ }
+
+ @Test
+ public void testProcessTask_withExistingMetadata() {
+ Map metadata = new HashMap<>();
+ metadata.put("existing", "value");
+
+ Task task = new Task.Builder()
+ .id("test task")
+ .contextId("context-id")
+ .metadata(metadata)
+ .status(new TaskStatus(TaskState.COMPLETED))
+ .build();
+
+ ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class);
+ timestampQueue.enqueueEvent(task);
+
+ verify(delegateQueue).enqueueEvent(eventCaptor.capture());
+ Task processedTask = (Task) eventCaptor.getValue();
+
+ assertTrue(processedTask.getMetadata().containsKey(TIMESTAMP_FIELD));
+ assertTrue(processedTask.getMetadata().containsKey("existing"));
+ }
+
+ @Test
+ public void testProcessTask_withExistingTimestamp() {
+ OffsetDateTime existingTimestamp = OffsetDateTime.now();
+ Map metadata = new HashMap<>();
+ metadata.put(TIMESTAMP_FIELD, existingTimestamp);
+
+ Task task = new Task.Builder()
+ .id("test task")
+ .contextId("context-id")
+ .metadata(metadata)
+ .status(new TaskStatus(TaskState.COMPLETED))
+ .build();
+
+ ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class);
+ timestampQueue.enqueueEvent(task);
+
+ verify(delegateQueue).enqueueEvent(eventCaptor.capture());
+ Task processedTask = (Task) eventCaptor.getValue();
+
+ assertEquals(existingTimestamp, processedTask.getMetadata().get(TIMESTAMP_FIELD));
+ }
+
+ @Test
+ public void testProcessTaskStatusUpdateEvent_withNullMetadata() {
+ TaskStatusUpdateEvent event = new TaskStatusUpdateEvent.Builder()
+ .taskId("task-1")
+ .contextId("context-1")
+ .status(new TaskStatus(TaskState.COMPLETED))
+ .build();
+
+ ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class);
+ timestampQueue.enqueueEvent(event);
+
+ verify(delegateQueue).enqueueEvent(eventCaptor.capture());
+ TaskStatusUpdateEvent processedEvent = (TaskStatusUpdateEvent) eventCaptor.getValue();
+
+ assertNotNull(processedEvent.getMetadata());
+ assertTrue(processedEvent.getMetadata().containsKey(TIMESTAMP_FIELD));
+ }
+
+ @Test
+ public void testProcessTaskStatusUpdateEvent_withExistingMetadata() {
+ Map metadata = new HashMap<>();
+ metadata.put("existing", "value");
+
+ TaskStatusUpdateEvent event = new TaskStatusUpdateEvent.Builder()
+ .taskId("task-1")
+ .contextId("context-1")
+ .metadata(metadata)
+ .status(new TaskStatus(TaskState.COMPLETED))
+ .build();
+
+ ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class);
+ timestampQueue.enqueueEvent(event);
+
+ verify(delegateQueue).enqueueEvent(eventCaptor.capture());
+ TaskStatusUpdateEvent processedEvent = (TaskStatusUpdateEvent) eventCaptor.getValue();
+
+ assertTrue(processedEvent.getMetadata().containsKey(TIMESTAMP_FIELD));
+ assertTrue(processedEvent.getMetadata().containsKey("existing"));
+ }
+
+ @Test
+ public void testProcessTaskStatusUpdateEvent_withExistingTimestamp() {
+ OffsetDateTime existingTimestamp = OffsetDateTime.now();
+ Map metadata = new HashMap<>();
+ metadata.put(TIMESTAMP_FIELD, existingTimestamp);
+
+ TaskStatusUpdateEvent event = new TaskStatusUpdateEvent.Builder()
+ .taskId("task-1")
+ .contextId("context-1")
+ .metadata(metadata)
+ .status(new TaskStatus(TaskState.COMPLETED))
+ .build();
+
+ ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class);
+ timestampQueue.enqueueEvent(event);
+
+ verify(delegateQueue).enqueueEvent(eventCaptor.capture());
+ TaskStatusUpdateEvent processedEvent = (TaskStatusUpdateEvent) eventCaptor.getValue();
+
+ assertEquals(existingTimestamp, processedEvent.getMetadata().get(TIMESTAMP_FIELD));
+ }
+
+ @Test
+ public void testProcessTaskArtifactUpdateEvent_withNullMetadata() {
+ TaskArtifactUpdateEvent event = new TaskArtifactUpdateEvent.Builder()
+ .taskId("task-1")
+ .contextId("context-1")
+ .append(false)
+ .artifact(new Artifact.Builder()
+ .artifactId("artifact-id")
+ .description("Test artifact")
+ .name("Artifact")
+ .parts(List.of(new TextPart("test message")))
+ .build())
+ .build();
+
+ ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class);
+ timestampQueue.enqueueEvent(event);
+
+ verify(delegateQueue).enqueueEvent(eventCaptor.capture());
+ TaskArtifactUpdateEvent processedEvent = (TaskArtifactUpdateEvent) eventCaptor.getValue();
+
+ assertNotNull(processedEvent.getMetadata());
+ assertTrue(processedEvent.getMetadata().containsKey(TIMESTAMP_FIELD));
+ }
+
+ @Test
+ public void testProcessTaskArtifactUpdateEvent_withExistingMetadata() {
+ Map metadata = new HashMap<>();
+ metadata.put("existing", "value");
+
+ TaskArtifactUpdateEvent event = new TaskArtifactUpdateEvent.Builder()
+ .taskId("task-1")
+ .contextId("context-1")
+ .append(false)
+ .metadata(metadata)
+ .artifact(new Artifact.Builder()
+ .artifactId("artifact-id")
+ .description("Test artifact")
+ .name("Artifact")
+ .parts(List.of(new TextPart("test message")))
+ .build())
+ .build();
+
+ ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class);
+ timestampQueue.enqueueEvent(event);
+
+ verify(delegateQueue).enqueueEvent(eventCaptor.capture());
+ TaskArtifactUpdateEvent processedEvent = (TaskArtifactUpdateEvent) eventCaptor.getValue();
+
+ assertTrue(processedEvent.getMetadata().containsKey(TIMESTAMP_FIELD));
+ assertTrue(processedEvent.getMetadata().containsKey("existing"));
+ }
+
+ @Test
+ public void testProcessTaskArtifactUpdateEvent_withExistingTimestamp() {
+ OffsetDateTime existingTimestamp = OffsetDateTime.now();
+ Map metadata = new HashMap<>();
+ metadata.put(TIMESTAMP_FIELD, existingTimestamp);
+
+ TaskArtifactUpdateEvent event = new TaskArtifactUpdateEvent.Builder()
+ .taskId("task-1")
+ .contextId("context-1")
+ .append(false)
+ .metadata(metadata)
+ .artifact(new Artifact.Builder()
+ .artifactId("artifact-id")
+ .description("Test artifact")
+ .name("Artifact")
+ .parts(List.of(new TextPart("test message")))
+ .build())
+ .build();
+
+ ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class);
+ timestampQueue.enqueueEvent(event);
+
+ verify(delegateQueue).enqueueEvent(eventCaptor.capture());
+ TaskArtifactUpdateEvent processedEvent = (TaskArtifactUpdateEvent) eventCaptor.getValue();
+
+ assertEquals(existingTimestamp, processedEvent.getMetadata().get(TIMESTAMP_FIELD));
+ }
+
+ @Test
+ public void testProcessTaskArtifactUpdateEvent_withArtifact() {
+ Map artifactMetadata = new HashMap<>();
+ List extensions = new ArrayList<>();
+
+ Artifact artifact = new Artifact.Builder()
+ .artifactId("artifact-id")
+ .parts(List.of(new TextPart("test part")))
+ .metadata(artifactMetadata)
+ .extensions(extensions)
+ .build();
+
+ Map metadata = new HashMap<>();
+
+ TaskArtifactUpdateEvent event = new TaskArtifactUpdateEvent.Builder()
+ .taskId("task-1")
+ .contextId("context-1")
+ .append(false)
+ .artifact(artifact)
+ .metadata(metadata)
+ .build();
+
+ ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class);
+ timestampQueue.enqueueEvent(event);
+
+ verify(delegateQueue).enqueueEvent(eventCaptor.capture());
+ TaskArtifactUpdateEvent processedEvent = (TaskArtifactUpdateEvent) eventCaptor.getValue();
+
+ assertTrue(processedEvent.getMetadata().containsKey(TIMESTAMP_FIELD));
+
+ // Verify artifact was processed
+ assertNotNull(processedEvent.getArtifact());
+ assertTrue(processedEvent.getArtifact().metadata().containsKey(TIMESTAMP_FIELD));
+ assertTrue(processedEvent.getArtifact().extensions().contains(URI));
+ }
+
+ @Test
+ public void testUnknownEventType_passesThrough() {
+ Event unknownEvent = mock(Event.class);
+
+ timestampQueue.enqueueEvent(unknownEvent);
+
+ verify(delegateQueue).enqueueEvent(unknownEvent);
+ }
+
+ @Test
+ public void testAwaitQueuePollerStart() throws InterruptedException {
+ timestampQueue.awaitQueuePollerStart();
+
+ verify(delegateQueue).awaitQueuePollerStart();
+ }
+
+ @Test
+ public void testClose() {
+ timestampQueue.close();
+
+ verify(delegateQueue).close();
+ }
+
+ @Test
+ public void testCloseWithImmediate() {
+ timestampQueue.close(true);
+
+ verify(delegateQueue).close(true);
+ }
+
+ @Test
+ public void testCloseWithImmediateAndNotifyParent() {
+ timestampQueue.close(true, true);
+
+ verify(delegateQueue).close(true, true);
+ }
+
+ @Test
+ public void testSignalQueuePollerStarted() {
+ timestampQueue.signalQueuePollerStarted();
+
+ verify(delegateQueue).signalQueuePollerStarted();
+ }
+
+ @Test
+ public void testTap() {
+ EventQueue tappedQueue = timestampQueue.tap();
+
+ assertEquals(timestampQueue, tappedQueue);
+ }
+}
diff --git a/pom.xml b/pom.xml
index 02129d631..59fbf8426 100644
--- a/pom.xml
+++ b/pom.xml
@@ -457,6 +457,7 @@
transport/jsonrpc
transport/grpc
transport/rest
+ extensions/timestamp
diff --git a/server-common/src/main/java/io/a2a/server/events/EventQueue.java b/server-common/src/main/java/io/a2a/server/events/EventQueue.java
index d590d8890..6a8a154ac 100644
--- a/server-common/src/main/java/io/a2a/server/events/EventQueue.java
+++ b/server-common/src/main/java/io/a2a/server/events/EventQueue.java
@@ -96,7 +96,7 @@ public int getQueueSize() {
public abstract void awaitQueuePollerStart() throws InterruptedException ;
- abstract void signalQueuePollerStarted();
+ public abstract void signalQueuePollerStarted();
public void enqueueEvent(Event event) {
enqueueItem(new LocalEventQueueItem(event));
@@ -119,7 +119,7 @@ public void enqueueItem(EventQueueItem item) {
LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this);
}
- abstract EventQueue tap();
+ public abstract EventQueue tap();
/**
* Dequeues an EventQueueItem from the queue.
@@ -265,7 +265,7 @@ static class MainQueue extends EventQueue {
taskId, onCloseCallbacks.size(), taskStateProvider != null);
}
- EventQueue tap() {
+ public EventQueue tap() {
ChildQueue child = new ChildQueue(this);
children.add(child);
return child;
@@ -310,7 +310,7 @@ public void awaitQueuePollerStart() throws InterruptedException {
}
@Override
- void signalQueuePollerStarted() {
+ public void signalQueuePollerStarted() {
if (pollingStarted.get()) {
return;
}
@@ -415,7 +415,7 @@ private void internalEnqueueItem(EventQueueItem item) {
}
@Override
- EventQueue tap() {
+ public EventQueue tap() {
throw new IllegalStateException("Can only tap the main queue");
}
@@ -425,7 +425,7 @@ public void awaitQueuePollerStart() throws InterruptedException {
}
@Override
- void signalQueuePollerStarted() {
+ public void signalQueuePollerStarted() {
parent.signalQueuePollerStarted();
}
diff --git a/spec/src/main/java/io/a2a/spec/Message.java b/spec/src/main/java/io/a2a/spec/Message.java
index dd7e860a5..3e9df40dd 100644
--- a/spec/src/main/java/io/a2a/spec/Message.java
+++ b/spec/src/main/java/io/a2a/spec/Message.java
@@ -128,7 +128,6 @@ public String asString() {
return this.role;
}
}
-
public static class Builder {
private Role role;
diff --git a/spec/src/main/java/io/a2a/spec/TaskStatusUpdateEvent.java b/spec/src/main/java/io/a2a/spec/TaskStatusUpdateEvent.java
index 25e2cd170..10050dfa5 100644
--- a/spec/src/main/java/io/a2a/spec/TaskStatusUpdateEvent.java
+++ b/spec/src/main/java/io/a2a/spec/TaskStatusUpdateEvent.java
@@ -96,6 +96,7 @@ public Builder(TaskStatusUpdateEvent existingTaskStatusUpdateEvent) {
this.isFinal = existingTaskStatusUpdateEvent.isFinal;
this.metadata = existingTaskStatusUpdateEvent.metadata;
}
+
public Builder taskId(String id) {
this.taskId = id;
return this;