Skip to content

Commit f89330e

Browse files
committed
feat: tcp-echo-server uses tasks, test runner uses wasmtime cli, tcp-listener supports v6
The tcp-echo-server example has been rewritten to spawn the echo part of each accept into a task, which means it can accept new connections while other echos are in flight. The tcp-echo-server test has been rewritten to test that connections can be accepted while other echoes are in flight. The tcp-echo-server test has been rewritten to use wasmtime cli as a process, rather than use wasmtime as a crate. This drops wasmtime from the dev-dependencies of the workspace, which is good because it was running a quite out-of-date wasmtime. Fix up the missing conversions to/from std::net::SocketAddr in wstd::net::tcp_listener, so that we can send a Display impl of the listening address from the guest to host, and parse it out in the host (see get_listening_address)
1 parent 16bafe6 commit f89330e

File tree

6 files changed

+165
-125
lines changed

6 files changed

+165
-125
lines changed

Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,6 @@ test-programs = { path = "test-programs" }
8484
test-programs-artifacts = { path = "test-programs/artifacts" }
8585
ureq = { version = "2.12.1", default-features = false }
8686
wasi = "0.14.0"
87-
wasmtime = "26"
88-
wasmtime-wasi = "26"
89-
wasmtime-wasi-http = "26"
9087
wstd = { path = "." }
9188
wstd-macro = { path = "macro", version = "=0.5.4" }
9289

examples/tcp_echo_server.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ async fn main() -> io::Result<()> {
1212
while let Some(stream) = incoming.next().await {
1313
let stream = stream?;
1414
println!("Accepted from: {}", stream.peer_addr()?);
15-
io::copy(&stream, &stream).await?;
15+
wstd::runtime::spawn(async move {
16+
let _ = io::copy(&stream, &stream).await;
17+
})
18+
.detach();
1619
}
1720
Ok(())
1821
}

src/net/tcp_listener.rs

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,8 @@ impl TcpListener {
3333
wasi::sockets::tcp_create_socket::create_tcp_socket(family).map_err(to_io_err)?;
3434
let network = wasi::sockets::instance_network::instance_network();
3535

36-
let local_address = match addr {
37-
SocketAddr::V4(addr) => {
38-
let ip = addr.ip().octets();
39-
let address = (ip[0], ip[1], ip[2], ip[3]);
40-
let port = addr.port();
41-
IpSocketAddress::Ipv4(Ipv4SocketAddress { port, address })
42-
}
43-
SocketAddr::V6(_) => todo!("IPv6 not yet supported in `wstd::net::TcpListener`"),
44-
};
36+
let local_address = sockaddr_to_wasi(addr);
37+
4538
socket
4639
.start_bind(&network, local_address)
4740
.map_err(to_io_err)?;
@@ -56,10 +49,11 @@ impl TcpListener {
5649
}
5750

5851
/// Returns the local socket address of this listener.
59-
// TODO: make this return an actual socket addr
60-
pub fn local_addr(&self) -> io::Result<String> {
61-
let addr = self.socket.local_address().map_err(to_io_err)?;
62-
Ok(format!("{addr:?}"))
52+
pub fn local_addr(&self) -> io::Result<std::net::SocketAddr> {
53+
self.socket
54+
.local_address()
55+
.map_err(to_io_err)
56+
.map(sockaddr_from_wasi)
6357
}
6458

6559
/// Returns an iterator over the connections being received on this listener.
@@ -105,3 +99,51 @@ pub(super) fn to_io_err(err: ErrorCode) -> io::Error {
10599
_ => ErrorKind::Other.into(),
106100
}
107101
}
102+
103+
fn sockaddr_from_wasi(addr: IpSocketAddress) -> std::net::SocketAddr {
104+
use wasi::sockets::network::Ipv6SocketAddress;
105+
match addr {
106+
IpSocketAddress::Ipv4(Ipv4SocketAddress { address, port }) => {
107+
std::net::SocketAddr::V4(std::net::SocketAddrV4::new(
108+
std::net::Ipv4Addr::new(address.0, address.1, address.2, address.3),
109+
port,
110+
))
111+
}
112+
IpSocketAddress::Ipv6(Ipv6SocketAddress {
113+
address,
114+
port,
115+
flow_info,
116+
scope_id,
117+
}) => std::net::SocketAddr::V6(std::net::SocketAddrV6::new(
118+
std::net::Ipv6Addr::new(
119+
address.0, address.1, address.2, address.3, address.4, address.5, address.6,
120+
address.7,
121+
),
122+
port,
123+
flow_info,
124+
scope_id,
125+
)),
126+
}
127+
}
128+
129+
fn sockaddr_to_wasi(addr: std::net::SocketAddr) -> IpSocketAddress {
130+
use wasi::sockets::network::Ipv6SocketAddress;
131+
match addr {
132+
std::net::SocketAddr::V4(addr) => {
133+
let ip = addr.ip().octets();
134+
IpSocketAddress::Ipv4(Ipv4SocketAddress {
135+
address: (ip[0], ip[1], ip[2], ip[3]),
136+
port: addr.port(),
137+
})
138+
}
139+
std::net::SocketAddr::V6(addr) => {
140+
let ip = addr.ip().segments();
141+
IpSocketAddress::Ipv6(Ipv6SocketAddress {
142+
address: (ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]),
143+
port: addr.port(),
144+
flow_info: addr.flowinfo(),
145+
scope_id: addr.scope_id(),
146+
})
147+
}
148+
}
149+
}

src/runtime/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
mod block_on;
1414
mod reactor;
1515

16+
pub use ::async_task::Task;
1617
pub use block_on::block_on;
1718
pub use reactor::{AsyncPollable, Reactor, WaitFor};
1819
use std::cell::RefCell;
@@ -22,3 +23,14 @@ use std::cell::RefCell;
2223
std::thread_local! {
2324
pub(crate) static REACTOR: RefCell<Option<Reactor>> = const { RefCell::new(None) };
2425
}
26+
27+
/// Spawn a `Future` as a `Task` on the current `Reactor`.
28+
///
29+
/// Panics if called from outside `block_on`.
30+
pub fn spawn<F, T>(fut: F) -> Task<T>
31+
where
32+
F: std::future::Future<Output = T> + 'static,
33+
T: 'static,
34+
{
35+
Reactor::current().spawn(fut)
36+
}

test-programs/artifacts/Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@ anyhow.workspace = true
1313
test-log.workspace = true
1414
test-programs-artifacts.workspace = true
1515
ureq.workspace = true
16-
wasmtime.workspace = true
17-
wasmtime-wasi.workspace = true
18-
wasmtime-wasi-http.workspace = true
1916

2017
[build-dependencies]
2118
cargo_metadata.workspace = true
Lines changed: 94 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -1,123 +1,112 @@
1-
use anyhow::{anyhow, Context, Result};
2-
use wasmtime::{
3-
component::{Component, Linker, ResourceTable},
4-
Config, Engine, Store,
5-
};
6-
use wasmtime_wasi::{pipe::MemoryOutputPipe, WasiCtx, WasiView};
7-
use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView};
8-
9-
struct Ctx {
10-
table: ResourceTable,
11-
wasi: WasiCtx,
12-
http: WasiHttpCtx,
13-
}
14-
15-
impl WasiView for Ctx {
16-
fn table(&mut self) -> &mut ResourceTable {
17-
&mut self.table
18-
}
19-
fn ctx(&mut self) -> &mut WasiCtx {
20-
&mut self.wasi
21-
}
22-
}
23-
24-
impl WasiHttpView for Ctx {
25-
fn table(&mut self) -> &mut ResourceTable {
26-
&mut self.table
27-
}
28-
fn ctx(&mut self) -> &mut WasiHttpCtx {
29-
&mut self.http
30-
}
31-
}
32-
33-
fn run_in_wasmtime(wasm: &[u8], stdout: Option<MemoryOutputPipe>) -> Result<()> {
34-
let config = Config::default();
35-
let engine = Engine::new(&config).context("creating engine")?;
36-
let component = Component::new(&engine, wasm).context("loading component")?;
37-
38-
let mut linker: Linker<Ctx> = Linker::new(&engine);
39-
wasmtime_wasi::add_to_linker_sync(&mut linker).context("add wasi to linker")?;
40-
wasmtime_wasi_http::add_only_http_to_linker_sync(&mut linker)
41-
.context("add wasi-http to linker")?;
42-
43-
let mut builder = WasiCtx::builder();
44-
builder.inherit_stderr().inherit_network();
45-
let wasi = match stdout {
46-
Some(stdout) => builder.stdout(stdout).build(),
47-
None => builder.inherit_stdout().build(),
48-
};
49-
let mut store = Store::new(
50-
&engine,
51-
Ctx {
52-
table: ResourceTable::new(),
53-
wasi,
54-
http: WasiHttpCtx::new(),
55-
},
56-
);
57-
58-
let instance = linker.instantiate(&mut store, &component)?;
59-
let run_interface = instance
60-
.get_export(&mut store, None, "wasi:cli/run@0.2.0")
61-
.ok_or_else(|| anyhow!("wasi:cli/run missing?"))?;
62-
let run_func_export = instance
63-
.get_export(&mut store, Some(&run_interface), "run")
64-
.ok_or_else(|| anyhow!("run export missing?"))?;
65-
let run_func = instance
66-
.get_typed_func::<(), (Result<(), ()>,)>(&mut store, &run_func_export)
67-
.context("run as typed func")?;
68-
69-
println!("entering wasm...");
70-
let (runtime_result,) = run_func.call(&mut store, ())?;
71-
runtime_result.map_err(|()| anyhow!("run returned an error"))?;
72-
println!("done");
73-
74-
Ok(())
75-
}
1+
use anyhow::{Context, Result};
2+
use std::process::Command;
763

774
#[test_log::test]
785
fn tcp_echo_server() -> Result<()> {
796
use std::io::{Read, Write};
807
use std::net::{Shutdown, TcpStream};
81-
use std::thread::sleep;
82-
use std::time::Duration;
838

849
println!("testing {}", test_programs_artifacts::TCP_ECHO_SERVER);
85-
let wasm = std::fs::read(test_programs_artifacts::TCP_ECHO_SERVER).context("read wasm")?;
8610

87-
let pipe = wasmtime_wasi::pipe::MemoryOutputPipe::new(1024 * 1024);
88-
let write_end = pipe.clone();
89-
let wasmtime_thread = std::thread::spawn(move || run_in_wasmtime(&wasm, Some(write_end)));
11+
// Run the component in wasmtime
12+
// -Sinherit-network allows it to accept network connections
13+
let mut wasmtime_process = Command::new("wasmtime")
14+
.arg("run")
15+
.arg("-Sinherit-network")
16+
.arg(test_programs_artifacts::TCP_ECHO_SERVER)
17+
.stdout(std::process::Stdio::piped())
18+
.spawn()?;
9019

91-
'wait: loop {
92-
sleep(Duration::from_millis(100));
93-
for line in pipe.contents().split(|c| *c == b'\n') {
94-
if line.starts_with(b"Listening on") {
95-
break 'wait;
96-
}
97-
}
98-
}
20+
let addr = get_listening_address(wasmtime_process.stdout.take().expect("stdout is piped"))?;
9921

100-
let mut tcpstream =
101-
TcpStream::connect("127.0.0.1:8080").context("connect to wasm echo server")?;
102-
println!("connected to wasm echo server");
22+
println!("tcp echo server is listening on {addr:?}");
10323

104-
const MESSAGE: &[u8] = b"hello, echoserver!\n";
24+
let mut stream1 = TcpStream::connect(&addr).context("connect stream1")?;
25+
println!("stream1 connected");
10526

106-
tcpstream.write_all(MESSAGE).context("write to socket")?;
107-
println!("wrote to echo server");
27+
let mut stream2 = TcpStream::connect(&addr).context("connect stream2")?;
28+
println!("stream3 connected");
10829

109-
tcpstream.shutdown(Shutdown::Write)?;
30+
const MESSAGE1: &[u8] = b"hello, echoserver!\n";
11031

111-
let mut readback = Vec::new();
112-
tcpstream
113-
.read_to_end(&mut readback)
114-
.context("read from socket")?;
32+
stream1.write_all(MESSAGE1).context("write to stream1")?;
33+
println!("stream1 wrote to echo server");
11534

116-
println!("read from wasm server");
117-
assert_eq!(MESSAGE, readback);
35+
let mut stream3 = TcpStream::connect(&addr).context("connect stream3")?;
36+
println!("stream3 connected");
37+
38+
const MESSAGE2: &[u8] = b"hello, gussie!\n";
39+
stream2.write_all(MESSAGE2).context("write to stream1")?;
40+
println!("stream2 wrote to echo server");
41+
42+
stream1.shutdown(Shutdown::Write)?;
43+
stream2.shutdown(Shutdown::Write)?;
44+
45+
let mut readback2 = Vec::new();
46+
stream2
47+
.read_to_end(&mut readback2)
48+
.context("read from stream2")?;
49+
println!("read from stream2");
50+
51+
let mut readback1 = Vec::new();
52+
stream1
53+
.read_to_end(&mut readback1)
54+
.context("read from stream1")?;
55+
println!("read from stream1");
56+
57+
assert_eq!(MESSAGE1, readback1, "readback of stream 1");
58+
assert_eq!(MESSAGE2, readback2, "readback of stream 2");
59+
60+
const MESSAGE3: &[u8] = b"hello, willa!\n";
61+
stream3.write_all(MESSAGE3).context("write to stream1")?;
62+
println!("stream3 wrote to echo server");
63+
stream3.shutdown(Shutdown::Write)?;
64+
65+
let mut readback3 = Vec::new();
66+
stream3
67+
.read_to_end(&mut readback3)
68+
.context("read from stream3")?;
69+
println!("read from stream3");
70+
assert_eq!(MESSAGE3, readback3, "readback of stream 3");
71+
72+
wasmtime_process.kill()?;
11873

119-
if wasmtime_thread.is_finished() {
120-
wasmtime_thread.join().expect("wasmtime panicked")?;
121-
}
12274
Ok(())
12375
}
76+
77+
fn get_listening_address(
78+
mut wasmtime_stdout: std::process::ChildStdout,
79+
) -> Result<std::net::SocketAddr> {
80+
use std::io::Read;
81+
use std::thread::sleep;
82+
use std::time::Duration;
83+
84+
// Gather complete contents of stdout here
85+
let mut stdout_contents = String::new();
86+
loop {
87+
// Wait for process to print
88+
sleep(Duration::from_millis(100));
89+
90+
// Read more that the process printed, append to contents
91+
let mut buf = vec![0; 4096];
92+
let len = wasmtime_stdout
93+
.read(&mut buf)
94+
.context("reading wasmtime stdout")?;
95+
buf.truncate(len);
96+
stdout_contents
97+
.push_str(std::str::from_utf8(&buf).context("wasmtime stdout should be string")?);
98+
99+
// Parse out the line where guest program says where it is listening
100+
for line in stdout_contents.lines() {
101+
if let Some(rest) = line.strip_prefix("Listening on ") {
102+
// Forget wasmtime_stdout, rather than drop it, so that any
103+
// subsequent stdout from wasmtime doesn't panic on a broken
104+
// pipe.
105+
std::mem::forget(wasmtime_stdout);
106+
return rest
107+
.parse()
108+
.with_context(|| format!("parsing socket addr from line: {line:?}"));
109+
}
110+
}
111+
}
112+
}

0 commit comments

Comments
 (0)