@@ -1007,6 +1007,20 @@ where
10071007 }
10081008 msgs
10091009 }
1010+
1011+ fn enqueue_event ( & self , event : Event ) {
1012+ const MAX_EVENTS_BUFFER_SIZE : usize = ( 1 << 10 ) * 256 ;
1013+ let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
1014+ let total_buffered_bytes: usize = pending_events
1015+ . iter ( )
1016+ . map ( |ev| ev. serialized_length ( ) )
1017+ . sum ( ) ;
1018+ if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
1019+ log_trace ! ( self . logger, "Dropping event {:?}: buffer full" , event) ;
1020+ return
1021+ }
1022+ pending_events. push ( event) ;
1023+ }
10101024}
10111025
10121026fn outbound_buffer_full ( peer_node_id : & PublicKey , buffer : & HashMap < PublicKey , OnionMessageRecipient > ) -> bool {
@@ -1134,7 +1148,7 @@ where
11341148 log_trace ! ( logger, "Forwarding an onion message to peer {}" , next_node_id) ;
11351149 } ,
11361150 _ if self . intercept_messages_for_offline_peers => {
1137- self . pending_events . lock ( ) . unwrap ( ) . push (
1151+ self . enqueue_event (
11381152 Event :: OnionMessageIntercepted {
11391153 peer_node_id : next_node_id, message : onion_message
11401154 }
@@ -1162,7 +1176,7 @@ where
11621176 . or_insert_with ( || OnionMessageRecipient :: ConnectedPeer ( VecDeque :: new ( ) ) )
11631177 . mark_connected ( ) ;
11641178 if self . intercept_messages_for_offline_peers {
1165- self . pending_events . lock ( ) . unwrap ( ) . push (
1179+ self . enqueue_event (
11661180 Event :: OnionMessagePeerConnected { peer_node_id : * their_node_id }
11671181 ) ;
11681182 }
0 commit comments