Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ wstd-macro.workspace = true
anyhow.workspace = true
futures-lite.workspace = true
serde_json.workspace = true
futures-concurrency.workspace = true

[workspace]
members = [
Expand All @@ -50,6 +51,7 @@ anyhow = "1"
cargo_metadata = "0.18.1"
futures-core = "0.3.19"
futures-lite = "1.12.0"
futures-concurrency = "7.6.2"
heck = "0.5"
http = "1.1"
pin-project-lite = "0.2.8"
Expand Down
55 changes: 55 additions & 0 deletions examples/tcp_echo_server_non_blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use futures_concurrency::future::FutureGroup;
use futures_lite::{FutureExt, StreamExt};
use std::{
cell::RefCell,
future::Future,
pin::{pin, Pin},
rc::Rc,
task::Poll,
};
use wstd::io;
use wstd::iter::AsyncIterator;
use wstd::net::TcpListener;

type StreamTasks = Rc<RefCell<FutureGroup<Pin<Box<dyn Future<Output = io::Result<()>>>>>>>;

#[wstd::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Listening on {}", listener.local_addr()?);
println!("type `nc localhost 8080` to create a TCP client");

let stream_tasks: StreamTasks = StreamTasks::default();
let mut listening_task = pin!(start_listening(listener, stream_tasks.clone()));

futures_lite::future::poll_fn(|cx| {
if let Poll::Ready(_) = listening_task.as_mut().poll(cx) {
return Poll::Ready(());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to return as Ready until both the Listener is ready and the stream tasks are all complete.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't quite understand what you meant. Could you please tell me your thoughts directly?

};

let mut stream_tasks_ref = stream_tasks.borrow_mut();
if let Poll::Ready(Some(res)) = stream_tasks_ref.poll_next(cx) {
println!("Task finished: {:?}", res);
println!("Tasks len: {}", stream_tasks_ref.len());
}

Poll::Pending
})
.await;
Ok(())
}

async fn start_listening(listener: TcpListener, stream_tasks: StreamTasks) -> io::Result<()> {
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?;
println!("Accepted from: {}", stream.peer_addr()?);

let stream_task = async move { io::copy(&stream, &stream).await }.boxed_local();

stream_tasks.borrow_mut().insert(stream_task);
println!("Task added");
println!("Tasks len: {}", stream_tasks.borrow().len());
}
Ok(())
}
1 change: 1 addition & 0 deletions test-programs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ publish = false
futures-lite.workspace = true
serde_json.workspace = true
wstd.workspace = true
futures-concurrency.workspace = true