Skip to content

Commit ec447ca

Browse files
committed
feat(aggregator): add delay on error in signature processor
1 parent fdd8a4b commit ec447ca

File tree

1 file changed

+28
-2
lines changed

1 file changed

+28
-2
lines changed

mithril-aggregator/src/services/signature_processor.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use std::sync::Arc;
1+
use std::{sync::Arc, time::Duration};
22

3+
use anyhow::{Context, anyhow};
34
use slog::{Logger, error, trace, warn};
45

56
use mithril_common::{
@@ -34,6 +35,8 @@ pub struct SequentialSignatureProcessor {
3435
}
3536

3637
impl SequentialSignatureProcessor {
38+
const ERROR_DELAY_IN_SECONDS: Duration = Duration::from_secs(10);
39+
3740
/// Creates a new `SignatureProcessor` instance.
3841
pub fn new(
3942
consumer: Arc<dyn SignatureConsumer>,
@@ -64,6 +67,7 @@ impl SignatureProcessor for SequentialSignatureProcessor {
6467
async fn process_signatures(&self) -> StdResult<()> {
6568
let origin_network = self.consumer.get_origin_tag();
6669

70+
let mut total_import_errors = 0;
6771
match self.consumer.get_signatures().await {
6872
Ok(signatures) => {
6973
let number_of_signatures = signatures.len() as u32;
@@ -86,6 +90,7 @@ impl SignatureProcessor for SequentialSignatureProcessor {
8690
.increment(&[&origin_network]);
8791
}
8892
Err(e) => {
93+
total_import_errors += 1;
8994
error!(
9095
self.logger, "Error dispatching single signature";
9196
"full_payload" => #?signature, "error" => ?e
@@ -96,9 +101,24 @@ impl SignatureProcessor for SequentialSignatureProcessor {
96101
}
97102
Err(e) => {
98103
error!(self.logger, "Error consuming single signatures"; "error" => ?e);
104+
total_import_errors += 1;
99105
}
100106
}
101107

108+
if total_import_errors > 0 {
109+
error!(
110+
self.logger,
111+
"Total import errors while processing signatures: {}", total_import_errors
112+
);
113+
return Err(anyhow!(
114+
"Total import errors while processing signatures: {}",
115+
total_import_errors
116+
))
117+
.with_context(
118+
|| "SequentialSignatureProcessor encountered errors while processing signatures",
119+
);
120+
}
121+
102122
Ok(())
103123
}
104124

@@ -111,7 +131,13 @@ impl SignatureProcessor for SequentialSignatureProcessor {
111131

112132
return Ok(());
113133
}
114-
_ = self.process_signatures() => {}
134+
res = self.process_signatures() => {
135+
if let Err(e) = res {
136+
error!(self.logger, "Error processing signatures"; "error" => ?e);
137+
error!(self.logger, "Sleep for {} seconds", Self::ERROR_DELAY_IN_SECONDS.as_secs());
138+
tokio::time::sleep(Self::ERROR_DELAY_IN_SECONDS).await;
139+
}
140+
}
115141
}
116142
}
117143
}

0 commit comments

Comments
 (0)