diff --git a/src/sources/common.rs b/src/sources/common.rs index 4687561e..5c228f15 100644 --- a/src/sources/common.rs +++ b/src/sources/common.rs @@ -170,6 +170,9 @@ pub struct RetryPolicy { #[serde(default = "RetryPolicy::default_max_backoff")] pub connection_max_backoff: u32, + + #[serde(default = "RetryPolicy::default_memory")] + pub memory: u64, } impl RetryPolicy { @@ -180,6 +183,10 @@ impl RetryPolicy { fn default_max_backoff() -> u32 { 60 } + + fn default_memory() -> u64 { + 10 + } } pub fn setup_multiplexer_attempt(bearer: &BearerKind, address: &str) -> Result { @@ -209,6 +216,7 @@ pub fn setup_multiplexer( backoff_unit: Duration::from_secs(1), backoff_factor: 2, max_backoff: Duration::from_secs(policy.connection_max_backoff as u64), + memory: Duration::from_secs(policy.memory), }, ), None => setup_multiplexer_attempt(bearer, address), diff --git a/src/sources/n2c/run.rs b/src/sources/n2c/run.rs index efa696ee..f03e62e8 100644 --- a/src/sources/n2c/run.rs +++ b/src/sources/n2c/run.rs @@ -270,6 +270,12 @@ pub fn do_chainsync( .map(|x| x.chainsync_max_backoff as u64) .map(Duration::from_secs) .unwrap_or_else(|| Duration::from_secs(60)), + memory: config + .retry_policy + .as_ref() + .map(|x| x.memory) + .map(Duration::from_secs) + .unwrap_or_else(|| Duration::from_secs(10)), }, ) } diff --git a/src/sources/n2n/run.rs b/src/sources/n2n/run.rs index a3956d8e..a0a9884e 100644 --- a/src/sources/n2n/run.rs +++ b/src/sources/n2n/run.rs @@ -294,6 +294,12 @@ pub fn do_chainsync( .map(|x| x.chainsync_max_backoff as u64) .map(Duration::from_secs) .unwrap_or_else(|| Duration::from_secs(60)), + memory: config + .retry_policy + .as_ref() + .map(|x| x.memory) + .map(Duration::from_secs) + .unwrap_or_else(|| Duration::from_secs(10)), }, ) } diff --git a/src/utils/retry.rs b/src/utils/retry.rs index e664ab93..f7ec406d 100644 --- a/src/utils/retry.rs +++ b/src/utils/retry.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, ops::Mul, time::Duration}; +use std::{fmt::Debug, ops::Mul, time::{Duration, Instant}}; use serde::{Deserialize, Deserializer}; @@ -10,6 +10,8 @@ pub struct Policy { pub backoff_factor: u32, #[serde(deserialize_with = "deserialize_duration")] pub max_backoff: Duration, + #[serde(deserialize_with = "deserialize_duration")] + pub memory: Duration, // how long to remember a failure } fn deserialize_duration<'de, D>(deserializer: D) -> Result @@ -21,16 +23,38 @@ where Ok(Duration::from_millis(millis)) } +const DEFAULT_BACKOFF_FACTOR: u32 = 2; const DEFAULT_MAX_RETRIES: u32 = 20; -const DEFAULT_BACKOFF_DELAY: u64 = 5_000; +const DEFAULT_BACKOFF_DELAY: Duration = Duration::from_millis(5_000); impl Default for Policy { fn default() -> Self { + let backoff_unit = DEFAULT_BACKOFF_DELAY; + + // default memory should be greater than both + // 1. max_backoff + // 2. maximum duration based on backoff_unit, backoff_factor, and max_retries + // in order that retry failure is not impossible, by default. + + // 1. max_backoff + let default_max_backoff_ms = backoff_unit.checked_mul(20).unwrap_or(Duration::MAX); + + // 2. maximum duration based on backoff_unit, backoff_factor, and max_retries + let max_retries = DEFAULT_MAX_RETRIES; + let backoff_factor = DEFAULT_BACKOFF_FACTOR; + + // default memory + let default_memory = max_cumulative_retry_duration(backoff_unit, backoff_factor, max_retries) + .max(default_max_backoff_ms) + .checked_add(backoff_unit) // a little bit longer than the max. possible + .unwrap_or(Duration::MAX); + Self { - max_retries: DEFAULT_MAX_RETRIES, - backoff_unit: Duration::from_millis(DEFAULT_BACKOFF_DELAY), - backoff_factor: 2, - max_backoff: Duration::from_millis(20 * DEFAULT_BACKOFF_DELAY), + max_retries: max_retries, + backoff_unit: backoff_unit , + backoff_factor: backoff_factor, + max_backoff: default_max_backoff_ms, + memory: default_memory, } } } @@ -41,24 +65,60 @@ fn compute_backoff_delay(policy: &Policy, retry: u32) -> Duration { core::cmp::min(backoff, policy.max_backoff) } +// Determine how much time will be spent only sleeping/waiting for max_retries (the worst case). +fn max_cumulative_retry_duration(backoff_unit: Duration, backoff_factor: u32, max_retries: u32) -> Duration { + // https://www.wolframalpha.com/input?i=sum+of+a%5Ek+from+k%3D0+to+k%3Dj + let (num, den) = if backoff_factor < 1 { + let num = Some(1 - backoff_factor.pow(max_retries+1)); + let den = 1 - backoff_factor; + (num, den) + } else { + let num = backoff_factor + .checked_pow(max_retries + 1) + .map(|x| x-1); + + let den = backoff_factor - 1; + + (num, den) + }; + + let max_retryable = match (num, den) { + (Some(v), den) => v / den, + (None, _) => u32::MAX, + }; + + backoff_unit.checked_mul(max_retryable).unwrap_or(Duration::MAX) +} + pub fn retry_operation(op: impl Fn() -> Result, policy: &Policy) -> Result where E: Debug, { let mut retry = 0; + let mut last: Option = None; + + loop { let result = op(); + // reset the counter if the failure hasn't occurred for a while + let now = std::time::Instant::now(); + if retry != 0 && now.duration_since(last.unwrap_or(now)) > policy.memory { + retry = 0; + } + + last = Some(std::time::Instant::now()); + match result { Ok(x) => break Ok(x), Err(err) if retry < policy.max_retries => { log::warn!("retryable operation error: {:?}", err); - retry += 1; - let backoff = compute_backoff_delay(policy, retry); + retry += 1; + log::debug!( "backoff for {}s until next retry #{}", backoff.as_secs(), @@ -96,6 +156,7 @@ mod tests { backoff_unit: Duration::from_secs(1), backoff_factor: 0, max_backoff: Duration::from_secs(100), + memory: Duration::from_secs(5), }; assert!(retry_operation(op, &policy).is_err()); @@ -112,6 +173,7 @@ mod tests { backoff_unit: Duration::from_millis(1), backoff_factor: 2, max_backoff: Duration::MAX, + memory: Duration::from_secs(5), }; let start = std::time::Instant::now(); @@ -120,8 +182,86 @@ mod tests { assert!(result.is_err()); - // not an exact science, should be 2046, adding +/- 10% - assert!(elapsed.as_millis() >= 1842); - assert!(elapsed.as_millis() <= 2250); + // not an exact science, should be 1024, adding +/- 10% + assert!(elapsed.as_millis() >= 1024*9/10); + assert!(elapsed.as_millis() <= 1024*11/10); + } + + #[test] + fn honors_memory() { + // For all cases the backoff factor is 2 and the backoff unit is 10ms. So, for each case + // the delays will look like [10ms, 20ms, 40ms, 80ms, 160ms, ... ]. + struct Case { + name: &'static str, + num_failures: u32, // number of op() failures + max_retries: u32, // retry policy + memory: Duration, // forget prior failures after this duration + expect_err: bool, // if retry should fail + expect_runs: u32, // number of expected iterations + expect_dur: Duration, + } + + let cases: Vec = vec![ + Case { + name: "max_fails occurs before memory is reached", + num_failures: 6, + max_retries: 5, + memory: Duration::from_secs(std::u64::MAX), + expect_err: true, + expect_runs: 6, + expect_dur: Duration::from_millis(10*((1<<5)-1)), + } + , Case { + name: "forget the 1st 4 failures", + num_failures: 5, // 10ms, 20ms, 40ms, 80ms, 160ms + max_retries: 3, + memory: Duration::from_millis(30), + expect_err: false, + expect_runs: 5, + expect_dur: Duration::from_millis(10*((1<<3)-1) + 10 + 20), + } + , Case { + name: "forget all failures", + num_failures: 11, + max_retries: 10, + memory: Duration::from_millis(0), + expect_err: false, + expect_runs: 11, + expect_dur: Duration::from_millis(10*11), + } + ]; + + cases.iter().for_each(|x| { + let start = Instant::now(); + let counter = Rc::new(RefCell::new(0)); + + let failure_counter = counter.clone(); + let op = move || -> Result<(), String> { + if *failure_counter.borrow() < x.num_failures { + *failure_counter.borrow_mut() += 1; + Err("very bad stuff happened".to_string()) + } + else { + Ok(()) + } + }; + + let policy = Policy { + max_retries: x.max_retries, + backoff_unit: Duration::from_millis(10), + backoff_factor: 2, + max_backoff: Duration::from_millis(1024), + memory: x.memory, + }; + + let failed = retry_operation(op, &policy).is_err(); + + assert!(failed == x.expect_err, "case '{}' failed in error check - {} vs. {}", x.name, x.expect_err, failed); + assert_eq!(*counter.borrow(), x.expect_runs, "case '{}' failed in run count check - {} vs. {}", x.name, x.expect_runs, counter.borrow()); + + let elapsed = start.elapsed(); + assert!(elapsed < x.expect_dur*11/10 && elapsed > x.expect_dur*9/10, "case '{}' failed in duration check - {} vs. {}", x.name, x.expect_dur.as_millis(), elapsed.as_millis()); + }); + } }