1- use std:: { collections :: HashSet , convert:: Infallible , sync:: Arc , time:: Duration } ;
1+ use std:: { convert:: Infallible , sync:: Arc , time:: Duration } ;
22
33use futures:: { future:: BoxFuture , FutureExt } ;
44use tokio:: task:: JoinHandle ;
@@ -21,12 +21,9 @@ use crate::{
2121 self , ContractHandler , ContractHandlerChannel , ExecutorToEventLoopChannel ,
2222 NetworkEventListenerHalve , WaitingResolution ,
2323 } ,
24- message:: { NetMessage , NetMessageV1 , NodeEvent } ,
24+ message:: NodeEvent ,
2525 node:: NodeConfig ,
26- operations:: {
27- connect:: { self , ConnectOp } ,
28- OpEnum ,
29- } ,
26+ operations:: connect,
3027} ;
3128
3229use super :: OpManager ;
@@ -49,10 +46,14 @@ pub(crate) struct NodeP2P {
4946}
5047
5148impl NodeP2P {
52- /// Aggressively establish connections during startup to avoid on-demand delays
53- async fn aggressive_initial_connections ( & self ) {
54- let min_connections = self . op_manager . ring . connection_manager . min_connections ;
55-
49+ /// Aggressively wait for connections during startup to avoid on-demand delays.
50+ /// This is an associated function that can be spawned as a task to run concurrently
51+ /// with the event listener. Without the event listener running, connection
52+ /// handshakes won't be processed.
53+ async fn aggressive_initial_connections_impl (
54+ op_manager : & Arc < OpManager > ,
55+ min_connections : usize ,
56+ ) {
5657 tracing:: info!(
5758 "Starting aggressive connection acquisition phase (target: {} connections)" ,
5859 min_connections
@@ -63,14 +64,13 @@ impl NodeP2P {
6364 let start = std:: time:: Instant :: now ( ) ;
6465 let max_duration = Duration :: from_secs ( 10 ) ;
6566 let mut last_connection_count = 0 ;
66- let mut stable_rounds = 0 ;
6767
6868 while start. elapsed ( ) < max_duration {
6969 // Cooperative yielding for CI environments with limited CPU cores
70- // Research shows CI (2 cores) needs explicit yields to prevent task starvation
70+ // This is critical - the event listener needs CPU time to process handshakes
7171 tokio:: task:: yield_now ( ) . await ;
7272
73- let current_connections = self . op_manager . ring . open_connections ( ) ;
73+ let current_connections = op_manager. ring . open_connections ( ) ;
7474
7575 // If we've reached our target, we're done
7676 if current_connections >= min_connections {
@@ -82,50 +82,34 @@ impl NodeP2P {
8282 break ;
8383 }
8484
85- // If connection count is stable for 3 rounds, actively trigger more connections
86- if current_connections == last_connection_count {
87- stable_rounds += 1 ;
88- if stable_rounds >= 3 && current_connections > 0 {
89- tracing:: info!(
90- "Connection count stable at {}, triggering active peer discovery" ,
91- current_connections
92- ) ;
93-
94- // Trigger the connection maintenance task to actively look for more peers
95- // In small networks, we want to be more aggressive
96- for _ in 0 ..3 {
97- // Yield before each connection attempt to prevent blocking other tasks
98- tokio:: task:: yield_now ( ) . await ;
99-
100- if let Err ( e) = self . trigger_connection_maintenance ( ) . await {
101- tracing:: warn!( "Failed to trigger connection maintenance: {}" , e) ;
102- }
103- tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
104- }
105- stable_rounds = 0 ;
106- }
107- } else {
108- stable_rounds = 0 ;
85+ // Log progress when connection count changes
86+ if current_connections != last_connection_count {
87+ tracing:: info!(
88+ "Connection progress: {}/{} (elapsed: {}s)" ,
89+ current_connections,
90+ min_connections,
91+ start. elapsed( ) . as_secs( )
92+ ) ;
10993 last_connection_count = current_connections;
94+ } else {
95+ tracing:: debug!(
96+ "Current connections: {}/{}, waiting for more peers (elapsed: {}s)" ,
97+ current_connections,
98+ min_connections,
99+ start. elapsed( ) . as_secs( )
100+ ) ;
110101 }
111102
112- tracing:: debug!(
113- "Current connections: {}/{}, waiting for more peers (elapsed: {}s)" ,
114- current_connections,
115- min_connections,
116- start. elapsed( ) . as_secs( )
117- ) ;
118-
119- // Check more frequently at the beginning
103+ // Check more frequently at the beginning to detect quick connections
120104 let sleep_duration = if start. elapsed ( ) < Duration :: from_secs ( 3 ) {
121- Duration :: from_millis ( 500 )
105+ Duration :: from_millis ( 250 )
122106 } else {
123- Duration :: from_secs ( 1 )
107+ Duration :: from_millis ( 500 )
124108 } ;
125109 tokio:: time:: sleep ( sleep_duration) . await ;
126110 }
127111
128- let final_connections = self . op_manager . ring . open_connections ( ) ;
112+ let final_connections = op_manager. ring . open_connections ( ) ;
129113 tracing:: info!(
130114 "Aggressive connection phase complete. Final connections: {}/{} (took {}s)" ,
131115 final_connections,
@@ -134,56 +118,6 @@ impl NodeP2P {
134118 ) ;
135119 }
136120
137- /// Trigger the connection maintenance task to actively look for more peers
138- async fn trigger_connection_maintenance ( & self ) -> anyhow:: Result < ( ) > {
139- let ideal_location = Location :: random ( ) ;
140-
141- // Find a connected peer to query
142- let query_target = {
143- let router = self . op_manager . ring . router . read ( ) ;
144- self . op_manager . ring . connection_manager . routing (
145- ideal_location,
146- None ,
147- & HashSet :: < std:: net:: SocketAddr > :: new ( ) ,
148- & router,
149- )
150- } ;
151-
152- if let Some ( query_target) = query_target {
153- let joiner = self . op_manager . ring . connection_manager . own_location ( ) ;
154- let ttl = self
155- . op_manager
156- . ring
157- . max_hops_to_live
158- . max ( 1 )
159- . min ( u8:: MAX as usize ) as u8 ;
160- let target_connections = self . op_manager . ring . connection_manager . min_connections ;
161-
162- let ( tx, op, msg) = ConnectOp :: initiate_join_request (
163- joiner,
164- query_target. clone ( ) ,
165- ideal_location,
166- ttl,
167- target_connections,
168- self . op_manager . connect_forward_estimator . clone ( ) ,
169- ) ;
170-
171- tracing:: debug!(
172- %tx,
173- query_peer = %query_target,
174- %ideal_location,
175- "Triggering connection maintenance connect request"
176- ) ;
177- self . op_manager
178- . notify_op_change (
179- NetMessage :: V1 ( NetMessageV1 :: Connect ( msg) ) ,
180- OpEnum :: Connect ( Box :: new ( op) ) ,
181- )
182- . await ?;
183- }
184-
185- Ok ( ( ) )
186- }
187121 pub ( super ) async fn run_node ( mut self ) -> anyhow:: Result < Infallible > {
188122 if self . should_try_connect {
189123 let join_handle = connect:: initial_join_procedure (
@@ -193,11 +127,26 @@ impl NodeP2P {
193127 . await ?;
194128 self . initial_join_task = Some ( join_handle) ;
195129
196- // After connecting to gateways, aggressively try to reach min_connections
197- // This is important for fast startup and avoiding on-demand connection delays
198- self . aggressive_initial_connections ( ) . await ;
130+ // Note: We don't run aggressive_initial_connections here because
131+ // the event listener hasn't started yet. The connect requests from
132+ // initial_join_procedure are queued but won't be processed until
133+ // the event listener runs. Instead, we'll run the aggressive
134+ // connection phase concurrently with the event listener below.
199135 }
200136
137+ // Spawn aggressive connection task to run concurrently with event listener.
138+ // This is needed because connection handshakes are processed by the event
139+ // listener, so we can't block waiting for connections before it starts.
140+ let aggressive_conn_task = if self . should_try_connect {
141+ let op_manager = self . op_manager . clone ( ) ;
142+ let min_connections = op_manager. ring . connection_manager . min_connections ;
143+ Some ( tokio:: spawn ( async move {
144+ Self :: aggressive_initial_connections_impl ( & op_manager, min_connections) . await ;
145+ } ) )
146+ } else {
147+ None
148+ } ;
149+
201150 let f = self . conn_manager . run_event_listener (
202151 self . op_manager . clone ( ) ,
203152 self . client_wait_for_transaction ,
@@ -226,6 +175,9 @@ impl NodeP2P {
226175 if let Some ( handle) = join_task {
227176 handle. abort ( ) ;
228177 }
178+ if let Some ( handle) = aggressive_conn_task {
179+ handle. abort ( ) ;
180+ }
229181
230182 result
231183 }
0 commit comments