Skip to content

Commit 72e272e

Browse files
George-iamclaude
andauthored
feat: add observe and waitFor poll-based methods (#22)
Add poll-based observe() and waitFor() methods to AxmeClient that use listIntentEvents internally. observe() polls in a loop and collects events until a terminal event is seen (COMPLETED, FAILED, CANCELED, TIMED_OUT). waitFor() is a convenience wrapper that returns only the terminal event. Supports configurable poll interval, since parameter for cursor tracking, and optional timeout via ObserveOptions. New files: - ObserveOptions.java: configuration for polling behavior - AxmeTimeoutException.java: thrown when observe/waitFor exceeds timeout Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 62877a8 commit 72e272e

4 files changed

Lines changed: 294 additions & 0 deletions

File tree

src/main/java/dev/axme/sdk/AxmeClient.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99
import java.net.http.HttpRequest;
1010
import java.net.http.HttpResponse;
1111
import java.nio.charset.StandardCharsets;
12+
import java.util.ArrayList;
1213
import java.util.LinkedHashMap;
14+
import java.util.List;
1315
import java.util.Map;
16+
import java.util.Set;
1417
import java.util.UUID;
1518

1619
public final class AxmeClient {
@@ -733,6 +736,100 @@ public Map<String, Object> mcpCallTool(String name, Map<String, Object> argument
733736
return result instanceof Map ? (Map<String, Object>) result : response;
734737
}
735738

739+
private static final Set<String> TERMINAL_STATUSES =
740+
Set.of("COMPLETED", "FAILED", "CANCELED", "TIMED_OUT");
741+
742+
private static final Set<String> TERMINAL_EVENT_TYPES =
743+
Set.of("intent.completed", "intent.failed", "intent.canceled", "intent.timed_out");
744+
745+
/**
746+
* Polls {@code listIntentEvents} in a loop and returns all events up to and
747+
* including the first terminal event.
748+
*
749+
* <p>A terminal event is one whose {@code status} field is in
750+
* {@code COMPLETED, FAILED, CANCELED, TIMED_OUT} or whose {@code event_type}
751+
* field is {@code intent.completed, intent.failed, intent.canceled, intent.timed_out}.
752+
*
753+
* @param intentId the intent to observe
754+
* @param options polling options (since, interval, timeout); may be {@code null}
755+
* @return list of event maps in arrival order, ending with the terminal event
756+
* @throws AxmeTimeoutException if {@code timeoutSeconds} is set and elapsed
757+
* @throws IOException on HTTP transport errors
758+
* @throws InterruptedException if the polling thread is interrupted
759+
*/
760+
@SuppressWarnings("unchecked")
761+
public List<Map<String, Object>> observe(String intentId, ObserveOptions options)
762+
throws IOException, InterruptedException {
763+
ObserveOptions opts = options != null ? options : ObserveOptions.defaults();
764+
int nextSince = opts.getSince();
765+
long startNanos = System.nanoTime();
766+
List<Map<String, Object>> collected = new ArrayList<>();
767+
768+
while (true) {
769+
if (opts.getTimeoutSeconds() != null) {
770+
double elapsed = (System.nanoTime() - startNanos) / 1_000_000_000.0;
771+
if (elapsed >= opts.getTimeoutSeconds()) {
772+
throw new AxmeTimeoutException(intentId, opts.getTimeoutSeconds());
773+
}
774+
}
775+
776+
Map<String, Object> response = listIntentEvents(intentId, nextSince, RequestOptions.none());
777+
Object eventsObj = response.get("events");
778+
List<Map<String, Object>> events =
779+
eventsObj instanceof List ? (List<Map<String, Object>>) eventsObj : List.of();
780+
781+
for (Map<String, Object> event : events) {
782+
nextSince = maxSeenSeq(nextSince, event);
783+
collected.add(event);
784+
if (isTerminalIntentEvent(event)) {
785+
return collected;
786+
}
787+
}
788+
789+
if (events.isEmpty()) {
790+
long sleepMillis = (long) (opts.getPollIntervalSeconds() * 1000);
791+
Thread.sleep(sleepMillis);
792+
}
793+
}
794+
}
795+
796+
/**
797+
* Polls until a terminal event is seen and returns it.
798+
*
799+
* <p>This is a convenience wrapper around {@link #observe} that discards
800+
* intermediate events and returns only the terminal one.
801+
*
802+
* @param intentId the intent to wait for
803+
* @param options polling options; may be {@code null}
804+
* @return the terminal event map
805+
* @throws AxmeTimeoutException if {@code timeoutSeconds} is set and elapsed
806+
* @throws IOException on HTTP transport errors
807+
* @throws InterruptedException if the polling thread is interrupted
808+
*/
809+
public Map<String, Object> waitFor(String intentId, ObserveOptions options)
810+
throws IOException, InterruptedException {
811+
List<Map<String, Object>> events = observe(intentId, options);
812+
return events.get(events.size() - 1);
813+
}
814+
815+
private static int maxSeenSeq(int currentMax, Map<String, Object> event) {
816+
Object seqObj = event.get("seq");
817+
if (seqObj instanceof Number) {
818+
int seq = ((Number) seqObj).intValue();
819+
return Math.max(currentMax, seq);
820+
}
821+
return currentMax;
822+
}
823+
824+
private static boolean isTerminalIntentEvent(Map<String, Object> event) {
825+
Object status = event.get("status");
826+
if (status instanceof String && TERMINAL_STATUSES.contains(status)) {
827+
return true;
828+
}
829+
Object eventType = event.get("event_type");
830+
return eventType instanceof String && TERMINAL_EVENT_TYPES.contains(eventType);
831+
}
832+
736833
private Map<String, Object> requestJson(
737834
String method,
738835
String path,
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package dev.axme.sdk;
2+
3+
/**
4+
* Thrown when an {@link AxmeClient#observe} or {@link AxmeClient#waitFor} call
5+
* exceeds its configured timeout.
6+
*/
7+
public final class AxmeTimeoutException extends RuntimeException {
8+
private final String intentId;
9+
private final double timeoutSeconds;
10+
11+
public AxmeTimeoutException(String intentId, double timeoutSeconds) {
12+
super("observe timed out after " + timeoutSeconds + "s for intent " + intentId);
13+
this.intentId = intentId;
14+
this.timeoutSeconds = timeoutSeconds;
15+
}
16+
17+
public String getIntentId() {
18+
return intentId;
19+
}
20+
21+
public double getTimeoutSeconds() {
22+
return timeoutSeconds;
23+
}
24+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package dev.axme.sdk;
2+
3+
/**
4+
* Options for {@link AxmeClient#observe} and {@link AxmeClient#waitFor} polling methods.
5+
*/
6+
public final class ObserveOptions {
7+
private final int since;
8+
private final double pollIntervalSeconds;
9+
private final Double timeoutSeconds;
10+
11+
public ObserveOptions() {
12+
this(0, 1.0, null);
13+
}
14+
15+
public ObserveOptions(int since, double pollIntervalSeconds, Double timeoutSeconds) {
16+
if (pollIntervalSeconds <= 0) {
17+
throw new IllegalArgumentException("pollIntervalSeconds must be positive");
18+
}
19+
if (timeoutSeconds != null && timeoutSeconds <= 0) {
20+
throw new IllegalArgumentException("timeoutSeconds must be positive");
21+
}
22+
this.since = since;
23+
this.pollIntervalSeconds = pollIntervalSeconds;
24+
this.timeoutSeconds = timeoutSeconds;
25+
}
26+
27+
public int getSince() {
28+
return since;
29+
}
30+
31+
public double getPollIntervalSeconds() {
32+
return pollIntervalSeconds;
33+
}
34+
35+
public Double getTimeoutSeconds() {
36+
return timeoutSeconds;
37+
}
38+
39+
public static ObserveOptions defaults() {
40+
return new ObserveOptions();
41+
}
42+
}

src/test/java/dev/axme/sdk/AxmeClientTest.java

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package dev.axme.sdk;
22

33
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertNotNull;
45
import static org.junit.jupiter.api.Assertions.assertThrows;
56
import static org.junit.jupiter.api.Assertions.assertTrue;
67

78
import com.fasterxml.jackson.core.type.TypeReference;
89
import com.fasterxml.jackson.databind.ObjectMapper;
10+
import java.util.List;
911
import java.util.Map;
1012
import okhttp3.mockwebserver.MockResponse;
1113
import okhttp3.mockwebserver.MockWebServer;
@@ -429,4 +431,133 @@ void organizationRoutingDeliveryAndBillingEndpointsAreReachable() throws Excepti
429431
assertEquals("/v1/billing/invoices?org_id=org_1&workspace_id=ws_1&status=open", server.takeRequest().getPath());
430432
assertEquals("/v1/billing/invoices/inv_1", server.takeRequest().getPath());
431433
}
434+
435+
@Test
436+
void observeCollectsEventsAndStopsAtTerminal() throws Exception {
437+
// First poll: two non-terminal events
438+
server.enqueue(new MockResponse().setResponseCode(200).setBody(
439+
"{\"ok\":true,\"events\":["
440+
+ "{\"seq\":1,\"event_type\":\"intent.created\",\"status\":\"PENDING\"},"
441+
+ "{\"seq\":2,\"event_type\":\"intent.dispatched\",\"status\":\"IN_PROGRESS\"}"
442+
+ "]}"));
443+
// Second poll: empty (triggers sleep + re-poll)
444+
server.enqueue(new MockResponse().setResponseCode(200).setBody(
445+
"{\"ok\":true,\"events\":[]}"));
446+
// Third poll: terminal event
447+
server.enqueue(new MockResponse().setResponseCode(200).setBody(
448+
"{\"ok\":true,\"events\":["
449+
+ "{\"seq\":3,\"event_type\":\"intent.completed\",\"status\":\"COMPLETED\"}"
450+
+ "]}"));
451+
452+
List<Map<String, Object>> events = client.observe("it_obs_1",
453+
new ObserveOptions(0, 0.05, null));
454+
455+
assertEquals(3, events.size());
456+
assertEquals("intent.created", events.get(0).get("event_type"));
457+
assertEquals("intent.dispatched", events.get(1).get("event_type"));
458+
assertEquals("intent.completed", events.get(2).get("event_type"));
459+
assertEquals("COMPLETED", events.get(2).get("status"));
460+
461+
// Verify since parameter advances: first call since=0, third since=2
462+
RecordedRequest req1 = server.takeRequest();
463+
assertEquals("/v1/intents/it_obs_1/events?since=0", req1.getPath());
464+
RecordedRequest req2 = server.takeRequest();
465+
assertEquals("/v1/intents/it_obs_1/events?since=2", req2.getPath());
466+
RecordedRequest req3 = server.takeRequest();
467+
assertEquals("/v1/intents/it_obs_1/events?since=2", req3.getPath());
468+
}
469+
470+
@Test
471+
void observeStopsOnEventTypeAlone() throws Exception {
472+
// Terminal via event_type only (no status field)
473+
server.enqueue(new MockResponse().setResponseCode(200).setBody(
474+
"{\"ok\":true,\"events\":["
475+
+ "{\"seq\":1,\"event_type\":\"intent.failed\"}"
476+
+ "]}"));
477+
478+
List<Map<String, Object>> events = client.observe("it_obs_2",
479+
new ObserveOptions(0, 0.05, null));
480+
481+
assertEquals(1, events.size());
482+
assertEquals("intent.failed", events.get(0).get("event_type"));
483+
}
484+
485+
@Test
486+
void observeStopsOnStatusAlone() throws Exception {
487+
// Terminal via status only (no event_type field)
488+
server.enqueue(new MockResponse().setResponseCode(200).setBody(
489+
"{\"ok\":true,\"events\":["
490+
+ "{\"seq\":1,\"status\":\"CANCELED\"}"
491+
+ "]}"));
492+
493+
List<Map<String, Object>> events = client.observe("it_obs_3",
494+
new ObserveOptions(0, 0.05, null));
495+
496+
assertEquals(1, events.size());
497+
assertEquals("CANCELED", events.get(0).get("status"));
498+
}
499+
500+
@Test
501+
void observeTimesOut() {
502+
// Enqueue enough empty responses to keep polling
503+
for (int i = 0; i < 20; i++) {
504+
server.enqueue(new MockResponse().setResponseCode(200).setBody(
505+
"{\"ok\":true,\"events\":[]}"));
506+
}
507+
508+
AxmeTimeoutException ex = assertThrows(AxmeTimeoutException.class, () ->
509+
client.observe("it_timeout", new ObserveOptions(0, 0.05, 0.1)));
510+
511+
assertEquals("it_timeout", ex.getIntentId());
512+
assertTrue(ex.getTimeoutSeconds() > 0);
513+
}
514+
515+
@Test
516+
void waitForReturnsTerminalEvent() throws Exception {
517+
server.enqueue(new MockResponse().setResponseCode(200).setBody(
518+
"{\"ok\":true,\"events\":["
519+
+ "{\"seq\":1,\"event_type\":\"intent.created\",\"status\":\"PENDING\"}"
520+
+ "]}"));
521+
server.enqueue(new MockResponse().setResponseCode(200).setBody(
522+
"{\"ok\":true,\"events\":["
523+
+ "{\"seq\":2,\"event_type\":\"intent.timed_out\",\"status\":\"TIMED_OUT\"}"
524+
+ "]}"));
525+
526+
Map<String, Object> terminal = client.waitFor("it_wait_1",
527+
new ObserveOptions(0, 0.05, null));
528+
529+
assertNotNull(terminal);
530+
assertEquals("TIMED_OUT", terminal.get("status"));
531+
assertEquals("intent.timed_out", terminal.get("event_type"));
532+
}
533+
534+
@Test
535+
void observeWithSinceParameter() throws Exception {
536+
// Start from since=5, verify first request uses since=5
537+
server.enqueue(new MockResponse().setResponseCode(200).setBody(
538+
"{\"ok\":true,\"events\":["
539+
+ "{\"seq\":6,\"event_type\":\"intent.completed\",\"status\":\"COMPLETED\"}"
540+
+ "]}"));
541+
542+
List<Map<String, Object>> events = client.observe("it_since",
543+
new ObserveOptions(5, 0.05, null));
544+
545+
assertEquals(1, events.size());
546+
RecordedRequest req = server.takeRequest();
547+
assertEquals("/v1/intents/it_since/events?since=5", req.getPath());
548+
}
549+
550+
@Test
551+
void observeWithNullOptionsUsesDefaults() throws Exception {
552+
server.enqueue(new MockResponse().setResponseCode(200).setBody(
553+
"{\"ok\":true,\"events\":["
554+
+ "{\"seq\":1,\"event_type\":\"intent.completed\",\"status\":\"COMPLETED\"}"
555+
+ "]}"));
556+
557+
List<Map<String, Object>> events = client.observe("it_null_opts", null);
558+
559+
assertEquals(1, events.size());
560+
RecordedRequest req = server.takeRequest();
561+
assertEquals("/v1/intents/it_null_opts/events?since=0", req.getPath());
562+
}
432563
}

0 commit comments

Comments
 (0)