@@ -770,6 +770,57 @@ fn op_retry_backoff(attempt: usize) -> Duration {
770770 Duration :: from_millis ( ( 5u64 << attempt. min ( 8 ) ) . min ( 1_000 ) )
771771}
772772
773+ /// If `op_result` indicates the operation completed and a `pending_op_result`
774+ /// callback is wired, forward `reply` to the awaiting caller of
775+ /// `OpManager::notify_op_execution`.
776+ ///
777+ /// This is the "round-trip primitive" used by the (currently dead) async
778+ /// sub-transaction scaffolding: `notify_op_execution` inserts a one-shot
779+ /// bounded sender into `p2p_protoc::pending_op_results`, and each branch of
780+ /// `handle_pure_network_message_v1` calls this helper after
781+ /// `handle_op_request` so the caller can `await` a single reply keyed by the
782+ /// same `Transaction`.
783+ ///
784+ /// Wired for PUT and GET historically; extended to SUBSCRIBE, CONNECT, and
785+ /// UPDATE in Phase 1 of the async-transaction refactor (#1454) so every op
786+ /// kind can terminate a `notify_op_execution` round-trip without hanging.
787+ ///
788+ /// # Channel safety
789+ ///
790+ /// Uses `try_send` rather than `.send().await` on the bounded capacity-1
791+ /// mpsc channel created inside `notify_op_execution`. This is sound
792+ /// because the callback is fired **at most once per transaction**: the
793+ /// `is_operation_completed` guard above combined with the `completed` /
794+ /// `under_progress` dedup sets in `OpManager` ensures that subsequent
795+ /// messages for the same tx short-circuit before reaching this code. So
796+ /// `try_send` on an empty capacity-1 channel cannot fail with `Full` —
797+ /// it can only fail with `Closed` when the caller of `notify_op_execution`
798+ /// has dropped its receiver. Using `try_send` eliminates any risk of
799+ /// blocking the pure-network-message handler if a future consumer ever
800+ /// ends up unable to drain the reply, satisfying the preference for
801+ /// non-blocking sends in `.claude/rules/channel-safety.md`.
802+ ///
803+ /// In Phase 1 (#1454) this path is dormant: `pending_op_result` is always
804+ /// `None` because no production caller of `notify_op_execution` exists.
805+ /// Phase 2 will introduce the first real caller; the `try_send` choice
806+ /// here means Phase 2 does not need to touch this function.
807+ fn forward_pending_op_result_if_completed (
808+ op_result : & Result < Option < OpEnum > , OpError > ,
809+ pending_op_result : Option < & tokio:: sync:: mpsc:: Sender < NetMessage > > ,
810+ reply : NetMessage ,
811+ ) {
812+ if !is_operation_completed ( op_result) {
813+ return ;
814+ }
815+ let Some ( callback) = pending_op_result else {
816+ return ;
817+ } ;
818+ let tx_id = * reply. id ( ) ;
819+ if let Err ( err) = callback. try_send ( reply) {
820+ tracing:: error!( %err, %tx_id, "Failed to send message to executor" ) ;
821+ }
822+ }
823+
773824/// Pure network message processing for V1 messages (no client concerns)
774825#[ allow( clippy:: too_many_arguments) ]
775826async fn handle_pure_network_message_v1 < CB > (
@@ -815,6 +866,13 @@ where
815866 . instrument ( span)
816867 . await ;
817868
869+ // Handle pending operation results (network concern)
870+ forward_pending_op_result_if_completed (
871+ & op_result,
872+ pending_op_result. as_ref ( ) ,
873+ NetMessage :: V1 ( NetMessageV1 :: Connect ( ( * op) . clone ( ) ) ) ,
874+ ) ;
875+
818876 if let Err ( OpError :: OpNotAvailable ( state) ) = & op_result {
819877 match state {
820878 OpNotAvailable :: Running => {
@@ -865,17 +923,11 @@ where
865923 ) ;
866924
867925 // Handle pending operation results (network concern)
868- if is_operation_completed ( & op_result) {
869- if let Some ( ref op_execution_callback) = pending_op_result {
870- let tx_id = * op. id ( ) ;
871- if let Err ( err) = op_execution_callback
872- . send ( NetMessage :: V1 ( NetMessageV1 :: Put ( ( * op) . clone ( ) ) ) )
873- . await
874- {
875- tracing:: error!( %err, %tx_id, "Failed to send message to executor" ) ;
876- }
877- }
878- }
926+ forward_pending_op_result_if_completed (
927+ & op_result,
928+ pending_op_result. as_ref ( ) ,
929+ NetMessage :: V1 ( NetMessageV1 :: Put ( ( * op) . clone ( ) ) ) ,
930+ ) ;
879931
880932 if let Err ( OpError :: OpNotAvailable ( state) ) = & op_result {
881933 match state {
@@ -914,17 +966,11 @@ where
914966 . await ;
915967
916968 // Handle pending operation results (network concern)
917- if is_operation_completed ( & op_result) {
918- if let Some ( ref op_execution_callback) = pending_op_result {
919- let tx_id = * op. id ( ) ;
920- if let Err ( err) = op_execution_callback
921- . send ( NetMessage :: V1 ( NetMessageV1 :: Get ( ( * op) . clone ( ) ) ) )
922- . await
923- {
924- tracing:: error!( %err, %tx_id, "Failed to send message to executor" ) ;
925- }
926- }
927- }
969+ forward_pending_op_result_if_completed (
970+ & op_result,
971+ pending_op_result. as_ref ( ) ,
972+ NetMessage :: V1 ( NetMessageV1 :: Get ( ( * op) . clone ( ) ) ) ,
973+ ) ;
928974
929975 if let Err ( OpError :: OpNotAvailable ( state) ) = & op_result {
930976 match state {
@@ -962,6 +1008,13 @@ where
9621008 )
9631009 . await ;
9641010
1011+ // Handle pending operation results (network concern)
1012+ forward_pending_op_result_if_completed (
1013+ & op_result,
1014+ pending_op_result. as_ref ( ) ,
1015+ NetMessage :: V1 ( NetMessageV1 :: Update ( ( * op) . clone ( ) ) ) ,
1016+ ) ;
1017+
9651018 if let Err ( OpError :: OpNotAvailable ( state) ) = & op_result {
9661019 match state {
9671020 OpNotAvailable :: Running => {
@@ -998,6 +1051,22 @@ where
9981051 )
9991052 . await ;
10001053
1054+ // Handle pending operation results (network concern).
1055+ //
1056+ // Phase 2 deferred: `subscribe::complete_local_subscription`
1057+ // (operations/subscribe.rs) finishes a subscription without
1058+ // any network round-trip, so a `notify_op_execution` caller
1059+ // targeting a locally-completed SUBSCRIBE would still hang
1060+ // because no `NetMessage` ever reaches this branch. Handling
1061+ // the local-completion path requires either a synthetic
1062+ // "locally completed" reply or a change to the
1063+ // `notify_op_execution` contract. See #1454 §5 Phase 2.
1064+ forward_pending_op_result_if_completed (
1065+ & op_result,
1066+ pending_op_result. as_ref ( ) ,
1067+ NetMessage :: V1 ( NetMessageV1 :: Subscribe ( ( * op) . clone ( ) ) ) ,
1068+ ) ;
1069+
10011070 if let Err ( OpError :: OpNotAvailable ( state) ) = & op_result {
10021071 match state {
10031072 OpNotAvailable :: Running => {
@@ -2462,4 +2531,153 @@ mod tests {
24622531 assert ! ( !success) ;
24632532 }
24642533 }
2534+
2535+ // Phase 1 (#1454) tests for forward_pending_op_result_if_completed.
2536+ //
2537+ // These exercise the callback-forwarding helper used by every branch of
2538+ // `handle_pure_network_message_v1`. The helper is the only place that
2539+ // drives the `pending_op_result` oneshot channel from a completed op
2540+ // result back to a caller of `OpManager::notify_op_execution`. Phase 1
2541+ // extended the hook from PUT/GET only to cover SUBSCRIBE/CONNECT/UPDATE
2542+ // as well, so these tests verify the helper forwards correctly for every
2543+ // op variant and short-circuits in the negative cases.
2544+ mod callback_forward_tests {
2545+ use super :: super :: { OpError , OpNotAvailable , forward_pending_op_result_if_completed} ;
2546+ use crate :: message:: { MessageStats , NetMessage , NetMessageV1 , Transaction } ;
2547+ use crate :: operations:: OpEnum ;
2548+ use crate :: operations:: connect:: { ConnectMsg , ConnectOp , ConnectState } ;
2549+
2550+ fn completed_connect_op ( ) -> ConnectOp {
2551+ ConnectOp :: with_state ( ConnectState :: Completed )
2552+ }
2553+
2554+ fn dummy_reply ( ) -> NetMessage {
2555+ // We don't care about the payload — the helper only looks at
2556+ // `NetMessage::id()` for logging. Use the tx-only `Aborted`
2557+ // variant to avoid building an entire ConnectMsg payload.
2558+ NetMessage :: V1 ( NetMessageV1 :: Aborted ( Transaction :: new :: < ConnectMsg > ( ) ) )
2559+ }
2560+
2561+ #[ tokio:: test]
2562+ async fn forwards_reply_when_completed_and_sender_present ( ) {
2563+ let op = completed_connect_op ( ) ;
2564+ let op_result = Ok ( Some ( OpEnum :: Connect ( Box :: new ( op) ) ) ) ;
2565+
2566+ let ( tx, mut rx) = tokio:: sync:: mpsc:: channel :: < NetMessage > ( 1 ) ;
2567+ let reply = dummy_reply ( ) ;
2568+ let expected_id = * reply. id ( ) ;
2569+
2570+ forward_pending_op_result_if_completed ( & op_result, Some ( & tx) , reply) ;
2571+
2572+ let received = rx. try_recv ( ) . expect ( "helper should forward the reply" ) ;
2573+ assert_eq ! ( * received. id( ) , expected_id) ;
2574+ }
2575+
2576+ #[ tokio:: test]
2577+ async fn no_forward_when_sender_absent ( ) {
2578+ // Helper must not panic / block when no pending_op_result sender is wired.
2579+ let op = completed_connect_op ( ) ;
2580+ let op_result = Ok ( Some ( OpEnum :: Connect ( Box :: new ( op) ) ) ) ;
2581+
2582+ forward_pending_op_result_if_completed ( & op_result, None , dummy_reply ( ) ) ;
2583+ // Nothing to assert beyond "did not panic".
2584+ }
2585+
2586+ #[ tokio:: test]
2587+ async fn no_forward_when_op_not_completed ( ) {
2588+ // `Ok(None)` and OpError variants should not trigger a send even if
2589+ // a sender is present. This is the guard that keeps in-progress
2590+ // ops (e.g. `SendAndContinue`) from prematurely firing the callback.
2591+ let ( tx, mut rx) = tokio:: sync:: mpsc:: channel :: < NetMessage > ( 1 ) ;
2592+
2593+ let ok_none: Result < Option < OpEnum > , OpError > = Ok ( None ) ;
2594+ forward_pending_op_result_if_completed ( & ok_none, Some ( & tx) , dummy_reply ( ) ) ;
2595+ assert ! ( rx. try_recv( ) . is_err( ) , "Ok(None) must not forward" ) ;
2596+
2597+ let err_running: Result < Option < OpEnum > , OpError > =
2598+ Err ( OpError :: OpNotAvailable ( OpNotAvailable :: Running ) ) ;
2599+ forward_pending_op_result_if_completed ( & err_running, Some ( & tx) , dummy_reply ( ) ) ;
2600+ assert ! (
2601+ rx. try_recv( ) . is_err( ) ,
2602+ "OpNotAvailable::Running must not forward"
2603+ ) ;
2604+
2605+ let err_completed: Result < Option < OpEnum > , OpError > =
2606+ Err ( OpError :: OpNotAvailable ( OpNotAvailable :: Completed ) ) ;
2607+ forward_pending_op_result_if_completed ( & err_completed, Some ( & tx) , dummy_reply ( ) ) ;
2608+ assert ! (
2609+ rx. try_recv( ) . is_err( ) ,
2610+ "OpNotAvailable::Completed must not forward (no OpEnum payload)"
2611+ ) ;
2612+ }
2613+
2614+ #[ tokio:: test]
2615+ async fn no_forward_when_op_in_progress ( ) {
2616+ // A non-completed op state (WaitingForResponses) must not trigger
2617+ // the callback even though the op exists — this is the core guard
2618+ // that keeps mid-flight operations from prematurely terminating a
2619+ // `notify_op_execution` round-trip.
2620+ use crate :: operations:: connect:: JoinerState ;
2621+ use std:: collections:: HashSet ;
2622+ use tokio:: time:: Instant ;
2623+
2624+ let waiting = ConnectState :: WaitingForResponses ( JoinerState {
2625+ target_connections : 1 ,
2626+ observed_address : None ,
2627+ accepted : HashSet :: new ( ) ,
2628+ last_progress : Instant :: now ( ) ,
2629+ started_without_address : true ,
2630+ } ) ;
2631+ let op = ConnectOp :: with_state ( waiting) ;
2632+ assert ! (
2633+ !op. is_completed( ) ,
2634+ "precondition: WaitingForResponses must not be completed"
2635+ ) ;
2636+ let op_result = Ok ( Some ( OpEnum :: Connect ( Box :: new ( op) ) ) ) ;
2637+
2638+ let ( tx, mut rx) = tokio:: sync:: mpsc:: channel :: < NetMessage > ( 1 ) ;
2639+ forward_pending_op_result_if_completed ( & op_result, Some ( & tx) , dummy_reply ( ) ) ;
2640+ assert ! (
2641+ rx. try_recv( ) . is_err( ) ,
2642+ "in-progress op must not forward to pending_op_result"
2643+ ) ;
2644+ }
2645+
2646+ #[ tokio:: test]
2647+ async fn no_hang_when_receiver_dropped ( ) {
2648+ // Regression guard for the `try_send` channel-safety choice:
2649+ // if the caller of `notify_op_execution` drops its receiver
2650+ // (e.g. cancelled, timed out) before the op completes, the
2651+ // reply side must not block the pure-network-message handler.
2652+ // With `try_send` the send fails with `Closed` and we log;
2653+ // with `.send().await` it would have succeeded but stranded
2654+ // the message. Either way the handler must make progress —
2655+ // the test asserts the helper returns promptly (the
2656+ // `#[tokio::test]` runtime would hang the whole test process
2657+ // on regression).
2658+ let op = completed_connect_op ( ) ;
2659+ let op_result = Ok ( Some ( OpEnum :: Connect ( Box :: new ( op) ) ) ) ;
2660+
2661+ let ( tx, rx) = tokio:: sync:: mpsc:: channel :: < NetMessage > ( 1 ) ;
2662+ drop ( rx) ;
2663+
2664+ forward_pending_op_result_if_completed ( & op_result, Some ( & tx) , dummy_reply ( ) ) ;
2665+ // Returning at all is the assertion.
2666+ }
2667+
2668+ // Note on per-variant coverage: Phase 1's point is that every op
2669+ // variant of `handle_pure_network_message_v1` can terminate a
2670+ // `notify_op_execution` round-trip. The helper tested above is
2671+ // variant-agnostic once the `is_operation_completed` guard passes,
2672+ // and each op's own `is_completed` impl is covered by unit tests in
2673+ // `crates/core/src/operations/{connect,put,get,subscribe,update}.rs`.
2674+ // The remaining "do the five branches of `handle_pure_network_message_v1`
2675+ // actually invoke the helper with the matching reply variant?"
2676+ // question is enforced by the compiler — each branch binds `ref op`
2677+ // for the concrete op type and reconstructs the same variant before
2678+ // handing it to `forward_pending_op_result_if_completed`. An
2679+ // end-to-end integration test that spins up a node and exercises
2680+ // `notify_op_execution` for each op kind belongs in Phase 2, where
2681+ // the first real production caller is added.
2682+ }
24652683}
0 commit comments