diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d2f1ac..38ddb7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - feat(cli): refactor check command interface from positional arguments to `--conn` and `--table` options - feat(cli): add comprehensive test coverage for new CLI interface functionality - feat(cli): support explicit table name specification independent of database URL +- feat(schema): add comprehensive multi-table support for schema validation +- feat(schema): support multi-table rules format with table-level configuration options +- feat(schema): add Excel multi-sheet file support as data source +- feat(schema): implement table-grouped output display for multi-table validation results +- feat(schema): add table-level options support (strict_mode, case_insensitive) +- feat(tests): add comprehensive multi-table functionality test coverage +- feat(tests): add multi-table Excel file validation test scenarios ### Changed - **BREAKING CHANGE**: CLI interface changed from `vlite-cli check ` to `vlite-cli check --conn --table ` @@ -18,12 +25,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - refactor(cli): modify check command to pass table_name to SourceParser.parse_source() - refactor(tests): update all existing CLI tests to use new interface format - refactor(tests): add new test cases specifically for table name parameter validation +- refactor(schema): enhance schema command to support both single-table and multi-table formats +- refactor(schema): improve output formatting with table-grouped results display +- refactor(schema): enhance rule decomposition logic for multi-table support +- refactor(data-validator): improve multi-table detection and processing capabilities +- refactor(schema): preserve field order from initial JSON definition instead of alphabetical sorting +- refactor(schema): consolidate field validation information display to single line per field ### Fixed - fix(cli): resolve issue where `--table` parameter was not correctly passed to backend - fix(cli): ensure table name from `--table` option takes precedence over table name in database URL - fix(tests): update regression tests to use new CLI interface format - fix(tests): resolve test failures caused by interface changes +- fix(schema): resolve multi-table rules validation and type checking issues +- fix(schema): improve table name detection and validation in multi-table scenarios +- fix(schema): enhance error handling for multi-table validation workflows +- fix(schema): ensure schema-only rule fields are not omitted from validation results +- fix(schema): properly display skip conventions for non-existent columns (FIELD_MISSING/TYPE_MISMATCH) ### Removed - **BREAKING CHANGE**: remove backward compatibility for old positional argument interface diff --git a/README.md b/README.md index 2fa8a6e..51062e7 100644 --- a/README.md +++ b/README.md @@ -162,11 +162,17 @@ The project includes comprehensive tests to ensure reliability. If you encounter # Run all tests with coverage pytest -vv --cov +# Run tests quietly (suppress debug messages) +python scripts/run_tests_quiet.py --cov + # Run specific test categories pytest tests/unit/ -v # Unit tests only pytest tests/integration/ -v # Integration tests pytest tests/e2e/ -v # End-to-end tests +# Run specific tests quietly +python scripts/run_tests_quiet.py tests/unit/ -v + # Code quality checks pre-commit run --all-files diff --git a/cli/commands/schema.py b/cli/commands/schema.py index dd52bc7..a216f6e 100644 --- a/cli/commands/schema.py +++ b/cli/commands/schema.py @@ -2,7 +2,7 @@ Schema Command Adds `vlite-cli schema` command that parses parameters, performs minimal rules -file validation (single-table only, no jsonschema), and prints placeholder +file validation (supports both single-table and multi-table formats), and prints output aligned with the existing CLI style. """ @@ -19,6 +19,7 @@ from shared.enums import RuleAction, RuleCategory, RuleType, SeverityLevel from shared.enums.data_types import DataType from shared.schema.base import RuleTarget, TargetEntity +from shared.schema.connection_schema import ConnectionSchema from shared.schema.rule_schema import RuleSchema from shared.utils.console import safe_echo from shared.utils.datetime_utils import now as _now @@ -37,88 +38,135 @@ } -def _validate_rules_payload(payload: Any) -> Tuple[List[str], int]: - """Validate the minimal structure of the schema rules file. +def _validate_multi_table_rules_payload(payload: Any) -> Tuple[List[str], int]: + """Validate the structure of multi-table schema rules file. - This performs non-jsonschema checks: - - Top-level must be an object with a `rules` array - - Warn and ignore top-level `table` if present - - Validate each rule item fields and types: - - field: required str - - type: optional str in allowed set - - required: optional bool - - enum: optional list - - min/max: optional numeric (int or float) + Multi-table format: + { + "table1": { + "rules": [...], + "strict_mode": true + }, + "table2": { + "rules": [...] + } + } Returns: - warnings, rules_count - - Raises: - click.UsageError: if structure or types are invalid + warnings, total_rules_count """ warnings: List[str] = [] + total_rules = 0 if not isinstance(payload, dict): - raise click.UsageError("Rules file must be a JSON object with a 'rules' array") + raise click.UsageError("Rules file must be a JSON object") - if "table" in payload: - warnings.append( - "Top-level 'table' is ignored; table is derived from data-source" - ) + # Check if this is a multi-table format (has table names as keys) + table_names = [key for key in payload.keys() if key != "rules"] - if "tables" in payload: - # Explicitly reject multi-table format in v1 - raise click.UsageError( - "'tables' is not supported in v1; use single-table 'rules' only" - ) + if table_names: + # Multi-table format + for table_name in table_names: + table_schema = payload[table_name] + if not isinstance(table_schema, dict): + raise click.UsageError(f"Table '{table_name}' schema must be an object") - rules = payload.get("rules") - if not isinstance(rules, list): - raise click.UsageError("'rules' must be an array") + table_rules = table_schema.get("rules") + if not isinstance(table_rules, list): + raise click.UsageError( + f"Table '{table_name}' must have a 'rules' array" + ) - for idx, item in enumerate(rules): - if not isinstance(item, dict): - raise click.UsageError(f"rules[{idx}] must be an object") + # Validate each rule in this table + for idx, item in enumerate(table_rules): + if not isinstance(item, dict): + raise click.UsageError( + f"Table '{table_name}' rules[{idx}] must be an object" + ) - # field - field_name = item.get("field") - if not isinstance(field_name, str) or not field_name: - raise click.UsageError(f"rules[{idx}].field must be a non-empty string") + # Validate rule fields + _validate_single_rule_item(item, f"Table '{table_name}' rules[{idx}]") - # type - if "type" in item: - type_name = item["type"] - if not isinstance(type_name, str): + total_rules += len(table_rules) + + # Validate optional table-level switches + if "strict_mode" in table_schema and not isinstance( + table_schema["strict_mode"], bool + ): raise click.UsageError( - f"rules[{idx}].type must be a string when provided" + f"Table '{table_name}' strict_mode must be a boolean" ) - if type_name.lower() not in _ALLOWED_TYPE_NAMES: - allowed = ", ".join(sorted(_ALLOWED_TYPE_NAMES)) + if "case_insensitive" in table_schema and not isinstance( + table_schema["case_insensitive"], bool + ): raise click.UsageError( - f"rules[{idx}].type '{type_name}' is not supported. " - f"Allowed: {allowed}" + f"Table '{table_name}' case_insensitive must be a boolean" ) + else: + # Single-table format (backward compatibility) + warnings.append( + "Single-table format detected; consider using multi-table format for " + "better organization" + ) + if "rules" not in payload: + raise click.UsageError("Single-table format must have a 'rules' array") + + rules = payload["rules"] + if not isinstance(rules, list): + raise click.UsageError("'rules' must be an array") + + for idx, item in enumerate(rules): + if not isinstance(item, dict): + raise click.UsageError(f"rules[{idx}] must be an object") + _validate_single_rule_item(item, f"rules[{idx}]") + + total_rules = len(rules) - # required - if "required" in item and not isinstance(item["required"], bool): + return warnings, total_rules + + +def _validate_single_rule_item(item: Dict[str, Any], context: str) -> None: + """Validate a single rule item from the rules array.""" + # field + field_name = item.get("field") + if not isinstance(field_name, str) or not field_name: + raise click.UsageError(f"{context}.field must be a non-empty string") + + # type + if "type" in item: + type_name = item["type"] + if not isinstance(type_name, str): + raise click.UsageError(f"{context}.type must be a string when provided") + if type_name.lower() not in _ALLOWED_TYPE_NAMES: + allowed = ", ".join(sorted(_ALLOWED_TYPE_NAMES)) raise click.UsageError( - f"rules[{idx}].required must be a boolean when provided" + f"{context}.type '{type_name}' is not supported. " f"Allowed: {allowed}" ) - # enum - if "enum" in item and not isinstance(item["enum"], list): - raise click.UsageError(f"rules[{idx}].enum must be an array when provided") + # required + if "required" in item and not isinstance(item["required"], bool): + raise click.UsageError(f"{context}.required must be a boolean when provided") - # min/max - for bound_key in ("min", "max"): - if bound_key in item: - value = item[bound_key] - if not isinstance(value, (int, float)): - raise click.UsageError( - f"rules[{idx}].{bound_key} must be numeric when provided" - ) + # enum + if "enum" in item and not isinstance(item["enum"], list): + raise click.UsageError(f"{context}.enum must be an array when provided") + + # min/max + for bound_key in ("min", "max"): + if bound_key in item: + value = item[bound_key] + if not isinstance(value, (int, float)): + raise click.UsageError( + f"{context}.{bound_key} must be numeric when provided" + ) + + +def _validate_rules_payload(payload: Any) -> Tuple[List[str], int]: + """Validate the minimal structure of the schema rules file. - return warnings, len(rules) + This performs non-jsonschema checks for both single-table and multi-table formats. + """ + return _validate_multi_table_rules_payload(payload) def _map_type_name_to_datatype(type_name: str) -> DataType: @@ -200,16 +248,73 @@ def _create_rule_schema( ) -def _decompose_to_atomic_rules(payload: Dict[str, Any]) -> List[RuleSchema]: - """Decompose schema JSON payload into atomic RuleSchema objects. +def _decompose_schema_payload( + payload: Dict[str, Any], source_config: ConnectionSchema +) -> List[RuleSchema]: + """Decompose a schema payload into atomic RuleSchema objects. + + This function handles both single-table and multi-table formats in a + source-agnostic way. + """ + all_atomic_rules: List[RuleSchema] = [] + source_db = source_config.db_name or "unknown" + + is_multi_table_format = "rules" not in payload + + if is_multi_table_format: + tables_in_rules = list(payload.keys()) + available_tables_from_source = set(source_config.available_tables or []) + + for table_name in tables_in_rules: + if ( + available_tables_from_source + and table_name not in available_tables_from_source + ): + logger.warning( + f"Skipping rules for table '{table_name}' as it is not available " + "in the source." + ) + continue + + table_schema = payload[table_name] + if not isinstance(table_schema, dict): + logger.warning( + f"Definition for table '{table_name}' is not a valid object, " + "skipping." + ) + continue + + table_rules = _decompose_single_table_schema( + table_schema, source_db, table_name + ) + all_atomic_rules.extend(table_rules) + else: + table_name = "unknown" + if source_config.available_tables: + table_name = source_config.available_tables[0] + else: + logger.warning( + "Could not determine table name for single-table schema. " + "Consider using multi-table format for database sources." + ) + + table_rules = _decompose_single_table_schema(payload, source_db, table_name) + all_atomic_rules.extend(table_rules) - Rules per item: - - type -> contributes to table-level SCHEMA columns mapping - - required -> NOT_NULL(column) - - min/max -> RANGE(column, min_value/max_value) - - enum -> ENUM(column, allowed_values) + return all_atomic_rules + + +def _decompose_single_table_schema( + table_schema: Dict[str, Any], source_db: str, table_name: str +) -> List[RuleSchema]: + """Decompose a single table's schema definition into atomic RuleSchema objects. + + Args: + table_schema: The schema definition for a single table + source_db: Database name from source + table_name: Name of the table being validated """ - rules_arr = payload.get("rules", []) + rules_arr = table_schema.get("rules", []) # Build SCHEMA columns mapping first columns_map: Dict[str, Dict[str, Any]] = {} @@ -275,65 +380,80 @@ def _decompose_to_atomic_rules(payload: Dict[str, Any]) -> List[RuleSchema]: # Create one table-level SCHEMA rule if any columns were declared if columns_map: schema_params: Dict[str, Any] = {"columns": columns_map} - # Optional switches at top-level - if isinstance(payload.get("strict_mode"), bool): - schema_params["strict_mode"] = payload["strict_mode"] - if isinstance(payload.get("case_insensitive"), bool): - schema_params["case_insensitive"] = payload["case_insensitive"] + # Optional switches at table level + if isinstance(table_schema.get("strict_mode"), bool): + schema_params["strict_mode"] = table_schema["strict_mode"] + if isinstance(table_schema.get("case_insensitive"), bool): + schema_params["case_insensitive"] = table_schema["case_insensitive"] atomic_rules.insert( 0, _create_rule_schema( - name="schema", + name=f"schema_{table_name}", rule_type=RuleType.SCHEMA, column=None, parameters=schema_params, - description="CLI: table schema existence+type", + description=f"CLI: table schema existence+type for {table_name}", ), ) + # Set the target table and database for all rules + for rule in atomic_rules: + if rule.target and rule.target.entities: + rule.target.entities[0].database = source_db + rule.target.entities[0].table = table_name + return atomic_rules def _build_prioritized_atomic_status( *, - schema_result: Dict[str, Any] | None, + schema_results: List[Dict[str, Any]], atomic_rules: List[RuleSchema], ) -> Dict[str, Dict[str, str]]: - """Return a mapping rule_id -> {status, skip_reason} applying prioritization. + """Return a mapping rule_id -> {status, skip_reason} applying prioritization.""" + mapping: Dict[str, Dict[str, str]] = {} + schema_failures: Dict[str, str] = ( + {} + ) # Key: f"{table}.{column}", Value: failure_code - Prioritization per column: - 1) If field missing → mark SCHEMA for that field as FAILED (implicit) and all - dependent rules (NOT_NULL/RANGE/ENUM) as SKIPPED (reason FIELD_MISSING). - 2) If type mismatch → mark dependent rules as SKIPPED (reason TYPE_MISMATCH). - 3) Otherwise, leave dependent rules to their engine-evaluated status. + schema_rules_map = { + str(rule.id): rule for rule in atomic_rules if rule.type == RuleType.SCHEMA + } - We infer per-column status from schema_result.execution_plan.schema_details. - """ - mapping: Dict[str, Dict[str, str]] = {} + for res in schema_results: + rule_id = str(res.get("rule_id", "")) + rule = schema_rules_map.get(rule_id) + if not rule: + continue + + table = rule.get_target_info().get("table", "") + details = ( + res.get("execution_plan", {}) + .get("schema_details", {}) + .get("field_results", []) + ) - # Build per-column guard from SCHEMA details - column_guard: Dict[str, str] = {} # column -> NONE|FIELD_MISSING|TYPE_MISMATCH - if schema_result: - # Safely access nested dictionaries, checking for None at each level. - execution_plan = schema_result.get("execution_plan") or {} - schema_details = execution_plan.get("schema_details") or {} - details = schema_details.get("field_results") or [] for item in details: - col = str(item.get("column")) - code = str(item.get("failure_code", "NONE")) - column_guard[col] = code + code = item.get("failure_code") + if code in ("FIELD_MISSING", "TYPE_MISMATCH"): + col = item.get("column") + if col: + schema_failures[f"{table}.{col}"] = code - # Apply skip to dependent rules - for r in atomic_rules: - if r.type == RuleType.SCHEMA: + if not schema_failures: + return {} + + for rule in atomic_rules: + if rule.type == RuleType.SCHEMA: continue - column = r.get_target_column() or "" - guard = column_guard.get(column, "NONE") - if guard == "FIELD_MISSING": - mapping[r.id] = {"status": "SKIPPED", "skip_reason": "FIELD_MISSING"} - elif guard == "TYPE_MISMATCH": - mapping[r.id] = {"status": "SKIPPED", "skip_reason": "TYPE_MISMATCH"} + + col = rule.get_target_column() + table = rule.get_target_info().get("table", "") + + if col and f"{table}.{col}" in schema_failures: + reason = schema_failures[f"{table}.{col}"] + mapping[str(rule.id)] = {"status": "SKIPPED", "skip_reason": reason} return mapping @@ -369,9 +489,11 @@ def _read_rules_payload(rules_file: str) -> Dict[str, Any]: return cast(Dict[str, Any], payload) -def _emit_warnings(warnings: List[str]) -> None: - for msg in warnings: - _safe_echo(f"⚠️ Warning: {msg}", err=True) +def _emit_warnings(warnings: List[str], output: str = "table") -> None: + """Emit warnings only for non-JSON output to avoid polluting JSON output.""" + if output.lower() != "json": + for msg in warnings: + _safe_echo(f"⚠️ Warning: {msg}", err=True) def _early_exit_when_no_rules( @@ -436,43 +558,42 @@ def _run_validation(validator: Any) -> Tuple[List[Any], float]: return results, exec_seconds -def _extract_schema_result_dict( +def _extract_schema_results( *, atomic_rules: List[RuleSchema], results: List[Any] -) -> Dict[str, Any] | None: - try: - schema_rule = next( - (rule for rule in atomic_rules if rule.type == RuleType.SCHEMA), None - ) - if not schema_rule: - return None - for r in results: - if r is None: - continue - rid = "" - if hasattr(r, "rule_id"): - try: - rid = str(getattr(r, "rule_id")) - except Exception: - rid = "" - elif isinstance(r, dict): - rid = str(r.get("rule_id", "")) - if rid == str(schema_rule.id): - return ( - r.model_dump() - if hasattr(r, "model_dump") - else cast(Dict[str, Any], r) - ) - return None - except Exception: - return None +) -> List[Dict[str, Any]]: + """Extract all SCHEMA rule results from the list of validation results.""" + schema_results = [] + schema_rule_ids = { + str(rule.id) for rule in atomic_rules if rule.type == RuleType.SCHEMA + } + if not schema_rule_ids: + return [] + + for r in results: + if r is None: + continue + rid = "" + if hasattr(r, "rule_id"): + try: + rid = str(getattr(r, "rule_id")) + except Exception: + rid = "" + elif isinstance(r, dict): + rid = str(r.get("rule_id", "")) + + if rid in schema_rule_ids: + schema_results.append( + r.model_dump() if hasattr(r, "model_dump") else cast(Dict[str, Any], r) + ) + return schema_results def _compute_skip_map( - *, atomic_rules: List[RuleSchema], schema_result_dict: Dict[str, Any] | None + *, atomic_rules: List[RuleSchema], schema_results: List[Dict[str, Any]] ) -> Dict[str, Dict[str, str]]: try: return _build_prioritized_atomic_status( - schema_result=schema_result_dict, atomic_rules=atomic_rules + schema_results=schema_results, atomic_rules=atomic_rules ) except Exception: return {} @@ -485,7 +606,7 @@ def _emit_json_output( atomic_rules: List[RuleSchema], results: List[Any], skip_map: Dict[str, Dict[str, str]], - schema_result_dict: Dict[str, Any] | None, + schema_results: List[Dict[str, Any]], exec_seconds: float, ) -> None: enriched_results: List[Dict[str, Any]] = [] @@ -523,14 +644,24 @@ def _failed_records_of(res: Dict[str, Any]) -> int: fields: List[Dict[str, Any]] = [] schema_fields_index: Dict[str, Dict[str, Any]] = {} - if schema_result_dict: - schema_plan = (schema_result_dict or {}).get("execution_plan", {}) or {} + schema_rules_map = { + str(rule.id): rule for rule in atomic_rules if rule.type == RuleType.SCHEMA + } + + for schema_result in schema_results: + schema_plan = (schema_result or {}).get("execution_plan", {}) or {} schema_details = schema_plan.get("schema_details", {}) or {} field_results = schema_details.get("field_results", []) or [] + + rule_id = str(schema_result.get("rule_id", "")) + rule = schema_rules_map.get(rule_id) + table_name = rule.get_target_info().get("table") if rule else "unknown" + for item in field_results: col_name = str(item.get("column")) entry: Dict[str, Any] = { "column": col_name, + "table": table_name, "checks": { "existence": { "status": item.get("existence", "UNKNOWN"), @@ -543,25 +674,25 @@ def _failed_records_of(res: Dict[str, Any]) -> int: }, } fields.append(entry) - schema_fields_index[col_name] = entry - - schema_rule = next( - (rule for rule in atomic_rules if rule.type == RuleType.SCHEMA), None - ) - if schema_rule: - params = schema_rule.parameters or {} - declared_cols = (params.get("columns") or {}).keys() - for col in declared_cols: - if str(col) not in schema_fields_index: - entry = { - "column": str(col), - "checks": { - "existence": {"status": "UNKNOWN", "failure_code": "NONE"}, - "type": {"status": "UNKNOWN", "failure_code": "NONE"}, - }, - } - fields.append(entry) - schema_fields_index[str(col)] = entry + schema_fields_index[f"{table_name}.{col_name}"] = entry + + for rule in atomic_rules: + if rule.type == RuleType.SCHEMA: + params = rule.parameters or {} + declared_cols = (params.get("columns") or {}).keys() + table_name = rule.get_target_info().get("table") + for col in declared_cols: + if f"{table_name}.{str(col)}" not in schema_fields_index: + entry = { + "column": str(col), + "table": table_name, + "checks": { + "existence": {"status": "UNKNOWN", "failure_code": "NONE"}, + "type": {"status": "UNKNOWN", "failure_code": "NONE"}, + }, + } + fields.append(entry) + schema_fields_index[f"{table_name}.{str(col)}"] = entry def _ensure_check(entry: Dict[str, Any], name: str) -> Dict[str, Any]: checks: Dict[str, Dict[str, Any]] = entry.setdefault("checks", {}) @@ -580,14 +711,23 @@ def _ensure_check(entry: Dict[str, Any], name: str) -> Dict[str, Any]: rule = rule_map.get(rule_id) if not rule or rule.type == RuleType.SCHEMA: continue + column_name = rule.get_target_column() or "" if not column_name: continue - l_entry = schema_fields_index.get(column_name) + + table_name = "unknown" + if rule.target and rule.target.entities: + table_name = rule.target.entities[0].table + + l_entry = schema_fields_index.get(f"{table_name}.{column_name}") if not l_entry: - l_entry = {"column": column_name, "checks": {}} + l_entry = {"column": column_name, "table": table_name, "checks": {}} fields.append(l_entry) - schema_fields_index[column_name] = l_entry + schema_fields_index[f"{table_name}.{column_name}"] = l_entry + else: + l_entry["table"] = table_name + t = rule.type if t == RuleType.NOT_NULL: key = "not_null" @@ -601,11 +741,13 @@ def _ensure_check(entry: Dict[str, Any], name: str) -> Dict[str, Any]: key = "date_format" else: key = t.value.lower() + check = _ensure_check(l_entry, key) check["status"] = str(rd.get("status", "UNKNOWN")) if rule_id in skip_map: check["status"] = skip_map[rule_id]["status"] check["skip_reason"] = skip_map[rule_id]["skip_reason"] + fr = _failed_records_of(rd) if fr: check["failed_records"] = fr @@ -623,18 +765,18 @@ def _ensure_check(entry: Dict[str, Any], name: str) -> Dict[str, Any]: total_failed_records = sum(_failed_records_of(r) for r in enriched_results) schema_extras: List[str] = [] - if schema_result_dict: + for schema_result in schema_results: try: extras = ( - (schema_result_dict or {}) + (schema_result or {}) .get("execution_plan", {}) .get("schema_details", {}) .get("extras", []) ) if isinstance(extras, list): - schema_extras = [str(x) for x in extras] + schema_extras.extend([str(x) for x in extras]) except Exception: - schema_extras = [] + pass payload: Dict[str, Any] = { "status": "ok", @@ -653,7 +795,7 @@ def _ensure_check(entry: Dict[str, Any], name: str) -> Dict[str, Any]: "fields": fields, } if schema_extras: - payload["schema_extras"] = sorted(schema_extras) + payload["schema_extras"] = sorted(list(set(schema_extras))) _safe_echo(json.dumps(payload, default=str)) @@ -663,7 +805,7 @@ def _emit_table_output( atomic_rules: List[RuleSchema], results: List[Any], skip_map: Dict[str, Dict[str, str]], - schema_result_dict: Dict[str, Any] | None, + schema_results: List[Dict[str, Any]], exec_seconds: float, ) -> None: rule_map = {str(rule.id): rule for rule in atomic_rules} @@ -699,14 +841,21 @@ def _dataset_total(res: Dict[str, Any]) -> int: rd["rule_type"] = rule.type.value rd["column_name"] = rule.get_target_column() rd.setdefault("rule_name", rule.name) + if rule.target and rule.target.entities: + rd["table_name"] = rule.target.entities[0].table if rid in skip_map: rd["status"] = skip_map[rid]["status"] rd["skip_reason"] = skip_map[rid]["skip_reason"] table_results.append(rd) - header_total_records = 0 + table_records: Dict[str, int] = {} for rd in table_results: - header_total_records = max(header_total_records, _dataset_total(rd)) + table_name = rd.get("table_name", "unknown") + total = _dataset_total(rd) + if total > 0: + table_records[table_name] = max(table_records.get(table_name, 0), total) + + header_total_records = sum(table_records.values()) def _calc_failed(res: Dict[str, Any]) -> int: if isinstance(res.get("failed_records"), int): @@ -726,101 +875,182 @@ def _calc_failed(res: Dict[str, Any]) -> int: if "total_records" not in rd: rd["total_records"] = _dataset_total(rd) - column_guard: Dict[str, str] = {} - if schema_result_dict: - execution_plan = schema_result_dict.get("execution_plan") or {} - schema_details = execution_plan.get("schema_details") or {} - details = schema_details.get("field_results") or [] - for item in details: - col = str(item.get("column")) - column_guard[col] = str(item.get("failure_code", "NONE")) - - grouped: Dict[str, Dict[str, Any]] = {} - schema_rule = next((r for r in atomic_rules if r.type == RuleType.SCHEMA), None) - declared_cols: List[str] = [] - if schema_rule: - params = schema_rule.parameters or {} - declared_cols = list((params.get("columns") or {}).keys()) - for col in declared_cols: - grouped[str(col)] = {"column": str(col), "issues": []} + tables_grouped: Dict[str, Dict[str, Dict[str, Any]]] = {} for rd in table_results: - rid = str(rd.get("rule_id", "")) - rule = rule_map.get(rid) - if not rule or rule.type == RuleType.SCHEMA: - continue - col = rule.get_target_column() or "" - if not col: + if rd.get("rule_type") == RuleType.SCHEMA.value: continue - entry = grouped.setdefault(col, {"column": col, "issues": []}) - status = str(rd.get("status", "UNKNOWN")) - if rule.type == RuleType.NOT_NULL: - key = "not_null" - elif rule.type == RuleType.RANGE: - key = "range" - elif rule.type == RuleType.ENUM: - key = "enum" - elif rule.type == RuleType.REGEX: - key = "regex" - elif rule.type == RuleType.DATE_FORMAT: - key = "date_format" - else: - key = rule.type.value.lower() - if column_guard.get(col) == "FIELD_MISSING": + table_name = rd.get("table_name", "unknown") + if table_name not in tables_grouped: + tables_grouped[table_name] = {} + + col = rd.get("column_name", "") + if col: + if col not in tables_grouped[table_name]: + tables_grouped[table_name][col] = {"column": col, "issues": []} + + status: Any = str(rd.get("status", "UNKNOWN")) + if rd.get("rule_type") == RuleType.NOT_NULL.value: + key = "not_null" + elif rd.get("rule_type") == RuleType.RANGE.value: + key = "range" + elif rd.get("rule_type") == RuleType.ENUM.value: + key = "enum" + else: + key = rd.get("rule_type", "unknown").lower() + + if status in {"FAILED", "ERROR", "SKIPPED"}: + tables_grouped[table_name][col]["issues"].append( + { + "check": key, + "status": status, + "failed_records": int(rd.get("failed_records", 0) or 0), + "skip_reason": rd.get("skip_reason"), + } + ) + + all_columns_by_table: Dict[str, List[str]] = {} + for rule in atomic_rules: + if rule.target and rule.target.entities: + table_name = rule.target.entities[0].table + if table_name not in all_columns_by_table: + all_columns_by_table[table_name] = [] + + if rule.type == RuleType.SCHEMA: + if rule.parameters: + declared_cols = (rule.parameters.get("columns") or {}).keys() + for col in declared_cols: + if str(col) not in all_columns_by_table[table_name]: + all_columns_by_table[table_name].append(str(col)) + else: + column_name = rule.get_target_column() + if column_name and column_name not in all_columns_by_table[table_name]: + all_columns_by_table[table_name].append(column_name) + + for table_name, columns in all_columns_by_table.items(): + if table_name not in tables_grouped: + tables_grouped[table_name] = {} + for column_name in columns: + if column_name not in tables_grouped[table_name]: + tables_grouped[table_name][column_name] = { + "column": column_name, + "issues": [], + } + + schema_rules_map = { + str(rule.id): rule for rule in atomic_rules if rule.type == RuleType.SCHEMA + } + for schema_result in schema_results: + rule_id = str(schema_result.get("rule_id", "")) + rule = schema_rules_map.get(rule_id) + if not rule: continue - if column_guard.get(col) == "TYPE_MISMATCH" and key in { - "not_null", - "range", - "enum", - "regex", - "date_format", - }: + + table_name = rule.get_target_info().get("table") + if not table_name or table_name not in tables_grouped: continue - if status in {"FAILED", "ERROR", "SKIPPED"}: - entry["issues"].append( - { - "check": key, - "status": status, - "failed_records": int(rd.get("failed_records", 0) or 0), - "skip_reason": skip_map.get(rid, {}).get("skip_reason"), - } - ) + + execution_plan = schema_result.get("execution_plan") or {} + schema_details = execution_plan.get("schema_details", {}) or {} + details = schema_details.get("field_results", []) or [] + for item in details: + col = str(item.get("column")) + if col not in tables_grouped[table_name]: + continue + if item.get("failure_code") == "FIELD_MISSING": + tables_grouped[table_name][col]["issues"].append( + {"check": "missing", "status": "FAILED"} + ) + elif item.get("failure_code") == "TYPE_MISMATCH": + tables_grouped[table_name][col]["issues"].append( + {"check": "type", "status": "FAILED"} + ) lines: List[str] = [] - lines.append(f"✓ Checking {source} ({header_total_records:,} records)") + lines.append(f"✓ Checking {source}") total_failed_records = sum( int(r.get("failed_records", 0) or 0) for r in table_results ) - for col in sorted(grouped.keys()): - guard = column_guard.get(col, "NONE") - if guard == "FIELD_MISSING": - lines.append(f"✗ {col}: missing (skipped dependent checks)") - continue - if guard == "TYPE_MISMATCH": - lines.append(f"✗ {col}: type mismatch (skipped dependent checks)") - continue - issues = grouped[col]["issues"] - critical = [i for i in issues if i["status"] in {"FAILED", "ERROR"}] - if not critical: - lines.append(f"✓ {col}: OK") - else: - for i in critical: - fr = i.get("failed_records") or 0 - if i["status"] == "ERROR": - lines.append(f"✗ {col}: {i['check']} error") - else: - lines.append(f"✗ {col}: {i['check']} failed ({fr} failures)") - - total_columns = len(grouped) + sorted_tables = sorted(tables_grouped.keys()) + + for table_name in sorted_tables: + records = table_records.get(table_name, 0) + lines.append(f"\n📋 Table: {table_name} ({records:,} records)") + + table_grouped = tables_grouped[table_name] + ordered_columns = all_columns_by_table.get(table_name, []) + + # Fallback for columns that might appear in results but not in rules + # (e.g., from a different source) + result_columns = sorted(table_grouped.keys()) + for col in result_columns: + if col not in ordered_columns: + ordered_columns.append(col) + + for col in ordered_columns: + if col not in table_grouped: + lines.append(f"✓ {col}: OK") + continue + + issues = table_grouped[col]["issues"] + + if not issues: + lines.append(f"✓ {col}: OK") + continue + + is_missing = any( + i.get("check") == "missing" or i.get("skip_reason") == "FIELD_MISSING" + for i in issues + ) + + if is_missing: + lines.append(f"✗ {col}: missing (skipped dependent checks)") + continue + + unique_issues: Dict[Tuple[str, str], Dict[str, Any]] = {} + for issue in issues: + key_ = (str(issue.get("status")), str(issue.get("check"))) + if key_ not in unique_issues: + unique_issues[key_] = issue + + final_issues = sorted( + unique_issues.values(), key=lambda x: str(x.get("check")) + ) + + issue_descs: List[str] = [] + for i in final_issues: + status = i.get("status") + check = i.get("check", "unknown") + + if status in {"FAILED", "ERROR"}: + fr = i.get("failed_records", 0) + if status == "ERROR": + issue_descs.append(f"{check} error") + else: + issue_descs.append(f"{check} failed ({fr} failures)") + elif status == "SKIPPED": + skip_reason = i.get("skip_reason") + if skip_reason == "TYPE_MISMATCH": + issue_descs.append("type mismatch (skipped dependent checks)") + else: + reason_text = skip_reason or "unknown reason" + issue_descs.append(f"{check} skipped ({reason_text})") + + if not issue_descs: + lines.append(f"✓ {col}: OK") + else: + lines.append(f"✗ {col}: { ', '.join(issue_descs)}") + + total_columns = sum(len(all_columns_by_table.get(t, [])) for t in sorted_tables) passed_columns = sum( - 1 - for col in grouped - if column_guard.get(col, "NONE") == "NONE" - and not [ - i for i in grouped[col]["issues"] if i["status"] in {"FAILED", "ERROR"} - ] + sum( + 1 + for c in all_columns_by_table.get(t, []) + if not tables_grouped.get(t, {}).get(c, {}).get("issues", []) + ) + for t in sorted_tables ) failed_columns = total_columns - passed_columns overall_error_rate = ( @@ -828,6 +1058,22 @@ def _calc_failed(res: Dict[str, Any]) -> int: if header_total_records == 0 else (total_failed_records / max(header_total_records, 1)) * 100 ) + + if len(tables_grouped) > 1: + lines.append("\n📊 Multi-table Summary:") + for table_name in sorted_tables: + table_cols = all_columns_by_table.get(table_name, []) + table_columns_count = len(table_cols) + table_passed = sum( + 1 + for c in table_cols + if not tables_grouped[table_name].get(c, {}).get("issues") + ) + table_failed = table_columns_count - table_passed + lines.append( + f" {table_name}: {table_passed} passed, {table_failed} failed" + ) + lines.append( f"\nSummary: {passed_columns} passed, {failed_columns} failed" f" ({overall_error_rate:.2f}% overall error rate)" @@ -844,13 +1090,13 @@ def _calc_failed(res: Dict[str, Any]) -> int: required=True, help="Database connection string or file path", ) -@click.option("--table", "table_name", required=True, help="Table name to validate") @click.option( "--rules", "rules_file", type=click.Path(exists=True, readable=True), required=True, - help="Path to schema rules file (JSON)", + help="Path to schema rules file (JSON) - supports both single-table " + "and multi-table formats", ) @click.option( "--output", @@ -863,88 +1109,49 @@ def _calc_failed(res: Dict[str, Any]) -> int: "--fail-on-error", is_flag=True, default=False, - help="Return exit code 1 if any error occurs during skeleton execution", -) -@click.option( - "--max-errors", - type=int, - default=100, - show_default=True, - help="Maximum number of errors to collect (reserved; not used in skeleton)", + help="Return exit code 1 if any error occurs during execution", ) @click.option("--verbose", is_flag=True, default=False, help="Enable verbose output") def schema_command( connection_string: str, - table_name: str, rules_file: str, output: str, fail_on_error: bool, - max_errors: int, verbose: bool, ) -> None: - """Schema validation command with minimal rules file validation. - - NEW FORMAT: - vlite-cli schema --conn --table \ - --rules [options] - - SOURCE can be: - - File path: users.csv, data.xlsx, records.json - - Database URL: mysql://user:pass@host/db - - SQLite file: sqlite:///path/to/file.db - - Examples: - vlite-cli schema --conn users.csv --table users --rules schema.json - vlite-cli schema --conn mysql://user:pass@host/db --table users \ - --rules schema.json + """ + Schema validation command with support for both single-table + and multi-table validation. """ from cli.core.config import get_cli_config from core.config import get_core_config - # start_time = now() try: _maybe_echo_analyzing(connection_string, output) _guard_empty_source_file(connection_string) source_config = SourceParser().parse_source(connection_string) - rules_payload = _read_rules_payload(rules_file) - warnings, rules_count = _validate_rules_payload(rules_payload) - _emit_warnings(warnings) + is_multi_table_rules = "rules" not in rules_payload + if is_multi_table_rules: + source_config.parameters["is_multi_table"] = True - # Decompose into atomic rules per design - atomic_rules = _decompose_to_atomic_rules(rules_payload) + warnings, rules_count = _validate_rules_payload(rules_payload) + _emit_warnings(warnings, output) - # FIX: Manually populate the target table and database from CLI args - # The source_config object is a class instance, not a dict. - # Use attribute access. - source_db = source_config.db_name - if not source_db: - source_db = "unknown" + atomic_rules = _decompose_schema_payload(rules_payload, source_config) - for rule in atomic_rules: - if rule.target and rule.target.entities: - rule.target.entities[0].database = source_db - rule.target.entities[0].table = table_name - - # get database name from SourceParser results - # source_db = source_config.get('database') - # for rule in atomic_rules: - # if rule.target and rule.target.entities: - # rule.target.entities[0].database = source_db - # rule.target.entities[0].table = table_name - # Fast-path: no rules -> emit minimal payload and exit cleanly - if len(atomic_rules) == 0: + if not atomic_rules: _early_exit_when_no_rules( source=connection_string, rules_file=rules_file, output=output, fail_on_error=fail_on_error, ) + return - # Execute via core engine using DataValidator core_config = get_core_config() cli_config = get_cli_config() validator = _create_validator( @@ -955,15 +1162,13 @@ def schema_command( ) results, exec_seconds = _run_validation(validator) - # Aggregation and prioritization - schema_result_dict: Dict[str, Any] | None = _extract_schema_result_dict( + schema_results = _extract_schema_results( atomic_rules=atomic_rules, results=results ) skip_map = _compute_skip_map( - atomic_rules=atomic_rules, schema_result_dict=schema_result_dict + atomic_rules=atomic_rules, schema_results=schema_results ) - # Apply skip map to JSON output only; table mode stays concise by design if output.lower() == "json": _emit_json_output( source=connection_string, @@ -971,7 +1176,7 @@ def schema_command( atomic_rules=atomic_rules, results=results, skip_map=skip_map, - schema_result_dict=schema_result_dict, + schema_results=schema_results, exec_seconds=exec_seconds, ) else: @@ -980,11 +1185,10 @@ def schema_command( atomic_rules=atomic_rules, results=results, skip_map=skip_map, - schema_result_dict=schema_result_dict, + schema_results=schema_results, exec_seconds=exec_seconds, ) - # Exit code: fail if any rule failed (support both model objects and dicts) def _status_of(item: Any) -> str: if hasattr(item, "status"): try: @@ -996,19 +1200,13 @@ def _status_of(item: Any) -> str: return "" any_failed = any(_status_of(r) == "FAILED" for r in results) - import click as _click - - raise _click.exceptions.Exit(1 if any_failed or fail_on_error else 0) + raise click.exceptions.Exit(1 if any_failed or fail_on_error else 0) except click.UsageError: - # Propagate Click usage errors for standard exit code (typically 2) raise except click.exceptions.Exit: - # Allow Click's explicit Exit (with code) to propagate unchanged raise - except Exception as e: # Fallback: print concise error and return generic failure + except Exception as e: logger.error(f"Schema command error: {str(e)}") _safe_echo(f"❌ Error: {str(e)}", err=True) - import click as _click - - raise _click.exceptions.Exit(1) + raise click.exceptions.Exit(1) diff --git a/cli/core/data_validator.py b/cli/core/data_validator.py index fb73021..2415f34 100644 --- a/cli/core/data_validator.py +++ b/cli/core/data_validator.py @@ -111,6 +111,13 @@ def _complete_target_info(self) -> None: This replaces the old _update_rule_connections method. """ + # If the source is multi-table, targets are already set. Do not overwrite. + if self.source_config.parameters.get("is_multi_table"): + self.logger.debug( + "Multi-table source detected, skipping target info completion." + ) + return + if not self.rules: return @@ -184,15 +191,32 @@ async def _validate_file(self) -> List[ExecutionResultSchema]: """Validate file-based data source""" self.logger.info(f"Validating file: {self.source_config.file_path}") - # Load file data - try: - df = self._load_file_data() - self.logger.info(f"Loaded {len(df)} records from file") - except Exception as e: - raise ValueError(f"Failed to load file data: {str(e)}") + # Check if this is a multi-table Excel file + is_multi_table = self.source_config.parameters.get("is_multi_table", False) + self.logger.info( + f"Multi-table detection: is_multi_table={is_multi_table}, " + f"connection_type={self.source_config.connection_type}" + ) + self.logger.info(f"Source config parameters: {self.source_config.parameters}") + + if ( + is_multi_table + and self.source_config.connection_type == ConnectionType.EXCEL + ): + # Handle multi-table Excel file + self.logger.info("Processing multi-table Excel file") + sqlite_config = await self._convert_multi_table_excel_to_sqlite() + else: + # Handle single-table file (existing logic) + self.logger.info("Processing single-table file") + try: + df = self._load_file_data() + self.logger.info(f"Loaded {len(df)} records from file") + except Exception as e: + raise ValueError(f"Failed to load file data: {str(e)}") - # Convert to SQLite for rule engine processing - sqlite_config = await self._convert_file_to_sqlite(df) + # Convert to SQLite for rule engine processing + sqlite_config = await self._convert_file_to_sqlite(df) # Execute rules using rule engine with new interface rule_engine = RuleEngine(connection=sqlite_config, core_config=self.core_config) @@ -310,6 +334,143 @@ def _load_file_data(self) -> pd.DataFrame: except Exception as e: raise ValueError(f"Failed to parse file: {str(e)}") + async def _convert_multi_table_excel_to_sqlite(self) -> ConnectionSchema: + """ + Convert multi-table Excel file to SQLite database. + + Returns: + ConnectionSchema: SQLite connection configuration + """ + import os + import tempfile + import time + + from sqlalchemy import create_engine + + temp_db_file = None + temp_db_path = None + start_time = time.time() + + try: + # Create a temporary SQLite file + temp_db_file = tempfile.NamedTemporaryFile(suffix=".db", delete=False) + temp_db_path = temp_db_file.name + temp_db_file.close() + + # Create SQLite engine + engine = create_engine(f"sqlite:///{temp_db_path}") + + # Load all sheets into SQLite + await self._load_multi_table_excel_to_sqlite(engine, temp_db_path) + + # Get table mapping for connection config + table_mapping = self.source_config.parameters.get("table_mapping", {}) + + # Create connection config with multi-table information + sqlite_config = ConnectionSchema( + name="temp_sqlite_multi_table", + description="Temporary SQLite for multi-table Excel validation", + connection_type=ConnectionType.SQLITE, + file_path=temp_db_path, + parameters={ + "is_multi_table": True, + "table_mapping": table_mapping, + "temp_file": True, # Mark as temporary file for cleanup + }, + ) + + # Log performance metrics + elapsed_time = time.time() - start_time + self.logger.info( + f"Created temporary SQLite database at {temp_db_path} with " + f"{len(table_mapping)} tables in {elapsed_time:.2f} seconds" + ) + + return sqlite_config + + except Exception as e: + # Clean up temporary file if it exists + if temp_db_path and os.path.exists(temp_db_path): + try: + os.unlink(temp_db_path) + except Exception as cleanup_error: + self.logger.warning( + f"Failed to cleanup temporary file {temp_db_path}: " + f"{cleanup_error}" + ) + raise ValueError(f"Failed to create multi-table SQLite database: {str(e)}") + + async def _load_multi_table_excel_to_sqlite( + self, engine: Any, temp_db_path: str + ) -> None: + """ + Load multiple sheets from Excel file into SQLite database. + + Args: + engine: SQLAlchemy engine for SQLite + temp_db_path: Path to temporary SQLite database + """ + import pandas as pd + + file_path = self.source_config.file_path + sheets_info = self.source_config.parameters.get("sheets", {}) + + if not sheets_info: + raise ValueError( + "Multi-table Excel file but no sheets information available" + ) + + self.logger.info( + f"Loading {len(sheets_info)} sheets into SQLite: {list(sheets_info.keys())}" + ) + + # Store table name mapping for later use + table_mapping = {} + + # Load each sheet into a separate table + for sheet_name, columns in sheets_info.items(): + try: + # Read the specific sheet + df = pd.read_excel(file_path, sheet_name=sheet_name, engine="openpyxl") + + # Validate that the sheet has the expected columns + expected_columns = set(columns) + actual_columns = set(df.columns) + + if not expected_columns.issubset(actual_columns): + missing_columns = expected_columns - actual_columns + self.logger.warning( + f"Sheet '{sheet_name}' missing expected columns: " + f"{missing_columns}" + ) + + # Write to SQLite with sheet name as table name + # Clean table name for SQLite (remove special characters) + clean_table_name = "".join( + c for c in sheet_name if c.isalnum() or c == "_" + ) + if not clean_table_name or clean_table_name[0].isdigit(): + clean_table_name = f"sheet_{clean_table_name}" + + # Store the mapping from original sheet name to clean table name + table_mapping[sheet_name] = clean_table_name + + df.to_sql(clean_table_name, engine, if_exists="replace", index=False) + self.logger.info( + f"Loaded sheet '{sheet_name}' as table '{clean_table_name}' " + f"with {len(df)} rows" + ) + + except Exception as e: + self.logger.error(f"Failed to load sheet '{sheet_name}': {str(e)}") + # Continue with other sheets + continue + + # Store the table mapping in the source config for later use + if hasattr(self, "source_config") and hasattr(self.source_config, "parameters"): + self.source_config.parameters["table_mapping"] = table_mapping + self.logger.info(f"Stored table mapping: {table_mapping}") + async def _convert_file_to_sqlite(self, df: pd.DataFrame) -> ConnectionSchema: """ Convert pandas DataFrame to SQLite in-memory database diff --git a/cli/core/source_parser.py b/cli/core/source_parser.py index e2bf3a7..7dadc59 100644 --- a/cli/core/source_parser.py +++ b/cli/core/source_parser.py @@ -8,7 +8,7 @@ import re import urllib.parse from pathlib import Path -from typing import Optional, Tuple +from typing import Dict, List, Optional, Tuple from uuid import uuid4 from cli.exceptions import ValidationError @@ -95,6 +95,62 @@ def parse_source( self.logger.error(f"{str(e)}") raise + def get_excel_sheets(self, file_path: str) -> Dict[str, List[str]]: + """ + Get sheet names from Excel file. + + Args: + file_path: Path to Excel file + + Returns: + Dict with sheet names as keys and column lists as values + + Raises: + ImportError: If pandas/openpyxl not available + FileNotFoundError: If file not found + """ + try: + import pandas as pd + except ImportError: + raise ImportError("pandas is required to read Excel files") + + try: + excel_file = pd.ExcelFile(file_path) + sheets_info = {} + + for sheet_name in excel_file.sheet_names: + # Read first few rows to get column names + df = pd.read_excel(file_path, sheet_name=sheet_name, nrows=0) + sheets_info[str(sheet_name)] = list(df.columns) + + return sheets_info + except Exception as e: + self.logger.error(f"Error reading Excel file {file_path}: {str(e)}") + raise + + def is_multi_table_excel(self, file_path: str) -> bool: + """ + Check if Excel file contains multiple sheets that could represent + multiple tables. + + Args: + file_path: Path to Excel file + + Returns: + True if file has multiple sheets, False otherwise + """ + try: + import pandas as pd + + excel_file = pd.ExcelFile(file_path) + return len(excel_file.sheet_names) > 1 + except ImportError: + # If pandas not available, assume single table + return False + except Exception: + # If any error occurs, assume single table + return False + def _is_database_url(self, source: str) -> bool: """Check if source is a database URL""" for patterns in self.db_url_patterns.values(): @@ -182,47 +238,66 @@ def _parse_file_path(self, file_path: str) -> ConnectionSchema: path = Path(file_path) - # Check if file exists if not path.exists(): raise FileNotFoundError(f"File not found: {file_path}") if not path.is_file(): raise ValidationError(f"Path is not a file: {file_path}") - # Determine file type file_ext = path.suffix.lower() conn_type = self.file_extensions.get(file_ext) if not conn_type: - # Try to infer from content or use CSV as default conn_type = ConnectionType.CSV self.logger.warning( f"Unknown file extension {file_ext}, assuming CSV format" ) + is_multi_table = False + sheets_info = {} + if conn_type == ConnectionType.EXCEL: + try: + sheets_info = self.get_excel_sheets(file_path) + if len(sheets_info) > 1: + is_multi_table = True + self.logger.info( + f"Multi-table Excel file detected with {len(sheets_info)} " + "sheets: {list(sheets_info.keys())}" + ) + except Exception as e: + self.logger.warning( + f"Could not read Excel sheets, treating as single-table: {str(e)}" + ) + is_multi_table = False + + parameters = { + "filename": path.name, + "file_size": path.stat().st_size, + "encoding": "utf-8", + } + + if is_multi_table and sheets_info: + parameters["is_multi_table"] = True + parameters["sheets"] = sheets_info + available_tables = list(sheets_info.keys()) + else: + parameters["is_multi_table"] = False + available_tables = [path.stem] + return ConnectionSchema( name=f"file_connection_{uuid4().hex[:8]}", - description=f"File connection: {path.name}", + description=f"File connection: {path.name}" + + (" (multi-table)" if is_multi_table else ""), connection_type=conn_type, - host=None, - port=None, - db_name=None, - username=None, - password=None, - db_schema=None, file_path=str(path.absolute()), - parameters={ - "filename": path.name, - "file_size": path.stat().st_size, - "encoding": "utf-8", # Default encoding - }, + parameters=parameters, + available_tables=available_tables, capabilities=DataSourceCapability( supports_sql=False, supports_batch_export=True, - max_export_rows=100000, - estimated_throughput=5000, + max_export_rows=100000 if not is_multi_table else 50000, + estimated_throughput=5000 if not is_multi_table else 2000, ), - cross_db_settings=None, ) def _detect_database_type(self, url: str) -> ConnectionType: @@ -298,14 +373,9 @@ def _create_sqlite_connection( name=f"sqlite_connection_{uuid4().hex[:8]}", description=f"SQLite connection: {Path(file_path).name}", connection_type=ConnectionType.SQLITE, - host=None, - port=None, - db_name=None, - username=None, - password=None, - db_schema=None, file_path=file_path, parameters=parameters, + available_tables=[table] if table else [], capabilities=DataSourceCapability( supports_sql=True, supports_batch_export=True, diff --git a/config/logging.test.toml b/config/logging.test.toml new file mode 100644 index 0000000..2ce2ddc --- /dev/null +++ b/config/logging.test.toml @@ -0,0 +1,37 @@ +# Test Environment Logging Configuration + +# Global log level: Set to WARNING to suppress DEBUG and INFO messages +level = "WARNING" + +# Log message format +format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + +# Enable logging to file (disabled for tests to keep output clean) +to_file = false + +# Module-specific log levels for testing +[module_levels] +# Core modules - set to WARNING to reduce noise +"shared.database.connection" = "WARNING" +"shared.database.query_executor" = "WARNING" +"cli.commands.check" = "WARNING" +"cli.core.data_validator" = "WARNING" +"cli.core.source_parser" = "WARNING" +"cli.core.rule_parser" = "WARNING" +"rule_engine" = "WARNING" +"core.engine.rule_engine" = "WARNING" + +# Third-party modules - set to ERROR to suppress all debug info +"aiosqlite" = "ERROR" +"sqlalchemy" = "ERROR" +"sqlalchemy.engine" = "ERROR" +"sqlalchemy.pool" = "ERROR" +"sqlalchemy.dialects" = "ERROR" +"pydantic" = "WARNING" +"toml" = "WARNING" +"werkzeug" = "WARNING" +"urllib3.connectionpool" = "WARNING" + +# Keep only critical errors visible +"asyncio" = "WARNING" +"pytest" = "WARNING" diff --git a/config/logging.toml b/config/logging.toml index 9630a91..b32cc7b 100644 --- a/config/logging.toml +++ b/config/logging.toml @@ -1,7 +1,7 @@ # Logging Configuration # Global log level: DEBUG, INFO, WARNING, ERROR, CRITICAL -level = "ERROR" +level = "WARNING" # Log message format format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" diff --git a/pytest.ini b/pytest.ini index 9a063be..5fcbd1d 100644 --- a/pytest.ini +++ b/pytest.ini @@ -9,6 +9,7 @@ addopts = --cov-report=term-missing --cov-report=html:htmlcov --strict-markers + --log-cli-level=ERROR python_files = test_*.py *_test.py python_classes = Test* python_functions = test_* diff --git a/scripts/run_tests_quiet.py b/scripts/run_tests_quiet.py new file mode 100644 index 0000000..31f95f4 --- /dev/null +++ b/scripts/run_tests_quiet.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +""" +Quiet test runner script that suppresses debug and info messages. + +Usage: + python scripts/run_tests_quiet.py [pytest_options...] + +Examples: + python scripts/run_tests_quiet.py + python scripts/run_tests_quiet.py -k "test_data_validator" + python scripts/run_tests_quiet.py --cov=core --cov-report=html +""" + +import os +import subprocess +import sys +from pathlib import Path + + +def main() -> None: + """Run tests with quiet logging configuration.""" + # Get the project root directory + project_root = Path(__file__).parent.parent + os.chdir(project_root) + + # Set environment variables for quiet logging + env = os.environ.copy() + env["PYTHONPATH"] = str(project_root) + + # Build pytest command with quiet options + cmd = [ + sys.executable, + "-m", + "pytest", + "--log-cli-level=WARNING", + "--tb=short", + "-v", + ] + + # Add any additional arguments passed to the script + cmd.extend(sys.argv[1:]) + + # Run pytest + try: + result = subprocess.run(cmd, env=env, cwd=project_root) + sys.exit(result.returncode) + except KeyboardInterrupt: + print("\nTest run interrupted by user") + sys.exit(1) + except Exception as e: + print(f"Error running tests: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/shared/database/query_executor.py b/shared/database/query_executor.py index eedfd83..0cd11e6 100644 --- a/shared/database/query_executor.py +++ b/shared/database/query_executor.py @@ -788,21 +788,23 @@ async def get_column_list( standardized_result = [] for col in result: # Different database dialects may use different key names - name = col.get("Field", col.get("name", col.get("column_name"))) - if name is None: - # If column name not found, try to use the first value as column - # name - if col and isinstance(col, dict) and len(col) > 0: - name = next(iter(col.values())) - else: - name = str(col) + name = col.get("Field") or col.get("name") or col.get("column_name") + type_ = col.get("Type") or col.get("data_type") or col.get("type") + + if not name: + # If column name not found, skip this column with a warning + self.logger.warning( + f"Could not determine column name from result: {col}" + ) + continue + + if not type_: + type_ = "unknown" # Create standardized column info std_col = { - "name": name, # Standardized column name key - "type": col.get( - "Type", col.get("data_type", col.get("type", "unknown")) - ), + "name": name, + "type": type_, "nullable": ( col.get("Null", col.get("is_nullable", "YES")).upper() == "YES" ), diff --git a/shared/schema/connection_schema.py b/shared/schema/connection_schema.py index 5c71258..3cad596 100644 --- a/shared/schema/connection_schema.py +++ b/shared/schema/connection_schema.py @@ -7,7 +7,7 @@ cross-database features. """ -from typing import Any, Dict +from typing import Any, Dict, List, Optional from uuid import UUID, uuid4 from pydantic import Field, model_validator @@ -34,6 +34,9 @@ class ConnectionSchema(ConnectionBase): id: UUID = Field( default_factory=uuid4, description="Unique identifier for the connection" ) + available_tables: Optional[List[str]] = Field( + default=None, description="List of available tables for file-based sources" + ) # ==================== Convenient methods ==================== diff --git a/test_data/multi_table_data.xlsx b/test_data/multi_table_data.xlsx new file mode 100644 index 0000000..f53dfd1 Binary files /dev/null and b/test_data/multi_table_data.xlsx differ diff --git a/test_data/multi_table_schema.json b/test_data/multi_table_schema.json new file mode 100644 index 0000000..088e22f --- /dev/null +++ b/test_data/multi_table_schema.json @@ -0,0 +1,31 @@ +{ + "users": { + "rules": [ + { "field": "id", "type": "integer", "required": true }, + { "field": "name", "type": "string", "required": true }, + { "field": "email", "type": "string", "required": true }, + { "field": "age", "type": "integer", "min": 0, "max": 120 }, + { "field": "status", "type": "string", "enum": ["active", "inactive", "pending"] } + ], + "strict_mode": true + }, + "products": { + "rules": [ + { "field": "product_id", "type": "integer", "required": true }, + { "field": "product_name", "type": "string", "required": true }, + { "field": "price", "type": "float", "min": 0.0 }, + { "field": "category", "type": "string", "enum": ["electronics", "clothing", "books"] }, + { "field": "in_stock", "type": "boolean" } + ] + }, + "orders": { + "rules": [ + { "field": "order_id", "type": "integer", "required": true }, + { "field": "user_id", "type": "integer", "required": true }, + { "field": "order_date", "type": "datetime", "required": true }, + { "field": "total_amount", "type": "float", "min": 0.0 }, + { "field": "order_status", "type": "string", "enum": ["pending", "confirmed", "shipped", "delivered"] } + ], + "case_insensitive": true + } +} diff --git a/test_data/schema.json b/test_data/schema.json index a0c590c..1770dc6 100644 --- a/test_data/schema.json +++ b/test_data/schema.json @@ -1,4 +1,5 @@ { + "customers": { "rules": [ { "field": "id", "type": "integer", "required": true }, { "field": "age", "type": "integer", "required": true, "min": 0, "max": 120 }, @@ -7,4 +8,5 @@ { "field": "invalid_col", "type": "string", "required": true }, { "field": "email", "type": "string" } ] + } } diff --git a/tests/conftest.py b/tests/conftest.py index e428610..87469f6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,9 +15,23 @@ # Add the project root directory to the Python path. sys.path.insert(0, str(Path(__file__).parent.parent)) +from shared.config.loader import load_config +from shared.config.logging_config import LoggingConfig + # Import the database connection management module. from shared.database.connection import close_all_engines +# Load test-specific logging configuration +try: + test_logging_config: LoggingConfig = load_config("logging.test.toml", LoggingConfig) + if test_logging_config and test_logging_config.module_levels: + # Apply test logging configuration + for module, level in test_logging_config.module_levels.items(): + _logging.getLogger(module).setLevel(getattr(_logging, level.upper())) +except Exception: + # Fallback to default configuration if test config not found + pass + # --------------------------------------------------------------------------- # Hypothesis global configuration – suppress HealthCheck for function-scoped # fixtures used in property-based tests (see OutputFormatter tests). diff --git a/tests/e2e/cli_scenarios/test_schema_command_e2e.py b/tests/e2e/cli_scenarios/test_schema_command_e2e.py new file mode 100644 index 0000000..eed2bd1 --- /dev/null +++ b/tests/e2e/cli_scenarios/test_schema_command_e2e.py @@ -0,0 +1,214 @@ +""" +E2E: vlite-cli schema on databases and table/json outputs + +Scenarios derived from notes/测试方案-数据库SchemaDrift与CLI-Schema命令.md: +- Happy path on DB URL with table/json outputs +- Drift: missing column (FIELD_MISSING), type mismatch (TYPE_MISMATCH), strict extras +- Exit codes and minimal payload when empty rules +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from tests.shared.utils.database_utils import ( + get_available_databases, + get_mysql_test_url, + get_postgresql_test_url, +) +from tests.shared.utils.e2e_test_utils import E2ETestUtils + +pytestmark = pytest.mark.e2e + + +def _db_urls() -> list[str]: + urls: list[str] = [] + available = set(get_available_databases()) + if "mysql" in available: + urls.append(get_mysql_test_url()) + if "postgresql" in available: + urls.append(get_postgresql_test_url()) + return urls + + +def _write_rules(tmp_dir: Path, payload: dict) -> str: + p = tmp_dir / "rules.json" + p.write_text(json.dumps(payload), encoding="utf-8") + return str(p) + + +def _param_db_urls() -> list[object]: + """Mypy-friendly parameter provider for pytest.mark.parametrize. + + Returns list[object] so we can mix str and pytest.param when DB not configured. + """ + out: list[object] = [] + urls = _db_urls() + if urls: + out.extend(urls) + else: + out.append(pytest.param("", marks=pytest.mark.skip(reason="No DB configured"))) + return out + + +@pytest.mark.parametrize("db_url", _param_db_urls()) +def test_happy_path_table_and_json(tmp_path: Path, db_url: str) -> None: + # Schema baseline + a couple atomic rules + rules = { + "customers": { + "rules": [ + {"field": "id", "type": "integer", "required": True}, + {"field": "email", "type": "string"}, + {"field": "age", "type": "integer", "min": 0, "max": 150}, + ], + "strict_mode": False, + "case_insensitive": True, + } + } + rules_file = _write_rules(tmp_path, rules) + + # table output + r1 = E2ETestUtils.run_cli_command( + [ + "schema", + "--conn", + db_url, + "--rules", + rules_file, + "--output", + "table", + ] + ) + assert r1.returncode in {0, 1} + assert "Checking" in r1.stdout + + # json output + r2 = E2ETestUtils.run_cli_command( + [ + "schema", + "--conn", + db_url, + "--rules", + rules_file, + "--output", + "json", + ] + ) + assert r2.returncode in {0, 1} + try: + payload = json.loads(r2.stdout) + except Exception as e: + assert False, ( + "Expected JSON output from CLI but failed to parse. " + f"Error: {e}\nSTDOUT:\n{r2.stdout}\nSTDERR:\n{r2.stderr}" + ) + assert payload["status"] == "ok" + assert payload["rules_count"] >= 1 + assert "summary" in payload and "results" in payload and "fields" in payload + + +@pytest.mark.parametrize("db_url", _param_db_urls()) +def test_drift_missing_and_type_mismatch(tmp_path: Path, db_url: str) -> None: + # Declare a missing column and mismatched type to trigger SKIPPED in JSON for dependent rules + rules = { + "customers": { + "rules": [ + {"field": "email", "type": "integer", "required": True}, # mismatch + { + "field": "status", + "type": "string", + "enum": ["active", "inactive"], + }, # missing + ], + "strict_mode": False, + "case_insensitive": True, + } + } + rules_file = _write_rules(tmp_path, rules) + + r = E2ETestUtils.run_cli_command( + [ + "schema", + "--conn", + db_url, + "--rules", + rules_file, + "--output", + "json", + ] + ) + assert r.returncode in {1, 0} + try: + payload = json.loads(r.stdout) + except Exception as e: + assert False, ( + "Expected JSON output from CLI but failed to parse. " + f"Error: {e}\nSTDOUT:\n{r.stdout}\nSTDERR:\n{r.stderr}" + ) + # Ensure field-level failure codes surface + fields = {f["column"]: f for f in payload.get("fields", [])} + assert "email" in fields and "status" in fields + + # Any dependent checks (not_null/range/enum) may be present; ensure skip reasons appear when applicable + # We accept either PASS/FAIL depending on data, but presence of checks map is required when emitted + + +@pytest.mark.parametrize("db_url", _param_db_urls()) +def test_strict_mode_extras_json(tmp_path: Path, db_url: str) -> None: + rules = { + "customers": { + "rules": [ + {"field": "id", "type": "integer"}, + ], + "strict_mode": True, + "case_insensitive": True, + } + } + rules_file = _write_rules(tmp_path, rules) + + r = E2ETestUtils.run_cli_command( + [ + "schema", + "--conn", + db_url, + "--rules", + rules_file, + "--output", + "json", + ] + ) + try: + payload = json.loads(r.stdout) + except Exception as e: + assert False, ( + "Expected JSON output from CLI but failed to parse. " + f"Error: {e}\nSTDOUT:\n{r.stdout}\nSTDERR:\n{r.stderr}" + ) + # schema_extras should appear and be an array + assert isinstance(payload.get("schema_extras", []), list) + + +def test_empty_rules_minimal_payload(tmp_path: Path) -> None: + # Use a simple CSV source to exercise early-exit path + data_file = tmp_path / "data.csv" + data_file.write_text("id\n1\n", encoding="utf-8") + rules_file = _write_rules(tmp_path, {"rules": []}) + + r = E2ETestUtils.run_cli_command( + [ + "schema", + "--conn", + str(data_file), + "--rules", + rules_file, + "--output", + "json", + ] + ) + assert r.returncode == 0 + payload = json.loads(r.stdout) + assert payload["rules_count"] == 0 diff --git a/tests/shared/builders/test_builders.py b/tests/shared/builders/test_builders.py index 2a02f8a..5a82b22 100644 --- a/tests/shared/builders/test_builders.py +++ b/tests/shared/builders/test_builders.py @@ -211,6 +211,7 @@ def __init__(self) -> None: self._username = "test_user" self._password = "test_pass" self._db_schema = "test_schema" + self._available_tables: Optional[List[str]] = None self._file_path: Optional[str] = None self._parameters: Dict[str, Any] = {} @@ -236,6 +237,12 @@ def with_database(self, db_name: str) -> "TestDataBuilder.ConnectionBuilder": self._db_name = db_name return self + def with_available_tables( + self, table_name: str + ) -> "TestDataBuilder.ConnectionBuilder": + self._available_tables = [table_name] + return self + def with_credentials( self, username: str, password: str ) -> "TestDataBuilder.ConnectionBuilder": @@ -275,6 +282,7 @@ def build(self) -> ConnectionSchema: db_schema=self._db_schema, file_path=self._file_path, parameters=self._parameters, + available_tables=self._available_tables, capabilities=DataSourceCapability(supports_sql=True), cross_db_settings=None, ) diff --git a/tests/unit/cli/commands/test_schema_command.py b/tests/unit/cli/commands/test_schema_command.py new file mode 100644 index 0000000..dc94e91 --- /dev/null +++ b/tests/unit/cli/commands/test_schema_command.py @@ -0,0 +1,262 @@ +"""Unit tests for schema command skeleton.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import pytest +from click.testing import CliRunner + +from cli.app import cli_app +from cli.core.data_validator import ExecutionResultSchema +from shared.enums.connection_types import ConnectionType + + +def _write_tmp_file(tmp_path: Path, name: str, content: str) -> str: + file_path = tmp_path / name + file_path.write_text(content, encoding="utf-8") + return str(file_path) + + +class TestSchemaCommandSkeleton: + def test_schema_command_help_registered(self) -> None: + runner = CliRunner() + result = runner.invoke(cli_app, ["--help"]) + assert result.exit_code == 0 + assert "schema" in result.output + + def test_schema_requires_source_and_rules(self, tmp_path: Path) -> None: + runner = CliRunner() + + # Missing args -> Click usage error (exit code >= 2) + result = runner.invoke(cli_app, ["schema"]) + assert result.exit_code >= 2 + + # Provide a minimal CSV and rules file + data_path = _write_tmp_file(tmp_path, "sample.csv", "id\n1\n") + rules_obj: dict[str, list[dict[str, Any]]] = {"rules": []} + rules_path = _write_tmp_file(tmp_path, "schema.json", json.dumps(rules_obj)) + + result = runner.invoke( + cli_app, ["schema", "--conn", data_path, "--rules", rules_path] + ) + assert result.exit_code == 0 + assert "Checking" in result.output + + def test_output_json_mode(self, tmp_path: Path) -> None: + runner = CliRunner() + data_path = _write_tmp_file(tmp_path, "data.csv", "id\n1\n") + rules_path = _write_tmp_file( + tmp_path, "schema.json", json.dumps({"user": {"rules": []}}) + ) + + result = runner.invoke( + cli_app, + ["schema", "--conn", data_path, "--rules", rules_path, "--output", "json"], + ) + assert result.exit_code == 0 + payload = json.loads(result.output) + assert payload["status"] == "ok" + assert payload["rules_count"] == 0 + + def test_output_json_declared_columns_always_listed( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + # Patch decomposition to include a SCHEMA rule that declares a column not in results + from shared.enums import RuleType + from shared.schema.rule_schema import RuleSchema + from tests.shared.builders import test_builders + + schema_rule: RuleSchema = ( + test_builders.TestDataBuilder.rule() + .with_name("schema") + .with_type(RuleType.SCHEMA) + .with_target("", "", "id") + .with_parameter("columns", {"id": {"expected_type": "INTEGER"}}) + .build() + ) + + # Create a mock ConnectionSchema for testing + mock_source_config = ( + test_builders.TestDataBuilder.connection() + .with_type(ConnectionType.CSV) + .with_database("test_db") + .with_available_tables("test_table") + .with_parameters({}) + .build() + ) + + monkeypatch.setattr( + "cli.commands.schema._decompose_schema_payload", + lambda payload, source_config: [schema_rule], + ) + + class DummyValidator: + def __init__( + self, source_config: Any, rules: Any, core_config: Any, cli_config: Any + ) -> None: + # Accept constructor arguments but ignore them + pass + + async def validate(self) -> list[ExecutionResultSchema]: + # Return no results to simulate missing schema details + return [] + + monkeypatch.setattr("cli.commands.schema.DataValidator", DummyValidator) + + runner = CliRunner() + data_path = _write_tmp_file(tmp_path, "data.csv", "id\n1\n") + rules_path = _write_tmp_file( + tmp_path, + "schema.json", + json.dumps({"data": {"rules": [{"field": "id", "type": "integer"}]}}), + ) + + result = runner.invoke( + cli_app, + ["schema", "--conn", data_path, "--rules", rules_path, "--output", "json"], + ) + # No failures but explicit -- in this setup lack of results implies exit 0 + assert result.exit_code == 0 + payload = json.loads(result.output) + # Declared column should still appear with UNKNOWN statuses + fields = {f["column"]: f for f in payload["fields"]} + assert "id" in fields + assert fields["id"]["checks"]["existence"]["status"] in { + "UNKNOWN", + "PASSED", + "FAILED", + } + + def test_fail_on_error_sets_exit_code_1(self, tmp_path: Path) -> None: + runner = CliRunner() + data_path = _write_tmp_file(tmp_path, "data.csv", "id\n1\n") + rules_path = _write_tmp_file(tmp_path, "schema.json", json.dumps({"rules": []})) + + result = runner.invoke( + cli_app, + [ + "schema", + "--conn", + data_path, + "--rules", + rules_path, + "--fail-on-error", + ], + ) + assert result.exit_code == 1 + + def test_invalid_rules_json_yields_usage_error(self, tmp_path: Path) -> None: + runner = CliRunner() + data_path = _write_tmp_file(tmp_path, "data.csv", "id\n1\n") + # invalid content + bad_rules_path = _write_tmp_file(tmp_path, "bad.json", "{invalid json}") + + result = runner.invoke( + cli_app, ["schema", "--conn", data_path, "--rules", bad_rules_path] + ) + + # Click usage error exit code is >= 2 + assert result.exit_code >= 2 + assert "Invalid JSON" in result.output + + +class TestSchemaCommandValidation: + def _write_tmp_file(self, tmp_path: Path, name: str, content: str) -> str: + file_path = tmp_path / name + file_path.write_text(content, encoding="utf-8") + return str(file_path) + + def test_warn_on_top_level_table_ignored(self, tmp_path: Path) -> None: + runner = CliRunner() + data_path = self._write_tmp_file(tmp_path, "data.csv", "id\n1\n") + rules = { + "users": { + "rules": [ + {"field": "id", "type": "integer", "required": True}, + ] + } + } + rules_path = self._write_tmp_file(tmp_path, "schema.json", json.dumps(rules)) + + result = runner.invoke( + cli_app, + ["schema", "--conn", data_path, "--rules", rules_path, "--output", "json"], + ) + # exit code from skeleton remains success + assert result.exit_code == 0 + # Since multi-table has been supported,so no warning emitted to stderr + # assert "table' is ignored" in (result.stderr or "") + + def test_rules_must_be_array(self, tmp_path: Path) -> None: + runner = CliRunner() + data_path = self._write_tmp_file(tmp_path, "data.csv", "id\n1\n") + rules_path = self._write_tmp_file(tmp_path, "schema.json", json.dumps({})) + + result = runner.invoke( + cli_app, ["schema", "--conn", data_path, "--rules", rules_path] + ) + assert result.exit_code >= 2 + assert "must have a 'rules' array" in result.output + + def test_rules_item_requires_field(self, tmp_path: Path) -> None: + runner = CliRunner() + data_path = self._write_tmp_file(tmp_path, "data.csv", "id\n1\n") + bad = {"rules": [{"type": "integer"}]} + rules_path = self._write_tmp_file(tmp_path, "schema.json", json.dumps(bad)) + + result = runner.invoke( + cli_app, ["schema", "--conn", data_path, "--rules", rules_path] + ) + assert result.exit_code >= 2 + assert "field must be a non-empty string" in result.output + + def test_type_must_be_supported_string(self, tmp_path: Path) -> None: + runner = CliRunner() + data_path = self._write_tmp_file(tmp_path, "data.csv", "id\n1\n") + bad = {"rules": [{"field": "id", "type": "number"}]} + rules_path = self._write_tmp_file(tmp_path, "schema.json", json.dumps(bad)) + + result = runner.invoke( + cli_app, ["schema", "--conn", data_path, "--rules", rules_path] + ) + assert result.exit_code >= 2 + assert "type 'number' is not supported" in result.output + + def test_required_must_be_boolean(self, tmp_path: Path) -> None: + runner = CliRunner() + data_path = self._write_tmp_file(tmp_path, "data.csv", "id\n1\n") + bad = {"rules": [{"field": "id", "required": "yes"}]} + rules_path = self._write_tmp_file(tmp_path, "schema.json", json.dumps(bad)) + + result = runner.invoke( + cli_app, ["schema", "--conn", data_path, "--rules", rules_path] + ) + assert result.exit_code >= 2 + assert "required must be a boolean" in result.output + + def test_enum_must_be_array(self, tmp_path: Path) -> None: + runner = CliRunner() + data_path = self._write_tmp_file(tmp_path, "data.csv", "id\n1\n") + bad = {"rules": [{"field": "flag", "enum": "01"}]} + rules_path = self._write_tmp_file(tmp_path, "schema.json", json.dumps(bad)) + + result = runner.invoke( + cli_app, ["schema", "--conn", data_path, "--rules", rules_path] + ) + assert result.exit_code >= 2 + assert "enum must be an array" in result.output + + def test_min_max_must_be_numeric(self, tmp_path: Path) -> None: + runner = CliRunner() + data_path = self._write_tmp_file(tmp_path, "data.csv", "id\n1\n") + bad = {"rules": [{"field": "age", "type": "integer", "min": "0"}]} + rules_path = self._write_tmp_file(tmp_path, "schema.json", json.dumps(bad)) + + result = runner.invoke( + cli_app, ["schema", "--conn", data_path, "--rules", rules_path] + ) + assert result.exit_code >= 2 + assert "min must be numeric" in result.output diff --git a/tests/unit/cli/commands/test_schema_command_extended.py b/tests/unit/cli/commands/test_schema_command_extended.py new file mode 100644 index 0000000..57ded12 --- /dev/null +++ b/tests/unit/cli/commands/test_schema_command_extended.py @@ -0,0 +1,451 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, Dict, List + +import pytest +from click.testing import CliRunner + +from cli.app import cli_app +from shared.enums import ( + ConnectionType, + RuleAction, + RuleCategory, + RuleType, + SeverityLevel, +) +from shared.schema.base import RuleTarget, TargetEntity +from shared.schema.rule_schema import RuleSchema +from tests.shared.builders import test_builders + + +def _write_tmp_file(tmp_path: Path, name: str, content: str) -> str: + file_path = tmp_path / name + file_path.write_text(content, encoding="utf-8") + return str(file_path) + + +def _make_rule( + *, + name: str, + rule_type: RuleType, + column: str | None, + parameters: Dict[str, Any], + description: str | None = None, +) -> RuleSchema: + target = RuleTarget( + entities=[ + TargetEntity( + database="", table="", column=column, connection_id=None, alias=None + ) + ], + relationship_type="single_table", + ) + return RuleSchema( + name=name, + description=description, + type=rule_type, + target=target, + parameters=parameters, + cross_db_config=None, + threshold=0.0, + category=( + RuleCategory.VALIDITY + if rule_type in {RuleType.SCHEMA, RuleType.RANGE, RuleType.ENUM} + else RuleCategory.COMPLETENESS + ), + severity=SeverityLevel.MEDIUM, + action=RuleAction.ALERT, + is_active=True, + tags=[], + template_id=None, + validation_error=None, + ) + + +class TestSchemaDecompositionAndMapping: + def test_map_type_names_are_case_insensitive_and_validated( + self, tmp_path: Path + ) -> None: + from cli.commands.schema import _map_type_name_to_datatype + + assert _map_type_name_to_datatype("STRING").value == "STRING" + assert _map_type_name_to_datatype("integer").value == "INTEGER" + assert _map_type_name_to_datatype("DateTime").value == "DATETIME" + + with pytest.raises(Exception): + _map_type_name_to_datatype("number") + + def test_decompose_to_atomic_rules_structure(self, tmp_path: Path) -> None: + from cli.commands.schema import _decompose_schema_payload + + payload = { + "strict_mode": True, + "case_insensitive": True, + "rules": [ + {"field": "id", "type": "integer", "required": True}, + {"field": "age", "min": 0, "max": 100}, + {"field": "status", "enum": ["A", "B"]}, + ], + } + # Create a mock ConnectionSchema for testing + mock_source_config = ( + test_builders.TestDataBuilder.connection() + .with_type(ConnectionType.CSV) + .with_database("test_db") + .with_available_tables("test_table") + .with_parameters({}) + .build() + ) + rules = _decompose_schema_payload(payload, mock_source_config) + + # First rule should be SCHEMA when any columns declared + assert rules[0].type == RuleType.SCHEMA + schema_params = rules[0].parameters or {} + assert schema_params["columns"]["id"]["expected_type"] == "INTEGER" + assert schema_params["strict_mode"] is True + assert schema_params["case_insensitive"] is True + + types = [r.type for r in rules] + # NOT_NULL created for required + assert RuleType.NOT_NULL in types + # RANGE created for min/max + assert RuleType.RANGE in types + # ENUM created when enum declared + assert RuleType.ENUM in types + + +class TestSchemaPrioritizationAndOutputs: + def test_prioritization_skip_map(self) -> None: + from cli.commands.schema import _build_prioritized_atomic_status + + # Build atomic rules manually + schema = _make_rule( + name="schema", + rule_type=RuleType.SCHEMA, + column=None, + parameters={ + "columns": { + "id": {"expected_type": "INTEGER"}, + "email": {"expected_type": "STRING"}, + "age": {"expected_type": "INTEGER"}, + } + }, + ) + not_null_email = _make_rule( + name="not_null_email", + rule_type=RuleType.NOT_NULL, + column="email", + parameters={}, + ) + range_age = _make_rule( + name="range_age", + rule_type=RuleType.RANGE, + column="age", + parameters={"min_value": 0, "max_value": 120}, + ) + + atomic_rules = [schema, not_null_email, range_age] + + # Simulate SCHEMA execution details + schema_results = [ + { + "rule_id": str(schema.id), + "execution_plan": { + "schema_details": { + "field_results": [ + {"column": "email", "failure_code": "TYPE_MISMATCH"}, + {"column": "age", "failure_code": "FIELD_MISSING"}, + {"column": "id", "failure_code": "NONE"}, + ] + } + }, + } + ] + + skip_map = _build_prioritized_atomic_status( + schema_results=schema_results, atomic_rules=atomic_rules + ) + + # email dependent rules should be skipped for TYPE_MISMATCH + assert skip_map[str(not_null_email.id)]["status"] == "SKIPPED" + assert skip_map[str(not_null_email.id)]["skip_reason"] == "TYPE_MISMATCH" + # age dependent rules should be skipped for FIELD_MISSING + assert skip_map[str(range_age.id)]["status"] == "SKIPPED" + assert skip_map[str(range_age.id)]["skip_reason"] == "FIELD_MISSING" + + def test_json_output_aggregation_and_skip_semantics( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + # Prepare known atomic rules and patch decomposition to return them + schema = _make_rule( + name="schema", + rule_type=RuleType.SCHEMA, + column=None, + parameters={ + "columns": { + "email": {"expected_type": "STRING"}, + "age": {"expected_type": "INTEGER"}, + } + }, + ) + not_null_email = _make_rule( + name="not_null_email", + rule_type=RuleType.NOT_NULL, + column="email", + parameters={}, + ) + range_age = _make_rule( + name="range_age", + rule_type=RuleType.RANGE, + column="age", + parameters={"min_value": 0, "max_value": 150}, + ) + atomic_rules = [schema, not_null_email, range_age] + + # Patch decomposition + monkeypatch.setattr( + "cli.commands.schema._decompose_schema_payload", + lambda payload, source_config: atomic_rules, + ) + + # Build SCHEMA and dependent rule results. Dependent rules are PASSED in raw + # and should be overridden to SKIPPED in JSON when schema marks issues. + schema_result = { + "rule_id": str(schema.id), + "status": "FAILED", + "dataset_metrics": [ + {"entity_name": "x", "total_records": 2, "failed_records": 2} + ], + "execution_plan": { + "schema_details": { + "field_results": [ + { + "column": "age", + "existence": "FAILED", + "type": "SKIPPED", + "failure_code": "FIELD_MISSING", + }, + { + "column": "email", + "existence": "PASSED", + "type": "FAILED", + "failure_code": "TYPE_MISMATCH", + }, + ], + "extras": [], + } + }, + } + not_null_email_result = { + "rule_id": str(not_null_email.id), + "status": "PASSED", + "dataset_metrics": [ + {"entity_name": "x", "total_records": 10, "failed_records": 0} + ], + } + range_age_result = { + "rule_id": str(range_age.id), + "status": "PASSED", + "dataset_metrics": [ + {"entity_name": "x", "total_records": 10, "failed_records": 0} + ], + } + + # Patch DataValidator.validate to return our results + class DummyValidator: + def __init__( + self, source_config: Any, rules: Any, core_config: Any, cli_config: Any + ): + # Accept all required parameters but don't use them + pass + + async def validate(self) -> List[Dict[str, Any]]: # type: ignore[override] + return [schema_result, not_null_email_result, range_age_result] + + monkeypatch.setattr("cli.commands.schema.DataValidator", DummyValidator) + + # Prepare inputs and run CLI in JSON output mode + runner = CliRunner() + data_path = _write_tmp_file(tmp_path, "data.csv", "id\n1\n") + rules_path = _write_tmp_file( + tmp_path, + "schema.json", + json.dumps( + { + "rules": [ + {"field": "email", "type": "string"}, + {"field": "age", "type": "integer"}, + ] + } + ), + ) + + result = runner.invoke( + cli_app, + ["schema", "--conn", data_path, "--rules", rules_path, "--output", "json"], + ) + + assert result.exit_code == 1 # schema failed -> non-zero + payload = json.loads(result.output) + assert payload["status"] == "ok" + assert payload["rules_count"] == len(atomic_rules) + # Results should contain SKIPPED overrides for dependent rules + results_map = {r["rule_id"]: r for r in payload["results"]} + assert results_map[str(not_null_email.id)]["status"] == "SKIPPED" + assert results_map[str(not_null_email.id)]["skip_reason"] == "TYPE_MISMATCH" + assert results_map[str(range_age.id)]["status"] == "SKIPPED" + assert results_map[str(range_age.id)]["skip_reason"] == "FIELD_MISSING" + + # Fields aggregate should include existence/type and dependent checks + fields = {f["column"]: f for f in payload["fields"]} + assert fields["age"]["checks"]["existence"]["status"] == "FAILED" + assert fields["email"]["checks"]["type"]["status"] == "FAILED" + assert fields["email"]["checks"]["not_null"]["status"] == "SKIPPED" + assert fields["age"]["checks"]["range"]["status"] == "SKIPPED" + + def test_table_output_grouping_and_skips( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + # Prepare known atomic rules and patch decomposition to return them + schema = _make_rule( + name="schema", + rule_type=RuleType.SCHEMA, + column=None, + parameters={ + "columns": { + "email": {"expected_type": "STRING"}, + "age": {"expected_type": "INTEGER"}, + } + }, + ) + not_null_email = _make_rule( + name="not_null_email", + rule_type=RuleType.NOT_NULL, + column="email", + parameters={}, + ) + range_age = _make_rule( + name="range_age", + rule_type=RuleType.RANGE, + column="age", + parameters={"min_value": 0, "max_value": 150}, + ) + atomic_rules = [schema, not_null_email, range_age] + + monkeypatch.setattr( + "cli.commands.schema._decompose_schema_payload", + lambda payload, source_config: atomic_rules, + ) + + schema_result = { + "rule_id": str(schema.id), + "status": "FAILED", + "dataset_metrics": [ + {"entity_name": "x", "total_records": 2, "failed_records": 2} + ], + "execution_plan": { + "schema_details": { + "field_results": [ + { + "column": "age", + "existence": "FAILED", + "type": "SKIPPED", + "failure_code": "FIELD_MISSING", + }, + { + "column": "email", + "existence": "PASSED", + "type": "FAILED", + "failure_code": "TYPE_MISMATCH", + }, + ], + "extras": [], + } + }, + } + # Dependent rule raw statuses set to PASSED; should be skipped for display grouping + not_null_email_result = { + "rule_id": str(not_null_email.id), + "status": "SKIPPED", + "dataset_metrics": [ + {"entity_name": "x", "total_records": 10, "failed_records": 0} + ], + "skip_reason": "TYPE_MISMATCH", + } + range_age_result = { + "rule_id": str(range_age.id), + "status": "SKIPPED", + "dataset_metrics": [ + {"entity_name": "x", "total_records": 10, "failed_records": 0} + ], + "skip_reason": "FIELD_MISSING", + } + + class DummyValidator: + def __init__(self, *args: Any, **kwargs: Any) -> None: # noqa: D401 + pass + + async def validate(self) -> List[Dict[str, Any]]: # type: ignore[override] + return [schema_result, not_null_email_result, range_age_result] + + monkeypatch.setattr("cli.commands.schema.DataValidator", DummyValidator) + + runner = CliRunner() + data_path = _write_tmp_file(tmp_path, "data.csv", "id\n1\n") + rules_path = _write_tmp_file( + tmp_path, + "schema.json", + json.dumps( + { + "rules": [ + {"field": "email", "type": "string"}, + {"field": "age", "type": "integer"}, + ] + } + ), + ) + + result = runner.invoke( + cli_app, ["schema", "--conn", data_path, "--rules", rules_path] + ) + assert result.exit_code == 1 + output = result.output + + # Should show concise messages per column with skip semantics + assert "✗ age: missing (skipped dependent checks)" in output + assert "✗ email: type mismatch (skipped dependent checks)" in output + # Should not render separate dependent issues since they are skipped + assert "not_null" not in output + assert "range" not in output + + +class TestSchemaValidationErrorsExtended: + # def test_reject_tables_top_level(self, tmp_path: Path) -> None: + # runner = CliRunner() + # data_path = _write_tmp_file(tmp_path, "data.csv", "id\n1\n") + # rules_path = _write_tmp_file( + # tmp_path, + # "schema.json", + # json.dumps({"tables": {"users": []}, "rules": []}), + # ) + + # result = runner.invoke(cli_app, ["schema", "--conn", data_path, "--rules", rules_path]) + # assert result.exit_code >= 2 + # assert "not supported in v1" in result.output + + def test_enum_must_be_non_empty_array(self, tmp_path: Path) -> None: + runner = CliRunner() + data_path = _write_tmp_file(tmp_path, "data.csv", "id\n1\n") + rules_path = _write_tmp_file( + tmp_path, + "schema.json", + json.dumps({"rules": [{"field": "status", "enum": []}]}), + ) + + result = runner.invoke( + cli_app, ["schema", "--conn", data_path, "--rules", rules_path] + ) + assert result.exit_code >= 2 + assert "enum' must be a non-empty" in result.output diff --git a/tests/unit/cli/commands/test_schema_command_file_sources.py b/tests/unit/cli/commands/test_schema_command_file_sources.py new file mode 100644 index 0000000..8b8ee95 --- /dev/null +++ b/tests/unit/cli/commands/test_schema_command_file_sources.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, Dict, List + +import pytest +from click.testing import CliRunner + +from cli.app import cli_app +from shared.enums import RuleType +from shared.schema.rule_schema import RuleSchema +from tests.shared.builders import test_builders + + +def _write_tmp_file(tmp_path: Path, name: str, content: str) -> str: + file_path = tmp_path / name + file_path.write_text(content, encoding="utf-8") + return str(file_path) + + +def _schema_rule_with(columns: Dict[str, Dict[str, str]]) -> RuleSchema: + return ( + test_builders.TestDataBuilder.rule() + .with_name("schema") + .with_type(RuleType.SCHEMA) + .with_target("main", "data", "id") + .with_parameter("columns", columns) + .build() + ) + + +class TestSchemaCommandForFileSources: + def test_csv_excel_to_sqlite_type_implications( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + # Declare DATE/DATETIME expectations; SQLite columns will be TEXT post-conversion + schema_rule = _schema_rule_with( + {"reg_date": {"expected_type": "DATE"}, "ts": {"expected_type": "DATETIME"}} + ) + monkeypatch.setattr( + "cli.commands.schema._decompose_schema_payload", + lambda payload, source_config: [schema_rule], + ) + + # Build SCHEMA result indicating SQLite TEXT types cause TYPE_MISMATCH + schema_result = { + "rule_id": str(schema_rule.id), + "status": "FAILED", + "dataset_metrics": [ + {"entity_name": "main.data", "total_records": 2, "failed_records": 2} + ], + "execution_plan": { + "schema_details": { + "field_results": [ + { + "column": "reg_date", + "existence": "PASSED", + "type": "FAILED", + "failure_code": "TYPE_MISMATCH", + }, + { + "column": "ts", + "existence": "PASSED", + "type": "FAILED", + "failure_code": "TYPE_MISMATCH", + }, + ], + "extras": [], + } + }, + } + + class DummyValidator: + def __init__( + self, source_config: Any, rules: Any, core_config: Any, cli_config: Any + ) -> None: + # Accept all required parameters but don't use them + pass + + async def validate(self) -> List[Dict[str, Any]]: # type: ignore[override] + return [schema_result] + + monkeypatch.setattr("cli.commands.schema.DataValidator", DummyValidator) + + # Prepare CSV file path as source (will be converted to SQLite inside command) + data_path = _write_tmp_file( + tmp_path, + "data.csv", + "reg_date,ts\n2023-01-01,2023-01-01T10:00:00Z\n2023-01-02,2023-01-02T11:00:00Z\n", + ) + rules_path = _write_tmp_file( + tmp_path, + "schema.json", + json.dumps( + { + "rules": [ + {"field": "reg_date", "type": "date"}, + {"field": "ts", "type": "datetime"}, + ] + } + ), + ) + + runner = CliRunner() + result = runner.invoke( + cli_app, + ["schema", "--conn", data_path, "--rules", rules_path, "--output", "json"], + ) + + assert result.exit_code == 1 + payload = json.loads(result.output) + + # The JSON `fields` section should reflect type mismatches from SQLite TEXT + fields = {f["column"]: f for f in payload["fields"]} + assert fields["reg_date"]["checks"]["type"]["status"] == "FAILED" + assert fields["ts"]["checks"]["type"]["status"] == "FAILED" diff --git a/tests/unit/cli/commands/test_schema_command_json_extras.py b/tests/unit/cli/commands/test_schema_command_json_extras.py new file mode 100644 index 0000000..d2f7100 --- /dev/null +++ b/tests/unit/cli/commands/test_schema_command_json_extras.py @@ -0,0 +1,176 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, Dict, List + +import pytest +from click.testing import CliRunner + +from cli.app import cli_app +from shared.enums import RuleType +from shared.schema.rule_schema import RuleSchema +from tests.shared.builders import test_builders + + +def _write_tmp_file(tmp_path: Path, name: str, content: str) -> str: + file_path = tmp_path / name + file_path.write_text(content, encoding="utf-8") + return str(file_path) + + +def _schema_rule_with(columns: Dict[str, Dict[str, str]]) -> RuleSchema: + return ( + test_builders.TestDataBuilder.rule() + .with_name("schema") + .with_type(RuleType.SCHEMA) + .with_target("", "", "id") + .with_parameter("columns", columns) + .with_parameter("strict_mode", True) + .build() + ) + + +class TestSchemaJsonExtrasAndSummary: + def test_json_includes_schema_extras_and_summary_counts( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + # Decomposition yields one SCHEMA rule for columns id/email + schema_rule = _schema_rule_with( + { + "id": {"expected_type": "INTEGER"}, + "email": {"expected_type": "STRING"}, + } + ) + monkeypatch.setattr( + "cli.commands.schema._decompose_schema_payload", + lambda payload, source_config: [schema_rule], + ) + + # Results: SCHEMA failed with 1 type mismatch, 0 existence failures, extras present + schema_result = { + "rule_id": str(schema_rule.id), + "status": "FAILED", + "dataset_metrics": [ + {"entity_name": "t", "total_records": 2, "failed_records": 1} + ], + "execution_plan": { + "schema_details": { + "field_results": [ + { + "column": "id", + "existence": "PASSED", + "type": "PASSED", + "failure_code": "NONE", + }, + { + "column": "email", + "existence": "PASSED", + "type": "FAILED", + "failure_code": "TYPE_MISMATCH", + }, + ], + "extras": ["zzz_extra", "aaa_extra"], + } + }, + } + + class DummyValidator: + def __init__( + self, source_config: Any, rules: Any, core_config: Any, cli_config: Any + ) -> None: + # Accept all required parameters but don't use them + pass + + async def validate(self) -> List[Dict[str, Any]]: # type: ignore[override] + return [schema_result] + + monkeypatch.setattr("cli.commands.schema.DataValidator", DummyValidator) + + runner = CliRunner() + data_path = _write_tmp_file(tmp_path, "data.csv", "id\n1\n") + rules_path = _write_tmp_file( + tmp_path, + "schema.json", + json.dumps( + { + "rules": [ + {"field": "id", "type": "integer"}, + {"field": "email", "type": "string"}, + ] + } + ), + ) + + result = runner.invoke( + cli_app, + ["schema", "--conn", data_path, "--rules", rules_path, "--output", "json"], + ) + assert result.exit_code == 1 + + # Extract JSON part from output (skip warning messages) + output_lines = result.output.strip().split("\n") + json_line = None + for line in output_lines: + if line.strip().startswith("{"): + json_line = line.strip() + break + + if not json_line: + raise ValueError("No JSON output found in result") + + payload = json.loads(json_line) + + # schema_extras must present, sorted by CLI before emission + assert payload.get("schema_extras") == ["aaa_extra", "zzz_extra"] + # summary counts + assert payload["summary"]["total_rules"] == 1 + assert payload["summary"]["failed_rules"] == 1 + assert payload["summary"]["skipped_rules"] >= 0 + assert payload["summary"]["total_failed_records"] >= 1 + + def test_table_output_does_not_emit_schema_extras_key( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + schema_rule = _schema_rule_with({"id": {"expected_type": "INTEGER"}}) + monkeypatch.setattr( + "cli.commands.schema._decompose_schema_payload", + lambda payload, source_config: [schema_rule], + ) + + schema_result = { + "rule_id": str(schema_rule.id), + "status": "PASSED", + "dataset_metrics": [ + {"entity_name": "t", "total_records": 1, "failed_records": 0} + ], + "execution_plan": { + "schema_details": {"field_results": [], "extras": ["x"]} + }, + } + + class DummyValidator: + def __init__( + self, source_config: Any, rules: Any, core_config: Any, cli_config: Any + ) -> None: + # Accept all required parameters but don't use them + pass + + async def validate(self) -> List[Dict[str, Any]]: # type: ignore[override] + return [schema_result] + + monkeypatch.setattr("cli.commands.schema.DataValidator", DummyValidator) + + runner = CliRunner() + data_path = _write_tmp_file(tmp_path, "data.csv", "id\n1\n") + rules_path = _write_tmp_file( + tmp_path, + "schema.json", + json.dumps({"rules": [{"field": "id", "type": "integer"}]}), + ) + result = runner.invoke( + cli_app, ["schema", "--conn", data_path, "--rules", rules_path] + ) + assert result.exit_code == 0 + # Plain text output should not dump JSON key name + assert "schema_extras" not in result.output diff --git a/tests/unit/cli/commands/test_schema_command_multi_table.py b/tests/unit/cli/commands/test_schema_command_multi_table.py new file mode 100644 index 0000000..0c5ecd8 --- /dev/null +++ b/tests/unit/cli/commands/test_schema_command_multi_table.py @@ -0,0 +1,387 @@ +"""Unit tests for schema command multi-table functionality.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import pytest +from click.testing import CliRunner + +from cli.app import cli_app + + +def _write_tmp_file(tmp_path: Path, name: str, content: str) -> str: + file_path = tmp_path / name + file_path.write_text(content, encoding="utf-8") + return str(file_path) + + +class TestSchemaCommandMultiTable: + def test_multi_table_rules_format_parsing(self, tmp_path: Path) -> None: + """Test that multi-table rules format is correctly parsed.""" + runner = CliRunner() + + # Create multi-table rules file + # Use the existing multi-table schema file + rules_path = "test_data/multi_table_schema.json" + # Use the new multi-table Excel file instead of CSV + data_path = "test_data/multi_table_data.xlsx" + + result = runner.invoke( + cli_app, + ["schema", "--conn", data_path, "--rules", rules_path, "--output", "json"], + ) + + assert result.exit_code == 0 + payload = json.loads(result.output) + assert payload["status"] == "ok" + assert payload["rules_count"] == 17 + + # Check that fields have table information + fields = payload["fields"] + assert len(fields) > 0 + for field in fields: + assert "table" in field + assert field["table"] in ["users", "products", "orders"] + + def test_multi_table_excel_sheets_detection(self, tmp_path: Path) -> None: + """Test that Excel file sheets are correctly detected and used as tables.""" + runner = CliRunner() + + # Create a simple multi-table rules file + multi_table_rules = { + "users": { + "rules": [ + {"field": "id", "type": "integer", "required": True}, + {"field": "name", "type": "string", "required": True}, + ] + }, + "products": { + "rules": [ + {"field": "product_id", "type": "integer", "required": True}, + {"field": "product_name", "type": "string", "required": True}, + ] + }, + } + + rules_path = _write_tmp_file( + tmp_path, "multi_table_rules.json", json.dumps(multi_table_rules) + ) + data_path = "test_data/multi_table_data.xlsx" + + result = runner.invoke( + cli_app, + ["schema", "--conn", data_path, "--rules", rules_path, "--output", "json"], + ) + + assert result.exit_code == 0 + payload = json.loads(result.output) + assert payload["status"] == "ok" + + # Check that both tables are processed + fields = payload["fields"] + user_fields = [f for f in fields if f.get("table") == "users"] + product_fields = [f for f in fields if f.get("table") == "products"] + + assert len(user_fields) > 0 + assert len(product_fields) > 0 + + def test_multi_table_with_table_level_options(self, tmp_path: Path) -> None: + """Test multi-table format with table-level options like strict_mode.""" + runner = CliRunner() + + multi_table_rules = { + "users": { + "rules": [{"field": "id", "type": "integer", "required": True}], + "strict_mode": True, + }, + "products": { + "rules": [ + {"field": "product_name", "type": "string", "required": True} + ], + "case_insensitive": True, + }, + } + + rules_path = _write_tmp_file( + tmp_path, "multi_table_options.json", json.dumps(multi_table_rules) + ) + data_path = "test_data/multi_table_data.xlsx" + + result = runner.invoke( + cli_app, + ["schema", "--conn", data_path, "--rules", rules_path, "--output", "json"], + ) + + # With strict_mode=True, extra columns will cause SCHEMA validation to fail + assert result.exit_code == 1 + payload = json.loads(result.output) + assert payload["status"] == "ok" # Overall status is ok + assert ( + payload["summary"]["failed_rules"] == 1 + ) # One rule failed due to strict mode + assert payload["summary"]["passed_rules"] == 3 # Three rules passed + + def test_multi_table_backward_compatibility(self, tmp_path: Path) -> None: + """Test that single-table format still works for backward compatibility.""" + runner = CliRunner() + + # Single-table format (legacy) + single_table_rules = { + "rules": [ + {"field": "id", "type": "integer", "required": True}, + {"field": "name", "type": "string", "required": True}, + ] + } + + rules_path = _write_tmp_file( + tmp_path, "single_table.json", json.dumps(single_table_rules) + ) + # Use only the users sheet for single table test + data_path = "test_data/multi_table_data.xlsx" + + result = runner.invoke( + cli_app, + ["schema", "--conn", data_path, "--rules", rules_path, "--output", "json"], + ) + + assert result.exit_code == 0 + + # Handle mixed output (warning + JSON) + output_lines = result.output.strip().split("\n") + json_line = None + for line in output_lines: + if line.strip().startswith("{"): + json_line = line.strip() + break + + assert json_line is not None, f"No JSON found in output: {result.output}" + + payload = json.loads(json_line) + assert payload["status"] == "ok" + assert payload["rules_count"] == 3 + + def test_multi_table_validation_errors(self, tmp_path: Path) -> None: + """Test validation errors for invalid multi-table format.""" + runner = CliRunner() + + # Invalid: table schema is not an object + invalid_rules = {"users": "not_an_object"} + + rules_path = _write_tmp_file( + tmp_path, "invalid.json", json.dumps(invalid_rules) + ) + data_path = "test_data/multi_table_data.xlsx" + + result = runner.invoke( + cli_app, ["schema", "--conn", data_path, "--rules", rules_path] + ) + + assert result.exit_code >= 2 # Usage error + assert "must be an object" in result.output + + def test_multi_table_missing_rules_array(self, tmp_path: Path) -> None: + """Test validation error when table is missing rules array.""" + runner = CliRunner() + + invalid_rules = { + "users": { + "strict_mode": True + # Missing rules array + } + } + + rules_path = _write_tmp_file( + tmp_path, "missing_rules.json", json.dumps(invalid_rules) + ) + data_path = "test_data/multi_table_data.xlsx" + + result = runner.invoke( + cli_app, ["schema", "--conn", data_path, "--rules", rules_path] + ) + + assert result.exit_code >= 2 # Usage error + assert "must have a 'rules' array" in result.output + + def test_multi_table_invalid_table_level_options(self, tmp_path: Path) -> None: + """Test validation error for invalid table-level options.""" + runner = CliRunner() + + invalid_rules = { + "users": { + "rules": [{"field": "id", "type": "integer", "required": True}], + "strict_mode": "not_a_boolean", # Should be boolean + } + } + + rules_path = _write_tmp_file( + tmp_path, "invalid_options.json", json.dumps(invalid_rules) + ) + data_path = "test_data/multi_table_data.xlsx" + + result = runner.invoke( + cli_app, ["schema", "--conn", data_path, "--rules", rules_path] + ) + + assert result.exit_code >= 2 # Usage error + assert "must be a boolean" in result.output + + def test_multi_table_output_formatting(self, tmp_path: Path) -> None: + """Test that multi-table output is properly formatted and grouped.""" + runner = CliRunner() + + multi_table_rules = { + "users": { + "rules": [ + {"field": "id", "type": "integer", "required": True}, + {"field": "name", "type": "string", "required": True}, + ] + }, + "products": { + "rules": [{"field": "product_id", "type": "integer", "required": True}] + }, + } + + rules_path = _write_tmp_file( + tmp_path, "multi_table.json", json.dumps(multi_table_rules) + ) + data_path = "test_data/multi_table_data.xlsx" + + # Test table output format + result = runner.invoke( + cli_app, + ["schema", "--conn", data_path, "--rules", rules_path, "--output", "table"], + ) + + assert result.exit_code == 0 + output = result.output + + # Should show table headers for multi-table + assert "📋 Table: users" in output + assert "📋 Table: products" in output + assert "📊 Multi-table Summary:" in output + + def test_multi_table_json_output_structure(self, tmp_path: Path) -> None: + """Test that JSON output includes table information for multi-table.""" + runner = CliRunner() + + multi_table_rules = { + "users": {"rules": [{"field": "id", "type": "integer", "required": True}]}, + "products": { + "rules": [{"field": "product_name", "type": "string", "required": True}] + }, + } + + rules_path = _write_tmp_file( + tmp_path, "multi_table.json", json.dumps(multi_table_rules) + ) + data_path = "test_data/multi_table_data.xlsx" + + result = runner.invoke( + cli_app, + ["schema", "--conn", data_path, "--rules", rules_path, "--output", "json"], + ) + + assert result.exit_code == 0 + payload = json.loads(result.output) + + # Check that fields have table information + fields = payload["fields"] + assert len(fields) >= 2 + + # Find fields for each table + user_fields = [f for f in fields if f.get("table") == "users"] + product_fields = [f for f in fields if f.get("table") == "products"] + + assert len(user_fields) > 0 + assert len(product_fields) > 0 + + # Check that each field has table info + for field in fields: + assert "table" in field + assert field["table"] in ["users", "products"] + + def test_multi_table_no_table_option_required(self, tmp_path: Path) -> None: + """Test that --table option is no longer required.""" + runner = CliRunner() + + multi_table_rules = { + "users": {"rules": [{"field": "id", "type": "integer", "required": True}]} + } + + rules_path = _write_tmp_file( + tmp_path, "multi_table.json", json.dumps(multi_table_rules) + ) + data_path = "test_data/multi_table_data.xlsx" + + # Should work without --table option + result = runner.invoke( + cli_app, ["schema", "--conn", data_path, "--rules", rules_path] + ) + + assert result.exit_code == 0 + # Command should execute successfully without --table option + + def test_multi_table_excel_specific_functionality(self, tmp_path: Path) -> None: + """Test specific Excel multi-table functionality.""" + runner = CliRunner() + + # Test with all three tables from the Excel file + multi_table_rules = { + "users": { + "rules": [ + {"field": "id", "type": "integer", "required": True}, + {"field": "name", "type": "string", "required": True}, + {"field": "email", "type": "string", "required": True}, + ] + }, + "products": { + "rules": [ + {"field": "product_id", "type": "integer", "required": True}, + {"field": "product_name", "type": "string", "required": True}, + {"field": "price", "type": "float", "min": 0.0}, + ] + }, + "orders": { + "rules": [ + {"field": "order_id", "type": "integer", "required": True}, + {"field": "user_id", "type": "integer", "required": True}, + {"field": "total_amount", "type": "float", "min": 0.0}, + ] + }, + } + + rules_path = _write_tmp_file( + tmp_path, "excel_multi_table.json", json.dumps(multi_table_rules) + ) + data_path = "test_data/multi_table_data.xlsx" + + result = runner.invoke( + cli_app, + ["schema", "--conn", data_path, "--rules", rules_path, "--output", "json"], + ) + + assert result.exit_code == 0 + payload = json.loads(result.output) + assert payload["status"] == "ok" + + # Check that all three tables are processed + fields = payload["fields"] + table_names = set(field.get("table") for field in fields) + assert "users" in table_names + assert "products" in table_names + assert "orders" in table_names + + def test_multi_table_help_text_updated(self, tmp_path: Path) -> None: + """Test that help text reflects multi-table support.""" + runner = CliRunner() + + result = runner.invoke(cli_app, ["schema", "--help"]) + assert result.exit_code == 0 + + # Should mention multi-table support + assert "multi-table" in result.output.lower() + # Should not mention --table option + assert "--table" not in result.output