Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,39 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- feat(core): Add comprehensive type aliases support (str→string, int→integer, bool→boolean)
- feat(tests): Comprehensive test coverage for type parser with unit and integration tests
- feat(tests): Native type integration testing for enhanced schema validation
- **feat(architecture): Implement two-phase execution framework in CLI with skip semantics**
- feat(schema): Add SchemaPhaseExecutor class for coordinated Phase 1 execution (schema rules only)
- feat(schema): Add DesiredTypePhaseExecutor class for coordinated Phase 2 execution (additional rules with filtering)
- feat(schema): Add ResultMerger class for combining phase results while maintaining output format consistency
- feat(schema): Comprehensive logging system for debugging two-phase execution with timing and rule counts
- feat(schema): Intelligent rule separation - automatically separate SCHEMA rules from other rule types for phased execution

### Changed
- enhance(cli): Updated schema command to support both syntactic sugar and detailed JSON type definitions
- enhance(core): Improved schema executor to handle parsed type definitions with metadata
- enhance(validation): Maintain backward compatibility with existing detailed JSON schema format
- **refactor(schema): Enhanced `_decompose_schema_payload()` to return tuple of (schema_rules, other_rules) for two-phase execution**
- refactor(schema): Added `_decompose_schema_payload_atomic()` for backward compatibility with single-list return format
- refactor(tests): Updated all schema-related test mocks to handle new tuple return format from rule decomposition
- improve(architecture): All validation maintains identical output format and behavior - no user-visible changes

### Fixed
- None
- **fix(async): Resolved RuntimeError event loop management issue in two-phase execution**
- fix(async): Consolidated both validation phases into single event loop to prevent database connection pool conflicts
- fix(async): Eliminated multiple `asyncio.run()` calls that caused "Event loop is closed" errors in production
- fix(tests): Updated test contracts and mocks to work with new two-phase execution architecture

### Removed
- None

### Architecture Notes
- **Two-Phase Execution Framework**: Implemented foundation for future desired_type compatibility analysis
- **Phase 1**: Schema rules execute first to collect native type information and validate table/column existence
- **Phase 2**: Additional rules execute with intelligent filtering based on schema analysis results (skip semantics)
- **Skip Logic**: Rules targeting missing tables/columns are automatically skipped to prevent cascading failures
- **Result Merging**: Synthetic results created for skipped rules to maintain consistent output format
- **Performance**: Current implementation optimizes for stability over concurrency - both phases execute serially within single event loop

## [0.4.3] - 2025-09-06

### Added
Expand Down
315 changes: 301 additions & 14 deletions cli/commands/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,27 @@ def _create_rule_schema(

def _decompose_schema_payload(
payload: Dict[str, Any], source_config: ConnectionSchema
) -> Tuple[List[RuleSchema], List[RuleSchema]]:
"""Decompose a schema payload into atomic RuleSchema objects, separated by phase.

This function handles both single-table and multi-table formats in a
source-agnostic way. Returns schema rules and non-schema rules separately
to support two-phase execution.

Returns:
Tuple of (schema_rules, other_rules) for two-phase execution
"""
all_atomic_rules = _decompose_schema_payload_atomic(payload, source_config)

# Separate rules by type for two-phase execution
schema_rules = [rule for rule in all_atomic_rules if rule.type == RuleType.SCHEMA]
other_rules = [rule for rule in all_atomic_rules if rule.type != RuleType.SCHEMA]

return schema_rules, other_rules


def _decompose_schema_payload_atomic(
payload: Dict[str, Any], source_config: ConnectionSchema
) -> List[RuleSchema]:
"""Decompose a schema payload into atomic RuleSchema objects.

Expand Down Expand Up @@ -894,6 +915,193 @@ def _ensure_check(entry: Dict[str, Any], name: str) -> Dict[str, Any]:
_safe_echo(json.dumps(payload, default=str))


class SchemaPhaseExecutor:
"""Executor for Phase 1: Schema rules only with native type collection."""

def __init__(self, *, source_config: Any, core_config: Any, cli_config: Any):
"""Init SchemaPhaseExecutor object"""
self.source_config = source_config
self.core_config = core_config
self.cli_config = cli_config

async def execute_schema_phase(
self, schema_rules: List[RuleSchema]
) -> Tuple[List[Any], float, List[Dict[str, Any]]]:
"""Execute schema rules and collect native type information.

Returns:
Tuple of (results, execution_seconds, schema_results)
"""
logger.debug(f"Phase 1: Executing {len(schema_rules)} schema rules")

if not schema_rules:
return [], 0.0, []

validator = _create_validator(
source_config=self.source_config,
atomic_rules=schema_rules,
core_config=self.core_config,
cli_config=self.cli_config,
)

results, exec_seconds = _run_validation(validator)
schema_results = _extract_schema_results(
atomic_rules=schema_rules, results=results
)

logger.debug(
f"Phase 1: Completed in {exec_seconds:.3f}s with {len(schema_results)} "
"schema results"
)
return results, exec_seconds, schema_results


class DesiredTypePhaseExecutor:
"""
Executor for Phase 2: Additional rules based on schema analysis
(currently with skip semantics).
"""

def __init__(
self, *, source_config: Any, core_config: Any, cli_config: Any
) -> None:
"""Init DesiredTypePhaseExecutor object"""
self.source_config = source_config
self.core_config = core_config
self.cli_config = cli_config

async def execute_additional_rules_phase(
self,
other_rules: List[RuleSchema],
schema_results: List[Dict[str, Any]],
skip_map: Dict[str, Dict[str, str]],
) -> Tuple[List[Any], float]:
"""Execute additional rules with filtering based on schema results.

Currently implements skip semantics for testing the two-phase framework.
Future versions will implement desired_type compatibility analysis.

Args:
other_rules: Non-schema rules to execute
schema_results: Results from schema phase for analysis
skip_map: Pre-computed skip decisions based on schema results

Returns:
Tuple of (results, execution_seconds)
"""
logger.debug(
f"Phase 2: Executing {len(other_rules)} additional rules "
"with skip semantics"
)

if not other_rules:
return [], 0.0

# Filter out rules that should be skipped based on schema results
filtered_rules = []
skipped_count = 0

for rule in other_rules:
rule_id = str(rule.id)
if rule_id in skip_map:
skipped_count += 1
logger.debug(
f"Phase 2: Skipping rule {rule.name} - "
f"{skip_map[rule_id]['skip_reason']}"
)
continue
filtered_rules.append(rule)

logger.debug(
f"Phase 2: Executing {len(filtered_rules)} rules, skipping {skipped_count}"
)

if not filtered_rules:
return [], 0.0

validator = _create_validator(
source_config=self.source_config,
atomic_rules=filtered_rules,
core_config=self.core_config,
cli_config=self.cli_config,
)

results, exec_seconds = _run_validation(validator)
logger.debug(f"Phase 2: Completed in {exec_seconds:.3f}s")

return results, exec_seconds


class ResultMerger:
"""Merges results from two-phase execution to maintain existing output format."""

@staticmethod
def merge_results(
schema_results_list: List[Any],
additional_results_list: List[Any],
schema_rules: List[RuleSchema],
other_rules: List[RuleSchema],
skip_map: Dict[str, Dict[str, str]],
) -> Tuple[List[Any], List[RuleSchema]]:
"""Merge results from both phases and reconstruct skipped results.

Args:
schema_results_list: Results from schema phase
additional_results_list: Results from additional rules phase
schema_rules: Schema rules that were executed
other_rules: Other rules (some may have been skipped)
skip_map: Information about skipped rules

Returns:
Tuple of (combined_results, all_atomic_rules)
"""
logger.debug("Merging results from two-phase execution")

# Combine all rules for consistent processing
all_atomic_rules = schema_rules + other_rules

# Start with executed results
combined_results = list(schema_results_list) + list(additional_results_list)

# Create synthetic results for skipped rules to maintain output consistency
executed_rule_ids = set()
for result in combined_results:
if hasattr(result, "rule_id"):
executed_rule_ids.add(str(result.rule_id))
elif isinstance(result, dict):
executed_rule_ids.add(str(result.get("rule_id", "")))

# Create placeholder results for skipped rules
for rule in other_rules:
rule_id = str(rule.id)
if rule_id in skip_map and rule_id not in executed_rule_ids:
# Create a synthetic result for skipped rule
synthetic_result = {
"rule_id": rule.id,
"status": "SKIPPED",
"skip_reason": skip_map[rule_id]["skip_reason"],
"dataset_metrics": [],
"execution_time": 0.0,
"execution_message": "Skipped due to "
f"{skip_map[rule_id]['skip_reason']}",
"error_message": None,
"sample_data": None,
"cross_db_metrics": None,
"execution_plan": {},
"started_at": None,
"ended_at": None,
}
combined_results.append(synthetic_result)

logger.debug(
f"Merged {len(schema_results_list)} schema + "
f"{len(additional_results_list)} additional + {len(skip_map)} "
f"skipped = {len(combined_results)} total results"
)

return combined_results, all_atomic_rules


def _emit_table_output(
*,
source: str,
Expand Down Expand Up @@ -1261,9 +1469,13 @@ def schema_command(
warnings, rules_count = _validate_rules_payload(rules_payload)
_emit_warnings(warnings, output)

atomic_rules = _decompose_schema_payload(rules_payload, source_config)
# Two-phase execution: separate schema and other rules
schema_rules, other_rules = _decompose_schema_payload(
rules_payload, source_config
)
all_atomic_rules = schema_rules + other_rules

if not atomic_rules:
if not all_atomic_rules:
_early_exit_when_no_rules(
source=connection_string,
rules_file=rules_file,
Expand All @@ -1274,21 +1486,96 @@ def schema_command(

core_config = get_core_config()
cli_config = get_cli_config()
validator = _create_validator(
source_config=source_config,
atomic_rules=atomic_rules,
core_config=core_config,
cli_config=cli_config,
)
results, exec_seconds = _run_validation(validator)

schema_results = _extract_schema_results(
atomic_rules=atomic_rules, results=results
)
skip_map = _compute_skip_map(
atomic_rules=atomic_rules, schema_results=schema_results
# Phase 1: Execute schema rules only
# schema_executor = SchemaPhaseExecutor(
# source_config=source_config, core_config=core_config,
# cli_config=cli_config
# )

# Execute two-phase validation in a single event loop to avoid
# connection issues
async def execute_two_phase_validation() -> tuple:
# start_time = _now()

# Phase 1: Execute schema rules only
if schema_rules:
schema_validator = _create_validator(
source_config=source_config,
atomic_rules=schema_rules,
core_config=core_config,
cli_config=cli_config,
)
schema_start = _now()
schema_results_list = await schema_validator.validate()
schema_exec_seconds = (_now() - schema_start).total_seconds()
schema_results = _extract_schema_results(
atomic_rules=schema_rules, results=schema_results_list
)
else:
schema_results_list, schema_exec_seconds, schema_results = [], 0.0, []

# Compute skip logic based on schema results
skip_map = _compute_skip_map(
atomic_rules=all_atomic_rules, schema_results=schema_results
)

# Phase 2: Execute additional rules with skip semantics
if other_rules:
# Filter out rules that should be skipped based on schema results
filtered_rules = [
rule for rule in other_rules if str(rule.id) not in skip_map
]

if filtered_rules:
additional_validator = _create_validator(
source_config=source_config,
atomic_rules=filtered_rules,
core_config=core_config,
cli_config=cli_config,
)
additional_start = _now()
additional_results_list = await additional_validator.validate()
additional_exec_seconds = (
_now() - additional_start
).total_seconds()
else:
additional_results_list, additional_exec_seconds = [], 0.0
else:
additional_results_list, additional_exec_seconds = [], 0.0

return (
schema_results_list,
schema_exec_seconds,
schema_results,
additional_results_list,
additional_exec_seconds,
skip_map,
)

import asyncio

(
schema_results_list,
schema_exec_seconds,
schema_results,
additional_results_list,
additional_exec_seconds,
skip_map,
) = asyncio.run(execute_two_phase_validation())

# Merge results to maintain existing output format
results, atomic_rules = ResultMerger.merge_results(
schema_results_list,
additional_results_list,
schema_rules,
other_rules,
skip_map,
)

# Total execution time
exec_seconds = schema_exec_seconds + additional_exec_seconds

if output.lower() == "json":
_emit_json_output(
source=connection_string,
Expand Down
Loading