Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/receivers/file/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub enum Error {
#[error("Configuration error: {0}")]
Config(String),

#[error("Parse error: {0}")]
Parse(String),

#[error("Field error: {0}")]
Field(String),

Expand Down
124 changes: 0 additions & 124 deletions src/receivers/file/input/file/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use serde::Deserialize;
use std::time::Duration;

/// Where to start reading from when a file is first discovered
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
Expand All @@ -11,126 +10,3 @@ pub enum StartAt {
#[default]
End,
}

/// Configuration for the file input operator
#[derive(Debug, Clone, Deserialize)]
pub struct FileInputConfig {
/// Unique identifier for this operator
#[serde(default = "default_id")]
pub id: String,

/// Glob patterns for files to include
pub include: Vec<String>,

/// Glob patterns for files to exclude
#[serde(default)]
pub exclude: Vec<String>,

/// How often to poll for file changes (in milliseconds)
#[serde(default = "default_poll_interval_ms")]
pub poll_interval_ms: u64,

/// Where to start reading new files from
#[serde(default)]
pub start_at: StartAt,

/// Maximum size of a single log entry (in bytes)
#[serde(default = "default_max_log_size")]
pub max_log_size: usize,

/// Maximum number of files to read concurrently
#[serde(default = "default_max_concurrent_files")]
pub max_concurrent_files: usize,

/// Whether to include the file name as a label
#[serde(default = "default_true")]
pub include_file_name: bool,

/// Whether to include the file path as a label
#[serde(default)]
pub include_file_path: bool,
}

fn default_id() -> String {
"file_input".to_string()
}

fn default_poll_interval_ms() -> u64 {
200
}

fn default_max_log_size() -> usize {
1024 * 1024 // 1MB
}

fn default_max_concurrent_files() -> usize {
512
}

fn default_true() -> bool {
true
}

impl Default for FileInputConfig {
fn default() -> Self {
Self {
id: default_id(),
include: vec![],
exclude: vec![],
poll_interval_ms: default_poll_interval_ms(),
start_at: StartAt::default(),
max_log_size: default_max_log_size(),
max_concurrent_files: default_max_concurrent_files(),
include_file_name: true,
include_file_path: false,
}
}
}

impl FileInputConfig {
/// Get the poll interval as a Duration
pub fn poll_interval(&self) -> Duration {
Duration::from_millis(self.poll_interval_ms)
}

/// Validate the configuration
pub fn validate(&self) -> Result<(), String> {
if self.include.is_empty() {
return Err("include patterns cannot be empty".to_string());
}

if self.max_log_size == 0 {
return Err("max_log_size must be positive".to_string());
}

if self.max_concurrent_files < 2 {
return Err("max_concurrent_files must be at least 2".to_string());
}

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_config_defaults() {
let config = FileInputConfig::default();
assert_eq!(config.poll_interval_ms, 200);
assert_eq!(config.max_log_size, 1024 * 1024);
assert_eq!(config.start_at, StartAt::End);
}

#[test]
fn test_config_validation() {
let mut config = FileInputConfig::default();
config.include = vec!["/var/log/*.log".to_string()];

assert!(config.validate().is_ok());

config.include = vec![];
assert!(config.validate().is_err());
}
}
2 changes: 1 addition & 1 deletion src/receivers/file/input/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod file_id;
mod finder;
mod reader;

pub use config::{FileInputConfig, StartAt};
pub use config::StartAt;
pub use file_id::{FileId, get_path_from_file};
#[cfg(test)]
pub use finder::MockFileFinder;
Expand Down
4 changes: 1 addition & 3 deletions src/receivers/file/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@ pub mod file;

#[cfg(test)]
pub use file::MockFileFinder;
pub use file::{
FileFinder, FileId, FileInputConfig, FileReader, GlobFileFinder, StartAt, get_path_from_file,
};
pub use file::{FileFinder, FileId, FileReader, GlobFileFinder, StartAt, get_path_from_file};
2 changes: 1 addition & 1 deletion src/receivers/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub mod watcher;

pub use config::FileReceiverConfig;
pub use error::{Error, Result};
pub use input::{FileFinder, FileInputConfig, FileReader, StartAt};
pub use input::{FileFinder, FileReader, StartAt};
pub use offset_committer::{FileOffsetCommitter, OffsetCommitterConfig, TrackedFileInfo};
pub use offset_tracker::{FileOffsetTracker, LineOffset};
pub use parser::{JsonParser, ParsedLog, Parser, RegexParser};
Expand Down
4 changes: 2 additions & 2 deletions src/receivers/file/parser/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Parser for JsonParser {
if self.lenient {
return Ok(ParsedLog::new());
}
return Err(Error::Config(format!("invalid JSON: {}", e)));
return Err(Error::Parse(format!("invalid JSON: {}", e)));
}
};

Expand All @@ -61,7 +61,7 @@ impl Parser for JsonParser {
if self.lenient {
Ok(ParsedLog::new())
} else {
Err(Error::Config(
Err(Error::Parse(
"JSON must be an object at the top level".to_string(),
))
}
Expand Down
2 changes: 1 addition & 1 deletion src/receivers/file/parser/regex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ fn try_parse_naive_timestamp(value: &str) -> Option<u64> {
impl Parser for RegexParser {
fn parse(&self, line: &str) -> Result<ParsedLog> {
let captures = self.regex.captures(line).ok_or_else(|| {
Error::Config(format!(
Error::Parse(format!(
"regex pattern does not match input: {:?}",
line.chars().take(100).collect::<String>()
))
Expand Down
21 changes: 17 additions & 4 deletions src/receivers/file/persistence/json_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,19 +261,32 @@ fn atomic_write(path: &Path, state: &DatabaseState) -> Result<()> {
serde_json::to_writer_pretty(&mut writer, state)
.map_err(|e| Error::Persistence(format!("failed to write database: {}", e)))?;

// Ensure all data is flushed to disk before rename
// Flush userspace buffer, then fsync to ensure data reaches disk before rename.
// Without sync_all(), a power failure after rename could leave a valid filename
// pointing at a file whose data blocks haven't been persisted.
use std::io::Write;
writer
.flush()
.map_err(|e| Error::Persistence(format!("failed to flush database: {}", e)))?;

// Drop the writer to close the file handle before rename
drop(writer);
let file = writer
.into_inner()
.map_err(|e| Error::Persistence(format!("failed to flush buffer: {}", e)))?;
file.sync_all()
.map_err(|e| Error::Persistence(format!("failed to fsync database: {}", e)))?;
drop(file);

// Rename temp to final (atomic on most filesystems)
fs::rename(&temp_path, path)
.map_err(|e| Error::Persistence(format!("failed to rename database file: {}", e)))?;

// Fsync parent directory to ensure the rename (directory entry update) is durable.
// Without this, a power failure could roll back to the old directory state.
if let Some(parent) = path.parent() {
if let Ok(dir) = File::open(parent) {
let _ = dir.sync_all();
}
}

Ok(())
}

Expand Down
Loading
Loading