Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions crates/agentos-actor-plugin/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub struct Vars {
pub live_sessions: HashMap<String, String>,
/// `live_session_id -> capture pump task`.
pub capture_tasks: HashMap<String, JoinHandle<()>>,
/// `live_session_id -> permission-request pump task`.
pub permission_tasks: HashMap<String, JoinHandle<()>>,
}

impl Vars {
Expand All @@ -51,12 +53,15 @@ impl Vars {
.unwrap_or(external_session_id)
}

/// Abort and clear all in-flight capture tasks. Called on VM teardown
/// (sleep / destroy / run-loop exit).
/// Abort and clear all in-flight pump tasks (event capture + permission
/// requests). Called on VM teardown (sleep / destroy / run-loop exit).
pub fn clear(&mut self) {
for (_, task) in self.capture_tasks.drain() {
task.abort();
}
for (_, task) in self.permission_tasks.drain() {
task.abort();
}
self.live_sessions.clear();
}
}
Expand Down Expand Up @@ -336,6 +341,17 @@ pub(crate) async fn dispatch(
},
Err(error) => reply_err(host, token, error),
},
"respondPermission" => match decode_as::<(String, String, String)>(args) {
Ok((session_id, permission_id, reply)) => {
match session::respond_permission(vm, vars, &session_id, &permission_id, &reply)
.await
{
Ok(()) => reply_ok(host, token, &()),
Err(error) => reply_err(host, token, error),
}
}
Err(error) => reply_err(host, token, error),
},
"createSignedPreviewUrl" => match decode_as::<(u16, u64)>(args) {
Ok((port, ttl_seconds)) => match preview::create(host, port, ttl_seconds).await {
Ok(dto) => reply_ok(host, token, &dto),
Expand Down
172 changes: 169 additions & 3 deletions crates/agentos-actor-plugin/src/actions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::collections::BTreeMap;
use std::time::{SystemTime, UNIX_EPOCH};

use crate::host_ctx::HostCtx;
use agentos_client::{AgentOs, CreateSessionOptions};
use agentos_client::{AgentOs, CreateSessionOptions, PermissionReply};
use anyhow::{anyhow, Result};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -135,6 +135,111 @@ fn spawn_event_capture(
.insert(live_session_id.to_owned(), handle);
}

/// Build the `permissionRequest` broadcast body for one request.
///
/// The RivetKit event wire is CBOR and the body is the array of handler
/// ARGUMENTS the client spreads into the listener (`handler(...body)`). The
/// documented listener is `(data) => …`, so the single argument is the TS
/// `PermissionRequestPayload` — `{ sessionId, request: { permissionId,
/// description?, params } }` — and the body is `[ <that object> ]`. The
/// `sessionId` is the client-facing external id (== live for native sessions).
fn permission_event_body(
external_session_id: &str,
permission_id: &str,
description: Option<&str>,
params: &JsonValue,
) -> JsonValue {
json!([{
"sessionId": external_session_id,
"request": {
"permissionId": permission_id,
"description": description,
"params": params,
},
}])
}

/// Map the wire reply string to a [`PermissionReply`] (`"once"` / `"always"` /
/// `"reject"`), matching the TS `PermissionReply` union.
fn parse_permission_reply(reply: &str) -> Result<PermissionReply> {
match reply {
"once" => Ok(PermissionReply::Once),
"always" => Ok(PermissionReply::Always),
"reject" => Ok(PermissionReply::Reject),
other => Err(anyhow!(
"invalid permission reply {other:?} (expected \"once\" | \"always\" | \"reject\")"
)),
}
}

/// Subscribe to the session's permission-request stream and spawn a task that
/// broadcasts each request to connected clients as a `permissionRequest` event
/// (`conn.on("permissionRequest", …)`).
///
/// Mirrors [`spawn_event_capture`]. A subscriber MUST exist before the guest
/// agent raises a permission request, otherwise the client auto-rejects it
/// (`deliver_sidecar_permission_request` checks `receiver_count() == 0`) — so
/// this is started at session-create time. Clients answer via the
/// `respondPermission` action (→ [`respond_permission`]), which resolves the
/// pending reply slot; this pump only fans the request out, so dropping the
/// broadcast item's responder clone here is harmless.
fn spawn_permission_pump(
ctx: &HostCtx,
vm: &AgentOs,
vars: &mut Vars,
external_session_id: &str,
live_session_id: &str,
) {
let (mut stream, subscription) = match vm.on_permission_request(live_session_id) {
Ok(sub) => sub,
Err(error) => {
tracing::warn!(?error, live_session_id, "on_permission_request subscribe failed");
return;
}
};
if let Some(old) = vars.permission_tasks.remove(live_session_id) {
old.abort();
}
let ctx = ctx.clone();
let external = external_session_id.to_owned();
let handle = tokio::spawn(async move {
// Keep the RAII guard alive for the pump's lifetime; dropping the stream
// (on abort / channel close) is the unsubscribe.
let _subscription = subscription;
while let Some(request) = stream.next().await {
let body = permission_event_body(
&external,
&request.permission_id,
request.description.as_deref(),
&request.params,
);
let mut cbor = Vec::new();
if ciborium::into_writer(&body, &mut cbor).is_ok() {
let _ = ctx.broadcast(b"permissionRequest".to_vec(), cbor);
}
}
});
vars.permission_tasks
.insert(live_session_id.to_owned(), handle);
}

/// Answer a permission request raised by the session's guest agent
/// (`respondPermission`). Resolves the pending reply slot through the client's
/// `respond_permission`, keyed by the live session id.
pub async fn respond_permission(
vm: &AgentOs,
vars: &Vars,
session_id: &str,
permission_id: &str,
reply: &str,
) -> Result<()> {
let reply = parse_permission_reply(reply)?;
let live_session_id = vars.live_id(session_id).to_owned();
vm.respond_permission(&live_session_id, permission_id, reply)
.await?;
Ok(())
}

pub async fn create_session(
ctx: &HostCtx,
vm: &AgentOs,
Expand Down Expand Up @@ -176,8 +281,11 @@ pub async fn create_session(
)
.await?;
// At create time `external == live`; capture every `session/update` for this
// session under the external id (spec §3/§5).
// session under the external id (spec §3/§5), and start fanning the guest's
// permission requests out to connected clients. The permission pump must be
// subscribed before the agent runs, or requests would auto-reject.
spawn_event_capture(ctx, vm, vars, &session_id, &session_id);
spawn_permission_pump(ctx, vm, vars, &session_id, &session_id);
Ok(SessionIdDto { session_id })
}

Expand Down Expand Up @@ -227,11 +335,14 @@ pub async fn close_session(
vars: &mut Vars,
session_id: &str,
) -> Result<()> {
// Stop event capture + drop the remap for this external session.
// Stop event capture + the permission pump + drop the remap for this session.
let live_session_id = vars.live_id(session_id).to_owned();
if let Some(task) = vars.capture_tasks.remove(&live_session_id) {
task.abort();
}
if let Some(task) = vars.permission_tasks.remove(&live_session_id) {
task.abort();
}
vars.live_sessions.remove(session_id);
vm.close_session(&live_session_id).map_err(|e| anyhow!(e))?;
// Drop persisted metadata + events (explicit, since SQLite FK cascade is
Expand Down Expand Up @@ -397,3 +508,58 @@ pub async fn resume_session(
AcpResumeSessionRequest contract"
))
}

#[cfg(test)]
mod tests {
use super::*;
use agentos_client::PermissionReply;

#[test]
fn permission_event_body_matches_ts_payload_shape() {
// The TS client listener is `(data) => …` where data is
// `PermissionRequestPayload { sessionId, request: { permissionId,
// description?, params } }`, delivered as the single broadcast arg.
let params = json!({
"toolCall": { "title": "Bash", "kind": "execute" },
"options": [{ "optionId": "allow_once" }],
});
let body = permission_event_body("sess-1", "perm-7", Some("run a command"), &params);

// Body is the args array spread into the listener: exactly one argument.
let args = body.as_array().expect("body is an array");
assert_eq!(args.len(), 1, "exactly one handler argument");
let data = &args[0];

assert_eq!(data["sessionId"], json!("sess-1"));
assert_eq!(data["request"]["permissionId"], json!("perm-7"));
assert_eq!(data["request"]["description"], json!("run a command"));
// params are forwarded verbatim so the client can inspect the tool/paths.
assert_eq!(data["request"]["params"], params);
}

#[test]
fn permission_event_body_serializes_absent_description_as_null() {
let body = permission_event_body("sess-1", "perm-1", None, &json!({}));
assert_eq!(body[0]["request"]["description"], JsonValue::Null);
}

#[test]
fn parse_permission_reply_maps_each_wire_value() {
assert_eq!(parse_permission_reply("once").unwrap(), PermissionReply::Once);
assert_eq!(
parse_permission_reply("always").unwrap(),
PermissionReply::Always
);
assert_eq!(
parse_permission_reply("reject").unwrap(),
PermissionReply::Reject
);
}

#[test]
fn parse_permission_reply_rejects_unknown_value() {
let err = parse_permission_reply("maybe").unwrap_err().to_string();
assert!(err.contains("invalid permission reply"), "got: {err}");
assert!(err.contains("maybe"), "names the bad value: {err}");
}
}
8 changes: 7 additions & 1 deletion examples/docs/approvals/auto-approve-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import type { registry } from "./server";
const client = createClient<typeof registry>({ endpoint: "http://localhost:6420" });
const agent = client.vm.getOrCreate("my-agent");

// No need to handle permissions on the client. The server hook handles them.
// Auto-approve every request as it arrives. `"always"` also approves future
// requests of the same type, so a multi-step agent run is not interrupted.
const conn = agent.connect();
conn.on("permissionRequest", async (data) => {
await agent.respondPermission(data.sessionId, data.request.permissionId, "always");
});

const session = await agent.createSession("claude", {
env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
});
Expand Down
6 changes: 0 additions & 6 deletions examples/docs/approvals/auto-approve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@ import pi from "./software/pi";

const vm = agentOS({
software: [pi],
// The onPermissionRequest hook runs server-side for every request before it
// is forwarded to clients. Use it to inspect requests in fully automated
// pipelines without a client round-trip.
onPermissionRequest: async (sessionId, request) => {
console.log("auto-approving", sessionId, request.permissionId);
},
});

export const registry = setup({ use: { vm } });
Expand Down
11 changes: 9 additions & 2 deletions examples/docs/approvals/selective-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@ import type { registry } from "./server";
const client = createClient<typeof registry>({ endpoint: "http://localhost:6420" });
const agent = client.vm.getOrCreate("my-agent");

// Permission requests forwarded by the server reach the client here. The
// payload is inferred from the actor's event schema, so no cast is needed.
const conn = agent.connect();
conn.on("permissionRequest", async (data) => {
// Inspect the request and decide per-request. `request.description` /
// `request.params` carry the raw ACP details (the requested tool, paths, etc.).
const description = data.request.description?.toLowerCase() ?? "";
if (description.includes("read")) {
// Auto-approve reads.
await agent.respondPermission(data.sessionId, data.request.permissionId, "always");
return;
}
// Forward everything else to a human.
const approved = confirm(`Allow: ${JSON.stringify(data.request)}?`);
await agent.respondPermission(
data.sessionId,
Expand Down
9 changes: 0 additions & 9 deletions examples/docs/approvals/selective.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,6 @@ import pi from "./software/pi";

const vm = agentOS({
software: [pi],
onPermissionRequest: async (sessionId, request) => {
// `request.description` and `request.params` carry the raw ACP permission
// details (the requested tool, paths, etc.). Inspect them to decide which
// requests to handle server-side and which to forward to clients.
const description = request.description ?? "";
if (description.toLowerCase().includes("read")) {
console.log("read request handled server-side", sessionId, request.permissionId);
}
},
});

export const registry = setup({ use: { vm } });
Expand Down
6 changes: 4 additions & 2 deletions examples/docs/crash-course/permissions-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import type { registry } from "./server";
const client = createClient<typeof registry>({ endpoint: "http://localhost:6420" });
const agent = client.vm.getOrCreate("my-agent");

// Or handle permissions client-side for human-in-the-loop
// Subscribe and reply to permission requests. Permissions are fail-closed, so
// the agent waits until you reply.
const conn = agent.connect();
conn.on("permissionRequest", async (data) => {
console.log("Permission requested:", data.request);
// "once" | "always" | "reject"
// "once" | "always" | "reject". Reply "always" to auto-approve trusted
// workloads, or prompt a human for human-in-the-loop.
await agent.respondPermission(data.sessionId, data.request.permissionId, "once");
});
4 changes: 0 additions & 4 deletions examples/docs/crash-course/permissions-server.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import { agentOS, setup } from "@rivet-dev/agentos";
import pi from "./software/pi";

// Auto-approve all permissions server-side
const vm = agentOS({
software: [pi],
onPermissionRequest: async (sessionId, request) => {
console.log("Auto-approving", sessionId, request.permissionId);
},
});

export const registry = setup({ use: { vm } });
Expand Down
Loading
Loading