From 1af5a3f9f49ef6d0d7ec29488d6fbf2395f68ace Mon Sep 17 00:00:00 2001 From: John Rinehart Date: Fri, 19 May 2023 23:46:57 +0100 Subject: [PATCH] feat: introduce a memory for retry::Policy The idea introduced, here, is to add a "memory" parameter for the retry policy. If a long-running retryable operation (like chainsync) fails sporadically then the downstream oura user may not actually want the retry counter to increment. By default, the memory should be longer than the longest possible delay. If this isn't done then max_retries will never be encountered and retry_operation will never return. This case (never returning) may be desirable in some cases (like for a daemon application). But, it should not be the default case. --- src/sources/common.rs | 8 ++ src/sources/n2c/run.rs | 6 ++ src/sources/n2n/run.rs | 6 ++ src/utils/retry.rs | 162 ++++++++++++++++++++++++++++++++++++++--- 4 files changed, 171 insertions(+), 11 deletions(-) diff --git a/src/sources/common.rs b/src/sources/common.rs index 4687561e..5c228f15 100644 --- a/src/sources/common.rs +++ b/src/sources/common.rs @@ -170,6 +170,9 @@ pub struct RetryPolicy { #[serde(default = "RetryPolicy::default_max_backoff")] pub connection_max_backoff: u32, + + #[serde(default = "RetryPolicy::default_memory")] + pub memory: u64, } impl RetryPolicy { @@ -180,6 +183,10 @@ impl RetryPolicy { fn default_max_backoff() -> u32 { 60 } + + fn default_memory() -> u64 { + 10 + } } pub fn setup_multiplexer_attempt(bearer: &BearerKind, address: &str) -> Result { @@ -209,6 +216,7 @@ pub fn setup_multiplexer( backoff_unit: Duration::from_secs(1), backoff_factor: 2, max_backoff: Duration::from_secs(policy.connection_max_backoff as u64), + memory: Duration::from_secs(policy.memory), }, ), None => setup_multiplexer_attempt(bearer, address), diff --git a/src/sources/n2c/run.rs b/src/sources/n2c/run.rs index efa696ee..f03e62e8 100644 --- a/src/sources/n2c/run.rs +++ b/src/sources/n2c/run.rs @@ -270,6 +270,12 @@ pub fn do_chainsync( .map(|x| x.chainsync_max_backoff as u64) .map(Duration::from_secs) .unwrap_or_else(|| Duration::from_secs(60)), + memory: config + .retry_policy + .as_ref() + .map(|x| x.memory) + .map(Duration::from_secs) + .unwrap_or_else(|| Duration::from_secs(10)), }, ) } diff --git a/src/sources/n2n/run.rs b/src/sources/n2n/run.rs index a3956d8e..a0a9884e 100644 --- a/src/sources/n2n/run.rs +++ b/src/sources/n2n/run.rs @@ -294,6 +294,12 @@ pub fn do_chainsync( .map(|x| x.chainsync_max_backoff as u64) .map(Duration::from_secs) .unwrap_or_else(|| Duration::from_secs(60)), + memory: config + .retry_policy + .as_ref() + .map(|x| x.memory) + .map(Duration::from_secs) + .unwrap_or_else(|| Duration::from_secs(10)), }, ) } diff --git a/src/utils/retry.rs b/src/utils/retry.rs index e664ab93..f7ec406d 100644 --- a/src/utils/retry.rs +++ b/src/utils/retry.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, ops::Mul, time::Duration}; +use std::{fmt::Debug, ops::Mul, time::{Duration, Instant}}; use serde::{Deserialize, Deserializer}; @@ -10,6 +10,8 @@ pub struct Policy { pub backoff_factor: u32, #[serde(deserialize_with = "deserialize_duration")] pub max_backoff: Duration, + #[serde(deserialize_with = "deserialize_duration")] + pub memory: Duration, // how long to remember a failure } fn deserialize_duration<'de, D>(deserializer: D) -> Result @@ -21,16 +23,38 @@ where Ok(Duration::from_millis(millis)) } +const DEFAULT_BACKOFF_FACTOR: u32 = 2; const DEFAULT_MAX_RETRIES: u32 = 20; -const DEFAULT_BACKOFF_DELAY: u64 = 5_000; +const DEFAULT_BACKOFF_DELAY: Duration = Duration::from_millis(5_000); impl Default for Policy { fn default() -> Self { + let backoff_unit = DEFAULT_BACKOFF_DELAY; + + // default memory should be greater than both + // 1. max_backoff + // 2. maximum duration based on backoff_unit, backoff_factor, and max_retries + // in order that retry failure is not impossible, by default. + + // 1. max_backoff + let default_max_backoff_ms = backoff_unit.checked_mul(20).unwrap_or(Duration::MAX); + + // 2. maximum duration based on backoff_unit, backoff_factor, and max_retries + let max_retries = DEFAULT_MAX_RETRIES; + let backoff_factor = DEFAULT_BACKOFF_FACTOR; + + // default memory + let default_memory = max_cumulative_retry_duration(backoff_unit, backoff_factor, max_retries) + .max(default_max_backoff_ms) + .checked_add(backoff_unit) // a little bit longer than the max. possible + .unwrap_or(Duration::MAX); + Self { - max_retries: DEFAULT_MAX_RETRIES, - backoff_unit: Duration::from_millis(DEFAULT_BACKOFF_DELAY), - backoff_factor: 2, - max_backoff: Duration::from_millis(20 * DEFAULT_BACKOFF_DELAY), + max_retries: max_retries, + backoff_unit: backoff_unit , + backoff_factor: backoff_factor, + max_backoff: default_max_backoff_ms, + memory: default_memory, } } } @@ -41,24 +65,60 @@ fn compute_backoff_delay(policy: &Policy, retry: u32) -> Duration { core::cmp::min(backoff, policy.max_backoff) } +// Determine how much time will be spent only sleeping/waiting for max_retries (the worst case). +fn max_cumulative_retry_duration(backoff_unit: Duration, backoff_factor: u32, max_retries: u32) -> Duration { + // https://www.wolframalpha.com/input?i=sum+of+a%5Ek+from+k%3D0+to+k%3Dj + let (num, den) = if backoff_factor < 1 { + let num = Some(1 - backoff_factor.pow(max_retries+1)); + let den = 1 - backoff_factor; + (num, den) + } else { + let num = backoff_factor + .checked_pow(max_retries + 1) + .map(|x| x-1); + + let den = backoff_factor - 1; + + (num, den) + }; + + let max_retryable = match (num, den) { + (Some(v), den) => v / den, + (None, _) => u32::MAX, + }; + + backoff_unit.checked_mul(max_retryable).unwrap_or(Duration::MAX) +} + pub fn retry_operation(op: impl Fn() -> Result, policy: &Policy) -> Result where E: Debug, { let mut retry = 0; + let mut last: Option = None; + + loop { let result = op(); + // reset the counter if the failure hasn't occurred for a while + let now = std::time::Instant::now(); + if retry != 0 && now.duration_since(last.unwrap_or(now)) > policy.memory { + retry = 0; + } + + last = Some(std::time::Instant::now()); + match result { Ok(x) => break Ok(x), Err(err) if retry < policy.max_retries => { log::warn!("retryable operation error: {:?}", err); - retry += 1; - let backoff = compute_backoff_delay(policy, retry); + retry += 1; + log::debug!( "backoff for {}s until next retry #{}", backoff.as_secs(), @@ -96,6 +156,7 @@ mod tests { backoff_unit: Duration::from_secs(1), backoff_factor: 0, max_backoff: Duration::from_secs(100), + memory: Duration::from_secs(5), }; assert!(retry_operation(op, &policy).is_err()); @@ -112,6 +173,7 @@ mod tests { backoff_unit: Duration::from_millis(1), backoff_factor: 2, max_backoff: Duration::MAX, + memory: Duration::from_secs(5), }; let start = std::time::Instant::now(); @@ -120,8 +182,86 @@ mod tests { assert!(result.is_err()); - // not an exact science, should be 2046, adding +/- 10% - assert!(elapsed.as_millis() >= 1842); - assert!(elapsed.as_millis() <= 2250); + // not an exact science, should be 1024, adding +/- 10% + assert!(elapsed.as_millis() >= 1024*9/10); + assert!(elapsed.as_millis() <= 1024*11/10); + } + + #[test] + fn honors_memory() { + // For all cases the backoff factor is 2 and the backoff unit is 10ms. So, for each case + // the delays will look like [10ms, 20ms, 40ms, 80ms, 160ms, ... ]. + struct Case { + name: &'static str, + num_failures: u32, // number of op() failures + max_retries: u32, // retry policy + memory: Duration, // forget prior failures after this duration + expect_err: bool, // if retry should fail + expect_runs: u32, // number of expected iterations + expect_dur: Duration, + } + + let cases: Vec = vec![ + Case { + name: "max_fails occurs before memory is reached", + num_failures: 6, + max_retries: 5, + memory: Duration::from_secs(std::u64::MAX), + expect_err: true, + expect_runs: 6, + expect_dur: Duration::from_millis(10*((1<<5)-1)), + } + , Case { + name: "forget the 1st 4 failures", + num_failures: 5, // 10ms, 20ms, 40ms, 80ms, 160ms + max_retries: 3, + memory: Duration::from_millis(30), + expect_err: false, + expect_runs: 5, + expect_dur: Duration::from_millis(10*((1<<3)-1) + 10 + 20), + } + , Case { + name: "forget all failures", + num_failures: 11, + max_retries: 10, + memory: Duration::from_millis(0), + expect_err: false, + expect_runs: 11, + expect_dur: Duration::from_millis(10*11), + } + ]; + + cases.iter().for_each(|x| { + let start = Instant::now(); + let counter = Rc::new(RefCell::new(0)); + + let failure_counter = counter.clone(); + let op = move || -> Result<(), String> { + if *failure_counter.borrow() < x.num_failures { + *failure_counter.borrow_mut() += 1; + Err("very bad stuff happened".to_string()) + } + else { + Ok(()) + } + }; + + let policy = Policy { + max_retries: x.max_retries, + backoff_unit: Duration::from_millis(10), + backoff_factor: 2, + max_backoff: Duration::from_millis(1024), + memory: x.memory, + }; + + let failed = retry_operation(op, &policy).is_err(); + + assert!(failed == x.expect_err, "case '{}' failed in error check - {} vs. {}", x.name, x.expect_err, failed); + assert_eq!(*counter.borrow(), x.expect_runs, "case '{}' failed in run count check - {} vs. {}", x.name, x.expect_runs, counter.borrow()); + + let elapsed = start.elapsed(); + assert!(elapsed < x.expect_dur*11/10 && elapsed > x.expect_dur*9/10, "case '{}' failed in duration check - {} vs. {}", x.name, x.expect_dur.as_millis(), elapsed.as_millis()); + }); + } }