Skip to content

Commit e2eeb0e

Browse files
authored
Merge pull request #325 from streamfold/file_receiver_improvements
File receiver: fix duplicate delivery on shutdown, fsync, and perf wins
2 parents 4c4463a + 8f6b95b commit e2eeb0e

12 files changed

Lines changed: 132 additions & 211 deletions

File tree

src/receivers/file/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ pub enum Error {
1717
#[error("Configuration error: {0}")]
1818
Config(String),
1919

20+
#[error("Parse error: {0}")]
21+
Parse(String),
22+
2023
#[error("Field error: {0}")]
2124
Field(String),
2225

Lines changed: 0 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use serde::Deserialize;
2-
use std::time::Duration;
32

43
/// Where to start reading from when a file is first discovered
54
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
@@ -11,126 +10,3 @@ pub enum StartAt {
1110
#[default]
1211
End,
1312
}
14-
15-
/// Configuration for the file input operator
16-
#[derive(Debug, Clone, Deserialize)]
17-
pub struct FileInputConfig {
18-
/// Unique identifier for this operator
19-
#[serde(default = "default_id")]
20-
pub id: String,
21-
22-
/// Glob patterns for files to include
23-
pub include: Vec<String>,
24-
25-
/// Glob patterns for files to exclude
26-
#[serde(default)]
27-
pub exclude: Vec<String>,
28-
29-
/// How often to poll for file changes (in milliseconds)
30-
#[serde(default = "default_poll_interval_ms")]
31-
pub poll_interval_ms: u64,
32-
33-
/// Where to start reading new files from
34-
#[serde(default)]
35-
pub start_at: StartAt,
36-
37-
/// Maximum size of a single log entry (in bytes)
38-
#[serde(default = "default_max_log_size")]
39-
pub max_log_size: usize,
40-
41-
/// Maximum number of files to read concurrently
42-
#[serde(default = "default_max_concurrent_files")]
43-
pub max_concurrent_files: usize,
44-
45-
/// Whether to include the file name as a label
46-
#[serde(default = "default_true")]
47-
pub include_file_name: bool,
48-
49-
/// Whether to include the file path as a label
50-
#[serde(default)]
51-
pub include_file_path: bool,
52-
}
53-
54-
fn default_id() -> String {
55-
"file_input".to_string()
56-
}
57-
58-
fn default_poll_interval_ms() -> u64 {
59-
200
60-
}
61-
62-
fn default_max_log_size() -> usize {
63-
1024 * 1024 // 1MB
64-
}
65-
66-
fn default_max_concurrent_files() -> usize {
67-
512
68-
}
69-
70-
fn default_true() -> bool {
71-
true
72-
}
73-
74-
impl Default for FileInputConfig {
75-
fn default() -> Self {
76-
Self {
77-
id: default_id(),
78-
include: vec![],
79-
exclude: vec![],
80-
poll_interval_ms: default_poll_interval_ms(),
81-
start_at: StartAt::default(),
82-
max_log_size: default_max_log_size(),
83-
max_concurrent_files: default_max_concurrent_files(),
84-
include_file_name: true,
85-
include_file_path: false,
86-
}
87-
}
88-
}
89-
90-
impl FileInputConfig {
91-
/// Get the poll interval as a Duration
92-
pub fn poll_interval(&self) -> Duration {
93-
Duration::from_millis(self.poll_interval_ms)
94-
}
95-
96-
/// Validate the configuration
97-
pub fn validate(&self) -> Result<(), String> {
98-
if self.include.is_empty() {
99-
return Err("include patterns cannot be empty".to_string());
100-
}
101-
102-
if self.max_log_size == 0 {
103-
return Err("max_log_size must be positive".to_string());
104-
}
105-
106-
if self.max_concurrent_files < 2 {
107-
return Err("max_concurrent_files must be at least 2".to_string());
108-
}
109-
110-
Ok(())
111-
}
112-
}
113-
114-
#[cfg(test)]
115-
mod tests {
116-
use super::*;
117-
118-
#[test]
119-
fn test_config_defaults() {
120-
let config = FileInputConfig::default();
121-
assert_eq!(config.poll_interval_ms, 200);
122-
assert_eq!(config.max_log_size, 1024 * 1024);
123-
assert_eq!(config.start_at, StartAt::End);
124-
}
125-
126-
#[test]
127-
fn test_config_validation() {
128-
let mut config = FileInputConfig::default();
129-
config.include = vec!["/var/log/*.log".to_string()];
130-
131-
assert!(config.validate().is_ok());
132-
133-
config.include = vec![];
134-
assert!(config.validate().is_err());
135-
}
136-
}

src/receivers/file/input/file/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ mod file_id;
55
mod finder;
66
mod reader;
77

8-
pub use config::{FileInputConfig, StartAt};
8+
pub use config::StartAt;
99
pub use file_id::{FileId, get_path_from_file};
1010
#[cfg(test)]
1111
pub use finder::MockFileFinder;

src/receivers/file/input/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,4 @@ pub mod file;
44

55
#[cfg(test)]
66
pub use file::MockFileFinder;
7-
pub use file::{
8-
FileFinder, FileId, FileInputConfig, FileReader, GlobFileFinder, StartAt, get_path_from_file,
9-
};
7+
pub use file::{FileFinder, FileId, FileReader, GlobFileFinder, StartAt, get_path_from_file};

src/receivers/file/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub mod watcher;
2222

2323
pub use config::FileReceiverConfig;
2424
pub use error::{Error, Result};
25-
pub use input::{FileFinder, FileInputConfig, FileReader, StartAt};
25+
pub use input::{FileFinder, FileReader, StartAt};
2626
pub use offset_committer::{FileOffsetCommitter, OffsetCommitterConfig, TrackedFileInfo};
2727
pub use offset_tracker::{FileOffsetTracker, LineOffset};
2828
pub use parser::{JsonParser, ParsedLog, Parser, RegexParser};

src/receivers/file/parser/json.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ impl Parser for JsonParser {
4343
if self.lenient {
4444
return Ok(ParsedLog::new());
4545
}
46-
return Err(Error::Config(format!("invalid JSON: {}", e)));
46+
return Err(Error::Parse(format!("invalid JSON: {}", e)));
4747
}
4848
};
4949

@@ -61,7 +61,7 @@ impl Parser for JsonParser {
6161
if self.lenient {
6262
Ok(ParsedLog::new())
6363
} else {
64-
Err(Error::Config(
64+
Err(Error::Parse(
6565
"JSON must be an object at the top level".to_string(),
6666
))
6767
}

src/receivers/file/parser/regex.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ fn try_parse_naive_timestamp(value: &str) -> Option<u64> {
107107
impl Parser for RegexParser {
108108
fn parse(&self, line: &str) -> Result<ParsedLog> {
109109
let captures = self.regex.captures(line).ok_or_else(|| {
110-
Error::Config(format!(
110+
Error::Parse(format!(
111111
"regex pattern does not match input: {:?}",
112112
line.chars().take(100).collect::<String>()
113113
))

src/receivers/file/persistence/json_file.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -261,19 +261,32 @@ fn atomic_write(path: &Path, state: &DatabaseState) -> Result<()> {
261261
serde_json::to_writer_pretty(&mut writer, state)
262262
.map_err(|e| Error::Persistence(format!("failed to write database: {}", e)))?;
263263

264-
// Ensure all data is flushed to disk before rename
264+
// Flush userspace buffer, then fsync to ensure data reaches disk before rename.
265+
// Without sync_all(), a power failure after rename could leave a valid filename
266+
// pointing at a file whose data blocks haven't been persisted.
265267
use std::io::Write;
266268
writer
267269
.flush()
268270
.map_err(|e| Error::Persistence(format!("failed to flush database: {}", e)))?;
269-
270-
// Drop the writer to close the file handle before rename
271-
drop(writer);
271+
let file = writer
272+
.into_inner()
273+
.map_err(|e| Error::Persistence(format!("failed to flush buffer: {}", e)))?;
274+
file.sync_all()
275+
.map_err(|e| Error::Persistence(format!("failed to fsync database: {}", e)))?;
276+
drop(file);
272277

273278
// Rename temp to final (atomic on most filesystems)
274279
fs::rename(&temp_path, path)
275280
.map_err(|e| Error::Persistence(format!("failed to rename database file: {}", e)))?;
276281

282+
// Fsync parent directory to ensure the rename (directory entry update) is durable.
283+
// Without this, a power failure could roll back to the old directory state.
284+
if let Some(parent) = path.parent() {
285+
if let Ok(dir) = File::open(parent) {
286+
let _ = dir.sync_all();
287+
}
288+
}
289+
277290
Ok(())
278291
}
279292

0 commit comments

Comments
 (0)