diff --git a/src/main.rs b/src/main.rs index fa02f08..1124e83 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ mod runner; mod storage; use output::OutputFormat; -use rules::RulesFile; +use rules::{RulesFile, Severity}; use runner::run_rule; use storage::register_data; @@ -46,74 +46,238 @@ async fn main() { let args = Cli::parse(); let verbose = args.verbose; - if let Err(e) = run(args).await { - if verbose { - eprintln!("Error: {e:?}"); - } else { - eprintln!("Error: {e}"); + match run(args).await { + Ok(code) => std::process::exit(code), + Err(e) => { + // Check if the error carries a specific exit code + let code = if let Some(exit_err) = e.downcast_ref::() { + exit_err.code + } else { + 1 + }; + if verbose { + eprintln!("Error: {e:?}"); + } else { + eprintln!("Error: {e}"); + } + std::process::exit(code); } - std::process::exit(1); } } -async fn run(args: Cli) -> anyhow::Result<()> { - let content = std::fs::read_to_string(&args.rules).context("Could not read rules file")?; - let rules: RulesFile = - serde_yaml::from_str(&content).context("Could not parse the rules YAML")?; +/// Compute the granular exit code from the collected rule results. +/// +/// - `0` — all rules passed +/// - `1` — at least one `error`-severity rule failed +/// - `2` — only `warning`-severity rules triggered (no errors failed) +pub fn compute_exit_code(results: &[RuleResult]) -> i32 { + let has_error_fail = results + .iter() + .any(|r| matches!(r.status, RuleStatus::Fail) && matches!(r.severity, Severity::Error)); + if has_error_fail { + return 1; + } + let has_warning_fail = results + .iter() + .any(|r| matches!(r.status, RuleStatus::Fail) && matches!(r.severity, Severity::Warning)); + if has_warning_fail { + return 2; + } + 0 +} + +async fn run(args: Cli) -> anyhow::Result { + // Exit code 3: invalid rules file or schema mismatch + let content = std::fs::read_to_string(&args.rules) + .map_err(|e| anyhow::anyhow!("Could not read rules file: {e}")) + .map_err(|e| ExitCodeError::new(3, e))?; + + let rules: RulesFile = serde_yaml::from_str(&content) + .map_err(|e| anyhow::anyhow!("Could not parse the rules YAML: {e}")) + .map_err(|e| ExitCodeError::new(3, e))?; + let format: OutputFormat = args .format - .context("Could not parse output format. Valid options are json or table")?; + .context("Could not parse output format. Valid options are json or table") + .map_err(|e| ExitCodeError::new(3, e))?; + let ctx = SessionContext::new(); - register_data(&ctx, &args.file).await?; + + // Exit code 4: data file not found or unreadable + register_data(&ctx, &args.file) + .await + .map_err(|e| ExitCodeError::new(4, e))?; + let schema_cols: Vec = ctx .table("data") .await - .context("Could not read the table schema")? + .context("Could not read the table schema") + .map_err(|e| ExitCodeError::new(4, e))? .schema() .fields() .iter() .map(|c| c.name().clone()) .collect(); + let missing_cols: Vec = rules .rules .iter() .map(|c| c.column.clone()) .filter(|c| !schema_cols.contains(c)) .collect(); + if !missing_cols.is_empty() { - anyhow::bail!("Invalid columns in rules: {}", missing_cols.join(", ")); + return Err(ExitCodeError::new( + 3, + anyhow::anyhow!("Invalid columns in rules: {}", missing_cols.join(", ")), + ) + .into()); } - runner::validate_threshold(&rules.rules)?; + + runner::validate_threshold(&rules.rules).map_err(|e| ExitCodeError::new(3, e))?; + if args.dry_run { for rule in &rules.rules { runner::validate_rule(rule) - .with_context(|| format!("Rule '{}' is invalid", rule.name))?; + .with_context(|| format!("Rule '{}' is invalid", rule.name)) + .map_err(|e| ExitCodeError::new(3, e))?; } println!( "Rules file is valid. {} rules ready to run.", rules.rules.len() ); - return Ok(()); + return Ok(0); } - let mut any_failed = false; - let total_rows = run_sql(&ctx, "SELECT COUNT(*) FROM data".into()).await?; + + let total_rows = run_sql(&ctx, "SELECT COUNT(*) FROM data".into()) + .await + .map_err(|e| ExitCodeError::new(4, e))?; + if total_rows == 0 { - anyhow::bail!("Input file is empty"); + return Err(ExitCodeError::new(4, anyhow::anyhow!("Input file is empty")).into()); } + let mut results: Vec = Vec::new(); for rule in &rules.rules { let result = run_rule(&ctx, rule, total_rows) .await .with_context(|| format!("Rule '{}' failed to execute", rule.name))?; - if matches!(result.status, RuleStatus::Fail) { - any_failed = true; - } results.push(result); } + let out = format_results(&results, &format); println!("{}", out); - if any_failed { - std::process::exit(1); + + Ok(compute_exit_code(&results)) +} + +/// A wrapper error that carries a desired process exit code alongside the anyhow error chain. +#[derive(Debug)] +struct ExitCodeError { + code: i32, + inner: anyhow::Error, +} + +impl ExitCodeError { + fn new(code: i32, inner: anyhow::Error) -> Self { + ExitCodeError { code, inner } + } +} + +impl std::fmt::Display for ExitCodeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.inner) + } +} + +impl std::error::Error for ExitCodeError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.inner.source() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::rules::{Check, Rule, Severity}; + use crate::runner::{RuleResult, RuleStatus}; + + fn make_result(status: RuleStatus, severity: Severity) -> RuleResult { + RuleResult { + name: "test_rule".to_string(), + status, + severity, + violations: 0, + total_rows: 10, + violation_rate: 0.0, + } + } + + #[test] + fn test_all_pass_gives_exit_code_0() { + let results = vec![ + make_result(RuleStatus::Pass, Severity::Error), + make_result(RuleStatus::Pass, Severity::Warning), + ]; + assert_eq!(compute_exit_code(&results), 0); + } + + #[test] + fn test_error_fail_gives_exit_code_1() { + let results = vec![ + make_result(RuleStatus::Fail, Severity::Error), + make_result(RuleStatus::Pass, Severity::Warning), + ]; + assert_eq!(compute_exit_code(&results), 1); + } + + #[test] + fn test_error_and_warning_fail_gives_exit_code_1() { + let results = vec![ + make_result(RuleStatus::Fail, Severity::Error), + make_result(RuleStatus::Fail, Severity::Warning), + ]; + assert_eq!(compute_exit_code(&results), 1); + } + + #[test] + fn test_warning_only_fail_gives_exit_code_2() { + let results = vec![ + make_result(RuleStatus::Pass, Severity::Error), + make_result(RuleStatus::Fail, Severity::Warning), + ]; + assert_eq!(compute_exit_code(&results), 2); + } + + #[test] + fn test_empty_results_gives_exit_code_0() { + let results: Vec = vec![]; + assert_eq!(compute_exit_code(&results), 0); + } + + #[test] + fn test_multiple_warnings_fail_gives_exit_code_2() { + let results = vec![ + make_result(RuleStatus::Fail, Severity::Warning), + make_result(RuleStatus::Fail, Severity::Warning), + ]; + assert_eq!(compute_exit_code(&results), 2); + } + + #[test] + fn test_make_rule_helper_builds_valid_rule() { + // Verify Rule struct fields are accessible (compile-time check for struct completeness) + let rule = Rule { + name: "test".to_string(), + column: "id".to_string(), + check: Check::NotNull, + min: None, + max: None, + pattern: None, + threshold: None, + sql: None, + severity: Severity::Warning, + }; + assert_eq!(rule.severity, Severity::Warning); } - Ok(()) } diff --git a/src/output.rs b/src/output.rs index a0fff82..6bb505a 100644 --- a/src/output.rs +++ b/src/output.rs @@ -46,11 +46,12 @@ pub fn build_json(results: &[RuleResult]) -> String { pub fn build_table(results: &[RuleResult]) -> String { let mut table = Table::new(); - table.set_header(["RULE", "STATUS", "VIOLATIONS", "TOTAL", "RATE"]); + table.set_header(["RULE", "STATUS", "SEVERITY", "VIOLATIONS", "TOTAL", "RATE"]); results.iter().for_each(|res| { table.add_row([ res.name.clone(), format!("{}", res.status), + format!("{}", res.severity), res.violations.to_string(), res.total_rows.to_string(), format!("{:.1}%", res.violation_rate * 100.0), diff --git a/src/rules.rs b/src/rules.rs index 745b778..72bc0c1 100644 --- a/src/rules.rs +++ b/src/rules.rs @@ -1,4 +1,25 @@ -use serde::Deserialize; +use serde::{Deserialize, Serialize}; + +/// Severity level for a rule. +/// +/// `Error` (default) causes the pipeline to fail with exit code 1. +/// `Warning` prints a notice but does not affect the exit code beyond code 2. +#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum Severity { + #[default] + Error, + Warning, +} + +impl std::fmt::Display for Severity { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Severity::Error => write!(f, "error"), + Severity::Warning => write!(f, "warning"), + } + } +} /// Top-level structure of a rules YAML file. #[derive(Debug, Deserialize)] @@ -7,7 +28,7 @@ pub struct RulesFile { } /// A single data-quality rule targeting one column. -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] pub struct Rule { /// Human-readable name shown in output. pub name: String, @@ -24,10 +45,13 @@ pub struct Rule { pub threshold: Option, /// Full SQL expression used by the `custom` check. pub sql: Option, + /// Severity of this rule. Defaults to `error`. + #[serde(default)] + pub severity: Severity, } /// The type of check to perform on a column. -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "snake_case")] pub enum Check { /// Fails if any value in the column is NULL. @@ -47,3 +71,61 @@ pub enum Check { /// Executes `rule.sql` directly; the query must return a single violation count. Custom, } + +#[cfg(test)] +mod tests { + use super::*; + + fn parse_rule(yaml: &str) -> Rule { + let rules_file: RulesFile = serde_yaml::from_str(yaml).expect("valid YAML"); + rules_file + .rules + .into_iter() + .next() + .expect("at least one rule") + } + + #[test] + fn test_severity_defaults_to_error_when_omitted() { + let yaml = r#" +rules: + - name: no_nulls + column: id + check: not_null +"#; + let rule = parse_rule(yaml); + assert_eq!(rule.severity, Severity::Error); + } + + #[test] + fn test_severity_warning_parses_correctly() { + let yaml = r#" +rules: + - name: phone_format + column: phone + check: not_null + severity: warning +"#; + let rule = parse_rule(yaml); + assert_eq!(rule.severity, Severity::Warning); + } + + #[test] + fn test_severity_error_parses_correctly() { + let yaml = r#" +rules: + - name: no_nulls + column: id + check: not_null + severity: error +"#; + let rule = parse_rule(yaml); + assert_eq!(rule.severity, Severity::Error); + } + + #[test] + fn test_severity_display() { + assert_eq!(Severity::Error.to_string(), "error"); + assert_eq!(Severity::Warning.to_string(), "warning"); + } +} diff --git a/src/runner.rs b/src/runner.rs index 2e5ffcc..d16fe17 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -1,4 +1,4 @@ -use crate::rules::{Check, Rule}; +use crate::rules::{Check, Rule, Severity}; use anyhow::Context; use datafusion::arrow::array::Int64Array; use datafusion::prelude::*; @@ -24,6 +24,7 @@ impl std::fmt::Display for RuleStatus { pub struct RuleResult { pub name: String, pub status: RuleStatus, + pub severity: Severity, pub violations: u64, pub total_rows: u64, pub violation_rate: f64, @@ -133,6 +134,7 @@ pub async fn run_rule( Ok(RuleResult { name: rule.name.clone(), status, + severity: rule.severity.clone(), violations, total_rows, violation_rate, @@ -159,6 +161,7 @@ mod test { pattern: None, threshold: None, sql: None, + severity: Severity::Error, } } @@ -411,6 +414,7 @@ mod test { pattern: None, threshold: Some(1.5), sql: None, + severity: Severity::Error, }]; assert!(validate_threshold(&rules).is_err()); } @@ -427,6 +431,7 @@ mod test { pattern: None, threshold: Some(0.0), sql: None, + severity: Severity::Error, }, Rule { name: "b".to_string(), @@ -437,8 +442,52 @@ mod test { pattern: None, threshold: Some(1.0), sql: None, + severity: Severity::Error, }, ]; assert!(validate_threshold(&rules).is_ok()); } + + #[tokio::test] + async fn test_warning_rule_with_violations_has_fail_status_and_warning_severity() { + let ctx = + make_ctx("CREATE TABLE data AS SELECT * FROM (VALUES (1), (2), (NULL)) AS t(age)") + .await; + let rule = Rule { + severity: Severity::Warning, + ..make_rule("age_not_null", "age", Check::NotNull) + }; + let res = run_rule(&ctx, &rule, 3).await.unwrap(); + assert!(matches!(res.status, RuleStatus::Fail)); + assert_eq!(res.severity, Severity::Warning); + assert_eq!(res.violations, 1); + } + + #[tokio::test] + async fn test_error_rule_with_violations_has_fail_status_and_error_severity() { + let ctx = + make_ctx("CREATE TABLE data AS SELECT * FROM (VALUES (1), (2), (NULL)) AS t(age)") + .await; + let rule = Rule { + severity: Severity::Error, + ..make_rule("age_not_null", "age", Check::NotNull) + }; + let res = run_rule(&ctx, &rule, 3).await.unwrap(); + assert!(matches!(res.status, RuleStatus::Fail)); + assert_eq!(res.severity, Severity::Error); + assert_eq!(res.violations, 1); + } + + #[tokio::test] + async fn test_severity_propagates_to_result_on_pass() { + let ctx = + make_ctx("CREATE TABLE data AS SELECT * FROM (VALUES (1), (2), (3)) AS t(age)").await; + let rule = Rule { + severity: Severity::Warning, + ..make_rule("age_not_null", "age", Check::NotNull) + }; + let res = run_rule(&ctx, &rule, 3).await.unwrap(); + assert!(matches!(res.status, RuleStatus::Pass)); + assert_eq!(res.severity, Severity::Warning); + } } diff --git a/src/storage.rs b/src/storage.rs index 8054052..78be096 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -99,7 +99,7 @@ mod test { .unwrap(); // 4. run a rule and assert - use crate::rules::{Check, Rule}; + use crate::rules::{Check, Rule, Severity}; use crate::runner::{run_rule, RuleStatus}; let rule = Rule { @@ -111,6 +111,7 @@ mod test { pattern: None, threshold: None, sql: None, + severity: Severity::Error, }; let result = run_rule(&ctx, &rule, 3).await.unwrap(); assert!(matches!(result.status, RuleStatus::Fail)); @@ -157,7 +158,7 @@ mod test { .unwrap(); // 4. Run a rule and assert - use crate::rules::{Check, Rule}; + use crate::rules::{Check, Rule, Severity}; use crate::runner::{run_rule, RuleStatus}; let rule = Rule { @@ -169,6 +170,7 @@ mod test { pattern: None, threshold: None, sql: None, + severity: Severity::Error, }; let result = run_rule(&ctx, &rule, 3).await.unwrap(); assert!(matches!(result.status, RuleStatus::Fail));