Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
### Maven/Gradle Builds ###
target/
.m2repo/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
Expand Down Expand Up @@ -46,6 +47,12 @@ build.log
shell.log
derby.log

### Environment Files ###
.env
.env.*
**/.env
**/.env.*

### Compiled Files ###
*.class

Expand All @@ -57,6 +64,7 @@ derby.log
*.zip
*.tar.gz
*.rar
node_modules/

### Claude Code ###
.claude/
Expand All @@ -68,5 +76,7 @@ hs_err_pid*
replay_pid*

### Planning and Internal Documentation ###
plans/
plans/*
!plans/STREAMABLE-HTTP-TRANSPORT.md
!plans/STREAMABLE-HTTP-AGENT-TRANSPORT.md
learnings/
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ For WebSocket server support (agents accepting WebSocket connections):
</dependency>
```

For Streamable HTTP server support (agents accepting remote HTTP/SSE connections):
```xml
<dependency>
<groupId>com.agentclientprotocol</groupId>
<artifactId>acp-streamable-http-jetty</artifactId>
<version>0.11.0</version>
</dependency>
```

---

## Getting Started
Expand Down Expand Up @@ -368,7 +377,8 @@ agent.start().block(); // Starts WebSocket server on port 8080

| Artifact | Description |
|----------|-------------|
| [`acp-core`](https://central.sonatype.com/artifact/com.agentclientprotocol/acp-core) | Client and Agent SDKs, stdio and WebSocket client transports |
| [`acp-core`](https://central.sonatype.com/artifact/com.agentclientprotocol/acp-core) | Client and Agent SDKs, stdio, WebSocket, and Streamable HTTP client transports |
| `acp-streamable-http-jetty` | Jetty-backed Streamable HTTP agent transport for listener-backed remote agents |
| [`acp-annotations`](https://central.sonatype.com/artifact/com.agentclientprotocol/acp-annotations) | `@AcpAgent`, `@Prompt`, and other annotations |
| [`acp-agent-support`](https://central.sonatype.com/artifact/com.agentclientprotocol/acp-agent-support) | Annotation-based agent runtime |
| [`acp-test`](https://central.sonatype.com/artifact/com.agentclientprotocol/acp-test) | In-memory transport and mock utilities for testing |
Expand All @@ -380,6 +390,7 @@ agent.start().block(); // Starts WebSocket server on port 8080
|-----------|--------|-------|--------|
| Stdio | `StdioAcpClientTransport` | `StdioAcpAgentTransport` | acp-core |
| WebSocket | `WebSocketAcpClientTransport` | `WebSocketAcpAgentTransport` | acp-core / acp-websocket-jetty |
| Streamable HTTP | `StreamableHttpAcpClientTransport` | `StreamableHttpAcpAgentTransport` | acp-core / acp-streamable-http-jetty |

---

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2025-2026 the original author or authors.
*/

package com.agentclientprotocol.sdk.agent;

import java.util.function.Function;

import com.agentclientprotocol.sdk.spec.AcpAgentTransport;
import com.agentclientprotocol.sdk.util.Assert;

/**
* Factory for creating one ACP agent runtime for one agent-side transport.
*
* <p>
* Listener-backed transports such as remote HTTP transports accept multiple client
* connections over their lifetime. Each accepted connection needs its own
* connection-bound agent runtime while reusing the same agent definition. This factory
* is the explicit public seam for that relationship.
* </p>
*
* @author Kaiser Dandangi
*/
@FunctionalInterface
public interface AcpAgentFactory {

/**
* Creates a new asynchronous agent runtime for the supplied transport.
* @param transport per-connection transport
* @return a fresh asynchronous agent runtime
*/
AcpAsyncAgent create(AcpAgentTransport transport);

/**
* Creates a factory from an asynchronous agent builder function.
* @param factory function that creates a fresh asynchronous agent per transport
* @return an agent factory
*/
static AcpAgentFactory async(Function<AcpAgentTransport, AcpAsyncAgent> factory) {
Assert.notNull(factory, "The async factory can not be null");
return factory::apply;
}

/**
* Creates a factory from a synchronous agent builder function.
*
* <p>
* Synchronous agents are wrappers around asynchronous agents in this SDK, so the
* transport seam remains asynchronous underneath while callers may still author
* agents with the blocking API.
* </p>
* @param factory function that creates a fresh synchronous agent per transport
* @return an agent factory
*/
static AcpAgentFactory sync(Function<AcpAgentTransport, AcpSyncAgent> factory) {
Assert.notNull(factory, "The sync factory can not be null");
return transport -> factory.apply(transport).async();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
/*
* Copyright 2025-2026 the original author or authors.
*/

package com.agentclientprotocol.sdk.agent.transport;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;

import com.agentclientprotocol.sdk.agent.AcpAgentFactory;
import com.agentclientprotocol.sdk.agent.AcpAsyncAgent;
import com.agentclientprotocol.sdk.error.AcpConnectionException;
import com.agentclientprotocol.sdk.json.AcpJsonMapper;
import com.agentclientprotocol.sdk.json.TypeRef;
import com.agentclientprotocol.sdk.spec.AcpAgentTransport;
import com.agentclientprotocol.sdk.spec.AcpSchema;
import com.agentclientprotocol.sdk.spec.AcpSchema.JSONRPCMessage;
import com.agentclientprotocol.sdk.util.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/**
* Shared per-connection core for listener-backed remote ACP agent transports.
*
* <p>
* Remote transports such as Streamable HTTP and WebSocket have different wire-level
* framing, but they both need the same agent-side shape once a remote ACP connection
* exists: one connection-bound {@link AcpAgentTransport}, one fresh agent runtime from
* {@link AcpAgentFactory}, inbound JSON-RPC delivery to the agent, and outbound JSON-RPC
* delivery back to the wire adapter.
* </p>
*
* <p>
* This class intentionally does not know about HTTP headers, SSE streams, WebSocket
* sessions, or route maps. Those remain transport-adapter concerns.
* </p>
*
* @author Kaiser Dandangi
*/
public final class RemoteAcpConnection {

private static final Logger logger = LoggerFactory.getLogger(RemoteAcpConnection.class);

private final String id;

private final AcpJsonMapper jsonMapper;

private final ConnectionTransport transport;

private final AtomicBoolean started = new AtomicBoolean(false);

private final AtomicBoolean closing = new AtomicBoolean(false);

private volatile AcpAsyncAgent agent;

/**
* Creates a new remote ACP connection core.
* @param id stable transport connection id
* @param jsonMapper JSON mapper used by the connection transport
* @param outboundConsumer callback that receives agent-originated outbound messages
*/
public RemoteAcpConnection(String id, AcpJsonMapper jsonMapper, Consumer<JSONRPCMessage> outboundConsumer) {
Assert.hasText(id, "The id can not be empty");
Assert.notNull(jsonMapper, "The jsonMapper can not be null");
Assert.notNull(outboundConsumer, "The outboundConsumer can not be null");
this.id = id;
this.jsonMapper = jsonMapper;
this.transport = new ConnectionTransport(outboundConsumer);
}

/**
* Returns the transport-level connection id.
* @return connection id
*/
public String id() {
return id;
}

/**
* Starts a fresh agent runtime for this connection.
* @param agentFactory factory used to create the connection-bound agent runtime
* @return mono that completes when the agent runtime is started
*/
public Mono<Void> start(AcpAgentFactory agentFactory) {
Assert.notNull(agentFactory, "The agentFactory can not be null");
if (!started.compareAndSet(false, true)) {
return Mono.error(new IllegalStateException("Already started"));
}
return Mono.defer(() -> {
this.agent = agentFactory.create(transport);
return this.agent.start();
}).doOnError(this::signalException);
}

/**
* Accepts one client-originated JSON-RPC message for delivery to the connection's
* agent runtime.
* @param message inbound message
*/
public void acceptInbound(JSONRPCMessage message) {
transport.acceptInbound(message);
}

/**
* Reports a transport adapter exception to the agent transport exception handler.
* @param error exception to report
*/
public void signalException(Throwable error) {
transport.signalException(error);
}

/**
* Closes the connection and its agent runtime gracefully.
* @return mono that completes when close work has been requested
*/
public Mono<Void> closeGracefully() {
return Mono.defer(() -> {
if (!closing.compareAndSet(false, true)) {
return Mono.empty();
}
AcpAsyncAgent currentAgent = this.agent;
if (currentAgent != null) {
return currentAgent.closeGracefully()
.onErrorResume(error -> {
signalException(error);
return Mono.empty();
})
.then(transport.closeGracefully());
}
return transport.closeGracefully();
});
}

/**
* Closes the connection and its agent runtime immediately.
*/
public void close() {
if (!closing.compareAndSet(false, true)) {
return;
}
AcpAsyncAgent currentAgent = this.agent;
if (currentAgent != null) {
currentAgent.close();
}
transport.close();
}

private final class ConnectionTransport implements AcpAgentTransport {

private final Consumer<JSONRPCMessage> outboundConsumer;

private final Sinks.Many<JSONRPCMessage> inboundSink = Sinks.many().unicast().onBackpressureBuffer();

/*
* Streamable HTTP can deliver multiple POST requests for one ACP connection on
* different server threads. Reactor unicast sinks require serialized producers,
* so all transport-adapter ingress is funneled through this monitor before
* emission.
*/
private final Object inboundEmitMonitor = new Object();

private final Sinks.One<Void> terminationSink = Sinks.one();

private final AtomicBoolean transportStarted = new AtomicBoolean(false);

private final AtomicBoolean transportClosing = new AtomicBoolean(false);

private volatile Consumer<Throwable> exceptionHandler = t -> logger.error("Remote ACP transport error", t);

ConnectionTransport(Consumer<JSONRPCMessage> outboundConsumer) {
this.outboundConsumer = outboundConsumer;
}

@Override
public Mono<Void> start(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
Assert.notNull(handler, "The handler can not be null");
if (!transportStarted.compareAndSet(false, true)) {
return Mono.error(new IllegalStateException("Already started"));
}
inboundSink.asFlux()
.flatMap(message -> Mono.just(message).transform(handler))
.doOnNext(response -> {
if (response != null) {
outboundConsumer.accept(response);
}
})
.doOnError(this::signalException)
.doFinally(signal -> terminationSink.tryEmitValue(null))
.subscribe();
return Mono.empty();
}

void acceptInbound(JSONRPCMessage message) {
Assert.notNull(message, "The message can not be null");
if (transportClosing.get()) {
throw new AcpConnectionException("Remote ACP connection is closing");
}
synchronized (inboundEmitMonitor) {
Sinks.EmitResult result = inboundSink.tryEmitNext(message);
if (result.isFailure()) {
throw new AcpConnectionException("Failed to enqueue inbound message: " + result);
}
}
}

void signalException(Throwable error) {
exceptionHandler.accept(error);
}

@Override
public Mono<Void> sendMessage(JSONRPCMessage message) {
return Mono.fromRunnable(() -> {
if (transportClosing.get()) {
throw new AcpConnectionException("Remote ACP connection is closing");
}
outboundConsumer.accept(message);
});
}

@Override
public <T> T unmarshalFrom(Object data, TypeRef<T> typeRef) {
return jsonMapper.convertValue(data, typeRef);
}

@Override
public Mono<Void> closeGracefully() {
return Mono.fromRunnable(this::close);
}

@Override
public void close() {
if (transportClosing.compareAndSet(false, true)) {
inboundSink.tryEmitComplete();
terminationSink.tryEmitValue(null);
}
}

@Override
public void setExceptionHandler(Consumer<Throwable> handler) {
Assert.notNull(handler, "The handler can not be null");
this.exceptionHandler = handler;
}

@Override
public Mono<Void> awaitTermination() {
return terminationSink.asMono();
}

}

}
Loading