@@ -84,30 +84,33 @@ impl_writeable_tlv_based_enum!(Event,
8484 } ;
8585) ;
8686
87- pub struct EventQueue < K : Deref >
87+ pub struct EventQueue < K : Deref , L : Deref >
8888where
8989 K :: Target : KVStore ,
90+ L :: Target : Logger ,
9091{
9192 queue : Mutex < VecDeque < Event > > ,
9293 notifier : Condvar ,
9394 kv_store : K ,
95+ logger : L ,
9496}
9597
96- impl < K : Deref > EventQueue < K >
98+ impl < K : Deref , L : Deref > EventQueue < K , L >
9799where
98100 K :: Target : KVStore ,
101+ L :: Target : Logger ,
99102{
100- pub ( crate ) fn new ( kv_store : K ) -> Self {
103+ pub ( crate ) fn new ( kv_store : K , logger : L ) -> Self {
101104 let queue: Mutex < VecDeque < Event > > = Mutex :: new ( VecDeque :: new ( ) ) ;
102105 let notifier = Condvar :: new ( ) ;
103- Self { queue, notifier, kv_store }
106+ Self { queue, notifier, kv_store, logger }
104107 }
105108
106109 pub ( crate ) fn add_event ( & self , event : Event ) -> Result < ( ) , Error > {
107110 {
108111 let mut locked_queue = self . queue . lock ( ) . unwrap ( ) ;
109112 locked_queue. push_back ( event) ;
110- self . persist_queue ( & locked_queue) ?;
113+ self . write_queue_and_commit ( & locked_queue) ?;
111114 }
112115
113116 self . notifier . notify_one ( ) ;
@@ -124,37 +127,64 @@ where
124127 {
125128 let mut locked_queue = self . queue . lock ( ) . unwrap ( ) ;
126129 locked_queue. pop_front ( ) ;
127- self . persist_queue ( & locked_queue) ?;
130+ self . write_queue_and_commit ( & locked_queue) ?;
128131 }
129132 self . notifier . notify_one ( ) ;
130133 Ok ( ( ) )
131134 }
132135
133- fn persist_queue ( & self , locked_queue : & VecDeque < Event > ) -> Result < ( ) , Error > {
136+ fn write_queue_and_commit ( & self , locked_queue : & VecDeque < Event > ) -> Result < ( ) , Error > {
134137 let mut writer = self
135138 . kv_store
136139 . write ( EVENT_QUEUE_PERSISTENCE_NAMESPACE , EVENT_QUEUE_PERSISTENCE_KEY )
137- . map_err ( |_| Error :: PersistenceFailed ) ?;
138- EventQueueSerWrapper ( locked_queue)
139- . write ( & mut writer)
140- . map_err ( |_| Error :: PersistenceFailed ) ?;
141- writer. commit ( ) . map_err ( |_| Error :: PersistenceFailed ) ?;
140+ . map_err ( |e| {
141+ log_error ! (
142+ self . logger,
143+ "Getting writer for key {}/{} failed due to: {}" ,
144+ EVENT_QUEUE_PERSISTENCE_NAMESPACE ,
145+ EVENT_QUEUE_PERSISTENCE_KEY ,
146+ e
147+ ) ;
148+ Error :: PersistenceFailed
149+ } ) ?;
150+ EventQueueSerWrapper ( locked_queue) . write ( & mut writer) . map_err ( |e| {
151+ log_error ! (
152+ self . logger,
153+ "Writing event queue data to key {}/{} failed due to: {}" ,
154+ EVENT_QUEUE_PERSISTENCE_NAMESPACE ,
155+ EVENT_QUEUE_PERSISTENCE_KEY ,
156+ e
157+ ) ;
158+ Error :: PersistenceFailed
159+ } ) ?;
160+ writer. commit ( ) . map_err ( |e| {
161+ log_error ! (
162+ self . logger,
163+ "Committing event queue data to key {}/{} failed due to: {}" ,
164+ EVENT_QUEUE_PERSISTENCE_NAMESPACE ,
165+ EVENT_QUEUE_PERSISTENCE_KEY ,
166+ e
167+ ) ;
168+ Error :: PersistenceFailed
169+ } ) ?;
142170 Ok ( ( ) )
143171 }
144172}
145173
146- impl < K : Deref > ReadableArgs < K > for EventQueue < K >
174+ impl < K : Deref , L : Deref > ReadableArgs < ( K , L ) > for EventQueue < K , L >
147175where
148176 K :: Target : KVStore ,
177+ L :: Target : Logger ,
149178{
150179 #[ inline]
151180 fn read < R : lightning:: io:: Read > (
152- reader : & mut R , kv_store : K ,
181+ reader : & mut R , args : ( K , L ) ,
153182 ) -> Result < Self , lightning:: ln:: msgs:: DecodeError > {
183+ let ( kv_store, logger) = args;
154184 let read_queue: EventQueueDeserWrapper = Readable :: read ( reader) ?;
155185 let queue: Mutex < VecDeque < Event > > = Mutex :: new ( read_queue. 0 ) ;
156186 let notifier = Condvar :: new ( ) ;
157- Ok ( Self { queue, notifier, kv_store } )
187+ Ok ( Self { queue, notifier, kv_store, logger } )
158188 }
159189}
160190
@@ -191,11 +221,11 @@ where
191221 L :: Target : Logger ,
192222{
193223 wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > ,
194- event_queue : Arc < EventQueue < K > > ,
224+ event_queue : Arc < EventQueue < K , L > > ,
195225 channel_manager : Arc < ChannelManager > ,
196226 network_graph : Arc < NetworkGraph > ,
197227 keys_manager : Arc < KeysManager > ,
198- payment_store : Arc < PaymentInfoStorage < K > > ,
228+ payment_store : Arc < PaymentInfoStorage < K , L > > ,
199229 tokio_runtime : Arc < tokio:: runtime:: Runtime > ,
200230 logger : L ,
201231 _config : Arc < Config > ,
@@ -207,9 +237,9 @@ where
207237 L :: Target : Logger ,
208238{
209239 pub fn new (
210- wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > , event_queue : Arc < EventQueue < K > > ,
240+ wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > , event_queue : Arc < EventQueue < K , L > > ,
211241 channel_manager : Arc < ChannelManager > , network_graph : Arc < NetworkGraph > ,
212- keys_manager : Arc < KeysManager > , payment_store : Arc < PaymentInfoStorage < K > > ,
242+ keys_manager : Arc < KeysManager > , payment_store : Arc < PaymentInfoStorage < K , L > > ,
213243 tokio_runtime : Arc < tokio:: runtime:: Runtime > , logger : L , _config : Arc < Config > ,
214244 ) -> Self {
215245 Self {
@@ -557,12 +587,13 @@ where
557587#[ cfg( test) ]
558588mod tests {
559589 use super :: * ;
560- use crate :: test:: utils:: TestStore ;
590+ use crate :: test:: utils:: { TestLogger , TestStore } ;
561591
562592 #[ test]
563593 fn event_queue_persistence ( ) {
564594 let store = Arc :: new ( TestStore :: new ( ) ) ;
565- let event_queue = EventQueue :: new ( Arc :: clone ( & store) ) ;
595+ let logger = Arc :: new ( TestLogger :: new ( ) ) ;
596+ let event_queue = EventQueue :: new ( Arc :: clone ( & store) , Arc :: clone ( & logger) ) ;
566597
567598 let expected_event = Event :: ChannelReady { channel_id : [ 23u8 ; 32 ] , user_channel_id : 2323 } ;
568599 event_queue. add_event ( expected_event. clone ( ) ) . unwrap ( ) ;
@@ -579,7 +610,7 @@ mod tests {
579610 . get_persisted_bytes ( EVENT_QUEUE_PERSISTENCE_NAMESPACE , EVENT_QUEUE_PERSISTENCE_KEY )
580611 . unwrap ( ) ;
581612 let deser_event_queue =
582- EventQueue :: read ( & mut & persisted_bytes[ ..] , Arc :: clone ( & store) ) . unwrap ( ) ;
613+ EventQueue :: read ( & mut & persisted_bytes[ ..] , ( Arc :: clone ( & store) , logger ) ) . unwrap ( ) ;
583614 assert_eq ! ( deser_event_queue. next_event( ) , expected_event) ;
584615 assert ! ( !store. get_and_clear_did_persist( ) ) ;
585616
0 commit comments