@@ -113,7 +113,7 @@ const ONION_MESSAGE_HANDLER_TIMER: u64 = 1;
113113const NETWORK_PRUNE_TIMER : u64 = 60 * 60 ;
114114
115115#[ cfg( not( test) ) ]
116- const SCORER_PERSIST_TIMER : u64 = 60 * 60 ;
116+ const SCORER_PERSIST_TIMER : u64 = 60 * 5 ;
117117#[ cfg( test) ]
118118const SCORER_PERSIST_TIMER : u64 = 1 ;
119119
@@ -244,30 +244,30 @@ fn handle_network_graph_update<L: Deref>(
244244/// Updates scorer based on event and returns whether an update occurred so we can decide whether
245245/// to persist.
246246fn update_scorer < ' a , S : ' static + Deref < Target = SC > + Send + Sync , SC : ' a + WriteableScore < ' a > > (
247- scorer : & ' a S , event : & Event
247+ scorer : & ' a S , event : & Event , duration_since_epoch : Duration ,
248248) -> bool {
249249 match event {
250250 Event :: PaymentPathFailed { ref path, short_channel_id : Some ( scid) , .. } => {
251251 let mut score = scorer. write_lock ( ) ;
252- score. payment_path_failed ( path, * scid) ;
252+ score. payment_path_failed ( path, * scid, duration_since_epoch ) ;
253253 } ,
254254 Event :: PaymentPathFailed { ref path, payment_failed_permanently : true , .. } => {
255255 // Reached if the destination explicitly failed it back. We treat this as a successful probe
256256 // because the payment made it all the way to the destination with sufficient liquidity.
257257 let mut score = scorer. write_lock ( ) ;
258- score. probe_successful ( path) ;
258+ score. probe_successful ( path, duration_since_epoch ) ;
259259 } ,
260260 Event :: PaymentPathSuccessful { path, .. } => {
261261 let mut score = scorer. write_lock ( ) ;
262- score. payment_path_successful ( path) ;
262+ score. payment_path_successful ( path, duration_since_epoch ) ;
263263 } ,
264264 Event :: ProbeSuccessful { path, .. } => {
265265 let mut score = scorer. write_lock ( ) ;
266- score. probe_successful ( path) ;
266+ score. probe_successful ( path, duration_since_epoch ) ;
267267 } ,
268268 Event :: ProbeFailed { path, short_channel_id : Some ( scid) , .. } => {
269269 let mut score = scorer. write_lock ( ) ;
270- score. probe_failed ( path, * scid) ;
270+ score. probe_failed ( path, * scid, duration_since_epoch ) ;
271271 } ,
272272 _ => return false ,
273273 }
@@ -280,7 +280,7 @@ macro_rules! define_run_body {
280280 $channel_manager: ident, $process_channel_manager_events: expr,
281281 $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
282282 $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
283- $timer_elapsed: expr, $check_slow_await: expr
283+ $timer_elapsed: expr, $check_slow_await: expr, $time_fetch : expr ,
284284 ) => { {
285285 log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
286286 $channel_manager. timer_tick_occurred( ) ;
@@ -294,6 +294,7 @@ macro_rules! define_run_body {
294294 let mut last_scorer_persist_call = $get_timer( SCORER_PERSIST_TIMER ) ;
295295 let mut last_rebroadcast_call = $get_timer( REBROADCAST_TIMER ) ;
296296 let mut have_pruned = false ;
297+ let mut have_decayed_scorer = false ;
297298
298299 loop {
299300 $process_channel_manager_events;
@@ -383,11 +384,10 @@ macro_rules! define_run_body {
383384 if should_prune {
384385 // The network graph must not be pruned while rapid sync completion is pending
385386 if let Some ( network_graph) = $gossip_sync. prunable_network_graph( ) {
386- # [ cfg ( feature = "std" ) ] {
387+ if let Some ( duration_since_epoch ) = $time_fetch ( ) {
387388 log_trace!( $logger, "Pruning and persisting network graph." ) ;
388- network_graph. remove_stale_channels_and_tracking( ) ;
389- }
390- #[ cfg( not( feature = "std" ) ) ] {
389+ network_graph. remove_stale_channels_and_tracking_with_time( duration_since_epoch. as_secs( ) ) ;
390+ } else {
391391 log_warn!( $logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time." ) ;
392392 log_trace!( $logger, "Persisting network graph." ) ;
393393 }
@@ -402,9 +402,24 @@ macro_rules! define_run_body {
402402 last_prune_call = $get_timer( prune_timer) ;
403403 }
404404
405+ if !have_decayed_scorer {
406+ if let Some ( ref scorer) = $scorer {
407+ if let Some ( duration_since_epoch) = $time_fetch( ) {
408+ log_trace!( $logger, "Calling time_passed on scorer at startup" ) ;
409+ scorer. write_lock( ) . time_passed( duration_since_epoch) ;
410+ }
411+ }
412+ have_decayed_scorer = true ;
413+ }
414+
405415 if $timer_elapsed( & mut last_scorer_persist_call, SCORER_PERSIST_TIMER ) {
406416 if let Some ( ref scorer) = $scorer {
407- log_trace!( $logger, "Persisting scorer" ) ;
417+ if let Some ( duration_since_epoch) = $time_fetch( ) {
418+ log_trace!( $logger, "Calling time_passed and persisting scorer" ) ;
419+ scorer. write_lock( ) . time_passed( duration_since_epoch) ;
420+ } else {
421+ log_trace!( $logger, "Persisting scorer" ) ;
422+ }
408423 if let Err ( e) = $persister. persist_scorer( & scorer) {
409424 log_error!( $logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
410425 }
@@ -510,12 +525,16 @@ use core::task;
510525/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
511526/// are hundreds or thousands of simultaneous process calls running.
512527///
528+ /// The `fetch_time` parameter should return the current wall clock time, if one is available. If
529+ /// no time is available, some features may be disabled, however the node will still operate fine.
530+ ///
513531/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
514532/// could setup `process_events_async` like this:
515533/// ```
516534/// # use lightning::io;
517535/// # use std::sync::{Arc, RwLock};
518536/// # use std::sync::atomic::{AtomicBool, Ordering};
537+ /// # use std::time::SystemTime;
519538/// # use lightning_background_processor::{process_events_async, GossipSync};
520539/// # struct MyStore {}
521540/// # impl lightning::util::persist::KVStore for MyStore {
@@ -584,6 +603,7 @@ use core::task;
584603/// Some(background_scorer),
585604/// sleeper,
586605/// mobile_interruptable_platform,
606+ /// || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
587607/// )
588608/// .await
589609/// .expect("Failed to process events");
@@ -620,11 +640,12 @@ pub async fn process_events_async<
620640 S : ' static + Deref < Target = SC > + Send + Sync ,
621641 SC : for < ' b > WriteableScore < ' b > ,
622642 SleepFuture : core:: future:: Future < Output = bool > + core:: marker:: Unpin ,
623- Sleeper : Fn ( Duration ) -> SleepFuture
643+ Sleeper : Fn ( Duration ) -> SleepFuture ,
644+ FetchTime : Fn ( ) -> Option < Duration > ,
624645> (
625646 persister : PS , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
626647 gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM , logger : L , scorer : Option < S > ,
627- sleeper : Sleeper , mobile_interruptable_platform : bool ,
648+ sleeper : Sleeper , mobile_interruptable_platform : bool , fetch_time : FetchTime ,
628649) -> Result < ( ) , lightning:: io:: Error >
629650where
630651 UL :: Target : ' static + UtxoLookup ,
@@ -648,15 +669,18 @@ where
648669 let scorer = & scorer;
649670 let logger = & logger;
650671 let persister = & persister;
672+ let fetch_time = & fetch_time;
651673 async move {
652674 if let Some ( network_graph) = network_graph {
653675 handle_network_graph_update ( network_graph, & event)
654676 }
655677 if let Some ( ref scorer) = scorer {
656- if update_scorer ( scorer, & event) {
657- log_trace ! ( logger, "Persisting scorer after update" ) ;
658- if let Err ( e) = persister. persist_scorer ( & scorer) {
659- log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
678+ if let Some ( duration_since_epoch) = fetch_time ( ) {
679+ if update_scorer ( scorer, & event, duration_since_epoch) {
680+ log_trace ! ( logger, "Persisting scorer after update" ) ;
681+ if let Err ( e) = persister. persist_scorer ( & scorer) {
682+ log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
683+ }
660684 }
661685 }
662686 }
@@ -688,7 +712,7 @@ where
688712 task:: Poll :: Ready ( exit) => { should_break = exit; true } ,
689713 task:: Poll :: Pending => false ,
690714 }
691- } , mobile_interruptable_platform
715+ } , mobile_interruptable_platform, fetch_time ,
692716 )
693717}
694718
@@ -810,7 +834,10 @@ impl BackgroundProcessor {
810834 handle_network_graph_update ( network_graph, & event)
811835 }
812836 if let Some ( ref scorer) = scorer {
813- if update_scorer ( scorer, & event) {
837+ use std:: time:: SystemTime ;
838+ let duration_since_epoch = SystemTime :: now ( ) . duration_since ( SystemTime :: UNIX_EPOCH )
839+ . expect ( "Time should be sometime after 1970" ) ;
840+ if update_scorer ( scorer, & event, duration_since_epoch) {
814841 log_trace ! ( logger, "Persisting scorer after update" ) ;
815842 if let Err ( e) = persister. persist_scorer ( & scorer) {
816843 log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
@@ -829,7 +856,12 @@ impl BackgroundProcessor {
829856 channel_manager. get_event_or_persistence_needed_future( ) ,
830857 chain_monitor. get_update_future( )
831858 ) . wait_timeout( Duration :: from_millis( 100 ) ) ; } ,
832- |_| Instant :: now( ) , |time: & Instant , dur| time. elapsed( ) . as_secs( ) > dur, false
859+ |_| Instant :: now( ) , |time: & Instant , dur| time. elapsed( ) . as_secs( ) > dur, false ,
860+ || {
861+ use std:: time:: SystemTime ;
862+ Some ( SystemTime :: now( ) . duration_since( SystemTime :: UNIX_EPOCH )
863+ . expect( "Time should be sometime after 1970" ) )
864+ } ,
833865 )
834866 } ) ;
835867 Self { stop_thread : stop_thread_clone, thread_handle : Some ( handle) }
@@ -1117,7 +1149,7 @@ mod tests {
11171149 }
11181150
11191151 impl ScoreUpdate for TestScorer {
1120- fn payment_path_failed ( & mut self , actual_path : & Path , actual_short_channel_id : u64 ) {
1152+ fn payment_path_failed ( & mut self , actual_path : & Path , actual_short_channel_id : u64 , _ : Duration ) {
11211153 if let Some ( expectations) = & mut self . event_expectations {
11221154 match expectations. pop_front ( ) . unwrap ( ) {
11231155 TestResult :: PaymentFailure { path, short_channel_id } => {
@@ -1137,7 +1169,7 @@ mod tests {
11371169 }
11381170 }
11391171
1140- fn payment_path_successful ( & mut self , actual_path : & Path ) {
1172+ fn payment_path_successful ( & mut self , actual_path : & Path , _ : Duration ) {
11411173 if let Some ( expectations) = & mut self . event_expectations {
11421174 match expectations. pop_front ( ) . unwrap ( ) {
11431175 TestResult :: PaymentFailure { path, .. } => {
@@ -1156,7 +1188,7 @@ mod tests {
11561188 }
11571189 }
11581190
1159- fn probe_failed ( & mut self , actual_path : & Path , _: u64 ) {
1191+ fn probe_failed ( & mut self , actual_path : & Path , _: u64 , _ : Duration ) {
11601192 if let Some ( expectations) = & mut self . event_expectations {
11611193 match expectations. pop_front ( ) . unwrap ( ) {
11621194 TestResult :: PaymentFailure { path, .. } => {
@@ -1174,7 +1206,7 @@ mod tests {
11741206 }
11751207 }
11761208 }
1177- fn probe_successful ( & mut self , actual_path : & Path ) {
1209+ fn probe_successful ( & mut self , actual_path : & Path , _ : Duration ) {
11781210 if let Some ( expectations) = & mut self . event_expectations {
11791211 match expectations. pop_front ( ) . unwrap ( ) {
11801212 TestResult :: PaymentFailure { path, .. } => {
@@ -1192,6 +1224,7 @@ mod tests {
11921224 }
11931225 }
11941226 }
1227+ fn time_passed ( & mut self , _: Duration ) { }
11951228 }
11961229
11971230 #[ cfg( c_bindings) ]
@@ -1469,7 +1502,7 @@ mod tests {
14691502 tokio:: time:: sleep ( dur) . await ;
14701503 false // Never exit
14711504 } )
1472- } , false ,
1505+ } , false , || Some ( Duration :: ZERO ) ,
14731506 ) ;
14741507 match bp_future. await {
14751508 Ok ( _) => panic ! ( "Expected error persisting manager" ) ,
@@ -1600,7 +1633,7 @@ mod tests {
16001633
16011634 loop {
16021635 let log_entries = nodes[ 0 ] . logger . lines . lock ( ) . unwrap ( ) ;
1603- let expected_log = "Persisting scorer" . to_string ( ) ;
1636+ let expected_log = "Calling time_passed and persisting scorer" . to_string ( ) ;
16041637 if log_entries. get ( & ( "lightning_background_processor" , expected_log) ) . is_some ( ) {
16051638 break
16061639 }
@@ -1699,7 +1732,7 @@ mod tests {
16991732 _ = exit_receiver. changed( ) => true ,
17001733 }
17011734 } )
1702- } , false ,
1735+ } , false , || Some ( Duration :: from_secs ( 1696300000 ) ) ,
17031736 ) ;
17041737
17051738 let t1 = tokio:: spawn ( bp_future) ;
@@ -1874,7 +1907,7 @@ mod tests {
18741907 _ = exit_receiver. changed( ) => true ,
18751908 }
18761909 } )
1877- } , false ,
1910+ } , false , || Some ( Duration :: ZERO ) ,
18781911 ) ;
18791912 let t1 = tokio:: spawn ( bp_future) ;
18801913 let t2 = tokio:: spawn ( async move {
0 commit comments