diff --git a/guests/pid0/build.rs b/guests/pid0/build.rs index 965901b3..b11403e8 100644 --- a/guests/pid0/build.rs +++ b/guests/pid0/build.rs @@ -3,15 +3,22 @@ use std::path::Path; fn main() { let manifest_dir = env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR not set"); - let capnp_file = Path::new(&manifest_dir) + let capnp_dir = Path::new(&manifest_dir) .join("..") .join("..") .join("capnp") - .join("peer.capnp"); + .canonicalize() + .expect("capnp dir not found"); capnpc::CompilerCommand::new() - .file(&capnp_file) + .src_prefix(&capnp_dir) + .file(capnp_dir.join("peer.capnp")) + .file(capnp_dir.join("membrane.capnp")) + .file(capnp_dir.join("stem.capnp")) .run() - .expect("failed to compile capnp schema"); - println!("cargo:rerun-if-changed={}", capnp_file.display()); + .expect("failed to compile capnp schemas"); + + println!("cargo:rerun-if-changed={}", capnp_dir.join("peer.capnp").display()); + println!("cargo:rerun-if-changed={}", capnp_dir.join("membrane.capnp").display()); + println!("cargo:rerun-if-changed={}", capnp_dir.join("stem.capnp").display()); } diff --git a/guests/pid0/src/lib.rs b/guests/pid0/src/lib.rs index cac36497..8fa97fd2 100644 --- a/guests/pid0/src/lib.rs +++ b/guests/pid0/src/lib.rs @@ -5,12 +5,22 @@ use wasip2::exports::cli::run::Guest; #[allow(dead_code)] mod peer_capnp { - include!(concat!( - env!("OUT_DIR"), - "/Users/mikel/Code/github.com/wetware/rs/capnp/peer_capnp.rs" - )); + include!(concat!(env!("OUT_DIR"), "/peer_capnp.rs")); } +#[allow(dead_code)] +mod stem_capnp { + include!(concat!(env!("OUT_DIR"), "/stem_capnp.rs")); +} + +#[allow(dead_code)] +mod membrane_capnp { + include!(concat!(env!("OUT_DIR"), "/membrane_capnp.rs")); +} + +/// Bootstrap capability: a Membrane whose sessions carry our WetwareSession extension. +type Membrane = stem_capnp::membrane::Client; + struct StderrLogger; impl log::Log for StderrLogger { @@ -52,32 +62,44 @@ fn run_impl() { init_logging(); log::trace!("pid0: start"); - wetware_guest::run(|host: peer_capnp::host::Client| async move { - log::trace!("pid0: rpc bootstrapped"); - const CHILD_WASM: &[u8] = - include_bytes!("../../child-echo/target/wasm32-wasip2/release/child_echo.wasm"); - - let executor = host.executor_request().send().pipeline.get_executor(); - - let mut request = executor.run_bytes_request(); - { - let mut params = request.get(); - params.set_wasm(CHILD_WASM); - params.reborrow().init_args(0); - params.reborrow().init_env(0); - } - log::trace!("pid0: runBytes sent"); - - let run_resp = request.send().promise.await?; - let process = run_resp.get()?.get_process()?; - log::trace!("pid0: got process"); - - let wait_resp = process.wait_request().send().promise.await?; - let exit_code = wait_resp.get()?.get_exit_code(); - log::trace!("pid0: child exited with code {}", exit_code); - - Ok(()) - }); + // Bootstrap a Membrane(WetwareSession) instead of a bare Host. + // The membrane provides epoch-scoped sessions with Host + Executor. + wetware_guest::run( + |membrane: Membrane| async move { + log::trace!("pid0: rpc bootstrapped, grafting onto membrane"); + + // Graft onto the membrane to get an epoch-scoped session. + // No signer needed — stem's graft() currently ignores it. + let graft_resp = membrane.graft_request().send().promise.await?; + let session = graft_resp.get()?.get_session()?; + let ext = session.get_extension()?; + log::trace!("pid0: grafted, got session"); + + let executor = ext.get_executor()?; + + const CHILD_WASM: &[u8] = + include_bytes!("../../child-echo/target/wasm32-wasip2/release/child_echo.wasm"); + + let mut request = executor.run_bytes_request(); + { + let mut params = request.get(); + params.set_wasm(CHILD_WASM); + params.reborrow().init_args(0); + params.reborrow().init_env(0); + } + log::trace!("pid0: runBytes sent"); + + let run_resp = request.send().promise.await?; + let process = run_resp.get()?.get_process()?; + log::trace!("pid0: got process"); + + let wait_resp = process.wait_request().send().promise.await?; + let exit_code = wait_resp.get()?.get_exit_code(); + log::trace!("pid0: child exited with code {}", exit_code); + + Ok(()) + }, + ); log::trace!("pid0: cleanup complete"); } diff --git a/src/cell/executor.rs b/src/cell/executor.rs index bc1cfdf7..665574fa 100644 --- a/src/cell/executor.rs +++ b/src/cell/executor.rs @@ -203,8 +203,21 @@ impl Command { let (reader, writer) = handles .take_host_split() .ok_or_else(|| anyhow::anyhow!("host stream missing; RPC streams already consumed"))?; - let rpc_system = - crate::rpc::build_peer_rpc(reader, writer, network_state, swarm_cmd_tx, wasm_debug); + // Static epoch (never advances) — real epoch wiring is a future concern. + let initial_epoch = stem::membrane::Epoch { + seq: 0, + head: vec![], + adopted_block: 0, + }; + let (_epoch_tx, epoch_rx) = tokio::sync::watch::channel(initial_epoch); + let rpc_system = crate::rpc::membrane::build_membrane_rpc( + reader, + writer, + network_state, + swarm_cmd_tx, + wasm_debug, + epoch_rx, + ); info!("Starting streams RPC server for guest"); let local = tokio::task::LocalSet::new();