Skip to content

Commit e9ff83e

Browse files
committed
second revision of channels section
1 parent 85e9ae2 commit e9ff83e

1 file changed

Lines changed: 78 additions & 5 deletions

File tree

content/sharing-io-resources.md

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -524,11 +524,15 @@ We can instead use `try_send`, which isn't blocking, and manage our own bufferin
524524
}
525525
```
526526

527-
This solves most of the problems; at some point back-pressure needs to be handled and having the task that sends messages to the `client_dispatcher` be blocked seems good enough. However, one might want to stop reading packets as channels get filled; that way the TCP queues get full and new packets don't get acknowledge and clients will automatically retransmit packets. Notice, however, this would require a channel back from the anonymous tasks that send the bytes to the `client_dispatcher` and then back from the `central_dispatcher` to `handle_connection`.
527+
This solves most of the problems; at some point back-pressure needs to be handled, and having the task that sends messages to the `client_dispatcher` be blocked seems good enough. However, one might want to stop reading packets as channels get filled; that way the TCP queues get full and new packets get dropped, and clients will automatically retransmit packets. Notice, however, this would require a channel back from the anonymous tasks that send the bytes to the `client_dispatcher` and then back from the `central_dispatcher` to `handle_connection`.
528528

529-
Another way we will require more channels is if we wanted to start handling errors on writing to the socket. Instead of being able to handle errors idiomatically in-place in `central_dispatcher`, `client_dispatcher` would need a way to send the error back, if that error would kill the connection.
529+
Another way we will require more channels is if we want to start handling errors on writing to the socket. Instead of being able to handle errors idiomatically in place in `central_dispatcher`, `client_dispatcher` would need a way to send the error back if that error would kill the connection.
530530

531-
Furthermore, each additional IO requires its own task, additionally, each piece of state requires either its own task or to be added to the `central_dispatch`. The first way would also require more channels for cross-comunication between state owners, and the problem with all these channels is that they are decoupled from the tasks generating messages. This is what we want from channels, but it potentially makes code harder to follow. Look at `central_dispatcher` by itself.
531+
Furthermore, each additional IO requires its own task; additionally, each piece of state requires either its own task or to be added to the `central_dispatch`. The first way would also require more channels for cross-communication between state owners.
532+
533+
All these channels and tasks add complexity. For one, tasks need to be managed and dropped eventually[^3]. But there's another complication: we lose co-location of IO and state by creating these channels. Channels do move around, so now when you see the place where state is managed, you have no idea where the events are produced.
534+
535+
Take a look again at our `central_dispatcher`.
532536

533537
```rs
534538
async fn central_dispatcher(mut rx: mpsc::Receiver<Message>) {
@@ -560,9 +564,76 @@ async fn central_dispatcher(mut rx: mpsc::Receiver<Message>) {
560564

561565
```
562566

563-
There's no way to tell how `Message` was created, if for example, we wanted to know if the bytes somehow correlate to a valid string, we would need to hunt down where `Message::Send` is created. That could potentially be anyhwere. Considering the producers can be sent back and forth between tasks, cloned, it can become very complicated.
567+
There's no way to know where `rx` comes from other than looking for the place where the channel is created, which can be very far up the stack. Then, to know where `Message` can be produced, you need to look at all the places where the corresponding `tx` moves to, which again can be very far down the stack.
568+
569+
Not having mutexes does simplify things a lot, but there's still complexity related to having multiple tasks. There's, however, a clue on how we could further simplify things. Remember that I said that if we had additional state, we could manage it in the `central_dispatcher` and have it act as a multiplexer between IO events. For example, if we had a new source of events, such as another socket, it can also use `tx` to inform the `central_dispatcher` of the events with a new `Message` variant. If that also required a new state, `central_dispatcher` can also manage that.
570+
571+
Let's say, for example, a new administration socket that can subscribe multiple clients to a "topic" that can look something like this.
572+
573+
```rs
574+
async fn handle_administration_connection(&self, socket: TcpStream) {
575+
let (mut read, write) = socket.into_split();
576+
self.tx.send(Message::New(write)).await.unwrap();
577+
578+
let mut buffer = BytesMut::new();
579+
580+
loop {
581+
let n = read.read_buf(&mut buffer).await.unwrap();
582+
assert!(n != 0);
583+
584+
let Ok((topic, clients)) = parse_admin_message(&mut buffer) else {
585+
continue;
586+
};
587+
588+
self.tx.send(Message::NewTopic(topic, clients)).await.unwrap();
589+
}
590+
}
591+
592+
async fn central_dispatcher(mut rx: mpsc::Receiver<Message>) {
593+
let mut writers = Vec::new();
594+
let mut topics = HashMap::new();
595+
while let Some(msg) = rx.recv().await {
596+
match msg {
597+
Message::New(connection) => {
598+
let (tx, rx) = mpsc::channel(100);
599+
600+
let w = tx.clone();
601+
let id = writers.len();
602+
tokio::spawn(client_dispatcher(connection, rx));
603+
tokio::spawn(async move {
604+
w.send(Bytes::from_owner((id as u32).to_be_bytes()))
605+
.await
606+
.unwrap();
607+
});
608+
writers.push(tx);
609+
}
610+
Message::Send(id, items) => {
611+
let w = writers[id as usize].clone();
612+
tokio::spawn(async move {
613+
w.send(items).await.unwrap();
614+
});
615+
}
616+
Message::NewTopic(topic, clients) => {
617+
topics.insert(topic, clients.iter().map(|c| writers[c].clone()).collect_vec());
618+
}
619+
Message::SendTopic(topic, message) => {
620+
for w in topics.get(topic).unwrap() {
621+
let w = w.clone();
622+
tokio::spawn(async move {
623+
w.send(items).await.unwrap();
624+
});
625+
}
626+
}
627+
}
628+
}
629+
}
630+
631+
```
632+
564633

565-
All of this to say, while it's very nice no longer requiring mutexes, by decoupling IO and state it becomes harder to communicate errors back to the state and correlate data that alters the state with the IO that generates the data. I mentioned that there is another way to handle additional state, other than having new tasks handling it. We can move it into `central_dispatcher` and have it multiplex messages from multiple channels. This doesn't solve the complexity of data and IO lack of co-location, but it tells us that we could potentially handle all IO-events in a single task concurrently. So let's look into that.
634+
{{ note(body="This code is merely illustrative, it's not written to be compiled.") }}
635+
636+
Well, we still have multiple incoming and outgoing channels, but this shows that we can have a single task managing state while listening concurrently to multiple IO events. So perhaps, we can move the IO inside that same task and simplify things further.
566637

567638
### A single task to rule them all
568639

@@ -688,6 +759,7 @@ impl Router {
688759
Now we're modeling I/O as a stream of events. We suspend execution while awaiting any of those events, each time one of the events happen we're signaled about it.
689760

690761
In response we handle the event synchroneously, in a context where we have mutable access to the state.
762+
691763
#### Hand-rolled future
692764

693765
There's another option that's way less discussed, if you think about it, what we want to do is:
@@ -879,3 +951,4 @@ What I hope you take away from this article, is that, particularly in Rust, you
879951

880952
[^1]: https://draft.ryhl.io/blog/actors-with-tokio/
881953
[^2]: In fairness, deadlocks are still possible, if 2 tasks are waiting from one another preventing from making any other work.
954+
[^3]: https://draft.ryhl.io/blog/actors-with-tokio/ has a very good explanation on how this can be done!

0 commit comments

Comments
 (0)