From efc5c76aa82a4cba089800b46893b364a73f0354 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 19 Feb 2025 14:49:36 +0100 Subject: [PATCH 1/2] feat: controller::send now returns a future if you want to wait until the thing is actually sent --- src/api/controller.rs | 2 +- src/buffer/controller.rs | 9 +++++---- src/buffer/worker.rs | 13 ++++++++----- src/cursor/controller.rs | 8 +++++--- src/workspace.rs | 2 +- 5 files changed, 20 insertions(+), 14 deletions(-) diff --git a/src/api/controller.rs b/src/api/controller.rs index 853f1e5a..7d61cc3d 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -39,7 +39,7 @@ where /// Details about the receiving end are left to the implementor. pub trait AsyncSender: Sized + Send + Sync { /// Enqueue a new value to be sent to all other users without blocking - fn send(&self, x: T) -> ControllerResult<()>; + fn send(&self, x: T) -> ControllerResult>; } /// Asynchronous and thread-safe handle to receive data from a stream. diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 9f1a9d13..2e71429b 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -58,7 +58,7 @@ pub(crate) struct BufferControllerInner { pub(crate) path: String, pub(crate) latest_version: watch::Receiver, pub(crate) local_version: watch::Receiver, - pub(crate) ops_in: mpsc::UnboundedSender, + pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender)>, pub(crate) poller: mpsc::UnboundedSender>, pub(crate) content_request: mpsc::Sender>, pub(crate) delta_request: mpsc::Sender>>, @@ -71,9 +71,10 @@ pub(crate) struct BufferControllerInner { impl Controller for BufferController {} impl AsyncSender for BufferController { - fn send(&self, op: TextChange) -> ControllerResult<()> { - self.0.ops_in.send(op)?; - Ok(()) + fn send(&self, op: TextChange) -> ControllerResult> { + let (tx, rx) = oneshot::channel(); + self.0.ops_in.send((op, tx))?; + Ok(async move { rx.await.unwrap_or(false) }) } } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 9b6b434a..68d7aa60 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -23,7 +23,7 @@ struct BufferWorker { latest_version: watch::Sender, local_version: watch::Sender, ack_rx: mpsc::UnboundedReceiver, - ops_in: mpsc::UnboundedReceiver, + ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender)>, poller: mpsc::UnboundedReceiver>, pollers: Vec>, content_checkout: mpsc::Receiver>, @@ -133,7 +133,7 @@ impl BufferController { // received a text change from editor res = worker.ops_in.recv() => match res { None => break tracing::debug!("stopping: editor closed channel"), - Some(change) => worker.handle_editor_change(change, &tx).await, + Some((change, sent)) => worker.handle_editor_change(change, sent, &tx).await, }, // received a message from server: add to oplog and update latest version (+unlock pollers) @@ -170,7 +170,7 @@ impl BufferController { impl BufferWorker { #[tracing::instrument(skip(self, tx))] - async fn handle_editor_change(&mut self, change: TextChange, tx: &mpsc::Sender) { + async fn handle_editor_change(&mut self, change: TextChange, sent: oneshot::Sender, tx: &mpsc::Sender) { let last_ver = self.oplog.local_version(); // clip to buffer extents let clip_start = change.start_idx as usize; @@ -199,14 +199,17 @@ impl BufferWorker { tx.send(Operation { data: self.oplog.encode_from(ENCODE_PATCH, &last_ver), }) - .await - .unwrap_or_warn("failed to send change!"); + .await + .unwrap_or_warn("failed to send change!"); self.latest_version .send(self.oplog.local_version()) .unwrap_or_warn("failed to update latest version!"); self.local_version .send(self.branch.local_version()) .unwrap_or_warn("failed to update local version!"); + let _ = sent.send(true); + } else { + let _ = sent.send(false); } } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index d0c544c5..2a40c31b 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -45,7 +45,7 @@ impl Controller for CursorController {} #[cfg_attr(feature = "async-trait", async_trait::async_trait)] impl AsyncSender for CursorController { - fn send(&self, mut cursor: Selection) -> ControllerResult<()> { + fn send(&self, mut cursor: Selection) -> ControllerResult> { if cursor.start_row > cursor.end_row || (cursor.start_row == cursor.end_row && cursor.start_col > cursor.end_col) { @@ -53,7 +53,7 @@ impl AsyncSender for CursorController { std::mem::swap(&mut cursor.start_col, &mut cursor.end_col); } - Ok(self.0.op.send(CursorPosition { + self.0.op.send(CursorPosition { buffer: BufferNode { path: cursor.buffer, }, @@ -65,7 +65,9 @@ impl AsyncSender for CursorController { row: cursor.end_row, col: cursor.end_col, }, - })?) + })?; + + Ok(std::future::ready(true)) } } diff --git a/src/workspace.rs b/src/workspace.rs index 80d2af05..68367439 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -174,7 +174,7 @@ impl Workspace { }); let credentials = worskspace_client.access_buffer(request).await?.into_inner(); - let (tx, rx) = mpsc::channel(256); + let (tx, rx) = mpsc::channel(1); let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx)); req.metadata_mut().insert( "buffer", From 223f764f7548214f645627dffd3754efe1e9b76e Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 19 Feb 2025 14:59:11 +0100 Subject: [PATCH 2/2] fix: put back outbound queue --- src/workspace.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/workspace.rs b/src/workspace.rs index 68367439..80d2af05 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -174,7 +174,7 @@ impl Workspace { }); let credentials = worskspace_client.access_buffer(request).await?.into_inner(); - let (tx, rx) = mpsc::channel(1); + let (tx, rx) = mpsc::channel(256); let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx)); req.metadata_mut().insert( "buffer",