Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions guests/pid0/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@ use std::path::Path;

fn main() {
let manifest_dir = env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR not set");
let capnp_file = Path::new(&manifest_dir)
let capnp_dir = Path::new(&manifest_dir)
.join("..")
.join("..")
.join("capnp")
.join("peer.capnp");
.canonicalize()
.expect("capnp dir not found");

capnpc::CompilerCommand::new()
.file(&capnp_file)
.src_prefix(&capnp_dir)
.file(capnp_dir.join("peer.capnp"))
.file(capnp_dir.join("membrane.capnp"))
.file(capnp_dir.join("stem.capnp"))
.run()
.expect("failed to compile capnp schema");
println!("cargo:rerun-if-changed={}", capnp_file.display());
.expect("failed to compile capnp schemas");

println!("cargo:rerun-if-changed={}", capnp_dir.join("peer.capnp").display());
println!("cargo:rerun-if-changed={}", capnp_dir.join("membrane.capnp").display());
println!("cargo:rerun-if-changed={}", capnp_dir.join("stem.capnp").display());
}
82 changes: 52 additions & 30 deletions guests/pid0/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@ use wasip2::exports::cli::run::Guest;

#[allow(dead_code)]
mod peer_capnp {
include!(concat!(
env!("OUT_DIR"),
"/Users/mikel/Code/github.com/wetware/rs/capnp/peer_capnp.rs"
));
include!(concat!(env!("OUT_DIR"), "/peer_capnp.rs"));
}

#[allow(dead_code)]
mod stem_capnp {
include!(concat!(env!("OUT_DIR"), "/stem_capnp.rs"));
}

#[allow(dead_code)]
mod membrane_capnp {
include!(concat!(env!("OUT_DIR"), "/membrane_capnp.rs"));
}

/// Bootstrap capability: a Membrane whose sessions carry our WetwareSession extension.
type Membrane = stem_capnp::membrane::Client<membrane_capnp::wetware_session::Owned>;

struct StderrLogger;

impl log::Log for StderrLogger {
Expand Down Expand Up @@ -52,32 +62,44 @@ fn run_impl() {
init_logging();
log::trace!("pid0: start");

wetware_guest::run(|host: peer_capnp::host::Client| async move {
log::trace!("pid0: rpc bootstrapped");
const CHILD_WASM: &[u8] =
include_bytes!("../../child-echo/target/wasm32-wasip2/release/child_echo.wasm");

let executor = host.executor_request().send().pipeline.get_executor();

let mut request = executor.run_bytes_request();
{
let mut params = request.get();
params.set_wasm(CHILD_WASM);
params.reborrow().init_args(0);
params.reborrow().init_env(0);
}
log::trace!("pid0: runBytes sent");

let run_resp = request.send().promise.await?;
let process = run_resp.get()?.get_process()?;
log::trace!("pid0: got process");

let wait_resp = process.wait_request().send().promise.await?;
let exit_code = wait_resp.get()?.get_exit_code();
log::trace!("pid0: child exited with code {}", exit_code);

Ok(())
});
// Bootstrap a Membrane(WetwareSession) instead of a bare Host.
// The membrane provides epoch-scoped sessions with Host + Executor.
wetware_guest::run(
|membrane: Membrane| async move {
log::trace!("pid0: rpc bootstrapped, grafting onto membrane");

// Graft onto the membrane to get an epoch-scoped session.
// No signer needed — stem's graft() currently ignores it.
let graft_resp = membrane.graft_request().send().promise.await?;
let session = graft_resp.get()?.get_session()?;
let ext = session.get_extension()?;
log::trace!("pid0: grafted, got session");

let executor = ext.get_executor()?;

const CHILD_WASM: &[u8] =
include_bytes!("../../child-echo/target/wasm32-wasip2/release/child_echo.wasm");

let mut request = executor.run_bytes_request();
{
let mut params = request.get();
params.set_wasm(CHILD_WASM);
params.reborrow().init_args(0);
params.reborrow().init_env(0);
}
log::trace!("pid0: runBytes sent");

let run_resp = request.send().promise.await?;
let process = run_resp.get()?.get_process()?;
log::trace!("pid0: got process");

let wait_resp = process.wait_request().send().promise.await?;
let exit_code = wait_resp.get()?.get_exit_code();
log::trace!("pid0: child exited with code {}", exit_code);

Ok(())
},
);

log::trace!("pid0: cleanup complete");
}
Expand Down
17 changes: 15 additions & 2 deletions src/cell/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,21 @@ impl Command {
let (reader, writer) = handles
.take_host_split()
.ok_or_else(|| anyhow::anyhow!("host stream missing; RPC streams already consumed"))?;
let rpc_system =
crate::rpc::build_peer_rpc(reader, writer, network_state, swarm_cmd_tx, wasm_debug);
// Static epoch (never advances) — real epoch wiring is a future concern.
let initial_epoch = stem::membrane::Epoch {
seq: 0,
head: vec![],
adopted_block: 0,
};
let (_epoch_tx, epoch_rx) = tokio::sync::watch::channel(initial_epoch);
let rpc_system = crate::rpc::membrane::build_membrane_rpc(
reader,
writer,
network_state,
swarm_cmd_tx,
wasm_debug,
epoch_rx,
);

info!("Starting streams RPC server for guest");
let local = tokio::task::LocalSet::new();
Expand Down