Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 177 additions & 12 deletions src/domain/relayer/evm/evm_relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::{
EvmNetwork, HealthCheckFailure, JsonRpcRequest, JsonRpcResponse, NetworkRepoModel,
NetworkRpcRequest, NetworkRpcResult, NetworkTransactionRequest, NetworkType,
PaginationQuery, RelayerRepoModel, RelayerStatus, RepositoryError, RpcErrorCodes,
TransactionRepoModel, TransactionStatus,
TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
},
repositories::{NetworkRepository, RelayerRepository, Repository, TransactionRepository},
services::{
Expand All @@ -57,7 +57,7 @@ use crate::{
};
use async_trait::async_trait;
use eyre::Result;
use tracing::{debug, info, instrument, warn};
use tracing::{debug, error, info, instrument, warn};

use super::{create_error_response, create_success_response, EvmTransactionValidator};
use crate::utils::{map_provider_error, sanitize_error_description};
Expand Down Expand Up @@ -292,16 +292,11 @@ where
.await
.map_err(|e| RepositoryError::TransactionFailure(e.to_string()))?;

// Queue preparation job (immediate)
self.job_producer
.produce_transaction_request_job(
TransactionRequest::new(transaction.id.clone(), transaction.relayer_id.clone()),
None,
)
.await?;

// Queue status check job (with initial delay)
self.job_producer
// Status check FIRST - this is our safety net for monitoring.
// If this fails, mark transaction as failed and don't proceed.
// This ensures we never have an unmonitored transaction.
if let Err(e) = self
.job_producer
.produce_check_transaction_status_job(
TransactionStatusCheck::new(
transaction.id.clone(),
Expand All @@ -312,6 +307,44 @@ where
EVM_STATUS_CHECK_INITIAL_DELAY_SECONDS,
)),
)
.await
{
// Status queue failed - mark transaction as failed to prevent orphaned tx
error!(
relayer_id = %self.relayer.id,
transaction_id = %transaction.id,
error = %e,
"Status check queue push failed - marking transaction as failed"
);
if let Err(update_err) = self
.transaction_repository
.partial_update(
transaction.id.clone(),
TransactionUpdateRequest {
status: Some(TransactionStatus::Failed),
status_reason: Some("Queue unavailable".to_string()),
..Default::default()
},
)
.await
{
warn!(
relayer_id = %self.relayer.id,
transaction_id = %transaction.id,
error = %update_err,
"Failed to mark transaction as failed after queue push failure"
);
}
return Err(e.into());
}

// Now safe to push transaction request.
// Even if this fails, status check will monitor and detect the stuck transaction.
self.job_producer
.produce_transaction_request_job(
TransactionRequest::new(transaction.id.clone(), transaction.relayer_id.clone()),
None,
)
.await?;

Ok(transaction)
Expand Down Expand Up @@ -961,6 +994,138 @@ mod tests {
assert!(result.is_ok());
}

#[tokio::test]
async fn test_process_transaction_request_status_check_failure_returns_error() {
let (
provider,
relayer_repo,
mut network_repo,
mut tx_repo,
mut job_producer,
signer,
counter,
) = setup_mocks();
let relayer_model = create_test_relayer();

let network_tx = NetworkTransactionRequest::Evm(crate::models::EvmTransactionRequest {
to: Some("0xRecipient".to_string()),
value: U256::from(1000000000000000000u64),
data: Some("0xData".to_string()),
gas_limit: Some(21000),
gas_price: Some(20000000000),
max_fee_per_gas: None,
max_priority_fee_per_gas: None,
speed: None,
valid_until: None,
});

network_repo
.expect_get_by_name()
.with(eq(NetworkType::Evm), eq("mainnet"))
.returning(|_, _| Ok(Some(create_test_network_repo_model())));

tx_repo.expect_create().returning(Ok);
// When status check fails, transaction is marked as failed
tx_repo
.expect_partial_update()
.returning(|_, _| Ok(TransactionRepoModel::default()));

// Status check fails
job_producer
.expect_produce_check_transaction_status_job()
.returning(|_, _| {
Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
"Failed to queue job".to_string(),
))))
});

// Transaction request should NOT be called when status check fails
// (no expectation set = test fails if called)

let relayer = EvmRelayer::new(
relayer_model,
signer,
provider,
create_test_evm_network(),
Arc::new(relayer_repo),
Arc::new(network_repo),
Arc::new(tx_repo),
Arc::new(counter),
Arc::new(job_producer),
)
.unwrap();

let result = relayer.process_transaction_request(network_tx).await;
assert!(result.is_err());
}

#[tokio::test]
async fn test_process_transaction_request_status_check_failure_marks_tx_failed() {
let (
provider,
relayer_repo,
mut network_repo,
mut tx_repo,
mut job_producer,
signer,
counter,
) = setup_mocks();
let relayer_model = create_test_relayer();

let network_tx = NetworkTransactionRequest::Evm(crate::models::EvmTransactionRequest {
to: Some("0xRecipient".to_string()),
value: U256::from(1000000000000000000u64),
data: Some("0xData".to_string()),
gas_limit: Some(21000),
gas_price: Some(20000000000),
max_fee_per_gas: None,
max_priority_fee_per_gas: None,
speed: None,
valid_until: None,
});

network_repo
.expect_get_by_name()
.with(eq(NetworkType::Evm), eq("mainnet"))
.returning(|_, _| Ok(Some(create_test_network_repo_model())));

tx_repo.expect_create().returning(Ok);

// Verify partial_update is called with correct status and reason
tx_repo
.expect_partial_update()
.withf(|_tx_id, update| {
update.status == Some(TransactionStatus::Failed)
&& update.status_reason == Some("Queue unavailable".to_string())
})
.returning(|_, _| Ok(TransactionRepoModel::default()));

job_producer
.expect_produce_check_transaction_status_job()
.returning(|_, _| {
Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
"Redis timeout".to_string(),
))))
});

let relayer = EvmRelayer::new(
relayer_model,
signer,
provider,
create_test_evm_network(),
Arc::new(relayer_repo),
Arc::new(network_repo),
Arc::new(tx_repo),
Arc::new(counter),
Arc::new(job_producer),
)
.unwrap();

let result = relayer.process_transaction_request(network_tx).await;
assert!(result.is_err());
// The mock verification (withf) ensures partial_update was called correctly
}

#[tokio::test]
async fn test_validate_min_balance_sufficient() {
let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
Expand Down
Loading
Loading