66
77import java .time .Duration ;
88import java .util .Map ;
9+ import java .util .Set ;
910import java .util .UUID ;
1011import java .util .concurrent .ConcurrentHashMap ;
1112import java .util .concurrent .Executors ;
1213import java .util .concurrent .atomic .AtomicLong ;
13- import java .util .concurrent .atomic .AtomicReference ;
1414
1515import reactor .core .scheduler .Scheduler ;
1616import reactor .core .scheduler .Schedulers ;
@@ -78,10 +78,17 @@ public class AcpAgentSession implements AcpSession {
7878 private final AtomicLong requestCounter = new AtomicLong (0 );
7979
8080 /**
81- * Active prompt tracking for single-turn enforcement.
82- * Only ONE prompt can be active at a time per ACP session.
81+ * Active prompt tracking for single-turn enforcement, keyed by logical ACP
82+ * sessionId.
83+ *
84+ * <p>
85+ * Kotlin SDK precedent: its Agent.SessionWrapper owns a single active prompt guard
86+ * per logical session wrapper. This Java session can multiplex multiple logical ACP
87+ * sessionIds over one transport connection, so the same single-turn rule needs to
88+ * be applied per sessionId instead of once for the whole connection.
89+ * </p>
8390 */
84- private final AtomicReference < ActivePrompt > activePrompt = new AtomicReference <>(null );
91+ private final ConcurrentHashMap < String , ActivePrompt > activePrompts = new ConcurrentHashMap <>();
8592
8693 /**
8794 * Represents an active prompt session for single-turn enforcement.
@@ -235,12 +242,12 @@ private Mono<AcpSchema.JSONRPCResponse> handleIncomingRequest(AcpSchema.JSONRPCR
235242 String sessionId = extractSessionId (request .params ());
236243 ActivePrompt newPrompt = new ActivePrompt (sessionId , request .id ());
237244
238- // Try to set as active prompt - fails if another prompt is active
239- if (! activePrompt . compareAndSet ( null , newPrompt )) {
240- ActivePrompt current = activePrompt . get ( );
241- logger . warn ( "Rejected concurrent prompt request. Active prompt: sessionId={}, requestId={}" ,
242- current != null ? current . sessionId () : "unknown" ,
243- current != null ? current .requestId () : "unknown" );
245+ // Try to set as active prompt - fails if this logical session already has
246+ // a prompt active.
247+ ActivePrompt current = activePrompts . putIfAbsent ( sessionId , newPrompt );
248+ if ( current != null ) {
249+ logger . warn ( "Rejected concurrent prompt request for sessionId={}. Active requestId={}" , sessionId ,
250+ current .requestId ());
244251 return Mono .just (new AcpSchema .JSONRPCResponse (AcpSchema .JSONRPC_VERSION , request .id (), null ,
245252 new AcpSchema .JSONRPCError (-32000 , "There is already an active prompt execution" , null )));
246253 }
@@ -249,8 +256,8 @@ private Mono<AcpSchema.JSONRPCResponse> handleIncomingRequest(AcpSchema.JSONRPCR
249256 return handler .handle (request .params ())
250257 .map (result -> new AcpSchema .JSONRPCResponse (AcpSchema .JSONRPC_VERSION , request .id (), result , null ))
251258 .doFinally (signal -> {
252- activePrompt . compareAndSet ( newPrompt , null );
253- logger .debug ("Prompt completed with signal: {}" , signal );
259+ activePrompts . remove ( sessionId , newPrompt );
260+ logger .debug ("Prompt completed for sessionId={} with signal: {}" , sessionId , signal );
254261 });
255262 }
256263
@@ -262,8 +269,13 @@ private Mono<AcpSchema.JSONRPCResponse> handleIncomingRequest(AcpSchema.JSONRPCR
262269 /**
263270 * Extracts the sessionId from request parameters.
264271 */
265- @ SuppressWarnings ("unchecked" )
266272 private String extractSessionId (Object params ) {
273+ if (params instanceof AcpSchema .PromptRequest promptRequest ) {
274+ return promptRequest .sessionId () != null ? promptRequest .sessionId () : "unknown" ;
275+ }
276+ if (params instanceof AcpSchema .CancelNotification cancelNotification ) {
277+ return cancelNotification .sessionId () != null ? cancelNotification .sessionId () : "unknown" ;
278+ }
267279 if (params instanceof Map <?, ?> map ) {
268280 Object sessionId = map .get ("sessionId" );
269281 return sessionId != null ? sessionId .toString () : "unknown" ;
@@ -289,9 +301,8 @@ private Mono<Void> handleIncomingNotification(AcpSchema.JSONRPCNotification noti
289301 // Handle cancel notification specially
290302 if (AcpSchema .METHOD_SESSION_CANCEL .equals (notification .method ())) {
291303 String sessionId = extractSessionId (notification .params ());
292- ActivePrompt current = activePrompt .get ();
293- if (current != null && sessionId .equals (current .sessionId ())) {
294- activePrompt .compareAndSet (current , null );
304+ ActivePrompt current = activePrompts .remove (sessionId );
305+ if (current != null ) {
295306 logger .debug ("Cancelled active prompt for session: {}" , sessionId );
296307 }
297308 }
@@ -372,16 +383,39 @@ public Mono<Void> sendNotification(String method, Object params) {
372383 * @return true if a prompt is currently active
373384 */
374385 public boolean hasActivePrompt () {
375- return activePrompt .get () != null ;
386+ return !activePrompts .isEmpty ();
387+ }
388+
389+ /**
390+ * Checks if there is an active prompt being processed for the specified logical
391+ * ACP session.
392+ * @param sessionId the logical ACP session ID
393+ * @return true if a prompt is currently active for the session
394+ */
395+ public boolean hasActivePrompt (String sessionId ) {
396+ Assert .hasText (sessionId , "The sessionId can not be empty" );
397+ return activePrompts .containsKey (sessionId );
376398 }
377399
378400 /**
379- * Gets the session ID of the active prompt, if any.
380- * @return the session ID or null if no prompt is active
401+ * Gets one active prompt session ID, if any.
402+ *
403+ * <p>
404+ * This is a legacy aggregate view. When multiple logical ACP sessions are active on
405+ * the same transport connection, the returned session ID is arbitrary.
406+ * </p>
407+ * @return one active session ID or null if no prompt is active
381408 */
382409 public String getActivePromptSessionId () {
383- ActivePrompt current = activePrompt .get ();
384- return current != null ? current .sessionId () : null ;
410+ return activePrompts .keySet ().stream ().findFirst ().orElse (null );
411+ }
412+
413+ /**
414+ * Gets the logical ACP session IDs that currently have active prompts.
415+ * @return an immutable snapshot of active prompt session IDs
416+ */
417+ public Set <String > getActivePromptSessionIds () {
418+ return Set .copyOf (activePrompts .keySet ());
385419 }
386420
387421 /**
@@ -391,7 +425,7 @@ public String getActivePromptSessionId() {
391425 @ Override
392426 public Mono <Void > closeGracefully () {
393427 return Mono .fromRunnable (() -> {
394- activePrompt . set ( null );
428+ activePrompts . clear ( );
395429 dismissPendingResponses ();
396430 timeoutScheduler .dispose ();
397431 }).then (this .transport .closeGracefully ());
@@ -402,7 +436,7 @@ public Mono<Void> closeGracefully() {
402436 */
403437 @ Override
404438 public void close () {
405- activePrompt . set ( null );
439+ activePrompts . clear ( );
406440 dismissPendingResponses ();
407441 timeoutScheduler .dispose ();
408442 transport .close ();
0 commit comments