diff --git a/Cargo.lock b/Cargo.lock index b4a9b6d86ef49..338fd114ae855 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2193,9 +2193,9 @@ dependencies = [ [[package]] name = "bon" -version = "3.8.1" +version = "3.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebeb9aaf9329dff6ceb65c689ca3db33dbf15f324909c60e4e5eef5701ce31b1" +checksum = "f47dbe92550676ee653353c310dfb9cf6ba17ee70396e1f7cf0a2020ad49b2fe" dependencies = [ "bon-macros", "rustversion", @@ -2203,11 +2203,11 @@ dependencies = [ [[package]] name = "bon-macros" -version = "3.8.1" +version = "3.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77e9d642a7e3a318e37c2c9427b5a6a48aa1ad55dcd986f3034ab2239045a645" +checksum = "519bd3116aeeb42d5372c29d982d16d0170d3d4a5ed85fc7dd91642ffff3c67c" dependencies = [ - "darling 0.21.3", + "darling 0.23.0", "ident_case", "prettyplease", "proc-macro2 1.0.106", @@ -3334,16 +3334,6 @@ dependencies = [ "darling_macro 0.20.11", ] -[[package]] -name = "darling" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" -dependencies = [ - "darling_core 0.21.3", - "darling_macro 0.21.3", -] - [[package]] name = "darling" version = "0.23.0" @@ -3368,20 +3358,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "darling_core" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" -dependencies = [ - "fnv", - "ident_case", - "proc-macro2 1.0.106", - "quote 1.0.45", - "strsim", - "syn 2.0.117", -] - [[package]] name = "darling_core" version = "0.23.0" @@ -3406,17 +3382,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "darling_macro" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" -dependencies = [ - "darling_core 0.21.3", - "quote 1.0.45", - "syn 2.0.117", -] - [[package]] name = "darling_macro" version = "0.23.0" @@ -3851,6 +3816,17 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "drain-log" +version = "0.1.0" +dependencies = [ + "bon", + "fastrand", + "smallvec", + "snafu 0.8.9", + "string-interner", +] + [[package]] name = "duct" version = "0.13.6" @@ -4224,9 +4200,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.3.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" [[package]] name = "ff" @@ -11103,6 +11079,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "string-interner" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07f9fdfdd31a0ff38b59deb401be81b73913d76c9cc5b1aed4e1330a223420b9" +dependencies = [ + "cfg-if", + "hashbrown 0.14.5", + "serde", +] + [[package]] name = "string_cache" version = "0.8.7" @@ -12890,6 +12877,7 @@ dependencies = [ "dirs-next", "dnsmsg-parser", "dnstap-parser", + "drain-log", "dyn-clone", "encoding_rs", "enum_dispatch", diff --git a/Cargo.toml b/Cargo.toml index a0baa2a76a980..fe3cb8a85a060 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,6 +101,7 @@ members = [ "lib/codecs", "lib/dnsmsg-parser", "lib/docs-renderer", + "lib/drain-log", "lib/fakedata", "lib/file-source", "lib/file-source-common", @@ -379,6 +380,7 @@ csv = { version = "1.3", default-features = false } databend-client = { version = "0.28.0", default-features = false, features = ["rustls"], optional = true } derivative.workspace = true dirs-next = { version = "2.0.0", default-features = false, optional = true } +drain-log = { path = "lib/drain-log", optional = true } dyn-clone = { version = "1.0.20", default-features = false } encoding_rs = { version = "0.8.35", default-features = false, features = ["serde"] } enum_dispatch = { version = "0.3.13", default-features = false } @@ -791,6 +793,7 @@ transforms-logs = [ "transforms-aws_ec2_metadata", "transforms-dedupe", "transforms-delay", + "transforms-drain", "transforms-filter", "transforms-window", "transforms-log_to_metric", @@ -821,6 +824,7 @@ transforms-aggregate = [] transforms-aws_ec2_metadata = ["dep:arc-swap"] transforms-dedupe = ["transforms-impl-dedupe"] transforms-delay = [] +transforms-drain = ["dep:drain-log"] transforms-filter = [] transforms-incremental_to_absolute = [] transforms-window = [] diff --git a/changelog.d/drain_transform.feature.md b/changelog.d/drain_transform.feature.md new file mode 100644 index 0000000000000..384ea4294a015 --- /dev/null +++ b/changelog.d/drain_transform.feature.md @@ -0,0 +1,9 @@ +Added a new `drain` transform that clusters log lines using the Drain log +parsing algorithm and annotates each event with a derived template string +(e.g. `user <*> logged in from <*>`). Mirrors the OpenTelemetry Collector +`drain` processor, including `seed_templates`, `seed_logs`, and +`warmup_min_clusters` for stable templates across deployments. Use the +emitted template field as input to a downstream `filter`/`route` to act on +classes of log patterns. + +authors: srstrickland diff --git a/lib/drain-log/Cargo.toml b/lib/drain-log/Cargo.toml new file mode 100644 index 0000000000000..b9375b8a9e39c --- /dev/null +++ b/lib/drain-log/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "drain-log" +version = "0.1.0" +edition = "2021" +authors = ["Vector Contributors "] +description = "Log template extraction via the Drain algorithm with LRU cluster eviction. Adapted from drain3 (akshatagarwl)." +license = "Apache-2.0" +publish = false + +[dependencies] +bon = "3.9.1" +fastrand = "2.4.1" +snafu = "0.8" +string-interner = { version = "0.15", features = ["backends"] } +smallvec = "1.13" diff --git a/lib/drain-log/LICENSE b/lib/drain-log/LICENSE new file mode 100644 index 0000000000000..c63d8ac95f8d6 --- /dev/null +++ b/lib/drain-log/LICENSE @@ -0,0 +1,17 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + Copyright 2026 Akshat Agarwal + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/lib/drain-log/NOTICE b/lib/drain-log/NOTICE new file mode 100644 index 0000000000000..4b3bfff2bb954 --- /dev/null +++ b/lib/drain-log/NOTICE @@ -0,0 +1,24 @@ +drain-log +Copyright 2026 Vector Contributors + +This product includes software derived from drain3 (Apache License 2.0): + + drain3 — Fast log template extraction via fixed-depth prefix trees + Copyright 2026 Akshat Agarwal + https://github.com/akshatagarwl/drain3 + +drain3 is itself a Rust port of logpai/Drain3: + + Drain3 — Streaming log template miner with persistence and masking + https://github.com/logpai/Drain3 + Released under the MIT License. + +Local additions on top of the upstream drain3 sources: + * True LRU eviction of clusters once `max_clusters` is reached, so the + matcher can adapt to drifting log vocabularies on long-running streams + without unbounded memory growth. The LRU is implemented as an intrusive + doubly-linked list threaded through `Cluster`, giving O(1) touch on + match and O(1) eviction on cap; freed cluster ids are recycled so the + `clusters` slot vector stays bounded. + * A `cluster_count` accessor on `Matcher` exposing the live tracked + cluster count after eviction. diff --git a/lib/drain-log/src/lib.rs b/lib/drain-log/src/lib.rs new file mode 100644 index 0000000000000..bccfafdb74896 --- /dev/null +++ b/lib/drain-log/src/lib.rs @@ -0,0 +1,1149 @@ +#![forbid(unsafe_code)] + +//! drain3 — fast log template extraction via fixed-depth prefix trees. +//! +//! Rust port of [logpai/Drain3](https://github.com/logpai/Drain3). Splits log lines into tokens, +//! clusters them by a prefix tree keyed on token count, and replaces +//! variable tokens with a param placeholder (`<*>` by default). +//! +//! # Example +//! ``` +//! use drain3::Config; +//! +//! # fn main() -> Result<(), drain3::Error> { +//! let samples: Vec = vec![ +//! "connection from 10.0.0.1 timeout after 5000ms".into(), +//! "connection from 10.0.0.2 timeout after 3000ms".into(), +//! "connection from 10.0.0.3 timeout after 1000ms".into(), +//! ]; +//! let matcher = drain3::train(&samples, Config::default())?; +//! let (id, args, ok) = matcher.match_line("connection from 192.168.1.1 timeout after 42ms"); +//! assert!(ok); +//! assert_eq!(args, vec!["192.168.1.1", "42ms"]); +//! # Ok(()) +//! # } +//! ``` +use smallvec::SmallVec; +use snafu::Snafu; +use std::sync::{Arc, Mutex}; +use string_interner::backend::BucketBackend; +use string_interner::StringInterner; + +mod prefilter; +mod render; +mod tokenizer; +mod tree; + +pub use render::RenderPlan; +pub(crate) use tree::{Cluster, Node}; + +/// Errors that can occur during training or template reconstruction. +#[derive(Debug, Snafu)] +pub enum Error { + /// Tree depth is below the minimum of 3. + #[snafu(display("depth must be >= 3, got {got}"))] + InvalidDepth { got: usize }, + /// Similarity threshold is outside [0, 1]. + #[snafu(display("similarity threshold must be in [0, 1], got {got}"))] + InvalidSimilarityThreshold { got: f64 }, + /// Match threshold is outside [0, 1]. + #[snafu(display("match threshold must be in [0, 1], got {got}"))] + InvalidMatchThreshold { got: f64 }, + /// Max children is below the minimum of 2. + #[snafu(display("max children must be >= 2, got {got}"))] + InvalidMaxChildren { got: usize }, + /// Max tokens must be >= 1. + #[snafu(display("max tokens must be >= 1, got {got}"))] + InvalidMaxTokens { got: usize }, + /// Max bytes must be >= 1. + #[snafu(display("max bytes must be >= 1, got {got}"))] + InvalidMaxBytes { got: usize }, + /// Param string was empty. + #[snafu(display("param string must not be empty"))] + EmptyParamString, + /// Template id must be > 0. + #[snafu(display("template id must be > 0, got {id}"))] + InvalidTemplateId { id: usize }, + /// Duplicate template id encountered. + #[snafu(display("duplicate template id {id}"))] + DuplicateTemplateId { id: usize }, + /// Template count must be > 0. + #[snafu(display("template {id} count must be > 0"))] + ZeroCountTemplate { id: usize }, + /// Internal error: cluster not found (programming bug). + #[snafu(display("internal error: cluster {id} not found"))] + ClusterNotFound { id: usize }, + /// Internal error: root node not initialized for token count. + #[snafu(display("internal error: root not initialized for token count {token_count}"))] + InternalRootNotInitialized { token_count: usize }, + /// Max clusters reached during training. + #[snafu(display("max clusters {limit} reached"))] + MaxClustersReached { limit: usize }, + /// Line exceeds max_bytes configuration. + #[snafu(display("line too long: {length} bytes (max: {max_bytes})"))] + LineTooLong { length: usize, max_bytes: usize }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) struct TokenId(pub(crate) u64); + +impl From for TokenId { + fn from(s: usize) -> Self { + TokenId(s as u64) + } +} + +#[allow(dead_code)] +impl From for usize { + fn from(id: TokenId) -> Self { + id.0 as usize + } +} + +const DEFAULT_DEPTH: usize = 4; + +/// Default similarity threshold for training (0.0–1.0). +/// Fraction of tokens that must match for a line to join a cluster. +const DEFAULT_SIMILARITY_THRESHOLD: f64 = 0.5; + +/// Default match threshold for matching (0.0–1.0). +/// Fraction of tokens that must match for a line to be considered a match. +const DEFAULT_MATCH_THRESHOLD: f64 = 1.0; + +/// Default max children per inner node. +/// One slot is reserved for the param catch-all child. +const DEFAULT_MAX_CHILDREN: usize = 100; + +/// Default max tokens per line. +/// Lines exceeding this are skipped during training and matching. +const DEFAULT_MAX_TOKENS: usize = 64; + +/// Default max bytes per line. +/// Lines exceeding this are skipped during training and matching. +const DEFAULT_MAX_BYTES: usize = 1024; + +/// Default max clusters. 0 = unlimited. +const DEFAULT_MAX_CLUSTERS: usize = 0; + +/// Minimum allowed tree depth. +const MIN_DEPTH: usize = 3; + +/// Minimum allowed max_children value. +const MIN_MAX_CHILDREN: usize = 2; + +/// Minimum allowed max_tokens and max_bytes value. +const MIN_LINE_LIMIT: usize = 1; + +/// Stack capacity for prefilter candidate buffer. +/// Determines how many cluster candidates can be collected without heap allocation. +const PREFILTER_CAPACITY: usize = 16; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) struct ClusterId(pub(crate) usize); + +impl From for usize { + fn from(id: ClusterId) -> Self { + id.0 + } +} + +impl From for ClusterId { + fn from(s: usize) -> Self { + ClusterId(s) + } +} + +/// Controls training and matching behavior. +#[derive(Debug, Clone, PartialEq, bon::Builder)] +pub struct Config { + #[builder(default = DEFAULT_DEPTH)] + pub depth: usize, + #[builder(default = DEFAULT_SIMILARITY_THRESHOLD)] + pub similarity_threshold: f64, + #[builder(default = DEFAULT_MATCH_THRESHOLD)] + pub match_threshold: f64, + #[builder(default = DEFAULT_MAX_CHILDREN)] + pub max_children: usize, + #[builder(default = DEFAULT_MAX_TOKENS)] + pub max_tokens: usize, + #[builder(default = DEFAULT_MAX_BYTES)] + pub max_bytes: usize, + #[builder(default = DEFAULT_MAX_CLUSTERS)] + pub max_clusters: usize, + #[builder(default = Arc::from("<*>"))] + pub param_string: Arc, + #[builder(default = true)] + pub parametrize_numeric_tokens: bool, + #[builder(default)] + pub extra_delimiters: Vec, + #[builder(default = true)] + pub enable_match_prefilter: bool, +} + +impl Config { + fn validate(&self) -> Result<(), Error> { + if self.depth < MIN_DEPTH { + return Err(Error::InvalidDepth { got: self.depth }); + } + if !(0.0..=1.0).contains(&self.similarity_threshold) { + return Err(Error::InvalidSimilarityThreshold { + got: self.similarity_threshold, + }); + } + if !(0.0..=1.0).contains(&self.match_threshold) { + return Err(Error::InvalidMatchThreshold { + got: self.match_threshold, + }); + } + if self.max_children < MIN_MAX_CHILDREN { + return Err(Error::InvalidMaxChildren { + got: self.max_children, + }); + } + if self.max_tokens < MIN_LINE_LIMIT { + return Err(Error::InvalidMaxTokens { + got: self.max_tokens, + }); + } + if self.max_bytes < MIN_LINE_LIMIT { + return Err(Error::InvalidMaxBytes { + got: self.max_bytes, + }); + } + if self.param_string.is_empty() { + return Err(Error::EmptyParamString); + } + Ok(()) + } +} + +impl Default for Config { + fn default() -> Self { + Self { + depth: DEFAULT_DEPTH, + similarity_threshold: DEFAULT_SIMILARITY_THRESHOLD, + match_threshold: DEFAULT_MATCH_THRESHOLD, + max_children: DEFAULT_MAX_CHILDREN, + max_tokens: DEFAULT_MAX_TOKENS, + max_bytes: DEFAULT_MAX_BYTES, + max_clusters: DEFAULT_MAX_CLUSTERS, + param_string: "<*>".into(), + parametrize_numeric_tokens: true, + extra_delimiters: Vec::new(), + enable_match_prefilter: true, + } + } +} + +/// A trained log template. +#[derive(Debug, Clone, PartialEq)] +pub struct Template { + id: usize, + tokens: Vec>, + params: Vec, + token_count: usize, + count: usize, +} + +impl Template { + /// Cluster id. + pub fn id(&self) -> usize { + self.id + } + /// Dense token list: only non-param tokens, in order. + pub fn tokens(&self) -> &[Arc] { + &self.tokens + } + /// Whether position `idx` is a param placeholder. + /// + /// # Panics + /// Panics if `idx` is out of bounds (>= `token_count`). + pub fn is_param(&self, idx: usize) -> bool { + self.params[idx] + } + /// Total number of positions (len(tokens) + param_count). + pub fn token_count(&self) -> usize { + self.token_count + } + /// Number of matching log lines. + pub fn count(&self) -> usize { + self.count + } +} +/// A trained DRAIN matcher. Holds the prefix tree, token dictionary, and +/// precomputed indices for fast line matching. +/// +/// Create via [`train`] or [`matcher_from_templates`]. +pub struct Matcher { + cfg: Config, + templates: Vec