Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,28 @@ public Integer getMaxSubtasks() {
return maxSubtasks;
}

/**
* Adds a change hook that will be triggered whenever the plan changes.
*
* <p>The hook receives the PlanNotebook instance and the current plan (which may be null if the
* plan was finished or cleared).
*
* @param id unique identifier for the hook (used for removal)
* @param hook the callback to execute when plan changes
*/
public void addChangeHook(String id, BiConsumer<PlanNotebook, Plan> hook) {
changeHooks.put(id, hook);
}

/**
* Removes a previously registered change hook.
*
* @param id the identifier of the hook to remove
*/
public void removeChangeHook(String id) {
changeHooks.remove(id);
}

private Mono<Void> triggerPlanChangeHooks() {
return Flux.fromIterable(changeHooks.values())
.flatMap(hook -> Mono.fromRunnable(() -> hook.accept(this, currentPlan)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,4 +716,94 @@ void testReviseCurrentPlanDeleteDoesNotCheckMaxSubtasks() {
assertTrue(result.contains("successfully"));
assertEquals(2, limitedNotebook.getCurrentPlan().getSubtasks().size());
}

// ==================== Change Hook Tests ====================

@Test
void testChangeHookTriggeredOnAllPlanMutations() {
int[] callCount = {0};
Plan[] capturedPlan = {null};
notebook.addChangeHook(
"testHook",
(nb, plan) -> {
callCount[0]++;
capturedPlan[0] = plan;
});

List<SubTask> subtasks = List.of(new SubTask("Task1", "Desc1", "Outcome1"));

// createPlan
notebook.createPlanWithSubTasks("Test Plan", "Desc", "Outcome", subtasks).block();
assertEquals(1, callCount[0], "createPlan");
assertEquals("Test Plan", capturedPlan[0].getName());

// updatePlanInfo
notebook.updatePlanInfo("New Name", null, null).block();
assertEquals(2, callCount[0], "updatePlanInfo");

// reviseCurrentPlan (add)
notebook.reviseCurrentPlan(
1,
"add",
PlanNotebook.subtaskToMap(new SubTask("Task2", "Desc2", "Outcome2")))
.block();
assertEquals(3, callCount[0], "reviseCurrentPlan add");

// reviseCurrentPlan (revise)
notebook.reviseCurrentPlan(
0,
"revise",
PlanNotebook.subtaskToMap(new SubTask("Updated", "Desc", "Outcome")))
.block();
assertEquals(4, callCount[0], "reviseCurrentPlan revise");

// reviseCurrentPlan (delete)
notebook.reviseCurrentPlan(1, "delete", null).block();
assertEquals(5, callCount[0], "reviseCurrentPlan delete");

// updateSubtaskState
notebook.updateSubtaskState(0, "in_progress").block();
assertEquals(6, callCount[0], "updateSubtaskState");

// finishSubtask
notebook.finishSubtask(0, "Done").block();
assertEquals(7, callCount[0], "finishSubtask");

// finishPlan
notebook.finishPlan("done", "All done").block();
assertEquals(8, callCount[0], "finishPlan");
}

@Test
void testRemoveChangeHookStopsNotifications() {
int[] callCount = {0};
notebook.addChangeHook("testHook", (nb, plan) -> callCount[0]++);

List<SubTask> subtasks = List.of(new SubTask("Task1", "Desc1", "Outcome1"));
notebook.createPlanWithSubTasks("Plan 1", "Desc", "Outcome", subtasks).block();
assertEquals(1, callCount[0]);

notebook.removeChangeHook("testHook");
notebook.createPlanWithSubTasks("Plan 2", "Desc", "Outcome", subtasks).block();
assertEquals(1, callCount[0], "Hook should not be called after removal");
}

@Test
void testMultipleHooksAndIdOverwrite() {
int[] count1 = {0};
int[] count2 = {0};
int[] count3 = {0};

notebook.addChangeHook("hook1", (nb, plan) -> count1[0]++);
notebook.addChangeHook("hook2", (nb, plan) -> count2[0]++);
// Overwrite hook1
notebook.addChangeHook("hook1", (nb, plan) -> count3[0]++);

List<SubTask> subtasks = List.of(new SubTask("Task1", "Desc1", "Outcome1"));
notebook.createPlanWithSubTasks("Test Plan", "Desc", "Outcome", subtasks).block();

assertEquals(0, count1[0], "Original hook1 should be overwritten");
assertEquals(1, count2[0], "hook2 should be called");
assertEquals(1, count3[0], "New hook1 should be called");
}
}
9 changes: 9 additions & 0 deletions agentscope-examples/plan-notebook/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ Open your browser and navigate to: **http://localhost:8080**
| GET | `/api/health` | Health check |
| POST | `/api/reset` | Reset agent and clear all data |

### Human-in-the-Loop (HITL) API

| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/api/resume` | Resume agent execution after user review (SSE streaming) |
| GET | `/api/paused` | Check if the agent is currently paused |
| POST | `/api/stop` | Request the agent to pause after the next plan tool execution |
| GET | `/api/stop-requested` | Check if a stop has been requested |

### Plan API

| Method | Endpoint | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.agentscope.examples.plannotebook.controller;

import io.agentscope.examples.plannotebook.service.AgentService;
import java.util.Map;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
Expand Down Expand Up @@ -51,6 +52,56 @@ public Flux<String> chat(
return agentService.chat(sessionId, message);
}

/**
* Resume agent execution after user review.
* This endpoint is called when user clicks "Continue" button after reviewing/modifying the plan.
*
* @param sessionId Session ID (optional, defaults to "default")
* @return Flux of streaming text chunks
*/
@GetMapping(path = "/resume", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> resume(@RequestParam(defaultValue = "default") String sessionId) {
return agentService.resume(sessionId);
}

/**
* Get the current pause state of the agent.
*
* @param sessionId Session ID (optional, defaults to "default", reserved for future multi-session support)
* @return Map containing isPaused boolean
*/
@GetMapping("/paused")
public Map<String, Boolean> isPaused(@RequestParam(defaultValue = "default") String sessionId) {
return Map.of("isPaused", agentService.isPaused());
}

/**
* Request the agent to stop after the next plan tool execution.
* The agent will continue running until a plan-related tool is executed, then pause.
*
* @param sessionId Session ID (optional, defaults to "default", reserved for future multi-session support)
* @return Map containing stopRequested status
*/
@PostMapping("/stop")
public Map<String, Object> requestStop(
@RequestParam(defaultValue = "default") String sessionId) {
agentService.requestStop();
return Map.of(
"stopRequested", true, "message", "Will pause after next plan tool execution");
}

/**
* Get the current stop requested state.
*
* @param sessionId Session ID (optional, defaults to "default", reserved for future multi-session support)
* @return Map containing stopRequested boolean
*/
@GetMapping("/stop-requested")
public Map<String, Boolean> isStopRequested(
@RequestParam(defaultValue = "default") String sessionId) {
return Map.of("stopRequested", agentService.isStopRequested());
}

/**
* Health check endpoint.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
package io.agentscope.examples.plannotebook.service;

import io.agentscope.core.ReActAgent;
import io.agentscope.core.agent.Event;
import io.agentscope.core.agent.EventType;
import io.agentscope.core.agent.StreamOptions;
import io.agentscope.core.formatter.dashscope.DashScopeChatFormatter;
import io.agentscope.core.hook.Hook;
import io.agentscope.core.hook.HookEvent;
import io.agentscope.core.hook.PostActingEvent;
import io.agentscope.core.memory.InMemoryMemory;
import io.agentscope.core.message.GenerateReason;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;
Expand All @@ -31,6 +33,7 @@
import io.agentscope.core.tool.Toolkit;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
Expand Down Expand Up @@ -67,6 +70,12 @@ public class AgentService implements InitializingBean {
private InMemoryMemory memory;
private Toolkit toolkit;

// Track if agent is paused waiting for user to continue
private final AtomicBoolean isPaused = new AtomicBoolean(false);

// Track if user has requested to stop (will pause on next plan tool execution)
private final AtomicBoolean stopRequested = new AtomicBoolean(false);

public AgentService(PlanService planService) {
this.planService = planService;
}
Expand All @@ -91,16 +100,26 @@ private void initializeAgent() {
PlanNotebook planNotebook = PlanNotebook.builder().build();
planService.setPlanNotebook(planNotebook);

// Create hook to detect plan changes
// Register change hook to broadcast plan changes via SSE
planNotebook.addChangeHook(
"planServiceBroadcast", (notebook, plan) -> planService.broadcastPlanChange());

// Create hook to pause agent for user review when stop is requested
Hook planChangeHook =
new Hook() {
@Override
public <T extends HookEvent> Mono<T> onEvent(T event) {
if (event instanceof PostActingEvent postActing) {
String toolName = postActing.getToolUse().getName();
if (PLAN_TOOL_NAMES.contains(toolName)) {
// Broadcast plan change
planService.broadcastPlanChange();
// Only stop if user has requested it
if (stopRequested.compareAndSet(true, false)) {
log.info(
"Plan tool '{}' executed, pausing for user review",
toolName);
isPaused.set(true);
postActing.stopAgent();
}
}
}
return Mono.just(event);
Expand Down Expand Up @@ -132,38 +151,104 @@ public <T extends HookEvent> Mono<T> onEvent(T event) {
* Send a message to the agent and get streaming response.
*/
public Flux<String> chat(String sessionId, String message) {
// Clear paused state when user sends a new message
isPaused.set(false);

Msg userMsg =
Msg.builder()
.role(MsgRole.USER)
.content(TextBlock.builder().text(message).build())
.build();

StreamOptions streamOptions =
StreamOptions.builder()
.eventTypes(EventType.REASONING, EventType.TOOL_RESULT)
.incremental(true)
.build();

return agent.stream(userMsg, streamOptions)
return agent.stream(userMsg, createStreamOptions())
.subscribeOn(Schedulers.boundedElastic())
.filter(event -> !event.isLast())
.map(
event -> {
List<TextBlock> textBlocks =
event.getMessage().getContentBlocks(TextBlock.class);
if (!textBlocks.isEmpty()) {
return textBlocks.get(0).getText();
}
return "";
})
.map(this::mapEventToString)
.filter(text -> text != null && !text.isEmpty());
}

/**
* Resume agent execution after user review.
* This is called when user clicks "Continue" button after reviewing/modifying the plan.
*/
public Flux<String> resume(String sessionId) {
if (isPaused.compareAndSet(true, false)) {
log.info("Resuming agent execution after user review");

// Resume by calling agent.stream() with no input message
return agent.stream(createStreamOptions())
.subscribeOn(Schedulers.boundedElastic())
.map(this::mapEventToString)
.filter(text -> text != null && !text.isEmpty());
} else {
log.warn("Tried to resume but agent is not paused or already resuming");
return Flux.just("Agent is not paused or is already resuming.");
}
}

private StreamOptions createStreamOptions() {
return StreamOptions.builder()
.eventTypes(EventType.REASONING, EventType.TOOL_RESULT, EventType.AGENT_RESULT)
.incremental(true)
.build();
}

/**
* Map a stream event to a string for SSE output.
*/
private String mapEventToString(Event event) {
// Handle AGENT_RESULT events (agent execution ended)
if (event.getType() == EventType.AGENT_RESULT) {
Msg msg = event.getMessage();
if (msg != null && msg.getGenerateReason() == GenerateReason.ACTING_STOP_REQUESTED) {
isPaused.set(true);
return "[PAUSED]";
}
// Normal completion - content already streamed via REASONING chunks
return "";
}

// Skip final accumulated messages in incremental mode to avoid duplicate output
if (event.isLast()) {
return "";
}

List<TextBlock> textBlocks = event.getMessage().getContentBlocks(TextBlock.class);
if (!textBlocks.isEmpty()) {
return textBlocks.get(0).getText();
}
return "";
}

/**
* Check if the agent is currently paused.
*/
public boolean isPaused() {
return isPaused.get();
}

/**
* Request the agent to stop after the next plan tool execution.
* This sets a flag that will cause the agent to pause after executing any plan-related tool.
*/
public void requestStop() {
log.info("User requested stop - will pause after next plan tool execution");
stopRequested.set(true);
}

/**
* Check if a stop has been requested.
*/
public boolean isStopRequested() {
return stopRequested.get();
}

/**
* Reset the agent, clearing all conversations and plans.
*/
public void reset() {
log.info("Resetting agent and clearing all data");
isPaused.set(false);
stopRequested.set(false);
FileToolMock.clearStorage();
initializeAgent();
planService.broadcastPlanChange();
Expand Down
Loading
Loading