@@ -31,7 +31,7 @@ use crate::{
3131 error:: { Error as _, RuntimeError } ,
3232 event:: IntoTags ,
3333 keymanager:: { KeyManagerClient , KeyManagerError } ,
34- module:: { self , BlockHandler , MethodHandler , TransactionHandler } ,
34+ module:: { self , BlockHandler , InMsgHandler , InMsgResult , MethodHandler , TransactionHandler } ,
3535 modules,
3636 modules:: core:: API as _,
3737 runtime:: Runtime ,
@@ -550,7 +550,7 @@ impl<R: Runtime> Dispatcher<R> {
550550 messages,
551551 block_tags : block_tags. into_tags ( ) ,
552552 tx_reject_hashes : vec ! [ ] ,
553- in_msgs_count : 0 , // TODO: Support processing incoming messages.
553+ in_msgs_count : 0 ,
554554 } )
555555 }
556556}
@@ -560,17 +560,63 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
560560 & self ,
561561 rt_ctx : transaction:: Context < ' _ > ,
562562 batch : & TxnBatch ,
563- _in_msgs : & [ roothash:: IncomingMessage ] ,
563+ in_msgs : & [ roothash:: IncomingMessage ] ,
564564 ) -> Result < ExecuteBatchResult , RuntimeError > {
565- self . execute_batch_common (
565+ let mut in_msgs_count = 0 ;
566+
567+ let mut result = self . execute_batch_common (
566568 rt_ctx,
567569 |ctx| -> Result < Vec < ExecuteTxResult > , RuntimeError > {
568570 // If prefetch limit is set enable prefetch.
569571 let prefetch_enabled = R :: PREFETCH_LIMIT > 0 ;
572+ let mut results = Vec :: with_capacity ( batch. len ( ) ) ;
573+
574+ // Process incoming messages first.
575+ let mut batch_it = batch. iter ( ) ;
576+ ' inmsg: for in_msg in in_msgs {
577+ match R :: IncomingMessagesHandler :: process_in_msg ( ctx, & in_msg) {
578+ InMsgResult :: Skip => {
579+ // Skip, but treat as processed.
580+ in_msgs_count += 1 ;
581+ }
582+ InMsgResult :: Execute ( raw_tx, tx) => {
583+ // Verify that the transaction has been included in the batch.
584+ match batch_it. next ( ) {
585+ None => {
586+ // Nothing in the batch when there should be an incoming message.
587+ return Err ( Error :: MalformedTransactionInBatch ( anyhow ! (
588+ "missing incoming message"
589+ ) )
590+ . into ( ) ) ;
591+ }
592+ Some ( batch_tx) if batch_tx != raw_tx => {
593+ // Incoming message does not match what is in the batch.
594+ return Err ( Error :: MalformedTransactionInBatch ( anyhow ! (
595+ "mismatched incoming message"
596+ ) )
597+ . into ( ) ) ;
598+ }
599+ _ => {
600+ // Everything is ok.
601+ }
602+ }
603+
604+ // Further execute the inner transaction. The transaction has already
605+ // passed checks so it is ok to include in a block.
606+ let tx_size = raw_tx. len ( ) . try_into ( ) . unwrap ( ) ;
607+ let index = results. len ( ) ;
608+ results. push ( Self :: execute_tx ( ctx, tx_size, tx, index) ?) ;
609+
610+ in_msgs_count += 1 ;
611+ }
612+ InMsgResult :: Stop => break ' inmsg,
613+ }
614+ }
570615
616+ let inmsg_txs = results. len ( ) ;
571617 let mut txs = Vec :: with_capacity ( batch. len ( ) ) ;
572618 let mut prefixes: BTreeSet < Prefix > = BTreeSet :: new ( ) ;
573- for tx in batch. iter ( ) {
619+ for tx in batch. iter ( ) . skip ( inmsg_txs ) {
574620 let tx_size = tx. len ( ) . try_into ( ) . map_err ( |_| {
575621 Error :: MalformedTransactionInBatch ( anyhow ! ( "transaction too large" ) )
576622 } ) ?;
@@ -593,24 +639,29 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
593639 }
594640
595641 // Execute the batch.
596- let mut results = Vec :: with_capacity ( batch. len ( ) ) ;
597- for ( index, ( tx_size, tx) ) in txs. into_iter ( ) . enumerate ( ) {
642+ for ( index, ( tx_size, tx) ) in txs. into_iter ( ) . skip ( inmsg_txs) . enumerate ( ) {
598643 results. push ( Self :: execute_tx ( ctx, tx_size, tx, index) ?) ;
599644 }
600645
601646 Ok ( results)
602647 } ,
603- )
648+ ) ?;
649+
650+ // Include number of processed incoming messages in the final result.
651+ result. in_msgs_count = in_msgs_count;
652+
653+ Ok ( result)
604654 }
605655
606656 fn schedule_and_execute_batch (
607657 & self ,
608658 rt_ctx : transaction:: Context < ' _ > ,
609659 batch : & mut TxnBatch ,
610- _in_msgs : & [ roothash:: IncomingMessage ] ,
660+ in_msgs : & [ roothash:: IncomingMessage ] ,
611661 ) -> Result < ExecuteBatchResult , RuntimeError > {
612662 let cfg = R :: SCHEDULE_CONTROL ;
613663 let mut tx_reject_hashes = Vec :: new ( ) ;
664+ let mut in_msgs_count = 0 ;
614665
615666 let mut result = self . execute_batch_common (
616667 rt_ctx,
@@ -620,13 +671,35 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
620671 // The idea is to keep scheduling transactions as long as we have some space
621672 // available in the block as determined by gas use.
622673 let mut new_batch = Vec :: new ( ) ;
623- let mut results = Vec :: with_capacity ( batch. len ( ) ) ;
674+ let mut results = Vec :: with_capacity ( in_msgs . len ( ) + batch. len ( ) ) ;
624675 let mut requested_batch_len = cfg. initial_batch_size ;
676+
677+ // Process incoming messages first.
678+ ' inmsg: for in_msg in in_msgs {
679+ match R :: IncomingMessagesHandler :: process_in_msg ( ctx, & in_msg) {
680+ InMsgResult :: Skip => {
681+ // Skip, but treat as processed.
682+ in_msgs_count += 1 ;
683+ }
684+ InMsgResult :: Execute ( raw_tx, tx) => {
685+ // Further execute the inner transaction. The transaction has already
686+ // passed checks so it is ok to include in a block.
687+ let tx_size = raw_tx. len ( ) . try_into ( ) . unwrap ( ) ;
688+ let index = new_batch. len ( ) ;
689+ new_batch. push ( raw_tx. to_owned ( ) ) ;
690+ results. push ( Self :: execute_tx ( ctx, tx_size, tx, index) ?) ;
691+
692+ in_msgs_count += 1 ;
693+ }
694+ InMsgResult :: Stop => break ' inmsg,
695+ }
696+ }
697+
698+ // Process regular transactions.
625699 ' batch: loop {
626700 // Remember length of last batch.
627701 let last_batch_len = batch. len ( ) ;
628702 let last_batch_tx_hash = batch. last ( ) . map ( |raw_tx| Hash :: digest_bytes ( raw_tx) ) ;
629-
630703 for raw_tx in batch. drain ( ..) {
631704 // If we don't have enough gas for processing even the cheapest transaction
632705 // we are done. Same if we reached the runtime-imposed maximum tx count.
@@ -731,8 +804,10 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
731804 } ,
732805 ) ?;
733806
734- // Include rejected transaction hashes in the final result.
807+ // Include rejected transaction hashes and number of processed incoming messages in the
808+ // final result.
735809 result. tx_reject_hashes = tx_reject_hashes;
810+ result. in_msgs_count = in_msgs_count;
736811
737812 Ok ( result)
738813 }
@@ -919,6 +994,7 @@ mod test {
919994 core:: Genesis {
920995 parameters : core:: Parameters {
921996 max_batch_gas : u64:: MAX ,
997+ max_inmsg_gas : 0 ,
922998 max_tx_size : 32 * 1024 ,
923999 max_tx_signers : 1 ,
9241000 max_multisig_signers : 8 ,
@@ -927,6 +1003,7 @@ mod test {
9271003 auth_signature : 0 ,
9281004 auth_multisig_signer : 0 ,
9291005 callformat_x25519_deoxysii : 0 ,
1006+ inmsg_base : 0 ,
9301007 } ,
9311008 min_gas_price : BTreeMap :: from ( [ ( token:: Denomination :: NATIVE , 0 ) ] ) ,
9321009 } ,
0 commit comments