Skip to content

Commit 6fe9fa0

Browse files
authored
ref: use Duration for flush_interval (#55)
Changes the flush_interval to Duration in order to allow sub-second flushing.
1 parent 82e5ff3 commit 6fe9fa0

File tree

5 files changed

+121
-26
lines changed

5 files changed

+121
-26
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

example.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ middlewares:
3737
#
3838
# aggregate_gauges: true
3939

40-
# Flush the aggregate buffer every `flush_interval` seconds.
40+
# Flush the aggregate buffer every `flush_interval` milliseconds.
4141
# Defaults to 1 second.
4242
#
43-
# flush_interval: 1
43+
# flush_interval: 1000
4444

4545
# Normally the times at which metrics are flushed are approximately aligned
4646
# with a multiple of `flush_interval`. For example, a `flush_interval` of 1

src/config.rs

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
use std::fmt::Formatter;
2+
use std::time::Duration;
3+
#[cfg(feature = "cli")]
4+
use serde::de::Visitor;
5+
#[cfg(feature = "cli")]
6+
use serde::{Deserializer};
17
#[cfg(feature = "cli")]
28
use {anyhow::Error, serde::Deserialize, std::fs::File};
39

@@ -79,8 +85,8 @@ fn default_true() -> bool {
7985
}
8086

8187
#[cfg(feature = "cli")]
82-
fn default_flush_interval() -> u64 {
83-
1
88+
fn default_flush_interval() -> Duration {
89+
Duration::from_secs(1)
8490
}
8591

8692
#[cfg(feature = "cli")]
@@ -95,8 +101,8 @@ pub struct AggregateMetricsConfig {
95101
pub aggregate_counters: bool,
96102
#[cfg_attr(feature = "cli", serde(default = "default_true"))]
97103
pub aggregate_gauges: bool,
98-
#[cfg_attr(feature = "cli", serde(default = "default_flush_interval"))]
99-
pub flush_interval: u64,
104+
#[cfg_attr(feature = "cli", serde(default = "default_flush_interval", deserialize_with="deserialize_duration"))]
105+
pub flush_interval: Duration,
100106
#[cfg_attr(feature = "cli", serde(default = "default_flush_offset"))]
101107
pub flush_offset: i64,
102108
#[cfg_attr(feature = "cli", serde(default))]
@@ -109,11 +115,58 @@ pub struct SampleConfig {
109115
pub sample_rate: f64,
110116
}
111117

118+
/// Deserializes a number or a time-string into a Duration struct.
119+
/// Numbers without unit suffixes will be treated as seconds while suffixes will be
120+
/// parsed using https://crates.io/crates/humantime
121+
#[cfg(feature = "cli")]
122+
fn deserialize_duration<'de, D>(deserializer: D) -> Result<Duration, D::Error> where D:Deserializer<'de> {
123+
struct FlushIntervalVisitor;
124+
125+
impl Visitor<'_> for FlushIntervalVisitor {
126+
type Value = Duration;
127+
128+
fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
129+
formatter.write_str("a non negative number")
130+
}
131+
132+
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
133+
where
134+
E: serde::de::Error,
135+
{
136+
Ok(Duration::from_millis(v))
137+
}
138+
}
139+
140+
deserializer.deserialize_any(FlushIntervalVisitor)
141+
}
142+
112143
#[cfg(test)]
113144
#[cfg(feature = "cli")]
114145
mod tests {
115146
use super::*;
116147

148+
#[test]
149+
fn flush_duration_milliseconds() {
150+
let yaml = r#"
151+
middlewares:
152+
- type: aggregate-metrics
153+
flush_interval: 125
154+
"#;
155+
let config = serde_yaml::from_str::<Config>(yaml).unwrap();
156+
assert!(matches!(&config.middlewares[0], MiddlewareConfig::AggregateMetrics(c) if c.flush_interval == Duration::from_millis(125)));
157+
}
158+
159+
#[test]
160+
fn flush_duration_negative_number() {
161+
let yaml = r#"
162+
middleware:
163+
- type: aggregate-metrics
164+
flush_interval: -1000
165+
"#;
166+
let config = serde_yaml::from_str::<Config>(yaml);
167+
assert!(config.is_err());
168+
}
169+
117170
#[test]
118171
fn config() {
119172
let config = Config::new("example.yaml").unwrap();
@@ -152,7 +205,7 @@ mod tests {
152205
AggregateMetricsConfig {
153206
aggregate_counters: true,
154207
aggregate_gauges: true,
155-
flush_interval: 1,
208+
flush_interval: 1s,
156209
flush_offset: 0,
157210
max_map_size: None,
158211
},

src/middleware/aggregate.rs

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
#[cfg(test)]
22
use std::sync::Mutex;
33

4-
use std::{
5-
collections::HashMap,
6-
time::{SystemTime, UNIX_EPOCH},
7-
};
4+
use std::{collections::HashMap, time::{SystemTime, UNIX_EPOCH}};
85
use std::{fmt, str};
96

107
use crate::{config::AggregateMetricsConfig, middleware::Middleware, types::Metric};
@@ -142,16 +139,16 @@ where
142139
SystemTime::now()
143140
.duration_since(UNIX_EPOCH)
144141
.unwrap()
145-
.as_secs()
142+
.as_millis() as u64
146143
});
147144

148145
let rounded_bucket =
149-
i64::try_from((now / self.config.flush_interval) * self.config.flush_interval)
146+
i64::try_from((now / self.config.flush_interval.as_millis() as u64) * self.config.flush_interval.as_millis() as u64)
150147
.expect("overflow when calculating with flush_interval");
151148
let rounded_bucket = u64::try_from(rounded_bucket + self.config.flush_offset)
152149
.expect("overflow when calculating with flush_interval");
153150

154-
if self.last_flushed_at + self.config.flush_interval <= rounded_bucket {
151+
if self.last_flushed_at + self.config.flush_interval.as_millis() as u64 <= rounded_bucket {
155152
self.flush_metrics();
156153
self.last_flushed_at = rounded_bucket;
157154
}
@@ -173,7 +170,7 @@ where
173170
#[cfg(test)]
174171
mod tests {
175172
use std::cell::RefCell;
176-
173+
use std::time::Duration;
177174
use super::*;
178175

179176
use crate::testutils::FnStep;
@@ -183,7 +180,52 @@ mod tests {
183180
let config = AggregateMetricsConfig {
184181
aggregate_counters: true,
185182
aggregate_gauges: true,
186-
flush_interval: 10,
183+
flush_interval: Duration::from_millis(100),
184+
flush_offset: 0,
185+
max_map_size: None,
186+
};
187+
let results = RefCell::new(vec![]);
188+
let next = FnStep(|metric: &mut Metric| {
189+
results.borrow_mut().push(metric.clone());
190+
});
191+
let mut aggregator = AggregateMetrics::new(config, next);
192+
193+
*CURRENT_TIME.lock().unwrap() = Some(0);
194+
195+
aggregator.poll();
196+
197+
aggregator.submit(&mut Metric::new(
198+
b"users.online:1|c|@0.5|#country:china".to_vec(),
199+
));
200+
201+
*CURRENT_TIME.lock().unwrap() = Some(10);
202+
203+
aggregator.poll();
204+
205+
aggregator.submit(&mut Metric::new(
206+
b"users.online:1|c|@0.5|#country:china".to_vec(),
207+
));
208+
209+
assert_eq!(results.borrow_mut().len(), 0);
210+
211+
*CURRENT_TIME.lock().unwrap() = Some(110);
212+
213+
aggregator.poll();
214+
215+
assert_eq!(
216+
results.borrow_mut().as_slice(),
217+
&[Metric::new(
218+
b"users.online:2|c|@0.5|#country:china".to_vec()
219+
)]
220+
);
221+
}
222+
223+
#[test]
224+
fn counter_seconds() {
225+
let config = AggregateMetricsConfig {
226+
aggregate_counters: true,
227+
aggregate_gauges: true,
228+
flush_interval: Duration::from_secs(1),
187229
flush_offset: 0,
188230
max_map_size: None,
189231
};
@@ -201,7 +243,7 @@ mod tests {
201243
b"users.online:1|c|@0.5|#country:china".to_vec(),
202244
));
203245

204-
*CURRENT_TIME.lock().unwrap() = Some(1);
246+
*CURRENT_TIME.lock().unwrap() = Some(101);
205247

206248
aggregator.poll();
207249

@@ -211,7 +253,7 @@ mod tests {
211253

212254
assert_eq!(results.borrow_mut().len(), 0);
213255

214-
*CURRENT_TIME.lock().unwrap() = Some(11);
256+
*CURRENT_TIME.lock().unwrap() = Some(1001);
215257

216258
aggregator.poll();
217259

@@ -228,7 +270,7 @@ mod tests {
228270
let config = AggregateMetricsConfig {
229271
aggregate_counters: true,
230272
aggregate_gauges: true,
231-
flush_interval: 10,
273+
flush_interval: Duration::from_millis(100),
232274
flush_offset: 0,
233275
max_map_size: None,
234276
};
@@ -246,7 +288,7 @@ mod tests {
246288
b"users.online:3|g|@0.5|#country:china".to_vec(),
247289
));
248290

249-
*CURRENT_TIME.lock().unwrap() = Some(1);
291+
*CURRENT_TIME.lock().unwrap() = Some(10);
250292

251293
aggregator.poll();
252294

@@ -256,7 +298,7 @@ mod tests {
256298

257299
assert_eq!(results.borrow_mut().len(), 0);
258300

259-
*CURRENT_TIME.lock().unwrap() = Some(11);
301+
*CURRENT_TIME.lock().unwrap() = Some(110);
260302

261303
aggregator.poll();
262304

src/types.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub struct MetricTag<'a> {
3939
pub name_value_sep_pos: Option<usize>,
4040
}
4141

42-
impl<'a> MetricTag<'a> {
42+
impl MetricTag<'_> {
4343
pub fn new(bytes: &[u8]) -> MetricTag {
4444
MetricTag {
4545
raw: bytes,
@@ -58,7 +58,7 @@ impl<'a> MetricTag<'a> {
5858
}
5959
}
6060

61-
impl<'a> fmt::Debug for MetricTag<'a> {
61+
impl fmt::Debug for MetricTag<'_> {
6262
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
6363
if self.name_value_sep_pos.is_none() {
6464
f.debug_struct("MetricTag")
@@ -85,7 +85,7 @@ impl<'a> Iterator for MetricTagIterator<'a> {
8585
let mut tag_pos_iter = remaining_tags.iter();
8686
let next_tag_sep_pos = tag_pos_iter.position(|&b| b == b',');
8787

88-
return if let Some(tag_sep_pos) = next_tag_sep_pos {
88+
if let Some(tag_sep_pos) = next_tag_sep_pos {
8989
// Got a tag and more tags remain
9090
let tag = MetricTag::new(&remaining_tags[..tag_sep_pos]);
9191
self.remaining_tags = Some(&remaining_tags[tag_sep_pos + 1..]);
@@ -96,7 +96,7 @@ impl<'a> Iterator for MetricTagIterator<'a> {
9696
let tag = MetricTag::new(remaining_tags);
9797
self.remaining_tags = None;
9898
Some(tag)
99-
};
99+
}
100100
}
101101
}
102102

0 commit comments

Comments
 (0)