Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .turbo
1 change: 1 addition & 0 deletions dist
1 change: 1 addition & 0 deletions node_modules
1 change: 1 addition & 0 deletions server/packages/sandbox-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ tracing-logfmt.workspace = true
tracing-subscriber.workspace = true
include_dir.workspace = true
base64.workspace = true
portable-pty = "0.8"
tempfile = { workspace = true, optional = true }

[target.'cfg(unix)'.dependencies]
Expand Down
1 change: 1 addition & 0 deletions server/packages/sandbox-agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
mod agent_server_logs;
pub mod credentials;
pub mod opencode_compat;
pub mod pty;
pub mod router;
pub mod server_logs;
pub mod telemetry;
Expand Down
240 changes: 167 additions & 73 deletions server/packages/sandbox-agent/src/opencode_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::str::FromStr;

use axum::extract::{Path, Query, State};
use axum::extract::{ws::{Message, WebSocket, WebSocketUpgrade}, Path, Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::sse::{Event, KeepAlive};
use axum::response::{IntoResponse, Sse};
use axum::routing::{get, patch, post, put};
use axum::{Json, Router};
use futures::stream;
use futures::{stream, SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio::sync::{broadcast, Mutex};
use tokio::time::interval;
use utoipa::{IntoParams, OpenApi, ToSchema};

use crate::pty::{PtyConnection, PtyInfo, PtySpawnRequest, PtyUpdateRequest};
use crate::router::{AppState, CreateSessionRequest, PermissionReply};
use sandbox_agent_error::SandboxError;
use sandbox_agent_agent_management::agents::AgentId;
Expand Down Expand Up @@ -125,31 +126,6 @@ struct OpenCodeMessageRecord {
parts: Vec<Value>,
}

#[derive(Clone, Debug)]
struct OpenCodePtyRecord {
id: String,
title: String,
command: String,
args: Vec<String>,
cwd: String,
status: String,
pid: i64,
}

impl OpenCodePtyRecord {
fn to_value(&self) -> Value {
json!({
"id": self.id,
"title": self.title,
"command": self.command,
"args": self.args,
"cwd": self.cwd,
"status": self.status,
"pid": self.pid,
})
}
}

#[derive(Clone, Debug)]
struct OpenCodePermissionRecord {
id: String,
Expand Down Expand Up @@ -219,7 +195,6 @@ pub struct OpenCodeState {
default_project_id: String,
sessions: Mutex<HashMap<String, OpenCodeSessionRecord>>,
messages: Mutex<HashMap<String, Vec<OpenCodeMessageRecord>>>,
ptys: Mutex<HashMap<String, OpenCodePtyRecord>>,
permissions: Mutex<HashMap<String, OpenCodePermissionRecord>>,
questions: Mutex<HashMap<String, OpenCodeQuestionRecord>>,
session_runtime: Mutex<HashMap<String, OpenCodeSessionRuntime>>,
Expand All @@ -236,7 +211,6 @@ impl OpenCodeState {
default_project_id: project_id,
sessions: Mutex::new(HashMap::new()),
messages: Mutex::new(HashMap::new()),
ptys: Mutex::new(HashMap::new()),
permissions: Mutex::new(HashMap::new()),
questions: Mutex::new(HashMap::new()),
session_runtime: Mutex::new(HashMap::new()),
Expand Down Expand Up @@ -568,6 +542,7 @@ struct PtyCreateRequest {
args: Option<Vec<String>>,
cwd: Option<String>,
title: Option<String>,
env: Option<HashMap<String, String>>,
}

fn next_id(prefix: &str, counter: &AtomicU64) -> String {
Expand Down Expand Up @@ -1075,6 +1050,18 @@ fn build_file_part_from_path(
Value::Object(map)
}

fn pty_value(info: &PtyInfo) -> Value {
json!({
"id": info.id,
"title": info.title,
"command": info.command,
"args": info.args,
"cwd": info.cwd,
"status": info.status.as_str(),
"pid": info.pid,
})
}

fn session_event(event_type: &str, session: &Value) -> Value {
json!({
"type": event_type,
Expand Down Expand Up @@ -3614,15 +3601,81 @@ async fn oc_auth_remove(Path(_provider_id): Path<String>) -> impl IntoResponse {
bool_ok(true)
}

fn spawn_pty_exit_listener(state: Arc<OpenCodeAppState>, pty_id: String) {
tokio::spawn(async move {
let mut exit_rx = match state
.inner
.session_manager()
.pty_manager()
.subscribe_exit(&pty_id)
.await
{
Some(receiver) => receiver,
None => return,
};
loop {
match exit_rx.recv().await {
Ok(exit) => {
state.opencode.emit_event(json!({
"type": "pty.exited",
"properties": {"id": exit.id, "exitCode": exit.exit_code}
}));
break;
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
}

async fn handle_pty_socket(mut socket: WebSocket, mut connection: PtyConnection) {
loop {
tokio::select! {
incoming = socket.recv() => {
match incoming {
Some(Ok(Message::Text(text))) => {
let _ = connection.input.send(text.into_bytes()).await;
}
Some(Ok(Message::Binary(bytes))) => {
let _ = connection.input.send(bytes).await;
}
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(_)) => {}
Some(Err(_)) => break,
}
}
outgoing = connection.output.recv() => {
match outgoing {
Ok(bytes) => {
if socket.send(Message::Binary(bytes)).await.is_err() {
break;
}
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
}
}

#[utoipa::path(
get,
path = "/pty",
responses((status = 200)),
tag = "opencode"
)]
async fn oc_pty_list(State(state): State<Arc<OpenCodeAppState>>) -> impl IntoResponse {
let ptys = state.opencode.ptys.lock().await;
let values: Vec<Value> = ptys.values().map(|p| p.to_value()).collect();
let values: Vec<Value> = state
.inner
.session_manager()
.pty_manager()
.list()
.await
.iter()
.map(pty_value)
.collect();
(StatusCode::OK, Json(json!(values)))
}

Expand All @@ -3641,25 +3694,38 @@ async fn oc_pty_create(
) -> impl IntoResponse {
let directory = state.opencode.directory_for(&headers, query.directory.as_ref());
let id = next_id("pty_", &PTY_COUNTER);
let record = OpenCodePtyRecord {
id: id.clone(),
title: body.title.unwrap_or_else(|| "PTY".to_string()),
command: body.command.unwrap_or_else(|| "bash".to_string()),
args: body.args.unwrap_or_default(),
cwd: body.cwd.unwrap_or_else(|| directory),
status: "running".to_string(),
pid: 0,
};
let value = record.to_value();
let mut ptys = state.opencode.ptys.lock().await;
ptys.insert(id, record);
drop(ptys);

state
.opencode
.emit_event(json!({"type": "pty.created", "properties": {"pty": value}}));
let spawn = state
.inner
.session_manager()
.pty_manager()
.spawn(PtySpawnRequest {
id: id.clone(),
title: body.title.unwrap_or_else(|| "PTY".to_string()),
command: body.command.unwrap_or_else(|| "bash".to_string()),
args: body.args.unwrap_or_default(),
cwd: body.cwd.unwrap_or_else(|| directory),
env: body.env,
owner_session_id: headers
.get("x-opencode-session-id")
.and_then(|value| value.to_str().ok())
.map(|value| value.to_string()),
})
.await;

(StatusCode::OK, Json(value))
match spawn {
Ok(info) => {
let value = pty_value(&info);
state
.opencode
.emit_event(json!({"type": "pty.created", "properties": {"info": value}}));
spawn_pty_exit_listener(state.clone(), info.id.clone());
(StatusCode::OK, Json(value)).into_response()
}
Err(SandboxError::InvalidRequest { message }) => {
bad_request(&message).into_response()
}
Err(err) => internal_error(&err.to_string()).into_response(),
}
}

#[utoipa::path(
Expand All @@ -3673,9 +3739,14 @@ async fn oc_pty_get(
State(state): State<Arc<OpenCodeAppState>>,
Path(pty_id): Path<String>,
) -> impl IntoResponse {
let ptys = state.opencode.ptys.lock().await;
if let Some(pty) = ptys.get(&pty_id) {
return (StatusCode::OK, Json(pty.to_value())).into_response();
if let Some(pty) = state
.inner
.session_manager()
.pty_manager()
.get(&pty_id)
.await
{
return (StatusCode::OK, Json(pty_value(&pty))).into_response();
}
not_found("PTY not found").into_response()
}
Expand All @@ -3693,24 +3764,23 @@ async fn oc_pty_update(
Path(pty_id): Path<String>,
Json(body): Json<PtyCreateRequest>,
) -> impl IntoResponse {
let mut ptys = state.opencode.ptys.lock().await;
if let Some(pty) = ptys.get_mut(&pty_id) {
if let Some(title) = body.title {
pty.title = title;
}
if let Some(command) = body.command {
pty.command = command;
}
if let Some(args) = body.args {
pty.args = args;
}
if let Some(cwd) = body.cwd {
pty.cwd = cwd;
}
let value = pty.to_value();
let update = PtyUpdateRequest {
title: body.title,
command: body.command,
args: body.args,
cwd: body.cwd,
};
if let Some(pty) = state
.inner
.session_manager()
.pty_manager()
.update(&pty_id, update)
.await
{
let value = pty_value(&pty);
state
.opencode
.emit_event(json!({"type": "pty.updated", "properties": {"pty": value}}));
.emit_event(json!({"type": "pty.updated", "properties": {"info": value}}));
return (StatusCode::OK, Json(value)).into_response();
}
not_found("PTY not found").into_response()
Expand All @@ -3727,11 +3797,17 @@ async fn oc_pty_delete(
State(state): State<Arc<OpenCodeAppState>>,
Path(pty_id): Path<String>,
) -> impl IntoResponse {
let mut ptys = state.opencode.ptys.lock().await;
if let Some(pty) = ptys.remove(&pty_id) {
if state
.inner
.session_manager()
.pty_manager()
.remove(&pty_id)
.await
.is_some()
{
state
.opencode
.emit_event(json!({"type": "pty.deleted", "properties": {"pty": pty.to_value()}}));
.emit_event(json!({"type": "pty.deleted", "properties": {"id": pty_id}}));
return bool_ok(true).into_response();
}
not_found("PTY not found").into_response()
Expand All @@ -3744,8 +3820,26 @@ async fn oc_pty_delete(
responses((status = 200)),
tag = "opencode"
)]
async fn oc_pty_connect(Path(_pty_id): Path<String>) -> impl IntoResponse {
bool_ok(true)
async fn oc_pty_connect(
State(state): State<Arc<OpenCodeAppState>>,
Path(pty_id): Path<String>,
ws: Option<WebSocketUpgrade>,
) -> impl IntoResponse {
let connection = state
.inner
.session_manager()
.pty_manager()
.connect(&pty_id)
.await;
let Some(connection) = connection else {
return not_found("PTY not found").into_response();
};
if let Some(ws) = ws {
ws.on_upgrade(|socket| handle_pty_socket(socket, connection))
.into_response()
} else {
bool_ok(true).into_response()
}
}

#[utoipa::path(
Expand Down
Loading
Loading