Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/opentake-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ tracing = "0.1"
regex = "1"
async-trait = "0.1"
futures = "0.3"
# Base64-encode composited `inspect_timeline` frame bytes into MCP image content.
base64 = "0.22"
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "sync", "net", "time"] }

# MCP server transport (Streamable HTTP over axum/hyper) + in-app chat HTTP.
Expand Down
539 changes: 532 additions & 7 deletions crates/opentake-agent/src/mcp/dispatch.rs

Large diffs are not rendered by default.

200 changes: 200 additions & 0 deletions crates/opentake-agent/src/mcp/media_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
//! `MediaBridge` — the injected side-door to the two capabilities that live
//! outside `opentake-core`: GPU compositing (in `opentake-render`, driven from
//! `src-tauri`) and the user-facing media-import machinery (in
//! `src-tauri/src/media.rs`, behind `MediaState`/`MediaEngine`).
//!
//! Why a *separate* trait instead of widening [`CoreHandle`](super::core_handle):
//! [`CoreHandle`] is deliberately the narrow *document* surface (read timeline,
//! read media, apply one command, project dir, decode analysis PCM). Its
//! production impl wraps only [`opentake_core::AppCore`]. The two capabilities the
//! media tools need reach into crates the agent layer does **not** — and by design
//! should not — link: `opentake-render` (wgpu) for compositing, and the
//! `src-tauri` import path (posters / manifest / events) for `import_media`.
//! Folding them into `CoreHandle` would either drag a GPU dependency into this
//! crate or hide logic behind default methods that can't reach the real paths.
//!
//! So the bridge is injected exactly like the plugin registry is — an optional
//! collaborator the [`Dispatcher`](super::dispatch::Dispatcher) holds. In a plain
//! `vite dev` / non-Tauri build (and in unit tests) there is no bridge and the two
//! tools report an honest "not available" instead of failing to compile. The real
//! implementation is constructed and injected in `src-tauri/src/mcp.rs`, where the
//! render + import code already lives.
//!
//! Both methods default to `Err("unsupported")` so a hand-rolled bridge (or the
//! absence of one) never breaks the build.

use crate::tools::result::Block;

/// One composited timeline frame produced by [`MediaBridge::inspect_timeline`],
/// ready to become MCP image content. `bytes` are already-encoded image data
/// (JPEG in the production path) — the agent crate never links an image encoder;
/// the bridge (which does) hands back finished bytes plus their media type.
#[derive(Debug, Clone)]
pub struct InspectedFrame {
/// The project frame this image was rendered at.
pub frame: i32,
/// Encoded image bytes (e.g. JPEG).
pub bytes: Vec<u8>,
/// MIME type of `bytes` (e.g. `"image/jpeg"`).
pub media_type: String,
}

/// The rendered result of an `inspect_timeline` call: the sampled frames plus the
/// downscaled render dimensions, mirroring upstream `inspectTimeline`'s
/// `imageBlocks + [metaJSON]` result. `total_frames` is echoed back in the meta.
#[derive(Debug, Clone)]
pub struct InspectResult {
/// Composited frames, in sample order.
pub frames: Vec<InspectedFrame>,
/// Downscaled render width (px) — the `width` field of the meta block.
pub width: u32,
/// Downscaled render height (px).
pub height: u32,
}

/// The outcome of an `import_media` call, mirroring upstream's `.ok("…")` string
/// results. The dispatcher wraps `message` in a [`crate::tools::result::ToolResult`].
#[derive(Debug, Clone)]
pub struct ImportOutcome {
/// Human/LLM-facing confirmation line (same shape as upstream `.ok(...)`).
pub message: String,
}

/// One decoded `source` object for [`MediaBridge::import_media`]. The dispatcher
/// has already enforced *exactly one* of `url` / `path` / `bytes` is set and that
/// `mime_type` is present when `bytes` is; the bridge does the IO.
#[derive(Debug, Clone)]
pub enum ImportSource {
/// Absolute local file or directory path, imported in place (directories are
/// mirrored recursively).
Path(String),
/// Base64-encoded inline bytes written into the project bundle's `media/`.
Bytes {
/// Raw base64 (already length-checked by the dispatcher).
base64: String,
/// Required MIME type (drives the written file's extension).
mime_type: String,
},
/// HTTPS URL downloaded into the project bundle's `media/`.
Url {
/// The HTTPS URL (scheme already validated by the dispatcher).
url: String,
/// Optional MIME override for extension inference (signed URLs).
mime_type: Option<String>,
},
}

/// A user-visible import error the bridge raises. Carries an LLM-facing message
/// the dispatcher surfaces verbatim (upstream `ToolError` messages).
#[derive(Debug, Clone)]
pub struct BridgeError {
/// The message shown to the model.
pub message: String,
}

impl BridgeError {
/// Wrap a message.
pub fn new(message: impl Into<String>) -> Self {
BridgeError {
message: message.into(),
}
}
}

impl std::fmt::Display for BridgeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.message)
}
}

impl std::error::Error for BridgeError {}

/// The injected capability boundary for the render + import tools. `Send + Sync`
/// so the [`Dispatcher`](super::dispatch::Dispatcher) can hold `Arc<dyn
/// MediaBridge>` across threads (matching [`CoreHandle`](super::core_handle)).
pub trait MediaBridge: Send + Sync {
/// Composite the timeline at each `frames` value and return them as encoded
/// image bytes, downscaled so the longest edge is at most `max_longest_edge`.
/// Frame numbers are validated by the dispatcher; the bridge composites and
/// encodes. Frames that fail to render are dropped (upstream `continue`s past a
/// failed `generator.image(at:)`), so the returned `frames` may be shorter than
/// the request; an all-empty render is an `Err`.
fn inspect_timeline(
&self,
_frames: &[i32],
_max_longest_edge: u32,
) -> Result<InspectResult, BridgeError> {
Err(BridgeError::new(
"inspect_timeline: rendering is not available in this build",
))
}

/// Import media through the SAME path as the user-facing import (posters,
/// manifest entry, `MediaChanged` event). `folder_id`, when set, has already
/// been checked to exist by the dispatcher. Returns the confirmation message.
fn import_media(
&self,
_source: ImportSource,
_name: Option<String>,
_folder_id: Option<String>,
) -> Result<ImportOutcome, BridgeError> {
Err(BridgeError::new(
"import_media: importing is not available in this build",
))
}
}

/// Turn one [`InspectedFrame`] into an MCP image [`Block`], base64-encoding the
/// bytes (rmcp image content is base64). Kept here so the dispatcher stays free of
/// encoding concerns.
pub fn frame_to_block(frame: &InspectedFrame) -> Block {
use base64::Engine as _;
let b64 = base64::engine::general_purpose::STANDARD.encode(&frame.bytes);
Block::image(b64, frame.media_type.clone())
}

#[cfg(test)]
mod tests {
use super::*;

/// The default trait methods report "unsupported" without a real bridge — the
/// non-Tauri build path.
struct NoopBridge;
impl MediaBridge for NoopBridge {}

#[test]
fn default_inspect_timeline_is_unsupported() {
let b = NoopBridge;
let err = b.inspect_timeline(&[0], 512).unwrap_err();
assert!(err.message.contains("not available"), "{}", err.message);
}

#[test]
fn default_import_media_is_unsupported() {
let b = NoopBridge;
let err = b
.import_media(ImportSource::Path("/x.mp4".into()), None, None)
.unwrap_err();
assert!(err.message.contains("not available"), "{}", err.message);
}

#[test]
fn frame_to_block_base64_encodes_image_content() {
let f = InspectedFrame {
frame: 3,
bytes: vec![0xff, 0xd8, 0xff, 0xe0],
media_type: "image/jpeg".into(),
};
match frame_to_block(&f) {
Block::Image { base64, media_type } => {
use base64::Engine as _;
assert_eq!(media_type, "image/jpeg");
let decoded = base64::engine::general_purpose::STANDARD
.decode(base64)
.unwrap();
assert_eq!(decoded, vec![0xff, 0xd8, 0xff, 0xe0]);
}
_ => panic!("expected image block"),
}
}
}
1 change: 1 addition & 0 deletions crates/opentake-agent/src/mcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ pub mod convert;
pub mod core_handle;
pub mod dispatch;
pub mod gen_catalog;
pub mod media_bridge;
pub mod server;
55 changes: 48 additions & 7 deletions crates/opentake-agent/src/mcp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use serde_json::{Map, Value};
use crate::mcp::convert::to_call_tool_result;
use crate::mcp::core_handle::CoreHandle;
use crate::mcp::dispatch::Dispatcher;
use crate::mcp::media_bridge::MediaBridge;
use crate::plugin::registry::PluginRegistry;
use crate::prompt::assemble::assemble_system_prompt;
use crate::tools::descriptions::{description, input_schema};
Expand All @@ -43,14 +44,25 @@ pub struct McpServer {
}

impl McpServer {
/// Build a session server over the shared document handle + plugin registry.
/// Build a session server over the shared document handle + plugin registry,
/// with no media bridge (render/import tools then report "not available").
pub fn new(handle: Arc<dyn CoreHandle>, registry: Arc<RwLock<PluginRegistry>>) -> Self {
Self::with_bridge(handle, registry, None)
}

/// Build a session server with an optional [`MediaBridge`] injected, so
/// `inspect_timeline` / `import_media` reach the real GPU + import paths.
pub fn with_bridge(
handle: Arc<dyn CoreHandle>,
registry: Arc<RwLock<PluginRegistry>>,
bridge: Option<Arc<dyn MediaBridge>>,
) -> Self {
let instructions = registry
.read()
.map(|r| assemble_system_prompt(&r, "default"))
.unwrap_or_default();
McpServer {
dispatcher: Arc::new(Dispatcher::new(handle, registry)),
dispatcher: Arc::new(Dispatcher::with_bridge(handle, registry, bridge)),
instructions,
}
}
Expand Down Expand Up @@ -183,19 +195,36 @@ async fn oauth_protected_resource() -> axum::Json<Value> {
}))
}

/// Build the axum router: `StreamableHttpService` at `/mcp`, the OAuth
/// well-known endpoint, and the loopback guard layered over everything.
/// Build the axum router with no media bridge (render/import tools report "not
/// available"). See [`build_router_with_bridge`].
pub fn build_router(
handle: Arc<dyn CoreHandle>,
registry: Arc<RwLock<PluginRegistry>>,
) -> axum::Router {
build_router_with_bridge(handle, registry, None)
}

/// Build the axum router: `StreamableHttpService` at `/mcp`, the OAuth
/// well-known endpoint, and the loopback guard layered over everything. The
/// optional [`MediaBridge`] is cloned into each per-session [`McpServer`].
pub fn build_router_with_bridge(
handle: Arc<dyn CoreHandle>,
registry: Arc<RwLock<PluginRegistry>>,
bridge: Option<Arc<dyn MediaBridge>>,
) -> axum::Router {
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
use rmcp::transport::streamable_http_server::{
StreamableHttpServerConfig, StreamableHttpService,
};

let service = StreamableHttpService::new(
move || Ok(McpServer::new(handle.clone(), registry.clone())),
move || {
Ok(McpServer::with_bridge(
handle.clone(),
registry.clone(),
bridge.clone(),
))
},
Arc::new(LocalSessionManager::default()),
StreamableHttpServerConfig::default(),
);
Expand All @@ -209,13 +238,25 @@ pub fn build_router(
.layer(axum::middleware::from_fn(localhost_guard))
}

/// Bind `addr` (loopback) and serve the MCP router until the process exits.
/// Bind `addr` (loopback) and serve the MCP router with no media bridge. See
/// [`serve_with_bridge`].
pub async fn serve(
addr: SocketAddr,
handle: Arc<dyn CoreHandle>,
registry: Arc<RwLock<PluginRegistry>>,
) -> std::io::Result<()> {
let router = build_router(handle, registry);
serve_with_bridge(addr, handle, registry, None).await
}

/// Bind `addr` (loopback) and serve the MCP router until the process exits, with
/// an optional [`MediaBridge`] injected (the Tauri shell passes `Some`).
pub async fn serve_with_bridge(
addr: SocketAddr,
handle: Arc<dyn CoreHandle>,
registry: Arc<RwLock<PluginRegistry>>,
bridge: Option<Arc<dyn MediaBridge>>,
) -> std::io::Result<()> {
let router = build_router_with_bridge(handle, registry, bridge);
let listener = tokio::net::TcpListener::bind(addr).await?;
tracing::info!("MCP server listening on http://{addr}/mcp");
axum::serve(listener, router).await
Expand Down
5 changes: 3 additions & 2 deletions src-tauri/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ opentake-media = { workspace = true }
opentake-render = { workspace = true }
opentake-gen = { workspace = true }
opentake-agent = { workspace = true }
# Preview composite frames are returned to the WebView as a base64 PNG data URL.
image = { version = "0.25", default-features = false, features = ["png"] }
# Preview composite frames are returned to the WebView as a base64 PNG data URL;
# the agent's `inspect_timeline` bridge encodes composited frames as JPEG.
image = { version = "0.25", default-features = false, features = ["png", "jpeg"] }
base64 = "0.22"

# Streaming playback transport (#53 / #64), behind the `playback-engine` feature.
Expand Down
14 changes: 11 additions & 3 deletions src-tauri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,25 @@ pub fn run() {
.app_data_dir()
.unwrap_or_else(|_| std::env::temp_dir())
.join("models");
let engine = MediaEngine::new(cache_root, models_dir);
let engine = MediaEngine::new(cache_root.clone(), models_dir.clone());

// Bring up the loopback MCP server (#36) over a session-sharing clone
// of the core, before the core is moved into managed state. Bundled +
// user workflow plugins live under <app_data_dir>/workflows.
// user workflow plugins live under <app_data_dir>/workflows. The
// media bridge (agent inspect_timeline / import_media) is built from
// the SAME cache/models dirs as the UI's engine, so imports share the
// same poster/manifest caches.
let workflows_dir = app
.path()
.app_data_dir()
.unwrap_or_else(|_| std::env::temp_dir())
.join("workflows");
mcp::spawn(core.clone(), workflows_dir);
mcp::spawn(
core.clone(),
workflows_dir,
cache_root.clone(),
models_dir.clone(),
);

// Global asset library (#37/#54): a cross-project copy-on-favorite
// store under <app_data_dir>/OpenTake/Library, falling back to the OS
Expand Down
Loading
Loading