@@ -19,7 +19,7 @@ use crate::{
1919 cache:: Cache ,
2020 config:: { Config , ELECTRS_VERSION } ,
2121 daemon:: { self , extract_bitcoind_error, Daemon } ,
22- mempool:: MempoolSyncUpdate ,
22+ mempool:: { self , MempoolSyncUpdate } ,
2323 merkle:: Proof ,
2424 metrics:: { self , Histogram , Metrics } ,
2525 signals:: Signal ,
@@ -49,6 +49,20 @@ struct Request {
4949 params : Value ,
5050}
5151
52+ struct CallResult {
53+ response : Value ,
54+ mempool_update : MempoolSyncUpdate ,
55+ }
56+
57+ impl CallResult {
58+ fn new < T : serde:: Serialize > ( response : T ) -> CallResult {
59+ CallResult {
60+ response : json ! ( response) ,
61+ mempool_update : MempoolSyncUpdate :: default ( ) ,
62+ }
63+ }
64+ }
65+
5266#[ derive( Deserialize ) ]
5367#[ serde( untagged) ]
5468enum Requests {
@@ -372,11 +386,20 @@ impl Rpc {
372386 Ok ( status)
373387 }
374388
375- fn transaction_broadcast ( & self , ( tx_hex, ) : & ( String , ) ) -> Result < Value > {
389+ fn transaction_broadcast ( & self , ( tx_hex, ) : & ( String , ) ) -> Result < ( Value , MempoolSyncUpdate ) > {
376390 let tx_bytes = Vec :: from_hex ( tx_hex) . context ( "non-hex transaction" ) ?;
377391 let tx = deserialize ( & tx_bytes) . context ( "invalid transaction" ) ?;
378392 let txid = self . daemon . broadcast ( & tx) ?;
379- Ok ( json ! ( txid) )
393+
394+ // Try to fetch the mempool entry immediately, so we can return an update
395+ // to be applied to the mempool.
396+ let mut mempool_update = MempoolSyncUpdate :: default ( ) ;
397+ if let Ok ( rpc_entry) = self . daemon . get_mempool_entry ( & txid) {
398+ let entry = mempool:: Entry :: new ( txid, tx, rpc_entry) ;
399+ mempool_update. new_entries . push ( entry) ;
400+ }
401+
402+ Ok ( ( json ! ( txid) , mempool_update) )
380403 }
381404
382405 fn transaction_get ( & self , args : & TxGetArgs ) -> Result < Value > {
@@ -475,7 +498,7 @@ impl Rpc {
475498 } ) )
476499 }
477500
478- pub fn handle_requests ( & self , client : & mut Client , lines : & [ String ] ) -> Vec < String > {
501+ pub fn handle_requests ( & mut self , client : & mut Client , lines : & [ String ] ) -> Vec < String > {
479502 lines
480503 . iter ( )
481504 . map ( |line| {
@@ -487,7 +510,7 @@ impl Rpc {
487510 . collect ( )
488511 }
489512
490- fn handle_calls ( & self , client : & mut Client , calls : Result < Calls , Value > ) -> Value {
513+ fn handle_calls ( & mut self , client : & mut Client , calls : Result < Calls , Value > ) -> Value {
491514 let calls: Calls = match calls {
492515 Ok ( calls) => calls,
493516 Err ( response) => return response, // JSON parsing failed - the response does not contain request id
@@ -498,12 +521,33 @@ impl Rpc {
498521 if let Some ( result) = self . try_multi_call ( client, & batch) {
499522 return json ! ( result) ;
500523 }
501- json ! ( batch
524+ let responses = batch
502525 . into_iter ( )
503- . map( |result| self . single_call( client, result) )
504- . collect:: <Vec <Value >>( ) )
526+ . map ( |call| {
527+ let CallResult {
528+ response,
529+ mempool_update,
530+ } = self . single_call ( client, call) ;
531+
532+ // Apply the mempool update immediately, so that the next
533+ // response will reflect the updated mempool state.
534+ self . mempool_apply ( mempool_update) ;
535+
536+ response
537+ } )
538+ . collect :: < Vec < Value > > ( ) ;
539+ json ! ( responses)
540+ }
541+
542+ Calls :: Single ( call) => {
543+ let CallResult {
544+ response,
545+ mempool_update,
546+ } = self . single_call ( client, call) ;
547+
548+ self . mempool_apply ( mempool_update) ;
549+ response
505550 }
506- Calls :: Single ( result) => self . single_call ( client, result) ,
507551 }
508552 }
509553
@@ -538,10 +582,10 @@ impl Rpc {
538582 )
539583 }
540584
541- fn single_call ( & self , client : & mut Client , call : Result < Call , Value > ) -> Value {
585+ fn single_call ( & self , client : & mut Client , call : Result < Call , Value > ) -> CallResult {
542586 let call = match call {
543587 Ok ( call) => call,
544- Err ( response) => return response, // params parsing may fail - the response contains request id
588+ Err ( response) => return CallResult :: new ( response) , // params parsing may fail - the response contains request id
545589 } ;
546590 self . rpc_duration . observe_duration ( & call. method , || {
547591 if self . tracker . status ( ) . is_err ( ) {
@@ -551,9 +595,11 @@ impl Rpc {
551595 | Params :: BlockHeaders ( _)
552596 | Params :: HeadersSubscribe
553597 | Params :: Version ( _) => ( ) ,
554- _ => return error_msg ( & call. id , RpcError :: UnavailableIndex ) ,
598+ _ => return CallResult :: new ( error_msg ( & call. id , RpcError :: UnavailableIndex ) ) ,
555599 } ;
556600 }
601+
602+ let mut mempool_update = MempoolSyncUpdate :: default ( ) ;
557603 let result = match & call. params {
558604 Params :: Banner => Ok ( json ! ( self . banner) ) ,
559605 Params :: BlockHeader ( args) => self . block_header ( * args) ,
@@ -571,13 +617,23 @@ impl Rpc {
571617 Params :: ScriptHashListUnspent ( args) => self . scripthash_list_unspent ( client, args) ,
572618 Params :: ScriptHashSubscribe ( args) => self . scripthash_subscribe ( client, args) ,
573619 Params :: ScriptHashUnsubscribe ( args) => self . scripthash_unsubscribe ( client, args) ,
574- Params :: TransactionBroadcast ( args) => self . transaction_broadcast ( args) ,
620+ Params :: TransactionBroadcast ( args) => {
621+ self . transaction_broadcast ( args)
622+ . map ( |( result, sync_update) | {
623+ mempool_update = sync_update; // extract the mempool sync update
624+ result
625+ } )
626+ }
575627 Params :: TransactionGet ( args) => self . transaction_get ( args) ,
576628 Params :: TransactionGetMerkle ( args) => self . transaction_get_merkle ( args) ,
577629 Params :: TransactionFromPosition ( args) => self . transaction_from_pos ( * args) ,
578630 Params :: Version ( args) => self . version ( args) ,
579631 } ;
580- call. response ( result)
632+
633+ CallResult {
634+ response : call. response ( result) ,
635+ mempool_update,
636+ }
581637 } )
582638 }
583639}
0 commit comments