diff --git a/agentscope-core/src/main/java/io/agentscope/core/plan/PlanNotebook.java b/agentscope-core/src/main/java/io/agentscope/core/plan/PlanNotebook.java
index df42d9538..0bc63bbea 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/plan/PlanNotebook.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/plan/PlanNotebook.java
@@ -1011,6 +1011,28 @@ public Integer getMaxSubtasks() {
return maxSubtasks;
}
+ /**
+ * Adds a change hook that will be triggered whenever the plan changes.
+ *
+ *
The hook receives the PlanNotebook instance and the current plan (which may be null if the
+ * plan was finished or cleared).
+ *
+ * @param id unique identifier for the hook (used for removal)
+ * @param hook the callback to execute when plan changes
+ */
+ public void addChangeHook(String id, BiConsumer hook) {
+ changeHooks.put(id, hook);
+ }
+
+ /**
+ * Removes a previously registered change hook.
+ *
+ * @param id the identifier of the hook to remove
+ */
+ public void removeChangeHook(String id) {
+ changeHooks.remove(id);
+ }
+
private Mono triggerPlanChangeHooks() {
return Flux.fromIterable(changeHooks.values())
.flatMap(hook -> Mono.fromRunnable(() -> hook.accept(this, currentPlan)))
diff --git a/agentscope-core/src/test/java/io/agentscope/core/plan/PlanNotebookToolTest.java b/agentscope-core/src/test/java/io/agentscope/core/plan/PlanNotebookToolTest.java
index 5821a40e5..559519969 100644
--- a/agentscope-core/src/test/java/io/agentscope/core/plan/PlanNotebookToolTest.java
+++ b/agentscope-core/src/test/java/io/agentscope/core/plan/PlanNotebookToolTest.java
@@ -716,4 +716,94 @@ void testReviseCurrentPlanDeleteDoesNotCheckMaxSubtasks() {
assertTrue(result.contains("successfully"));
assertEquals(2, limitedNotebook.getCurrentPlan().getSubtasks().size());
}
+
+ // ==================== Change Hook Tests ====================
+
+ @Test
+ void testChangeHookTriggeredOnAllPlanMutations() {
+ int[] callCount = {0};
+ Plan[] capturedPlan = {null};
+ notebook.addChangeHook(
+ "testHook",
+ (nb, plan) -> {
+ callCount[0]++;
+ capturedPlan[0] = plan;
+ });
+
+ List subtasks = List.of(new SubTask("Task1", "Desc1", "Outcome1"));
+
+ // createPlan
+ notebook.createPlanWithSubTasks("Test Plan", "Desc", "Outcome", subtasks).block();
+ assertEquals(1, callCount[0], "createPlan");
+ assertEquals("Test Plan", capturedPlan[0].getName());
+
+ // updatePlanInfo
+ notebook.updatePlanInfo("New Name", null, null).block();
+ assertEquals(2, callCount[0], "updatePlanInfo");
+
+ // reviseCurrentPlan (add)
+ notebook.reviseCurrentPlan(
+ 1,
+ "add",
+ PlanNotebook.subtaskToMap(new SubTask("Task2", "Desc2", "Outcome2")))
+ .block();
+ assertEquals(3, callCount[0], "reviseCurrentPlan add");
+
+ // reviseCurrentPlan (revise)
+ notebook.reviseCurrentPlan(
+ 0,
+ "revise",
+ PlanNotebook.subtaskToMap(new SubTask("Updated", "Desc", "Outcome")))
+ .block();
+ assertEquals(4, callCount[0], "reviseCurrentPlan revise");
+
+ // reviseCurrentPlan (delete)
+ notebook.reviseCurrentPlan(1, "delete", null).block();
+ assertEquals(5, callCount[0], "reviseCurrentPlan delete");
+
+ // updateSubtaskState
+ notebook.updateSubtaskState(0, "in_progress").block();
+ assertEquals(6, callCount[0], "updateSubtaskState");
+
+ // finishSubtask
+ notebook.finishSubtask(0, "Done").block();
+ assertEquals(7, callCount[0], "finishSubtask");
+
+ // finishPlan
+ notebook.finishPlan("done", "All done").block();
+ assertEquals(8, callCount[0], "finishPlan");
+ }
+
+ @Test
+ void testRemoveChangeHookStopsNotifications() {
+ int[] callCount = {0};
+ notebook.addChangeHook("testHook", (nb, plan) -> callCount[0]++);
+
+ List subtasks = List.of(new SubTask("Task1", "Desc1", "Outcome1"));
+ notebook.createPlanWithSubTasks("Plan 1", "Desc", "Outcome", subtasks).block();
+ assertEquals(1, callCount[0]);
+
+ notebook.removeChangeHook("testHook");
+ notebook.createPlanWithSubTasks("Plan 2", "Desc", "Outcome", subtasks).block();
+ assertEquals(1, callCount[0], "Hook should not be called after removal");
+ }
+
+ @Test
+ void testMultipleHooksAndIdOverwrite() {
+ int[] count1 = {0};
+ int[] count2 = {0};
+ int[] count3 = {0};
+
+ notebook.addChangeHook("hook1", (nb, plan) -> count1[0]++);
+ notebook.addChangeHook("hook2", (nb, plan) -> count2[0]++);
+ // Overwrite hook1
+ notebook.addChangeHook("hook1", (nb, plan) -> count3[0]++);
+
+ List subtasks = List.of(new SubTask("Task1", "Desc1", "Outcome1"));
+ notebook.createPlanWithSubTasks("Test Plan", "Desc", "Outcome", subtasks).block();
+
+ assertEquals(0, count1[0], "Original hook1 should be overwritten");
+ assertEquals(1, count2[0], "hook2 should be called");
+ assertEquals(1, count3[0], "New hook1 should be called");
+ }
}
diff --git a/agentscope-examples/plan-notebook/README.md b/agentscope-examples/plan-notebook/README.md
index f2644fd58..6ab3893ba 100644
--- a/agentscope-examples/plan-notebook/README.md
+++ b/agentscope-examples/plan-notebook/README.md
@@ -79,6 +79,15 @@ Open your browser and navigate to: **http://localhost:8080**
| GET | `/api/health` | Health check |
| POST | `/api/reset` | Reset agent and clear all data |
+### Human-in-the-Loop (HITL) API
+
+| Method | Endpoint | Description |
+|--------|----------|-------------|
+| GET | `/api/resume` | Resume agent execution after user review (SSE streaming) |
+| GET | `/api/paused` | Check if the agent is currently paused |
+| POST | `/api/stop` | Request the agent to pause after the next plan tool execution |
+| GET | `/api/stop-requested` | Check if a stop has been requested |
+
### Plan API
| Method | Endpoint | Description |
diff --git a/agentscope-examples/plan-notebook/src/main/java/io/agentscope/examples/plannotebook/controller/ChatController.java b/agentscope-examples/plan-notebook/src/main/java/io/agentscope/examples/plannotebook/controller/ChatController.java
index f3b7c6a09..21f78be7d 100644
--- a/agentscope-examples/plan-notebook/src/main/java/io/agentscope/examples/plannotebook/controller/ChatController.java
+++ b/agentscope-examples/plan-notebook/src/main/java/io/agentscope/examples/plannotebook/controller/ChatController.java
@@ -16,6 +16,7 @@
package io.agentscope.examples.plannotebook.controller;
import io.agentscope.examples.plannotebook.service.AgentService;
+import java.util.Map;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
@@ -51,6 +52,56 @@ public Flux chat(
return agentService.chat(sessionId, message);
}
+ /**
+ * Resume agent execution after user review.
+ * This endpoint is called when user clicks "Continue" button after reviewing/modifying the plan.
+ *
+ * @param sessionId Session ID (optional, defaults to "default")
+ * @return Flux of streaming text chunks
+ */
+ @GetMapping(path = "/resume", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux resume(@RequestParam(defaultValue = "default") String sessionId) {
+ return agentService.resume(sessionId);
+ }
+
+ /**
+ * Get the current pause state of the agent.
+ *
+ * @param sessionId Session ID (optional, defaults to "default", reserved for future multi-session support)
+ * @return Map containing isPaused boolean
+ */
+ @GetMapping("/paused")
+ public Map isPaused(@RequestParam(defaultValue = "default") String sessionId) {
+ return Map.of("isPaused", agentService.isPaused());
+ }
+
+ /**
+ * Request the agent to stop after the next plan tool execution.
+ * The agent will continue running until a plan-related tool is executed, then pause.
+ *
+ * @param sessionId Session ID (optional, defaults to "default", reserved for future multi-session support)
+ * @return Map containing stopRequested status
+ */
+ @PostMapping("/stop")
+ public Map requestStop(
+ @RequestParam(defaultValue = "default") String sessionId) {
+ agentService.requestStop();
+ return Map.of(
+ "stopRequested", true, "message", "Will pause after next plan tool execution");
+ }
+
+ /**
+ * Get the current stop requested state.
+ *
+ * @param sessionId Session ID (optional, defaults to "default", reserved for future multi-session support)
+ * @return Map containing stopRequested boolean
+ */
+ @GetMapping("/stop-requested")
+ public Map isStopRequested(
+ @RequestParam(defaultValue = "default") String sessionId) {
+ return Map.of("stopRequested", agentService.isStopRequested());
+ }
+
/**
* Health check endpoint.
*/
diff --git a/agentscope-examples/plan-notebook/src/main/java/io/agentscope/examples/plannotebook/service/AgentService.java b/agentscope-examples/plan-notebook/src/main/java/io/agentscope/examples/plannotebook/service/AgentService.java
index a526ded88..00a76bba5 100644
--- a/agentscope-examples/plan-notebook/src/main/java/io/agentscope/examples/plannotebook/service/AgentService.java
+++ b/agentscope-examples/plan-notebook/src/main/java/io/agentscope/examples/plannotebook/service/AgentService.java
@@ -16,6 +16,7 @@
package io.agentscope.examples.plannotebook.service;
import io.agentscope.core.ReActAgent;
+import io.agentscope.core.agent.Event;
import io.agentscope.core.agent.EventType;
import io.agentscope.core.agent.StreamOptions;
import io.agentscope.core.formatter.dashscope.DashScopeChatFormatter;
@@ -23,6 +24,7 @@
import io.agentscope.core.hook.HookEvent;
import io.agentscope.core.hook.PostActingEvent;
import io.agentscope.core.memory.InMemoryMemory;
+import io.agentscope.core.message.GenerateReason;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;
@@ -31,6 +33,7 @@
import io.agentscope.core.tool.Toolkit;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -67,6 +70,12 @@ public class AgentService implements InitializingBean {
private InMemoryMemory memory;
private Toolkit toolkit;
+ // Track if agent is paused waiting for user to continue
+ private final AtomicBoolean isPaused = new AtomicBoolean(false);
+
+ // Track if user has requested to stop (will pause on next plan tool execution)
+ private final AtomicBoolean stopRequested = new AtomicBoolean(false);
+
public AgentService(PlanService planService) {
this.planService = planService;
}
@@ -91,7 +100,11 @@ private void initializeAgent() {
PlanNotebook planNotebook = PlanNotebook.builder().build();
planService.setPlanNotebook(planNotebook);
- // Create hook to detect plan changes
+ // Register change hook to broadcast plan changes via SSE
+ planNotebook.addChangeHook(
+ "planServiceBroadcast", (notebook, plan) -> planService.broadcastPlanChange());
+
+ // Create hook to pause agent for user review when stop is requested
Hook planChangeHook =
new Hook() {
@Override
@@ -99,8 +112,14 @@ public Mono onEvent(T event) {
if (event instanceof PostActingEvent postActing) {
String toolName = postActing.getToolUse().getName();
if (PLAN_TOOL_NAMES.contains(toolName)) {
- // Broadcast plan change
- planService.broadcastPlanChange();
+ // Only stop if user has requested it
+ if (stopRequested.compareAndSet(true, false)) {
+ log.info(
+ "Plan tool '{}' executed, pausing for user review",
+ toolName);
+ isPaused.set(true);
+ postActing.stopAgent();
+ }
}
}
return Mono.just(event);
@@ -132,38 +151,104 @@ public Mono onEvent(T event) {
* Send a message to the agent and get streaming response.
*/
public Flux chat(String sessionId, String message) {
+ // Clear paused state when user sends a new message
+ isPaused.set(false);
+
Msg userMsg =
Msg.builder()
.role(MsgRole.USER)
.content(TextBlock.builder().text(message).build())
.build();
- StreamOptions streamOptions =
- StreamOptions.builder()
- .eventTypes(EventType.REASONING, EventType.TOOL_RESULT)
- .incremental(true)
- .build();
-
- return agent.stream(userMsg, streamOptions)
+ return agent.stream(userMsg, createStreamOptions())
.subscribeOn(Schedulers.boundedElastic())
- .filter(event -> !event.isLast())
- .map(
- event -> {
- List textBlocks =
- event.getMessage().getContentBlocks(TextBlock.class);
- if (!textBlocks.isEmpty()) {
- return textBlocks.get(0).getText();
- }
- return "";
- })
+ .map(this::mapEventToString)
.filter(text -> text != null && !text.isEmpty());
}
+ /**
+ * Resume agent execution after user review.
+ * This is called when user clicks "Continue" button after reviewing/modifying the plan.
+ */
+ public Flux resume(String sessionId) {
+ if (isPaused.compareAndSet(true, false)) {
+ log.info("Resuming agent execution after user review");
+
+ // Resume by calling agent.stream() with no input message
+ return agent.stream(createStreamOptions())
+ .subscribeOn(Schedulers.boundedElastic())
+ .map(this::mapEventToString)
+ .filter(text -> text != null && !text.isEmpty());
+ } else {
+ log.warn("Tried to resume but agent is not paused or already resuming");
+ return Flux.just("Agent is not paused or is already resuming.");
+ }
+ }
+
+ private StreamOptions createStreamOptions() {
+ return StreamOptions.builder()
+ .eventTypes(EventType.REASONING, EventType.TOOL_RESULT, EventType.AGENT_RESULT)
+ .incremental(true)
+ .build();
+ }
+
+ /**
+ * Map a stream event to a string for SSE output.
+ */
+ private String mapEventToString(Event event) {
+ // Handle AGENT_RESULT events (agent execution ended)
+ if (event.getType() == EventType.AGENT_RESULT) {
+ Msg msg = event.getMessage();
+ if (msg != null && msg.getGenerateReason() == GenerateReason.ACTING_STOP_REQUESTED) {
+ isPaused.set(true);
+ return "[PAUSED]";
+ }
+ // Normal completion - content already streamed via REASONING chunks
+ return "";
+ }
+
+ // Skip final accumulated messages in incremental mode to avoid duplicate output
+ if (event.isLast()) {
+ return "";
+ }
+
+ List textBlocks = event.getMessage().getContentBlocks(TextBlock.class);
+ if (!textBlocks.isEmpty()) {
+ return textBlocks.get(0).getText();
+ }
+ return "";
+ }
+
+ /**
+ * Check if the agent is currently paused.
+ */
+ public boolean isPaused() {
+ return isPaused.get();
+ }
+
+ /**
+ * Request the agent to stop after the next plan tool execution.
+ * This sets a flag that will cause the agent to pause after executing any plan-related tool.
+ */
+ public void requestStop() {
+ log.info("User requested stop - will pause after next plan tool execution");
+ stopRequested.set(true);
+ }
+
+ /**
+ * Check if a stop has been requested.
+ */
+ public boolean isStopRequested() {
+ return stopRequested.get();
+ }
+
/**
* Reset the agent, clearing all conversations and plans.
*/
public void reset() {
log.info("Resetting agent and clearing all data");
+ isPaused.set(false);
+ stopRequested.set(false);
FileToolMock.clearStorage();
initializeAgent();
planService.broadcastPlanChange();
diff --git a/agentscope-examples/plan-notebook/src/main/java/io/agentscope/examples/plannotebook/service/PlanService.java b/agentscope-examples/plan-notebook/src/main/java/io/agentscope/examples/plannotebook/service/PlanService.java
index 7d2bd479b..a0c07ab4e 100644
--- a/agentscope-examples/plan-notebook/src/main/java/io/agentscope/examples/plannotebook/service/PlanService.java
+++ b/agentscope-examples/plan-notebook/src/main/java/io/agentscope/examples/plannotebook/service/PlanService.java
@@ -104,9 +104,7 @@ public Mono addSubtask(int index, SubTaskRequest subtask) {
subtaskMap.put("description", subtask.getDescription());
subtaskMap.put("expected_outcome", subtask.getExpectedOutcome());
- return planNotebook
- .reviseCurrentPlan(index, "add", subtaskMap)
- .doOnSuccess(result -> broadcastPlanChange());
+ return planNotebook.reviseCurrentPlan(index, "add", subtaskMap);
}
/**
@@ -118,52 +116,42 @@ public Mono reviseSubtask(int index, SubTaskRequest subtask) {
subtaskMap.put("description", subtask.getDescription());
subtaskMap.put("expected_outcome", subtask.getExpectedOutcome());
- return planNotebook
- .reviseCurrentPlan(index, "revise", subtaskMap)
- .doOnSuccess(result -> broadcastPlanChange());
+ return planNotebook.reviseCurrentPlan(index, "revise", subtaskMap);
}
/**
* Delete a subtask at the specified index.
*/
public Mono deleteSubtask(int index) {
- return planNotebook
- .reviseCurrentPlan(index, "delete", null)
- .doOnSuccess(result -> broadcastPlanChange());
+ return planNotebook.reviseCurrentPlan(index, "delete", null);
}
/**
* Update subtask state.
*/
public Mono updateSubtaskState(int index, String state) {
- return planNotebook
- .updateSubtaskState(index, state)
- .doOnSuccess(result -> broadcastPlanChange());
+ return planNotebook.updateSubtaskState(index, state);
}
/**
* Finish a subtask with outcome.
*/
public Mono finishSubtask(int index, String outcome) {
- return planNotebook
- .finishSubtask(index, outcome)
- .doOnSuccess(result -> broadcastPlanChange());
+ return planNotebook.finishSubtask(index, outcome);
}
/**
* Finish the current plan.
*/
public Mono finishPlan(String state, String outcome) {
- return planNotebook.finishPlan(state, outcome).doOnSuccess(result -> broadcastPlanChange());
+ return planNotebook.finishPlan(state, outcome);
}
/**
* Update the current plan's name, description, or expected outcome.
*/
public Mono updatePlanInfo(String name, String description, String expectedOutcome) {
- return planNotebook
- .updatePlanInfo(name, description, expectedOutcome)
- .doOnSuccess(result -> broadcastPlanChange());
+ return planNotebook.updatePlanInfo(name, description, expectedOutcome);
}
/**
@@ -174,8 +162,6 @@ public Mono createPlan(
String description,
String expectedOutcome,
List