@@ -563,6 +563,9 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: D
563563
564564 peer_counter : AtomicCounter ,
565565
566+ gossip_processing_backlogged : AtomicBool ,
567+ gossip_processing_backlog_lifted : AtomicBool ,
568+
566569 node_signer : NS ,
567570
568571 logger : L ,
@@ -721,6 +724,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
721724 blocked_event_processors : AtomicBool :: new ( false ) ,
722725 ephemeral_key_midstate,
723726 peer_counter : AtomicCounter :: new ( ) ,
727+ gossip_processing_backlogged : AtomicBool :: new ( false ) ,
728+ gossip_processing_backlog_lifted : AtomicBool :: new ( false ) ,
724729 last_node_announcement_serial : AtomicU32 :: new ( current_time) ,
725730 logger,
726731 custom_message_handler,
@@ -847,7 +852,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
847852 Ok ( ( ) )
848853 }
849854
850- fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
855+ fn peer_should_read ( & self , peer : & Peer ) -> bool {
856+ !self . gossip_processing_backlogged . load ( Ordering :: Relaxed ) && peer. should_read ( )
857+ }
858+
859+ fn update_gossip_backlogged ( & self ) {
860+ let new_state = self . message_handler . route_handler . processing_queue_high ( ) ;
861+ let prev_state = self . gossip_processing_backlogged . swap ( new_state, Ordering :: Relaxed ) ;
862+ if prev_state && !new_state {
863+ self . gossip_processing_backlog_lifted . store ( true , Ordering :: Relaxed ) ;
864+ }
865+ }
866+
867+ fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer , force_one_write : bool ) {
868+ let mut have_written = false ;
851869 while !peer. awaiting_write_event {
852870 if peer. should_buffer_onion_message ( ) {
853871 if let Some ( peer_node_id) = peer. their_node_id {
@@ -905,12 +923,22 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
905923 }
906924
907925 let next_buff = match peer. pending_outbound_buffer . front ( ) {
908- None => return ,
926+ None => {
927+ if force_one_write && !have_written {
928+ let should_read = self . peer_should_read ( & peer) ;
929+ if should_read {
930+ let data_sent = descriptor. send_data ( & [ ] , should_read) ;
931+ debug_assert_eq ! ( data_sent, 0 , "Can't write more than no data" ) ;
932+ }
933+ }
934+ return
935+ } ,
909936 Some ( buff) => buff,
910937 } ;
911938
912939 let pending = & next_buff[ peer. pending_outbound_buffer_first_msg_offset ..] ;
913- let data_sent = descriptor. send_data ( pending, peer. should_read ( ) ) ;
940+ let data_sent = descriptor. send_data ( pending, self . peer_should_read ( & peer) ) ;
941+ have_written = true ;
914942 peer. pending_outbound_buffer_first_msg_offset += data_sent;
915943 if peer. pending_outbound_buffer_first_msg_offset == next_buff. len ( ) {
916944 peer. pending_outbound_buffer_first_msg_offset = 0 ;
@@ -945,7 +973,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
945973 Some ( peer_mutex) => {
946974 let mut peer = peer_mutex. lock ( ) . unwrap ( ) ;
947975 peer. awaiting_write_event = false ;
948- self . do_attempt_write_data ( descriptor, & mut peer) ;
976+ self . do_attempt_write_data ( descriptor, & mut peer, false ) ;
949977 }
950978 } ;
951979 Ok ( ( ) )
@@ -1192,7 +1220,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
11921220 }
11931221 }
11941222 }
1195- pause_read = !peer . should_read ( ) ;
1223+ pause_read = !self . peer_should_read ( & peer ) ;
11961224
11971225 if let Some ( message) = msg_to_handle {
11981226 match self . handle_message ( & peer_mutex, peer_lock, message) {
@@ -1404,19 +1432,22 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
14041432 . map_err ( |e| -> MessageHandlingError { e. into ( ) } ) ? {
14051433 should_forward = Some ( wire:: Message :: ChannelAnnouncement ( msg) ) ;
14061434 }
1435+ self . update_gossip_backlogged ( ) ;
14071436 } ,
14081437 wire:: Message :: NodeAnnouncement ( msg) => {
14091438 if self . message_handler . route_handler . handle_node_announcement ( & msg)
14101439 . map_err ( |e| -> MessageHandlingError { e. into ( ) } ) ? {
14111440 should_forward = Some ( wire:: Message :: NodeAnnouncement ( msg) ) ;
14121441 }
1442+ self . update_gossip_backlogged ( ) ;
14131443 } ,
14141444 wire:: Message :: ChannelUpdate ( msg) => {
14151445 self . message_handler . chan_handler . handle_channel_update ( & their_node_id, & msg) ;
14161446 if self . message_handler . route_handler . handle_channel_update ( & msg)
14171447 . map_err ( |e| -> MessageHandlingError { e. into ( ) } ) ? {
14181448 should_forward = Some ( wire:: Message :: ChannelUpdate ( msg) ) ;
14191449 }
1450+ self . update_gossip_backlogged ( ) ;
14201451 } ,
14211452 wire:: Message :: QueryShortChannelIds ( msg) => {
14221453 self . message_handler . route_handler . handle_query_short_channel_ids ( & their_node_id, msg) ?;
@@ -1568,6 +1599,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
15681599 }
15691600 }
15701601
1602+ self . update_gossip_backlogged ( ) ;
1603+ let flush_read_disabled = self . gossip_processing_backlog_lifted . swap ( false , Ordering :: Relaxed ) ;
1604+
15711605 let mut peers_to_disconnect = HashMap :: new ( ) ;
15721606 let mut events_generated = self . message_handler . chan_handler . get_and_clear_pending_msg_events ( ) ;
15731607 events_generated. append ( & mut self . message_handler . route_handler . get_and_clear_pending_msg_events ( ) ) ;
@@ -1797,7 +1831,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
17971831 }
17981832
17991833 for ( descriptor, peer_mutex) in peers. iter ( ) {
1800- self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , & mut * peer_mutex. lock ( ) . unwrap ( ) ) ;
1834+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , & mut * peer_mutex. lock ( ) . unwrap ( ) , flush_read_disabled ) ;
18011835 }
18021836 }
18031837 if !peers_to_disconnect. is_empty ( ) {
@@ -1819,7 +1853,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
18191853 self . enqueue_message ( & mut * peer, & msg) ;
18201854 // This isn't guaranteed to work, but if there is enough free
18211855 // room in the send buffer, put the error message there...
1822- self . do_attempt_write_data ( & mut descriptor, & mut * peer) ;
1856+ self . do_attempt_write_data ( & mut descriptor, & mut * peer, false ) ;
18231857 } else {
18241858 log_trace ! ( self . logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message" , log_pubkey!( node_id) ) ;
18251859 }
@@ -1927,6 +1961,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19271961 {
19281962 let peers_lock = self . peers . read ( ) . unwrap ( ) ;
19291963
1964+ self . update_gossip_backlogged ( ) ;
1965+ let flush_read_disabled = self . gossip_processing_backlog_lifted . swap ( false , Ordering :: Relaxed ) ;
1966+
19301967 for ( descriptor, peer_mutex) in peers_lock. iter ( ) {
19311968 let mut peer = peer_mutex. lock ( ) . unwrap ( ) ;
19321969 if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_node_id . is_none ( ) {
@@ -1942,34 +1979,37 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19421979 continue ;
19431980 }
19441981
1945- if peer. awaiting_pong_timer_tick_intervals == -1 {
1946- // Magic value set in `maybe_send_extra_ping`.
1947- peer. awaiting_pong_timer_tick_intervals = 1 ;
1982+ loop { // Used as a `goto` to skip writing a Ping message.
1983+ if peer. awaiting_pong_timer_tick_intervals == -1 {
1984+ // Magic value set in `maybe_send_extra_ping`.
1985+ peer. awaiting_pong_timer_tick_intervals = 1 ;
1986+ peer. received_message_since_timer_tick = false ;
1987+ break ;
1988+ }
1989+
1990+ if ( peer. awaiting_pong_timer_tick_intervals > 0 && !peer. received_message_since_timer_tick )
1991+ || peer. awaiting_pong_timer_tick_intervals as u64 >
1992+ MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock. len ( ) as u64
1993+ {
1994+ descriptors_needing_disconnect. push ( descriptor. clone ( ) ) ;
1995+ break ;
1996+ }
19481997 peer. received_message_since_timer_tick = false ;
1949- continue ;
1950- }
19511998
1952- if ( peer. awaiting_pong_timer_tick_intervals > 0 && !peer. received_message_since_timer_tick )
1953- || peer. awaiting_pong_timer_tick_intervals as u64 >
1954- MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock. len ( ) as u64
1955- {
1956- descriptors_needing_disconnect. push ( descriptor. clone ( ) ) ;
1957- continue ;
1958- }
1959- peer. received_message_since_timer_tick = false ;
1999+ if peer. awaiting_pong_timer_tick_intervals > 0 {
2000+ peer. awaiting_pong_timer_tick_intervals += 1 ;
2001+ break ;
2002+ }
19602003
1961- if peer. awaiting_pong_timer_tick_intervals > 0 {
1962- peer. awaiting_pong_timer_tick_intervals += 1 ;
1963- continue ;
2004+ peer. awaiting_pong_timer_tick_intervals = 1 ;
2005+ let ping = msgs:: Ping {
2006+ ponglen : 0 ,
2007+ byteslen : 64 ,
2008+ } ;
2009+ self . enqueue_message ( & mut * peer, & ping) ;
2010+ break ;
19642011 }
1965-
1966- peer. awaiting_pong_timer_tick_intervals = 1 ;
1967- let ping = msgs:: Ping {
1968- ponglen : 0 ,
1969- byteslen : 64 ,
1970- } ;
1971- self . enqueue_message ( & mut * peer, & ping) ;
1972- self . do_attempt_write_data ( & mut ( descriptor. clone ( ) ) , & mut * peer) ;
2012+ self . do_attempt_write_data ( & mut ( descriptor. clone ( ) ) , & mut * peer, flush_read_disabled) ;
19732013 }
19742014 }
19752015
0 commit comments