Skip to content

Commit 34128c0

Browse files
committed
feat(aggregator): add delay between calls to signature processing on error
1 parent fdd8a4b commit 34128c0

File tree

2 files changed

+82
-12
lines changed

2 files changed

+82
-12
lines changed

mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,9 @@ impl DependenciesBuilder {
105105
self.build_signature_consumer().await?,
106106
self.get_certifier_service().await?,
107107
stop_rx,
108-
self.root_logger(),
109108
self.get_metrics_service().await?,
109+
SequentialSignatureProcessor::DEFAULT_WAIT_DELAY_ON_ERROR_IN_SECONDS,
110+
self.root_logger(),
110111
);
111112

112113
Ok(Arc::new(signature_processor))

mithril-aggregator/src/services/signature_processor.rs

Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use std::sync::Arc;
1+
use std::{sync::Arc, time::Duration};
22

3+
use anyhow::{Context, anyhow};
34
use slog::{Logger, error, trace, warn};
45

56
use mithril_common::{
@@ -29,25 +30,31 @@ pub struct SequentialSignatureProcessor {
2930
consumer: Arc<dyn SignatureConsumer>,
3031
certifier: Arc<dyn CertifierService>,
3132
stop_rx: Receiver<()>,
32-
logger: Logger,
3333
metrics_service: Arc<MetricsService>,
34+
wait_delay_on_error_in_seconds: Duration,
35+
logger: Logger,
3436
}
3537

3638
impl SequentialSignatureProcessor {
39+
/// Delay to wait after an error before retrying
40+
pub const DEFAULT_WAIT_DELAY_ON_ERROR_IN_SECONDS: Duration = Duration::from_secs(10);
41+
3742
/// Creates a new `SignatureProcessor` instance.
3843
pub fn new(
3944
consumer: Arc<dyn SignatureConsumer>,
4045
certifier: Arc<dyn CertifierService>,
4146
stop_rx: Receiver<()>,
42-
logger: Logger,
4347
metrics_service: Arc<MetricsService>,
48+
wait_delay_on_error_in_seconds: Duration,
49+
logger: Logger,
4450
) -> Self {
4551
Self {
4652
consumer,
4753
certifier,
4854
stop_rx,
49-
logger: logger.new_with_component_name::<Self>(),
5055
metrics_service,
56+
wait_delay_on_error_in_seconds,
57+
logger: logger.new_with_component_name::<Self>(),
5158
}
5259
}
5360

@@ -64,6 +71,7 @@ impl SignatureProcessor for SequentialSignatureProcessor {
6471
async fn process_signatures(&self) -> StdResult<()> {
6572
let origin_network = self.consumer.get_origin_tag();
6673

74+
let mut total_import_errors = 0;
6775
match self.consumer.get_signatures().await {
6876
Ok(signatures) => {
6977
let number_of_signatures = signatures.len() as u32;
@@ -86,6 +94,7 @@ impl SignatureProcessor for SequentialSignatureProcessor {
8694
.increment(&[&origin_network]);
8795
}
8896
Err(e) => {
97+
total_import_errors += 1;
8998
error!(
9099
self.logger, "Error dispatching single signature";
91100
"full_payload" => #?signature, "error" => ?e
@@ -96,9 +105,24 @@ impl SignatureProcessor for SequentialSignatureProcessor {
96105
}
97106
Err(e) => {
98107
error!(self.logger, "Error consuming single signatures"; "error" => ?e);
108+
total_import_errors += 1;
99109
}
100110
}
101111

112+
if total_import_errors > 0 {
113+
error!(
114+
self.logger,
115+
"Total import errors while processing signatures: {}", total_import_errors
116+
);
117+
return Err(anyhow!(
118+
"Total import errors while processing signatures: {}",
119+
total_import_errors
120+
))
121+
.with_context(
122+
|| "SequentialSignatureProcessor encountered errors while processing signatures",
123+
);
124+
}
125+
102126
Ok(())
103127
}
104128

@@ -111,14 +135,22 @@ impl SignatureProcessor for SequentialSignatureProcessor {
111135

112136
return Ok(());
113137
}
114-
_ = self.process_signatures() => {}
138+
res = self.process_signatures() => {
139+
if let Err(e) = res {
140+
error!(self.logger, "Error processing signatures"; "error" => ?e);
141+
error!(self.logger, "Sleep for {} seconds", self.wait_delay_on_error_in_seconds.as_secs());
142+
tokio::time::sleep(self.wait_delay_on_error_in_seconds).await;
143+
}
144+
}
115145
}
116146
}
117147
}
118148
}
119149

120150
#[cfg(test)]
121151
mod tests {
152+
use core::panic;
153+
122154
use anyhow::anyhow;
123155
use mockall::predicate::eq;
124156
use tokio::{
@@ -193,8 +225,9 @@ mod tests {
193225
mock_consumer,
194226
mock_certifier,
195227
stop_rx,
196-
logger,
197228
Arc::new(MetricsService::new(TestLogger::stdout()).unwrap()),
229+
Duration::from_millis(10),
230+
logger,
198231
);
199232

200233
processor
@@ -223,8 +256,9 @@ mod tests {
223256
Arc::new(fake_consumer),
224257
mock_certifier,
225258
stop_rx,
226-
logger,
227259
metrics_service.clone(),
260+
Duration::from_millis(10),
261+
logger,
228262
);
229263

230264
let initial_received_counter_value = metrics_service
@@ -269,8 +303,9 @@ mod tests {
269303
Arc::new(fake_consumer),
270304
mock_certifier,
271305
stop_rx,
272-
logger,
273306
metrics_service.clone(),
307+
Duration::from_millis(10),
308+
logger,
274309
);
275310

276311
let initial_received_counter_value = metrics_service
@@ -280,7 +315,7 @@ mod tests {
280315
.get_signature_registration_total_successful_since_startup()
281316
.get(&[&network_origin]);
282317

283-
processor.process_signatures().await.unwrap();
318+
processor.process_signatures().await.expect_err("Should have failed");
284319

285320
assert_eq!(
286321
initial_received_counter_value + 1,
@@ -297,7 +332,7 @@ mod tests {
297332
}
298333

299334
#[tokio::test]
300-
async fn processor_run_succeeds() {
335+
async fn processor_run_succeeds_even_if_processing_signatures_fails() {
301336
let logger = TestLogger::stdout();
302337
let fake_consumer = FakeSignatureConsumer::new(vec![
303338
Err(anyhow!("Error consuming signatures")),
@@ -324,13 +359,47 @@ mod tests {
324359
Arc::new(fake_consumer),
325360
mock_certifier,
326361
stop_rx,
362+
Arc::new(metrics_service),
363+
Duration::from_millis(1),
327364
logger,
365+
);
366+
367+
select!(
368+
_res = processor.run() => {},
369+
_res = sleep(Duration::from_millis(500)) => {
370+
println!("Stopping signature processor...");
371+
stop_tx.send(()).unwrap();
372+
},
373+
);
374+
}
375+
376+
#[tokio::test]
377+
async fn processor_run_waits_before_resuming_if_processing_signatures_fails() {
378+
let logger = TestLogger::stdout();
379+
let fake_consumer = FakeSignatureConsumer::new(vec![
380+
Err(anyhow!("Error consuming signatures")),
381+
Ok(vec![(
382+
fake_data::single_signature(vec![1, 2, 3]),
383+
SignedEntityType::MithrilStakeDistribution(Epoch(1)),
384+
)]),
385+
]);
386+
let mock_certifier = MockBuilder::<MockCertifierService>::configure(|mock| {
387+
mock.expect_register_single_signature().never();
388+
});
389+
let (stop_tx, stop_rx) = channel(());
390+
let metrics_service = MetricsService::new(logger.clone()).unwrap();
391+
let processor = SequentialSignatureProcessor::new(
392+
Arc::new(fake_consumer),
393+
mock_certifier,
394+
stop_rx,
328395
Arc::new(metrics_service),
396+
Duration::from_millis(1000),
397+
logger,
329398
);
330399

331400
select!(
332401
_res = processor.run() => {},
333-
_res = sleep(Duration::from_millis(10)) => {
402+
_res = sleep(Duration::from_millis(500)) => {
334403
println!("Stopping signature processor...");
335404
stop_tx.send(()).unwrap();
336405
},

0 commit comments

Comments
 (0)