@@ -8,18 +8,23 @@ use crate::wallet::Wallet;
88use crate :: { Error , KeysManager } ;
99
1010use lightning:: chain:: chaininterface:: { BroadcasterInterface , ConfirmationTarget , FeeEstimator } ;
11- use lightning:: chain:: BestBlock ;
11+ use lightning:: chain:: { self , BestBlock , Confirm , Filter , Listen } ;
1212use lightning:: impl_writeable_tlv_based;
1313use lightning:: sign:: { EntropySource , SpendableOutputDescriptor } ;
1414use lightning:: util:: persist:: KVStore ;
1515use lightning:: util:: ser:: Writeable ;
1616
1717use bitcoin:: secp256k1:: Secp256k1 ;
18- use bitcoin:: { BlockHash , LockTime , PackedLockTime , Transaction } ;
18+ use bitcoin:: { BlockHash , BlockHeader , LockTime , PackedLockTime , Transaction , Txid } ;
1919
20+ use std:: collections:: HashSet ;
2021use std:: ops:: Deref ;
2122use std:: sync:: { Arc , Mutex } ;
2223
24+ const CONSIDERED_SPENT_THRESHOLD_CONF : u32 = 6 ;
25+
26+ const REGENERATE_SPEND_THRESHOLD : u32 = 144 ;
27+
2328#[ derive( Clone , Debug , PartialEq , Eq ) ]
2429pub ( crate ) struct SpendableOutputInfo {
2530 id : [ u8 ; 32 ] ,
@@ -37,29 +42,43 @@ impl_writeable_tlv_based!(SpendableOutputInfo, {
3742 ( 8 , confirmed_in_block, option) ,
3843} ) ;
3944
40- pub ( crate ) struct OutputSweeper < K : KVStore + Sync + Send , L : Deref >
45+ pub ( crate ) struct OutputSweeper < K : KVStore + Sync + Send , F : Deref , L : Deref >
4146where
47+ F :: Target : Filter ,
4248 L :: Target : Logger ,
4349{
4450 outputs : Mutex < Vec < SpendableOutputInfo > > ,
4551 wallet : Arc < Wallet < bdk:: database:: SqliteDatabase , L > > ,
4652 keys_manager : Arc < KeysManager > ,
4753 kv_store : Arc < K > ,
4854 best_block : Mutex < BestBlock > ,
55+ chain_source : Option < F > ,
4956 logger : L ,
5057}
5158
52- impl < K : KVStore + Sync + Send , L : Deref > OutputSweeper < K , L >
59+ impl < K : KVStore + Sync + Send , F : Deref , L : Deref > OutputSweeper < K , F , L >
5360where
61+ F :: Target : Filter ,
5462 L :: Target : Logger ,
5563{
5664 pub ( crate ) fn new (
5765 outputs : Vec < SpendableOutputInfo > , wallet : Arc < Wallet < bdk:: database:: SqliteDatabase , L > > ,
58- keys_manager : Arc < KeysManager > , kv_store : Arc < K > , best_block : BestBlock , logger : L ,
66+ keys_manager : Arc < KeysManager > , kv_store : Arc < K > , best_block : BestBlock ,
67+ chain_source : Option < F > , logger : L ,
5968 ) -> Self {
69+ if let Some ( filter) = chain_source. as_ref ( ) {
70+ for o in & outputs {
71+ if let Some ( tx) = o. spending_tx . as_ref ( ) {
72+ if let Some ( tx_out) = tx. output . first ( ) {
73+ filter. register_tx ( & tx. txid ( ) , & tx_out. script_pubkey ) ;
74+ }
75+ }
76+ }
77+ }
78+
6079 let outputs = Mutex :: new ( outputs) ;
6180 let best_block = Mutex :: new ( best_block) ;
62- Self { outputs, wallet, keys_manager, kv_store, best_block, logger }
81+ Self { outputs, wallet, keys_manager, kv_store, best_block, chain_source , logger }
6382 }
6483
6584 pub ( crate ) fn add_outputs ( & self , mut output_descriptors : Vec < SpendableOutputDescriptor > ) {
7695 match self . get_spending_tx ( & non_static_outputs, cur_height) {
7796 Ok ( spending_tx) => {
7897 self . wallet . broadcast_transactions ( & [ & spending_tx] ) ;
98+ if let Some ( filter) = self . chain_source . as_ref ( ) {
99+ if let Some ( tx_out) = spending_tx. output . first ( ) {
100+ filter. register_tx ( & spending_tx. txid ( ) , & tx_out. script_pubkey ) ;
101+ }
102+ }
79103 ( Some ( spending_tx) , Some ( cur_height) )
80104 }
81105 Err ( e) => {
@@ -150,3 +174,199 @@ where
150174 } )
151175 }
152176}
177+
178+ impl < K : KVStore + Sync + Send , F : Deref , L : Deref > Listen for OutputSweeper < K , F , L >
179+ where
180+ F :: Target : Filter ,
181+ L :: Target : Logger ,
182+ {
183+ fn filtered_block_connected (
184+ & self , header : & BlockHeader , txdata : & chain:: transaction:: TransactionData , height : u32 ,
185+ ) {
186+ {
187+ let best_block = self . best_block . lock ( ) . unwrap ( ) ;
188+ assert_eq ! ( best_block. block_hash( ) , header. prev_blockhash,
189+ "Blocks must be connected in chain-order - the connected header must build on the last connected header" ) ;
190+ assert_eq ! ( best_block. height( ) , height - 1 ,
191+ "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height" ) ;
192+ }
193+
194+ self . transactions_confirmed ( header, txdata, height) ;
195+ self . best_block_updated ( header, height) ;
196+ }
197+
198+ fn block_disconnected ( & self , header : & BlockHeader , height : u32 ) {
199+ let new_height = height - 1 ;
200+ {
201+ let mut best_block = self . best_block . lock ( ) . unwrap ( ) ;
202+ assert_eq ! ( best_block. block_hash( ) , header. block_hash( ) ,
203+ "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header" ) ;
204+ assert_eq ! ( best_block. height( ) , height,
205+ "Blocks must be disconnected in chain-order - the disconnected block must have the correct height" ) ;
206+ * best_block = BestBlock :: new ( header. prev_blockhash , new_height)
207+ }
208+
209+ let mut locked_outputs = self . outputs . lock ( ) . unwrap ( ) ;
210+ for output_info in locked_outputs. iter_mut ( ) {
211+ if output_info. confirmed_in_block == Some ( ( height, header. block_hash ( ) ) ) {
212+ output_info. confirmed_in_block = None ;
213+ match self . persist_info ( output_info) {
214+ Ok ( ( ) ) => { }
215+ Err ( e) => {
216+ log_error ! ( self . logger, "Error persisting spendable output info: {:?}" , e)
217+ }
218+ }
219+ }
220+ }
221+ }
222+ }
223+
224+ impl < K : KVStore + Sync + Send , F : Deref , L : Deref > Confirm for OutputSweeper < K , F , L >
225+ where
226+ F :: Target : Filter ,
227+ L :: Target : Logger ,
228+ {
229+ fn transactions_confirmed (
230+ & self , header : & BlockHeader , txdata : & chain:: transaction:: TransactionData , height : u32 ,
231+ ) {
232+ let mut locked_outputs = self . outputs . lock ( ) . unwrap ( ) ;
233+ for ( _, tx) in txdata {
234+ locked_outputs
235+ . iter_mut ( )
236+ . filter ( |o| o. spending_tx . as_ref ( ) . map ( |t| t. txid ( ) ) == Some ( tx. txid ( ) ) )
237+ . for_each ( |o| {
238+ o. confirmed_in_block = Some ( ( height, header. block_hash ( ) ) ) ;
239+ match self . persist_info ( o) {
240+ Ok ( ( ) ) => { }
241+ Err ( e) => {
242+ log_error ! (
243+ self . logger,
244+ "Error persisting spendable output info: {:?}" ,
245+ e
246+ )
247+ }
248+ }
249+ } ) ;
250+ }
251+ }
252+
253+ fn transaction_unconfirmed ( & self , txid : & Txid ) {
254+ let mut locked_outputs = self . outputs . lock ( ) . unwrap ( ) ;
255+
256+ // Get what height was unconfirmed.
257+ let unconf_height = locked_outputs
258+ . iter ( )
259+ . find ( |o| o. spending_tx . as_ref ( ) . map ( |t| t. txid ( ) ) == Some ( * txid) )
260+ . and_then ( |o| o. confirmed_in_block )
261+ . map ( |t| t. 0 ) ;
262+
263+ // Unconfirm all >= this height.
264+ locked_outputs
265+ . iter_mut ( )
266+ . filter ( |o| o. confirmed_in_block . map ( |t| t. 0 ) >= unconf_height)
267+ . for_each ( |o| {
268+ o. confirmed_in_block = None ;
269+ match self . persist_info ( o) {
270+ Ok ( ( ) ) => { }
271+ Err ( e) => {
272+ log_error ! ( self . logger, "Error persisting spendable output info: {:?}" , e)
273+ }
274+ }
275+ } ) ;
276+ }
277+
278+ fn best_block_updated ( & self , header : & BlockHeader , height : u32 ) {
279+ * self . best_block . lock ( ) . unwrap ( ) = BestBlock :: new ( header. block_hash ( ) , height) ;
280+
281+ let mut locked_outputs = self . outputs . lock ( ) . unwrap ( ) ;
282+
283+ // Regenerate spending tx and fee bump all outputs that didn't get confirmed by now.
284+ for output_info in locked_outputs. iter_mut ( ) . filter ( |o| o. confirmed_in_block . is_none ( ) ) {
285+ let bcast_height = output_info. broadcast_height . unwrap_or ( 0 ) ;
286+ if height >= bcast_height + REGENERATE_SPEND_THRESHOLD {
287+ let output_descriptors = vec ! [ output_info. descriptor. clone( ) ] ;
288+ match self . get_spending_tx ( & output_descriptors, height) {
289+ Ok ( Some ( spending_tx) ) => {
290+ if let Some ( filter) = self . chain_source . as_ref ( ) {
291+ if let Some ( tx_out) = spending_tx. output . first ( ) {
292+ filter. register_tx ( & spending_tx. txid ( ) , & tx_out. script_pubkey ) ;
293+ }
294+ }
295+ output_info. spending_tx = Some ( spending_tx) ;
296+ output_info. broadcast_height = Some ( height) ;
297+ match self . persist_info ( output_info) {
298+ Ok ( ( ) ) => { }
299+ Err ( e) => {
300+ log_error ! (
301+ self . logger,
302+ "Error persisting spendable output info: {:?}" ,
303+ e
304+ )
305+ }
306+ }
307+ }
308+ Ok ( None ) => {
309+ log_debug ! (
310+ self . logger,
311+ "Omitted spending static outputs: {:?}" ,
312+ output_descriptors
313+ ) ;
314+ }
315+ Err ( err) => {
316+ log_error ! ( self . logger, "Error spending outputs: {:?}" , err) ;
317+ }
318+ } ;
319+ }
320+ }
321+
322+ // Prune all outputs that have sufficient depth by now.
323+ locked_outputs. retain ( |o| {
324+ if let Some ( ( conf_height, _) ) = o. confirmed_in_block {
325+ if height >= conf_height + CONSIDERED_SPENT_THRESHOLD_CONF - 1 {
326+ let key = hex_utils:: to_string ( & o. id ) ;
327+ match self . kv_store . remove (
328+ SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE ,
329+ SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE ,
330+ & key,
331+ false ,
332+ ) {
333+ Ok ( _) => return false ,
334+ Err ( e) => {
335+ log_error ! (
336+ self . logger,
337+ "Removal of key {}/{}/{} failed due to: {}" ,
338+ SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE ,
339+ SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE ,
340+ key,
341+ e
342+ ) ;
343+ return true ;
344+ }
345+ }
346+ }
347+ }
348+ true
349+ } ) ;
350+
351+ // Rebroadcast all pending spending txs
352+ let mut txs = locked_outputs
353+ . iter ( )
354+ . filter_map ( |o| o. spending_tx . as_ref ( ) )
355+ . collect :: < HashSet < & Transaction > > ( ) ;
356+ self . wallet . broadcast_transactions ( & txs. drain ( ) . collect :: < Vec < _ > > ( ) ) ;
357+ }
358+
359+ fn get_relevant_txids ( & self ) -> Vec < ( Txid , Option < BlockHash > ) > {
360+ let locked_outputs = self . outputs . lock ( ) . unwrap ( ) ;
361+ locked_outputs
362+ . iter ( )
363+ . filter_map ( |o| {
364+ if let Some ( tx) = o. spending_tx . as_ref ( ) {
365+ Some ( ( tx. txid ( ) , o. confirmed_in_block . map ( |c| c. 1 ) ) )
366+ } else {
367+ None
368+ }
369+ } )
370+ . collect :: < Vec < _ > > ( )
371+ }
372+ }
0 commit comments