From 217676c7acf02d553ba1ec31ad5256d996178b9b Mon Sep 17 00:00:00 2001 From: litedatum Date: Thu, 11 Sep 2025 22:50:09 -0400 Subject: [PATCH 1/2] feat: Implement syntactic sugar for type definitions and enhance schema executor with native type reporting --- cli/commands/schema.py | 102 ++--- core/executors/schema_executor.py | 21 + shared/utils/type_parser.py | 233 +++++++++++ test_data/schema.json | 2 +- .../executors/test_native_type_integration.py | 371 +++++++++++++++++ .../test_schema_executor_native_types.py | 392 ++++++++++++++++++ .../test_simple_native_type_reporting.py | 139 +++++++ tests/unit/shared/utils/test_type_parser.py | 294 +++++++++++++ 8 files changed, 1506 insertions(+), 48 deletions(-) create mode 100644 shared/utils/type_parser.py create mode 100644 tests/integration/core/executors/test_native_type_integration.py create mode 100644 tests/integration/core/executors/test_schema_executor_native_types.py create mode 100644 tests/integration/core/executors/test_simple_native_type_reporting.py create mode 100644 tests/unit/shared/utils/test_type_parser.py diff --git a/cli/commands/schema.py b/cli/commands/schema.py index f0d304f..6ff434a 100644 --- a/cli/commands/schema.py +++ b/cli/commands/schema.py @@ -132,15 +132,21 @@ def _validate_single_rule_item(item: Dict[str, Any], context: str) -> None: if not isinstance(field_name, str) or not field_name: raise click.UsageError(f"{context}.field must be a non-empty string") - # type + # type - validate using TypeParser to support syntactic sugar if "type" in item: type_name = item["type"] if not isinstance(type_name, str): raise click.UsageError(f"{context}.type must be a string when provided") - if type_name.lower() not in _ALLOWED_TYPE_NAMES: + + # Use TypeParser to validate the type definition + from shared.utils.type_parser import TypeParser, TypeParseError + try: + TypeParser.parse_type_definition(type_name) + except TypeParseError as e: allowed = ", ".join(sorted(_ALLOWED_TYPE_NAMES)) raise click.UsageError( - f"{context}.type '{type_name}' is not supported. " f"Allowed: {allowed}" + f"{context}.type '{type_name}' is not supported. Error: {str(e)}. " + f"Supported formats: {allowed} or syntactic sugar like string(50), float(12,2), datetime('format')" ) # required @@ -160,58 +166,29 @@ def _validate_single_rule_item(item: Dict[str, Any], context: str) -> None: f"{context}.{bound_key} must be numeric when provided" ) - # max_length + # max_length - basic validation, TypeParser will handle type consistency if "max_length" in item: value = item["max_length"] if not isinstance(value, int) or value < 0: raise click.UsageError( f"{context}.max_length must be a non-negative integer when provided" ) - # Validate max_length is only for string types - type_name = item.get("type", "").lower() if item.get("type") else None - if type_name and type_name != "string": - raise click.UsageError( - f"{context}.max_length can only be specified for 'string' type " - f"fields, not '{type_name}'" - ) - # precision + # precision - basic validation, TypeParser will handle type consistency if "precision" in item: value = item["precision"] if not isinstance(value, int) or value < 0: raise click.UsageError( f"{context}.precision must be a non-negative integer when provided" ) - # Validate precision is only for float types - type_name = item.get("type", "").lower() if item.get("type") else None - if type_name and type_name != "float": - raise click.UsageError( - f"{context}.precision can only be specified for 'float' type " - f"fields, not '{type_name}'" - ) - # scale + # scale - basic validation, TypeParser will handle type consistency if "scale" in item: value = item["scale"] if not isinstance(value, int) or value < 0: raise click.UsageError( f"{context}.scale must be a non-negative integer when provided" ) - # Validate scale is only for float types - type_name = item.get("type", "").lower() if item.get("type") else None - if type_name and type_name != "float": - raise click.UsageError( - f"{context}.scale can only be specified for 'float' type " - f"fields, not '{type_name}'" - ) - # Validate scale <= precision when both are specified - if "precision" in item: - precision_val = item["precision"] - if isinstance(precision_val, int) and value > precision_val: - raise click.UsageError( - f"{context}.scale ({value}) cannot be greater than precision " - f"({precision_val})" - ) def _validate_rules_payload(payload: Any) -> Tuple[List[str], int]: @@ -379,21 +356,52 @@ def _decompose_single_table_schema( # Should have been validated earlier; keep defensive check raise click.UsageError("Each rule item must have a non-empty 'field'") - # SCHEMA: collect column metadata + # SCHEMA: collect column metadata using new TypeParser column_metadata = {} - # Add expected_type if type is specified + # Handle type definition using TypeParser (supports syntactic sugar) if "type" in item and item["type"] is not None: - dt = _map_type_name_to_datatype(str(item["type"])) - column_metadata["expected_type"] = dt.value - - # Add metadata fields if present - if "max_length" in item: - column_metadata["max_length"] = item["max_length"] - if "precision" in item: - column_metadata["precision"] = item["precision"] - if "scale" in item: - column_metadata["scale"] = item["scale"] + from shared.utils.type_parser import TypeParser, TypeParseError + + try: + # Create a type definition dict for the parser + type_def = {"type": item["type"]} + + # Add metadata fields if present in the item + for metadata_field in ["max_length", "precision", "scale", "format"]: + if metadata_field in item: + type_def[metadata_field] = item[metadata_field] + + # Parse using TypeParser (handles both syntactic sugar and detailed format) + parsed_type = TypeParser.parse_type_definition(item["type"]) + + # Add expected_type for schema validation + column_metadata["expected_type"] = parsed_type["type"] + + # Add any parsed metadata + for metadata_field in ["max_length", "precision", "scale", "format"]: + if metadata_field in parsed_type: + column_metadata[metadata_field] = parsed_type[metadata_field] + + # Also add any explicit metadata from the item (overrides parsed values) + for metadata_field in ["max_length", "precision", "scale", "format"]: + if metadata_field in item: + column_metadata[metadata_field] = item[metadata_field] + + except TypeParseError as e: + raise click.UsageError(f"Invalid type definition for field '{field_name}': {str(e)}") + except Exception as e: + # Fallback to original parsing for backward compatibility + dt = _map_type_name_to_datatype(str(item["type"])) + column_metadata["expected_type"] = dt.value + + # Add metadata fields if present + if "max_length" in item: + column_metadata["max_length"] = item["max_length"] + if "precision" in item: + column_metadata["precision"] = item["precision"] + if "scale" in item: + column_metadata["scale"] = item["scale"] # Only add to columns_map if we have any metadata to store if column_metadata: diff --git a/core/executors/schema_executor.py b/core/executors/schema_executor.py index 62a3b31..d3f54bc 100644 --- a/core/executors/schema_executor.py +++ b/core/executors/schema_executor.py @@ -331,6 +331,9 @@ def compare_metadata( "existence": "FAILED", "type": "SKIPPED", "failure_code": "FIELD_MISSING", + "native_type": None, + "canonical_type": None, + "native_metadata": {} } ) continue @@ -357,6 +360,12 @@ def compare_metadata( "type": "FAILED", "failure_code": "TYPE_MISMATCH", "failure_details": comparison_result["failure_details"], + "native_type": actual_meta.get("type"), + "canonical_type": actual_meta.get("canonical_type"), + "native_metadata": { + k: v for k, v in actual_meta.items() + if k in ["max_length", "precision", "scale"] and v is not None + } } ) elif comparison_result["metadata_status"] == "FAILED": @@ -368,6 +377,12 @@ def compare_metadata( "type": "PASSED", "failure_code": "METADATA_MISMATCH", "failure_details": comparison_result["failure_details"], + "native_type": actual_meta.get("type"), + "canonical_type": actual_meta.get("canonical_type"), + "native_metadata": { + k: v for k, v in actual_meta.items() + if k in ["max_length", "precision", "scale"] and v is not None + } } ) else: @@ -377,6 +392,12 @@ def compare_metadata( "existence": "PASSED", "type": "PASSED", "failure_code": "NONE", + "native_type": actual_meta.get("type"), + "canonical_type": actual_meta.get("canonical_type"), + "native_metadata": { + k: v for k, v in actual_meta.items() + if k in ["max_length", "precision", "scale"] and v is not None + } } ) diff --git a/shared/utils/type_parser.py b/shared/utils/type_parser.py new file mode 100644 index 0000000..46ef670 --- /dev/null +++ b/shared/utils/type_parser.py @@ -0,0 +1,233 @@ +""" +Type Definition Parser + +Provides reusable parsing logic for syntactic sugar type definitions +while maintaining backward compatibility with detailed JSON format. + +Supports formats like: +- string(50) → {"type": "string", "max_length": 50} +- float(12,2) → {"type": "float", "precision": 12, "scale": 2} +- datetime('yyyymmdd') → {"type": "datetime", "format": "yyyymmdd"} +""" + +import re +from typing import Any, Dict, Union + +from shared.enums.data_types import DataType + + +class TypeParseError(Exception): + """Raised when type definition parsing fails.""" + pass + + +class TypeParser: + """Parser for type definitions supporting both syntactic sugar and detailed JSON formats.""" + + # Supported base types + _SUPPORTED_TYPES = { + "string": DataType.STRING, + "str": DataType.STRING, # Allow str as alias for string + "integer": DataType.INTEGER, + "int": DataType.INTEGER, # Allow int as alias for integer + "float": DataType.FLOAT, + "boolean": DataType.BOOLEAN, + "bool": DataType.BOOLEAN, # Allow bool as alias for boolean + "date": DataType.DATE, + "datetime": DataType.DATETIME, + } + + # Regex patterns for syntactic sugar parsing + _STRING_PATTERN = re.compile(r'^(string|str)\s*\(\s*(-?\d+)\s*\)$', re.IGNORECASE) + _FLOAT_PATTERN = re.compile(r'^float\s*\(\s*(-?\d+)\s*,\s*(-?\d+)\s*\)$', re.IGNORECASE) + _DATETIME_PATTERN = re.compile(r'^datetime\s*\(\s*[\'"](.+?)[\'"]\s*\)$', re.IGNORECASE) + _SIMPLE_TYPE_PATTERN = re.compile(r'^(string|str|integer|int|float|boolean|bool|date|datetime)$', re.IGNORECASE) + + @classmethod + def parse_type_definition(cls, type_def: Union[str, Dict[str, Any]]) -> Dict[str, Any]: + """ + Parse a type definition that can be either: + 1. A string with syntactic sugar (e.g., "string(50)", "float(12,2)") + 2. A detailed JSON object (backward compatibility) + + Args: + type_def: Type definition as string or dict + + Returns: + Dict containing parsed type information with keys: + - type: Canonical type name (STRING, INTEGER, etc.) + - Additional metadata keys based on type (max_length, precision, scale, format) + + Raises: + TypeParseError: If parsing fails or type is unsupported + """ + if isinstance(type_def, dict): + return cls._parse_detailed_format(type_def) + elif isinstance(type_def, str): + return cls._parse_syntactic_sugar(type_def.strip()) + else: + raise TypeParseError(f"Type definition must be string or dict, got {type(type_def)}") + + @classmethod + def _parse_detailed_format(cls, type_def: Dict[str, Any]) -> Dict[str, Any]: + """Parse detailed JSON format (backward compatibility).""" + if "type" not in type_def: + raise TypeParseError("Detailed format must include 'type' field") + + type_name = str(type_def["type"]).lower() + if type_name not in cls._SUPPORTED_TYPES: + raise TypeParseError(f"Unsupported type '{type_name}' in detailed format") + + result = { + "type": cls._SUPPORTED_TYPES[type_name].value + } + + # Copy over additional metadata + metadata_fields = ["max_length", "precision", "scale", "format"] + for field in metadata_fields: + if field in type_def: + result[field] = type_def[field] + + # Validate metadata consistency + cls._validate_metadata(result) + + return result + + @classmethod + def _parse_syntactic_sugar(cls, type_str: str) -> Dict[str, Any]: + """Parse syntactic sugar format.""" + # Try string(length) pattern + match = cls._STRING_PATTERN.match(type_str) + if match: + length = int(match.group(2)) + if length <= 0: + raise TypeParseError("String length must be positive") + return { + "type": DataType.STRING.value, + "max_length": length + } + + # Try float(precision,scale) pattern + match = cls._FLOAT_PATTERN.match(type_str) + if match: + precision = int(match.group(1)) + scale = int(match.group(2)) + if precision <= 0: + raise TypeParseError("Float precision must be positive") + if scale < 0: + raise TypeParseError("Float scale cannot be negative") + if scale > precision: + raise TypeParseError("Float scale cannot be greater than precision") + return { + "type": DataType.FLOAT.value, + "precision": precision, + "scale": scale + } + + # Try datetime('format') pattern + match = cls._DATETIME_PATTERN.match(type_str) + if match: + format_str = match.group(1) + return { + "type": DataType.DATETIME.value, + "format": format_str + } + + # Try simple type names + match = cls._SIMPLE_TYPE_PATTERN.match(type_str) + if match: + type_name = match.group(1).lower() + return { + "type": cls._SUPPORTED_TYPES[type_name].value + } + + raise TypeParseError(f"Cannot parse type definition '{type_str}'") + + @classmethod + def _validate_metadata(cls, parsed_type: Dict[str, Any]) -> None: + """Validate that metadata is consistent with type.""" + type_value = parsed_type.get("type") + + # Validate max_length is only for strings + if "max_length" in parsed_type: + if type_value != DataType.STRING.value: + raise TypeParseError( + f"max_length can only be specified for STRING type, not {type_value}" + ) + if not isinstance(parsed_type["max_length"], int) or parsed_type["max_length"] <= 0: + raise TypeParseError("max_length must be a positive integer") + + # Validate precision/scale are only for floats + if "precision" in parsed_type or "scale" in parsed_type: + if type_value != DataType.FLOAT.value: + raise TypeParseError( + f"precision/scale can only be specified for FLOAT type, not {type_value}" + ) + + if "precision" in parsed_type: + if not isinstance(parsed_type["precision"], int) or parsed_type["precision"] <= 0: + raise TypeParseError("precision must be a positive integer") + + if "scale" in parsed_type: + if not isinstance(parsed_type["scale"], int) or parsed_type["scale"] < 0: + raise TypeParseError("scale must be a non-negative integer") + if "precision" in parsed_type and parsed_type["scale"] > parsed_type["precision"]: + raise TypeParseError("scale cannot be greater than precision") + + # Validate format is only for datetime + if "format" in parsed_type: + if type_value != DataType.DATETIME.value: + raise TypeParseError( + f"format can only be specified for DATETIME type, not {type_value}" + ) + + @classmethod + def is_syntactic_sugar(cls, type_def: Union[str, Dict[str, Any]]) -> bool: + """Check if a type definition uses syntactic sugar format.""" + if not isinstance(type_def, str): + return False + + type_str = type_def.strip() + return bool( + cls._STRING_PATTERN.match(type_str) or + cls._FLOAT_PATTERN.match(type_str) or + cls._DATETIME_PATTERN.match(type_str) or + cls._SIMPLE_TYPE_PATTERN.match(type_str) + ) + + @classmethod + def normalize_to_detailed_format(cls, type_def: Union[str, Dict[str, Any]]) -> Dict[str, Any]: + """ + Normalize any type definition to detailed format for backward compatibility. + + Args: + type_def: Type definition in any supported format + + Returns: + Dict in detailed format that existing code can use + """ + parsed = cls.parse_type_definition(type_def) + + # Convert canonical type back to lowercase for existing code compatibility + if "type" in parsed: + # Keep the canonical uppercase form for new code, but also provide lowercase + parsed["expected_type"] = parsed["type"] # For schema executor + parsed["type"] = parsed["type"].lower() # For backward compatibility + + return parsed + + +# Convenience functions for common usage patterns +def parse_type(type_def: Union[str, Dict[str, Any]]) -> Dict[str, Any]: + """Convenience function to parse a type definition.""" + return TypeParser.parse_type_definition(type_def) + + +def is_syntactic_sugar(type_def: Union[str, Dict[str, Any]]) -> bool: + """Convenience function to check if type definition uses syntactic sugar.""" + return TypeParser.is_syntactic_sugar(type_def) + + +def normalize_type(type_def: Union[str, Dict[str, Any]]) -> Dict[str, Any]: + """Convenience function to normalize type definition to detailed format.""" + return TypeParser.normalize_to_detailed_format(type_def) \ No newline at end of file diff --git a/test_data/schema.json b/test_data/schema.json index 5ce4404..d557a38 100644 --- a/test_data/schema.json +++ b/test_data/schema.json @@ -15,7 +15,7 @@ { "field": "customer_id", "type": "integer", "required": true }, { "field": "product_name", "type": "string", "max_length": 155, "required": true }, { "field": "quantity", "type": "integer", "required": true }, - { "field": "price", "type": "float", "precision": 8, "scale": 2, "required": true}, + { "field": "price", "type": "float(10,2)", "required": true}, { "field": "status", "type": "string", "max_length": 50, "required": true }, { "field": "order_date", "type": "date", "required": true } ], diff --git a/tests/integration/core/executors/test_native_type_integration.py b/tests/integration/core/executors/test_native_type_integration.py new file mode 100644 index 0000000..a6265c3 --- /dev/null +++ b/tests/integration/core/executors/test_native_type_integration.py @@ -0,0 +1,371 @@ +""" +Integration test for native type reporting functionality using MySQL. + +Based on the established pattern from test_mysql_integration.py. +Tests the enhanced SchemaExecutor that includes native_type, canonical_type, +and native_metadata in field_results. +""" + +import pytest + +from core.executors.schema_executor import SchemaExecutor +from shared.database.query_executor import QueryExecutor +from shared.enums import RuleAction, RuleCategory, RuleType, SeverityLevel +from shared.enums.connection_types import ConnectionType +from shared.enums.data_types import DataType +from shared.schema.base import RuleTarget, TargetEntity +from shared.schema.connection_schema import ConnectionSchema +from shared.schema.rule_schema import RuleSchema +from shared.utils.logger import get_logger +from tests.shared.utils.database_utils import ( + get_available_databases, + get_mysql_connection_params, +) + +pytestmark = pytest.mark.asyncio + +logger = get_logger(__name__) + + +def _skip_if_mysql_unavailable() -> None: + if "mysql" not in get_available_databases(): + pytest.skip("MySQL not configured; skipping integration tests") + + +@pytest.mark.integration +@pytest.mark.database +class TestNativeTypeIntegration: + """Test native type reporting functionality with real MySQL database.""" + + async def _prepare_test_environment(self, mysql_connection_params): + """Prepare MySQL test environment with test table.""" + from shared.database.connection import get_db_url, get_engine + from typing import cast + + # Create engine for setup + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + engine = await get_engine(db_url, pool_size=1, echo=False) + executor = QueryExecutor(engine) + + # Clean up and create test table + await executor.execute_query( + "DROP TABLE IF EXISTS native_type_test", fetch=False + ) + + await executor.execute_query( + """ + CREATE TABLE native_type_test ( + id INT PRIMARY KEY AUTO_INCREMENT, + name VARCHAR(50) NOT NULL, + email VARCHAR(100), + age SMALLINT, + score DECIMAL(5,2), + is_active BOOLEAN DEFAULT TRUE, + birth_date DATE, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + description TEXT + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """, + fetch=False, + ) + + # Insert test data + await executor.execute_query( + """ + INSERT INTO native_type_test + (name, email, age, score, is_active, birth_date) VALUES + ('Alice', 'alice@example.com', 25, 85.50, TRUE, '1998-05-15'), + ('Bob', 'bob@example.com', 30, 92.75, FALSE, '1993-08-20') + """, + fetch=False, + ) + + await engine.dispose() + return executor + + async def test_native_type_reporting_comprehensive(self, mysql_connection_params): + """Test that native type information is correctly reported for various MySQL types.""" + _skip_if_mysql_unavailable() + + # Prepare test environment + await self._prepare_test_environment(mysql_connection_params) + + # Create connection schema + connection = ConnectionSchema( + name="native_type_test_connection", + description="Connection for testing native type reporting", + connection_type=ConnectionType.MYSQL, + host=mysql_connection_params["host"], + port=mysql_connection_params["port"], + username=mysql_connection_params["username"], + password=mysql_connection_params["password"], + db_name=mysql_connection_params["database"], + ) + + # Create schema executor + executor = SchemaExecutor(connection, test_mode=True) + + # Define schema rule with expected types + columns = { + "id": {"expected_type": DataType.INTEGER.value}, + "name": {"expected_type": DataType.STRING.value, "max_length": 50}, + "email": {"expected_type": DataType.STRING.value, "max_length": 100}, + "age": {"expected_type": DataType.INTEGER.value}, + "score": {"expected_type": DataType.FLOAT.value, "precision": 5, "scale": 2}, + "is_active": {"expected_type": DataType.INTEGER.value}, # MySQL BOOLEAN -> TINYINT(1) -> INTEGER + "birth_date": {"expected_type": DataType.DATE.value}, + "created_at": {"expected_type": DataType.DATETIME.value}, + "description": {"expected_type": DataType.STRING.value}, + } + + rule = RuleSchema( + id="native_type_test_rule", + name="Native Type Reporting Test", + description="Test rule for native type reporting", + type=RuleType.SCHEMA, + category=RuleCategory.VALIDITY, + severity=SeverityLevel.MEDIUM, + action=RuleAction.LOG, + target=RuleTarget( + entities=[TargetEntity( + database=mysql_connection_params["database"], + table="native_type_test", + column=None + )], + relationship_type="single_table", + ), + parameters={"columns": columns}, + ) + + try: + # Execute the schema rule + result = await executor.execute_rule(rule) + + logger.info(f"Schema rule execution status: {result.status}") + logger.info(f"Execution message: {result.execution_message}") + + # Debug: print detailed information + execution_plan = result.execution_plan + if "schema_details" in execution_plan: + schema_details = execution_plan["schema_details"] + if "field_results" in schema_details: + field_results = schema_details["field_results"] + logger.info(f"Number of field results: {len(field_results)}") + for fr in field_results: + logger.info(f"Field {fr.get('column')}: existence={fr.get('existence')}, type={fr.get('type')}, failure_code={fr.get('failure_code')}") + if fr.get('failure_code') != 'NONE': + logger.info(f" Failure details: {fr.get('failure_details')}") + + # Verify basic execution - should pass now with corrected type expectations + assert result.status == "PASSED", f"Expected PASSED, got {result.status}: {result.execution_message}" + + # Verify execution plan contains schema details + assert "schema_details" in execution_plan + + schema_details = execution_plan["schema_details"] + assert "field_results" in schema_details + assert schema_details["table_exists"] is True + + field_results = schema_details["field_results"] + assert len(field_results) == len(columns), f"Expected {len(columns)} field results, got {len(field_results)}" + + # Test native type information for each field + field_map = {fr["column"]: fr for fr in field_results} + + # Test INTEGER type (id, age) + for col in ["id", "age"]: + field_result = field_map[col] + assert "native_type" in field_result + assert "canonical_type" in field_result + assert "native_metadata" in field_result + + assert field_result["canonical_type"] == DataType.INTEGER.value + assert field_result["native_type"] is not None + assert isinstance(field_result["native_metadata"], dict) + + logger.info(f"{col}: native_type={field_result['native_type']}, " + f"canonical_type={field_result['canonical_type']}") + + # Test STRING type with length (name, email) + name_result = field_map["name"] + assert name_result["canonical_type"] == DataType.STRING.value + assert name_result["native_metadata"].get("max_length") == 50 + + email_result = field_map["email"] + assert email_result["canonical_type"] == DataType.STRING.value + assert email_result["native_metadata"].get("max_length") == 100 + + # Test FLOAT type with precision/scale (score) + score_result = field_map["score"] + assert score_result["canonical_type"] == DataType.FLOAT.value + # Note: MySQL may return precision/scale info in native_metadata + logger.info(f"score native_metadata: {score_result['native_metadata']}") + + # Test BOOLEAN type (is_active) - Note: MySQL maps BOOLEAN to TINYINT(1) -> INTEGER + boolean_result = field_map["is_active"] + # In MySQL, BOOLEAN is actually stored as TINYINT(1) which maps to INTEGER + assert boolean_result["canonical_type"] == DataType.INTEGER.value + logger.info(f"is_active correctly identified as INTEGER (MySQL BOOLEAN -> TINYINT mapping)") + + # Test DATE type (birth_date) + date_result = field_map["birth_date"] + assert date_result["canonical_type"] == DataType.DATE.value + + # Test DATETIME type (created_at) + datetime_result = field_map["created_at"] + assert datetime_result["canonical_type"] == DataType.DATETIME.value + + # Test TEXT type (description) - should map to STRING + desc_result = field_map["description"] + assert desc_result["canonical_type"] == DataType.STRING.value + + # Verify all fields have the required enhanced information + for field_result in field_results: + assert field_result["existence"] == "PASSED" + assert field_result["type"] == "PASSED" + assert field_result["failure_code"] == "NONE" + + # Verify enhanced fields exist and have meaningful values + assert field_result["native_type"] is not None + assert field_result["canonical_type"] is not None + assert isinstance(field_result["native_metadata"], dict) + + logger.info(f"✓ {field_result['column']}: " + f"native='{field_result['native_type']}', " + f"canonical='{field_result['canonical_type']}', " + f"metadata={field_result['native_metadata']}") + + logger.info("✅ Native type reporting test completed successfully") + + finally: + # Cleanup + from shared.database.connection import get_db_url, get_engine + from typing import cast + + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + cleanup_engine = await get_engine(db_url, pool_size=1, echo=False) + cleanup_executor = QueryExecutor(cleanup_engine) + + await cleanup_executor.execute_query( + "DROP TABLE IF EXISTS native_type_test", fetch=False + ) + await cleanup_engine.dispose() + + async def test_native_type_reporting_with_type_mismatch(self, mysql_connection_params): + """Test native type information is included even for TYPE_MISMATCH cases.""" + _skip_if_mysql_unavailable() + + # Prepare test environment + await self._prepare_test_environment(mysql_connection_params) + + # Create connection schema + connection = ConnectionSchema( + name="type_mismatch_test_connection", + description="Connection for testing type mismatch scenarios", + connection_type=ConnectionType.MYSQL, + host=mysql_connection_params["host"], + port=mysql_connection_params["port"], + username=mysql_connection_params["username"], + password=mysql_connection_params["password"], + db_name=mysql_connection_params["database"], + ) + + # Create schema executor + executor = SchemaExecutor(connection, test_mode=True) + + # Define schema rule with intentional type mismatches + columns = { + "id": {"expected_type": DataType.STRING.value}, # Mismatch: actual is INT + "name": {"expected_type": DataType.INTEGER.value}, # Mismatch: actual is VARCHAR + "age": {"expected_type": DataType.FLOAT.value}, # Mismatch: actual is SMALLINT + } + + rule = RuleSchema( + id="type_mismatch_test_rule", + name="Type Mismatch Test", + description="Test rule for type mismatch scenarios", + type=RuleType.SCHEMA, + category=RuleCategory.VALIDITY, + severity=SeverityLevel.MEDIUM, + action=RuleAction.LOG, + target=RuleTarget( + entities=[TargetEntity( + database=mysql_connection_params["database"], + table="native_type_test", + column=None + )], + relationship_type="single_table", + ), + parameters={"columns": columns}, + ) + + try: + # Execute the schema rule + result = await executor.execute_rule(rule) + + logger.info(f"Type mismatch test status: {result.status}") + logger.info(f"Execution message: {result.execution_message}") + + # Should fail due to type mismatches + assert result.status == "FAILED" + + # Verify schema details + schema_details = result.execution_plan["schema_details"] + field_results = schema_details["field_results"] + assert len(field_results) == 3 + + # Verify that native type information is provided even for failed cases + for field_result in field_results: + assert field_result["existence"] == "PASSED" + assert field_result["type"] == "FAILED" + assert field_result["failure_code"] == "TYPE_MISMATCH" + + # Critical: native type info should still be present for failed validations + assert "native_type" in field_result + assert "canonical_type" in field_result + assert "native_metadata" in field_result + + assert field_result["native_type"] is not None + assert field_result["canonical_type"] is not None + assert isinstance(field_result["native_metadata"], dict) + + logger.info(f"❌ {field_result['column']}: TYPE_MISMATCH but still has " + f"native='{field_result['native_type']}', " + f"canonical='{field_result['canonical_type']}'") + + logger.info("✅ Type mismatch native type reporting test completed") + + finally: + # Cleanup + from shared.database.connection import get_db_url, get_engine + from typing import cast + + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + cleanup_engine = await get_engine(db_url, pool_size=1, echo=False) + cleanup_executor = QueryExecutor(cleanup_engine) + + await cleanup_executor.execute_query( + "DROP TABLE IF EXISTS native_type_test", fetch=False + ) + await cleanup_engine.dispose() \ No newline at end of file diff --git a/tests/integration/core/executors/test_schema_executor_native_types.py b/tests/integration/core/executors/test_schema_executor_native_types.py new file mode 100644 index 0000000..889b49a --- /dev/null +++ b/tests/integration/core/executors/test_schema_executor_native_types.py @@ -0,0 +1,392 @@ +""" +Integration tests for SchemaExecutor native type reporting enhancements + +Tests the new functionality that includes native_type, canonical_type, +and native_metadata in field_results for all scenarios including TYPE_MISMATCH. +""" + +import pytest + +from core.executors.schema_executor import SchemaExecutor +from shared.enums import DataType, RuleType +from shared.schema.connection_schema import ConnectionSchema +from shared.schema.rule_schema import RuleSchema +from tests.shared.builders.test_builders import TestDataBuilder +from tests.shared.utils.database_utils import ( + get_available_databases, + get_mysql_connection_params, +) + +pytestmark = pytest.mark.asyncio + + +def _skip_if_mysql_unavailable() -> None: + if "mysql" not in get_available_databases(): + pytest.skip("MySQL not configured; skipping integration tests") + + +@pytest.fixture +def mysql_connection(): + """Create MySQL connection for testing.""" + _skip_if_mysql_unavailable() + params = get_mysql_connection_params() + from shared.enums.connection_types import ConnectionType + from typing import cast + + return ConnectionSchema( + name="mysql_native_type_test", + description="MySQL connection for native type testing", + connection_type=ConnectionType.MYSQL, + host=str(params["host"]), + port=cast(int, params["port"]), + db_name=str(params["database"]), + username=str(params["username"]), + password=str(params["password"]), + ) + + +@pytest.fixture +async def schema_executor(mysql_connection): + """Create SchemaExecutor with MySQL connection.""" + return SchemaExecutor(mysql_connection, test_mode=True) + + +def build_schema_rule_with_native_reporting( + columns: dict, + table_name: str = "test_table", + strict_mode: bool = False, + case_insensitive: bool = False +) -> RuleSchema: + """Build a SCHEMA rule for testing native type reporting.""" + builder = TestDataBuilder.rule() + rule = ( + builder.with_name(f"schema_{table_name}") + .with_target("test_db", table_name, None) # Table-level rule + .with_type(RuleType.SCHEMA) + .with_parameter("columns", columns) + .with_parameter("strict_mode", strict_mode) + .with_parameter("case_insensitive", case_insensitive) + .build() + ) + return rule + + +@pytest.mark.integration +@pytest.mark.database +class TestSchemaExecutorNativeTypeReporting: + """Test native type reporting enhancements in SchemaExecutor.""" + + async def test_native_type_reporting_successful_case(self, schema_executor): + """Test that native type information is included in successful validation.""" + # Create test table with known types + from sqlalchemy import text + engine = await schema_executor.get_engine() + async with engine.begin() as conn: + await conn.execute(text("DROP TABLE IF EXISTS test_users")) + await conn.execute(text( + "CREATE TABLE test_users (id INT, name VARCHAR(50), active BOOLEAN)" + )) + + # Define schema rule that should pass + columns = { + "id": {"expected_type": DataType.INTEGER.value}, + "name": {"expected_type": DataType.STRING.value, "max_length": 50}, + "active": {"expected_type": DataType.BOOLEAN.value}, + } + rule = build_schema_rule_with_native_reporting(columns, "test_users") + + # Execute rule + result = await schema_executor.execute_rule(rule) + + # Verify result structure + assert result.status == "PASSED" + + # Verify enhanced field_results include native type information + schema_details = result.execution_plan.get("schema_details", {}) + field_results = schema_details.get("field_results", []) + + assert len(field_results) == 3 + + for field_result in field_results: + # Each field result should have native type information + assert "native_type" in field_result + assert "canonical_type" in field_result + assert "native_metadata" in field_result + + # Native type should be the database-specific type + assert field_result["native_type"] is not None + assert isinstance(field_result["native_type"], str) + + # Canonical type should be the standardized type + assert field_result["canonical_type"] in [dt.value for dt in DataType] + + # Native metadata should be a dict + assert isinstance(field_result["native_metadata"], dict) + + # Verify specific field expectations + if field_result["column"] == "id": + assert field_result["canonical_type"] == DataType.INTEGER.value + assert field_result["failure_code"] == "NONE" + elif field_result["column"] == "name": + assert field_result["canonical_type"] == DataType.STRING.value + # Should include max_length in native_metadata for VARCHAR(50) + assert "max_length" in field_result["native_metadata"] + assert field_result["native_metadata"]["max_length"] == 50 + elif field_result["column"] == "active": + assert field_result["canonical_type"] == DataType.BOOLEAN.value + + async def test_native_type_reporting_type_mismatch(self, schema_executor): + """Test that native type information is included even for TYPE_MISMATCH cases.""" + # Create test table + from sqlalchemy import text + engine = await schema_executor.get_engine() + async with engine.begin() as conn: + await conn.execute(text("DROP TABLE IF EXISTS test_mismatch")) + await conn.execute(text( + "CREATE TABLE test_mismatch (id INT, name VARCHAR(100))" + )) + + # Define schema rule with type mismatches + columns = { + "id": {"expected_type": DataType.STRING.value}, # Mismatch: expecting string, actual is integer + "name": {"expected_type": DataType.INTEGER.value}, # Mismatch: expecting integer, actual is string + } + rule = build_schema_rule_with_native_reporting(columns, "test_mismatch") + + # Execute rule + result = await schema_executor.execute_rule(rule) + + # Should fail due to type mismatches + assert result.status == "FAILED" + + # Verify enhanced field_results include native type information even for failures + schema_details = result.execution_plan.get("schema_details", {}) + field_results = schema_details.get("field_results", []) + + assert len(field_results) == 2 + + for field_result in field_results: + # Even with type mismatches, native type information should be present + assert "native_type" in field_result + assert "canonical_type" in field_result + assert "native_metadata" in field_result + + # Should have failed type validation but passed existence + assert field_result["existence"] == "PASSED" + assert field_result["type"] == "FAILED" + assert field_result["failure_code"] == "TYPE_MISMATCH" + + # Native type information should still be accurate + assert field_result["native_type"] is not None + assert field_result["canonical_type"] is not None + + # Verify the actual vs expected mismatch + if field_result["column"] == "id": + # Actual type is INTEGER, but expected STRING + assert field_result["canonical_type"] == DataType.INTEGER.value + elif field_result["column"] == "name": + # Actual type is STRING, but expected INTEGER + assert field_result["canonical_type"] == DataType.STRING.value + # Should include max_length from VARCHAR(100) + assert "max_length" in field_result["native_metadata"] + assert field_result["native_metadata"]["max_length"] == 100 + + async def test_native_type_reporting_field_missing(self, schema_executor): + """Test native type information handling for missing fields.""" + # Create test table with only some of the expected fields + from sqlalchemy import text + engine = await schema_executor.get_engine() + async with engine.begin() as conn: + await conn.execute(text("DROP TABLE IF EXISTS test_partial")) + await conn.execute(text("CREATE TABLE test_partial (id INT)")) + + # Define schema rule expecting more fields than exist + columns = { + "id": {"expected_type": DataType.INTEGER.value}, + "missing_field": {"expected_type": DataType.STRING.value}, + } + rule = build_schema_rule_with_native_reporting(columns, "test_partial") + + # Execute rule + result = await schema_executor.execute_rule(rule) + + # Should fail due to missing field + assert result.status == "FAILED" + + # Verify field_results + schema_details = result.execution_plan.get("schema_details", {}) + field_results = schema_details.get("field_results", []) + + assert len(field_results) == 2 + + # Find results for each field + id_result = next(fr for fr in field_results if fr["column"] == "id") + missing_result = next(fr for fr in field_results if fr["column"] == "missing_field") + + # Existing field should have native type information + assert id_result["existence"] == "PASSED" + assert id_result["type"] == "PASSED" + assert id_result["native_type"] is not None + assert id_result["canonical_type"] == DataType.INTEGER.value + assert isinstance(id_result["native_metadata"], dict) + + # Missing field should have null native type information + assert missing_result["existence"] == "FAILED" + assert missing_result["type"] == "SKIPPED" + assert missing_result["failure_code"] == "FIELD_MISSING" + assert missing_result["native_type"] is None + assert missing_result["canonical_type"] is None + assert missing_result["native_metadata"] == {} + + async def test_native_metadata_precision_scale(self, schema_executor): + """Test native metadata reporting for float types with precision/scale.""" + # Create test table with decimal/numeric types + from sqlalchemy import text + engine = await schema_executor.get_engine() + async with engine.begin() as conn: + await conn.execute(text("DROP TABLE IF EXISTS test_decimal")) + # MySQL supports DECIMAL with precision/scale + await conn.execute(text("CREATE TABLE test_decimal (price DECIMAL(10,2), amount NUMERIC(8,3))")) + + # Define schema rule for decimal types + columns = { + "price": {"expected_type": DataType.FLOAT.value, "precision": 10, "scale": 2}, + "amount": {"expected_type": DataType.FLOAT.value, "precision": 8, "scale": 3}, + } + rule = build_schema_rule_with_native_reporting(columns, "test_decimal") + + # Execute rule + result = await schema_executor.execute_rule(rule) + + # Verify field_results include precision/scale metadata + schema_details = result.execution_plan.get("schema_details", {}) + field_results = schema_details.get("field_results", []) + + for field_result in field_results: + assert "native_metadata" in field_result + native_metadata = field_result["native_metadata"] + + # Verify the native type is captured + assert field_result["native_type"] is not None + assert field_result["canonical_type"] == DataType.FLOAT.value + + # Note: SQLite might not preserve exact precision/scale, but the structure should be correct + assert isinstance(native_metadata, dict) + + async def test_comprehensive_native_type_coverage(self, schema_executor): + """Test native type reporting across various database type scenarios.""" + # Create table with various data types + from sqlalchemy import text + engine = await schema_executor.get_engine() + async with engine.begin() as conn: + await conn.execute(text("DROP TABLE IF EXISTS test_comprehensive")) + await conn.execute(text(""" + CREATE TABLE test_comprehensive ( + id INT, + name TEXT, + email VARCHAR(255), + age SMALLINT, + salary DOUBLE, + is_active BOOLEAN, + birth_date DATE, + created_at DATETIME + ) + """)) + + # Define schema rule covering all types + columns = { + "id": {"expected_type": DataType.INTEGER.value}, + "name": {"expected_type": DataType.STRING.value}, + "email": {"expected_type": DataType.STRING.value, "max_length": 255}, + "age": {"expected_type": DataType.INTEGER.value}, + "salary": {"expected_type": DataType.FLOAT.value}, + "is_active": {"expected_type": DataType.BOOLEAN.value}, + "birth_date": {"expected_type": DataType.DATE.value}, + "created_at": {"expected_type": DataType.DATETIME.value}, + } + rule = build_schema_rule_with_native_reporting(columns, "test_comprehensive") + + # Execute rule + result = await schema_executor.execute_rule(rule) + + # Verify all fields have complete native type information + schema_details = result.execution_plan.get("schema_details", {}) + field_results = schema_details.get("field_results", []) + + assert len(field_results) == 8 + + for field_result in field_results: + # Every field should have complete native type information + assert field_result["native_type"] is not None + assert field_result["canonical_type"] is not None + assert isinstance(field_result["native_metadata"], dict) + + # Verify canonical type mapping is correct + column_name = field_result["column"] + canonical_type = field_result["canonical_type"] + + type_expectations = { + "id": DataType.INTEGER.value, + "name": DataType.STRING.value, + "email": DataType.STRING.value, + "age": DataType.INTEGER.value, + "salary": DataType.FLOAT.value, + "is_active": DataType.BOOLEAN.value, + "birth_date": DataType.DATE.value, + "created_at": DataType.DATETIME.value, + } + + assert canonical_type == type_expectations[column_name] + + +@pytest.mark.integration +@pytest.mark.database +class TestSchemaExecutorBackwardCompatibility: + """Test that enhancements maintain backward compatibility.""" + + async def test_existing_functionality_unchanged(self, schema_executor): + """Test that existing schema validation functionality is unchanged.""" + # Create test table + from sqlalchemy import text + engine = await schema_executor.get_engine() + async with engine.begin() as conn: + await conn.execute(text("DROP TABLE IF EXISTS test_compat")) + await conn.execute(text("CREATE TABLE test_compat (id INT, name VARCHAR(50))")) + + # Use existing schema rule format + columns = { + "id": {"expected_type": DataType.INTEGER.value}, + "name": {"expected_type": DataType.STRING.value}, + } + rule = build_schema_rule_with_native_reporting(columns, "test_compat") + + # Execute rule + result = await schema_executor.execute_rule(rule) + + # Verify existing fields are still present and working + assert result.status == "PASSED" + assert result.rule_id == rule.id + assert len(result.dataset_metrics) == 1 + + # Verify execution_plan structure is maintained + execution_plan = result.execution_plan + assert "execution_type" in execution_plan + assert "schema_details" in execution_plan + + schema_details = execution_plan["schema_details"] + assert "field_results" in schema_details + assert "extras" in schema_details + assert "table_exists" in schema_details + + # Verify field_results have expected legacy fields + field_results = schema_details["field_results"] + for field_result in field_results: + assert "column" in field_result + assert "existence" in field_result + assert "type" in field_result + assert "failure_code" in field_result + + # NEW: Also verify enhanced fields are added + assert "native_type" in field_result + assert "canonical_type" in field_result + assert "native_metadata" in field_result \ No newline at end of file diff --git a/tests/integration/core/executors/test_simple_native_type_reporting.py b/tests/integration/core/executors/test_simple_native_type_reporting.py new file mode 100644 index 0000000..3b4974e --- /dev/null +++ b/tests/integration/core/executors/test_simple_native_type_reporting.py @@ -0,0 +1,139 @@ +""" +Simple integration test to verify native type reporting functionality works. + +This is a minimal test to demonstrate that the native type reporting enhancements +work correctly with a real MySQL database. +""" + +import pytest +from sqlalchemy import text + +from core.executors.schema_executor import SchemaExecutor +from shared.enums import DataType, RuleType +from shared.enums.connection_types import ConnectionType +from shared.schema.connection_schema import ConnectionSchema +from shared.schema.rule_schema import RuleSchema +from tests.shared.builders.test_builders import TestDataBuilder +from tests.shared.utils.database_utils import ( + get_available_databases, + get_mysql_connection_params, +) + +pytestmark = pytest.mark.asyncio + + +def _skip_if_mysql_unavailable() -> None: + if "mysql" not in get_available_databases(): + pytest.skip("MySQL not configured; skipping integration tests") + + +def build_simple_schema_rule(columns: dict) -> RuleSchema: + """Build a simple SCHEMA rule for testing.""" + builder = TestDataBuilder.rule() + rule = ( + builder.with_name("test_native_reporting") + .with_target("test_db", "native_test_table", None) # Table-level rule + .with_type(RuleType.SCHEMA) + .with_parameter("columns", columns) + .build() + ) + return rule + + +@pytest.mark.integration +@pytest.mark.database +class TestSimpleNativeTypeReporting: + """Simple test for native type reporting.""" + + async def test_native_type_reporting_works(self): + """Test that native type information is included in schema validation results.""" + _skip_if_mysql_unavailable() + + # Create connection + params = get_mysql_connection_params() + from typing import cast + + connection = ConnectionSchema( + name="test_native_types", + description="Test connection for native type reporting", + connection_type=ConnectionType.MYSQL, + host=str(params["host"]), + port=cast(int, params["port"]), + db_name=str(params["database"]), + username=str(params["username"]), + password=str(params["password"]), + ) + + # Create executor + executor = SchemaExecutor(connection, test_mode=True) + + # Create and setup table + engine = await executor.get_engine() + + # Use regular connection (not transaction) for DDL + async with engine.connect() as conn: + # Drop and create table + await conn.execute(text("DROP TABLE IF EXISTS native_test_table")) + await conn.execute(text(""" + CREATE TABLE native_test_table ( + id INT PRIMARY KEY, + name VARCHAR(50) NOT NULL, + score DECIMAL(5,2) + ) + """)) + await conn.commit() + + try: + # Create schema rule + columns = { + "id": {"expected_type": DataType.INTEGER.value}, + "name": {"expected_type": DataType.STRING.value, "max_length": 50}, + "score": {"expected_type": DataType.FLOAT.value, "precision": 5, "scale": 2}, + } + rule = build_simple_schema_rule(columns) + + # Execute rule + result = await executor.execute_rule(rule) + + # Basic validation + print(f"Rule execution status: {result.status}") + print(f"Execution message: {result.execution_message}") + + # Check that we have schema details + execution_plan = result.execution_plan + assert "schema_details" in execution_plan + + schema_details = execution_plan["schema_details"] + assert "field_results" in schema_details + + field_results = schema_details["field_results"] + assert len(field_results) >= 1 # Should have at least one field result + + # Check that native type information is present + for field_result in field_results: + print(f"Field: {field_result.get('column')}") + print(f" - Native type: {field_result.get('native_type')}") + print(f" - Canonical type: {field_result.get('canonical_type')}") + print(f" - Native metadata: {field_result.get('native_metadata')}") + + # Verify enhanced fields are present + assert "native_type" in field_result + assert "canonical_type" in field_result + assert "native_metadata" in field_result + + # Verify they have meaningful values + assert field_result["native_type"] is not None + assert field_result["canonical_type"] is not None + assert isinstance(field_result["native_metadata"], dict) + + # Print overall result for debugging + print(f"Test completed with result status: {result.status}") + + finally: + # Clean up + async with engine.connect() as conn: + await conn.execute(text("DROP TABLE IF EXISTS native_test_table")) + await conn.commit() + + # Close engine + await engine.dispose() \ No newline at end of file diff --git a/tests/unit/shared/utils/test_type_parser.py b/tests/unit/shared/utils/test_type_parser.py new file mode 100644 index 0000000..637e836 --- /dev/null +++ b/tests/unit/shared/utils/test_type_parser.py @@ -0,0 +1,294 @@ +""" +Tests for TypeParser utility + +Comprehensive test coverage for syntactic sugar type parsing and backward compatibility. +""" + +import pytest + +from shared.enums.data_types import DataType +from shared.utils.type_parser import TypeParser, TypeParseError, parse_type, is_syntactic_sugar, normalize_type + + +class TestTypeParser: + """Test TypeParser class methods""" + + def test_parse_simple_types(self): + """Test parsing of simple type names.""" + # Test all supported simple types + test_cases = [ + ("string", {"type": DataType.STRING.value}), + ("str", {"type": DataType.STRING.value}), + ("integer", {"type": DataType.INTEGER.value}), + ("int", {"type": DataType.INTEGER.value}), + ("float", {"type": DataType.FLOAT.value}), + ("boolean", {"type": DataType.BOOLEAN.value}), + ("bool", {"type": DataType.BOOLEAN.value}), + ("date", {"type": DataType.DATE.value}), + ("datetime", {"type": DataType.DATETIME.value}), + ] + + for input_type, expected in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + def test_parse_case_insensitive(self): + """Test that parsing is case insensitive.""" + test_cases = ["STRING", "String", "sTrInG", "INTEGER", "Int", "FLOAT", "Float"] + + for input_type in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert "type" in result + assert result["type"] in [dt.value for dt in DataType] + + def test_parse_string_with_length(self): + """Test parsing string with length specification.""" + test_cases = [ + ("string(50)", {"type": DataType.STRING.value, "max_length": 50}), + ("STRING(255)", {"type": DataType.STRING.value, "max_length": 255}), + ("str(10)", {"type": DataType.STRING.value, "max_length": 10}), + ("string( 100 )", {"type": DataType.STRING.value, "max_length": 100}), # with spaces + ] + + for input_type, expected in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + def test_parse_float_with_precision_scale(self): + """Test parsing float with precision and scale.""" + test_cases = [ + ("float(10,2)", {"type": DataType.FLOAT.value, "precision": 10, "scale": 2}), + ("FLOAT(12,4)", {"type": DataType.FLOAT.value, "precision": 12, "scale": 4}), + ("float( 8 , 3 )", {"type": DataType.FLOAT.value, "precision": 8, "scale": 3}), # with spaces + ("float(15,0)", {"type": DataType.FLOAT.value, "precision": 15, "scale": 0}), + ] + + for input_type, expected in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + def test_parse_datetime_with_format(self): + """Test parsing datetime with format specification.""" + test_cases = [ + ("datetime('yyyymmdd')", {"type": DataType.DATETIME.value, "format": "yyyymmdd"}), + ("DATETIME(\"yyyy-mm-dd\")", {"type": DataType.DATETIME.value, "format": "yyyy-mm-dd"}), + ("datetime( 'dd/mm/yyyy hh:mm:ss' )", {"type": DataType.DATETIME.value, "format": "dd/mm/yyyy hh:mm:ss"}), + ] + + for input_type, expected in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + def test_parse_detailed_format_backward_compatibility(self): + """Test parsing detailed JSON format for backward compatibility.""" + test_cases = [ + ({"type": "string"}, {"type": DataType.STRING.value}), + ({"type": "string", "max_length": 100}, {"type": DataType.STRING.value, "max_length": 100}), + ({"type": "float", "precision": 10, "scale": 2}, {"type": DataType.FLOAT.value, "precision": 10, "scale": 2}), + ({"type": "datetime", "format": "yyyy-mm-dd"}, {"type": DataType.DATETIME.value, "format": "yyyy-mm-dd"}), + ] + + for input_type, expected in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + def test_error_cases(self): + """Test error handling for invalid type definitions.""" + error_cases = [ + ("invalid_type", "Cannot parse type definition"), + ("string(-1)", "String length must be positive"), + ("float(0,2)", "Float precision must be positive"), + ("float(5,-1)", "Float scale cannot be negative"), + ("float(3,5)", "Float scale cannot be greater than precision"), + ({"type": "unknown"}, "Unsupported type 'unknown'"), + ({"missing_type": "value"}, "Detailed format must include 'type' field"), + (123, "Type definition must be string or dict"), + (None, "Type definition must be string or dict"), + ] + + for input_type, expected_error in error_cases: + with pytest.raises(TypeParseError) as exc_info: + TypeParser.parse_type_definition(input_type) + assert expected_error in str(exc_info.value) + + def test_metadata_validation(self): + """Test metadata validation for type consistency.""" + # Test invalid metadata combinations in detailed format + invalid_cases = [ + ({"type": "integer", "max_length": 10}, "max_length can only be specified for STRING type"), + ({"type": "string", "precision": 5}, "precision/scale can only be specified for FLOAT type"), + ({"type": "boolean", "scale": 2}, "precision/scale can only be specified for FLOAT type"), + ({"type": "date", "format": "yyyy"}, "format can only be specified for DATETIME type"), + ({"type": "string", "max_length": 0}, "max_length must be a positive integer"), + ({"type": "float", "precision": 0}, "precision must be a positive integer"), + ({"type": "float", "scale": -1}, "scale must be a non-negative integer"), + ({"type": "float", "precision": 3, "scale": 5}, "scale cannot be greater than precision"), + ] + + for input_type, expected_error in invalid_cases: + with pytest.raises(TypeParseError) as exc_info: + TypeParser.parse_type_definition(input_type) + assert expected_error in str(exc_info.value) + + def test_is_syntactic_sugar(self): + """Test identification of syntactic sugar formats.""" + sugar_cases = [ + "string(50)", + "float(10,2)", + "datetime('yyyy-mm-dd')", + "integer", + "boolean", + ] + + detailed_cases = [ + {"type": "string"}, + {"type": "float", "precision": 10}, + 123, + None, + ] + + for case in sugar_cases: + assert TypeParser.is_syntactic_sugar(case) is True + + for case in detailed_cases: + assert TypeParser.is_syntactic_sugar(case) is False + + def test_normalize_to_detailed_format(self): + """Test normalization to detailed format.""" + test_cases = [ + ("string(50)", {"type": "string", "expected_type": "STRING", "max_length": 50}), + ("float(10,2)", {"type": "float", "expected_type": "FLOAT", "precision": 10, "scale": 2}), + ({"type": "boolean"}, {"type": "boolean", "expected_type": "BOOLEAN"}), + ] + + for input_type, expected_keys in test_cases: + result = TypeParser.normalize_to_detailed_format(input_type) + for key, value in expected_keys.items(): + assert result[key] == value + + +class TestConvenienceFunctions: + """Test convenience functions""" + + def test_parse_type_function(self): + """Test parse_type convenience function.""" + result = parse_type("string(100)") + assert result == {"type": DataType.STRING.value, "max_length": 100} + + def test_is_syntactic_sugar_function(self): + """Test is_syntactic_sugar convenience function.""" + assert is_syntactic_sugar("float(10,2)") is True + assert is_syntactic_sugar({"type": "string"}) is False + + def test_normalize_type_function(self): + """Test normalize_type convenience function.""" + result = normalize_type("string(50)") + assert result["type"] == "string" + assert result["expected_type"] == "STRING" + assert result["max_length"] == 50 + + +class TestEdgeCases: + """Test edge cases and boundary conditions""" + + def test_whitespace_handling(self): + """Test handling of various whitespace scenarios.""" + test_cases = [ + (" string ", {"type": DataType.STRING.value}), + ("string( 50 )", {"type": DataType.STRING.value, "max_length": 50}), + ("float( 10 , 2 )", {"type": DataType.FLOAT.value, "precision": 10, "scale": 2}), + ("datetime( ' format ' )", {"type": DataType.DATETIME.value, "format": " format "}), + ] + + for input_type, expected in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + def test_boundary_values(self): + """Test boundary values for numeric parameters.""" + # Test valid boundary values + valid_cases = [ + ("string(1)", {"type": DataType.STRING.value, "max_length": 1}), + ("float(1,0)", {"type": DataType.FLOAT.value, "precision": 1, "scale": 0}), + ("float(1,1)", {"type": DataType.FLOAT.value, "precision": 1, "scale": 1}), + ] + + for input_type, expected in valid_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + # Test invalid boundary values + invalid_cases = [ + ("string(0)", "String length must be positive"), + ("float(0,0)", "Float precision must be positive"), + ] + + for input_type, expected_error in invalid_cases: + with pytest.raises(TypeParseError) as exc_info: + TypeParser.parse_type_definition(input_type) + assert expected_error in str(exc_info.value) + + def test_quote_variations(self): + """Test different quote styles for datetime format.""" + test_cases = [ + ("datetime('format')", "format"), + ("datetime(\"format\")", "format"), + ("datetime('format with spaces')", "format with spaces"), + ("datetime(\"format with 'quotes'\")", "format with 'quotes'"), + ] + + for input_type, expected_format in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == {"type": DataType.DATETIME.value, "format": expected_format} + + def test_large_numbers(self): + """Test handling of large numeric values.""" + test_cases = [ + ("string(65535)", {"type": DataType.STRING.value, "max_length": 65535}), + ("float(38,10)", {"type": DataType.FLOAT.value, "precision": 38, "scale": 10}), + ] + + for input_type, expected in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + +class TestIntegrationWithDataTypeEnum: + """Test integration with DataType enum""" + + def test_all_data_types_supported(self): + """Test that all DataType enum values are supported.""" + type_mappings = { + "string": DataType.STRING, + "integer": DataType.INTEGER, + "float": DataType.FLOAT, + "boolean": DataType.BOOLEAN, + "date": DataType.DATE, + "datetime": DataType.DATETIME, + } + + for type_name, expected_enum in type_mappings.items(): + result = TypeParser.parse_type_definition(type_name) + assert result["type"] == expected_enum.value + + def test_enum_value_consistency(self): + """Test that returned type values match DataType enum values.""" + result = TypeParser.parse_type_definition("string") + assert result["type"] == DataType.STRING.value == "STRING" + + result = TypeParser.parse_type_definition("float(10,2)") + assert result["type"] == DataType.FLOAT.value == "FLOAT" + + +@pytest.mark.parametrize("input_type,expected", [ + ("string(50)", {"type": "STRING", "max_length": 50}), + ("float(12,2)", {"type": "FLOAT", "precision": 12, "scale": 2}), + ("datetime('yyyymmdd')", {"type": "DATETIME", "format": "yyyymmdd"}), + ("integer", {"type": "INTEGER"}), + ("boolean", {"type": "BOOLEAN"}), + ("date", {"type": "DATE"}), +]) +def test_acceptance_criteria_examples(input_type, expected): + """Test the specific examples from the acceptance criteria.""" + result = parse_type(input_type) + assert result == expected \ No newline at end of file From 4d373689b87cad461f44ead0eaeb46afcebbc33a Mon Sep 17 00:00:00 2001 From: litedatum Date: Fri, 12 Sep 2025 16:07:49 -0400 Subject: [PATCH 2/2] test: regression test and preparation for summit PR --- CHANGELOG.md | 14 +- cli/commands/schema.py | 35 +- core/executors/schema_executor.py | 28 +- shared/utils/type_parser.py | 200 +++-- .../executors/test_native_type_integration.py | 739 ++++++++++++++++-- .../test_schema_executor_native_types.py | 392 ---------- .../test_simple_native_type_reporting.py | 139 ---- tests/unit/shared/utils/test_type_parser.py | 239 ++++-- 8 files changed, 1018 insertions(+), 768 deletions(-) delete mode 100644 tests/integration/core/executors/test_schema_executor_native_types.py delete mode 100644 tests/integration/core/executors/test_simple_native_type_reporting.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 7af93f9..1a2dd09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,10 +8,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added -- None +- feat(schema): Implement syntactic sugar for type definitions in schema rules +- feat(core): Add TypeParser utility for parsing compact type definitions (e.g., `string(50)`, `float(12,2)`) +- feat(schema): Support shorthand type syntax: `string(50)` → `{"type": "string", "max_length": 50}` +- feat(schema): Support float precision/scale syntax: `float(12,2)` → `{"type": "float", "precision": 12, "scale": 2}` +- feat(schema): Support datetime format syntax: `datetime('yyyymmdd')` → `{"type": "datetime", "format": "yyyymmdd"}` +- feat(core): Enhanced schema executor with native database type reporting capabilities +- feat(core): Add comprehensive type aliases support (str→string, int→integer, bool→boolean) +- feat(tests): Comprehensive test coverage for type parser with unit and integration tests +- feat(tests): Native type integration testing for enhanced schema validation ### Changed -- None +- enhance(cli): Updated schema command to support both syntactic sugar and detailed JSON type definitions +- enhance(core): Improved schema executor to handle parsed type definitions with metadata +- enhance(validation): Maintain backward compatibility with existing detailed JSON schema format ### Fixed - None diff --git a/cli/commands/schema.py b/cli/commands/schema.py index 6ff434a..946bec5 100644 --- a/cli/commands/schema.py +++ b/cli/commands/schema.py @@ -137,16 +137,18 @@ def _validate_single_rule_item(item: Dict[str, Any], context: str) -> None: type_name = item["type"] if not isinstance(type_name, str): raise click.UsageError(f"{context}.type must be a string when provided") - + # Use TypeParser to validate the type definition - from shared.utils.type_parser import TypeParser, TypeParseError + from shared.utils.type_parser import TypeParseError, TypeParser + try: TypeParser.parse_type_definition(type_name) except TypeParseError as e: allowed = ", ".join(sorted(_ALLOWED_TYPE_NAMES)) raise click.UsageError( f"{context}.type '{type_name}' is not supported. Error: {str(e)}. " - f"Supported formats: {allowed} or syntactic sugar like string(50), float(12,2), datetime('format')" + f"Supported formats: {allowed} or syntactic sugar like string(50), " + "float(12,2), datetime('format')" ) # required @@ -361,40 +363,43 @@ def _decompose_single_table_schema( # Handle type definition using TypeParser (supports syntactic sugar) if "type" in item and item["type"] is not None: - from shared.utils.type_parser import TypeParser, TypeParseError - + from shared.utils.type_parser import TypeParseError, TypeParser + try: # Create a type definition dict for the parser type_def = {"type": item["type"]} - + # Add metadata fields if present in the item for metadata_field in ["max_length", "precision", "scale", "format"]: if metadata_field in item: type_def[metadata_field] = item[metadata_field] - - # Parse using TypeParser (handles both syntactic sugar and detailed format) + + # Parse using TypeParser (handles both syntactic sugar + # and detailed format) parsed_type = TypeParser.parse_type_definition(item["type"]) - + # Add expected_type for schema validation column_metadata["expected_type"] = parsed_type["type"] - + # Add any parsed metadata for metadata_field in ["max_length", "precision", "scale", "format"]: if metadata_field in parsed_type: column_metadata[metadata_field] = parsed_type[metadata_field] - + # Also add any explicit metadata from the item (overrides parsed values) for metadata_field in ["max_length", "precision", "scale", "format"]: if metadata_field in item: column_metadata[metadata_field] = item[metadata_field] - + except TypeParseError as e: - raise click.UsageError(f"Invalid type definition for field '{field_name}': {str(e)}") - except Exception as e: + raise click.UsageError( + f"Invalid type definition for field '{field_name}': {str(e)}" + ) + except Exception: # Fallback to original parsing for backward compatibility dt = _map_type_name_to_datatype(str(item["type"])) column_metadata["expected_type"] = dt.value - + # Add metadata fields if present if "max_length" in item: column_metadata["max_length"] = item["max_length"] diff --git a/core/executors/schema_executor.py b/core/executors/schema_executor.py index d3f54bc..7576136 100644 --- a/core/executors/schema_executor.py +++ b/core/executors/schema_executor.py @@ -305,7 +305,7 @@ def compare_metadata( # Count failures across declared columns and strict-mode extras total_declared = len(columns_cfg) failures = 0 - field_results: list[dict[str, str]] = [] + field_results: list[dict[str, Any]] = [] for declared_name, cfg in columns_cfg.items(): expected_type_raw = cfg.get("expected_type") @@ -333,7 +333,7 @@ def compare_metadata( "failure_code": "FIELD_MISSING", "native_type": None, "canonical_type": None, - "native_metadata": {} + "native_metadata": {}, } ) continue @@ -363,9 +363,11 @@ def compare_metadata( "native_type": actual_meta.get("type"), "canonical_type": actual_meta.get("canonical_type"), "native_metadata": { - k: v for k, v in actual_meta.items() - if k in ["max_length", "precision", "scale"] and v is not None - } + k: v + for k, v in actual_meta.items() + if k in ["max_length", "precision", "scale"] + and v is not None + }, } ) elif comparison_result["metadata_status"] == "FAILED": @@ -380,9 +382,11 @@ def compare_metadata( "native_type": actual_meta.get("type"), "canonical_type": actual_meta.get("canonical_type"), "native_metadata": { - k: v for k, v in actual_meta.items() - if k in ["max_length", "precision", "scale"] and v is not None - } + k: v + for k, v in actual_meta.items() + if k in ["max_length", "precision", "scale"] + and v is not None + }, } ) else: @@ -395,9 +399,11 @@ def compare_metadata( "native_type": actual_meta.get("type"), "canonical_type": actual_meta.get("canonical_type"), "native_metadata": { - k: v for k, v in actual_meta.items() - if k in ["max_length", "precision", "scale"] and v is not None - } + k: v + for k, v in actual_meta.items() + if k in ["max_length", "precision", "scale"] + and v is not None + }, } ) diff --git a/shared/utils/type_parser.py b/shared/utils/type_parser.py index 46ef670..d6efa42 100644 --- a/shared/utils/type_parser.py +++ b/shared/utils/type_parser.py @@ -18,16 +18,20 @@ class TypeParseError(Exception): """Raised when type definition parsing fails.""" + pass class TypeParser: - """Parser for type definitions supporting both syntactic sugar and detailed JSON formats.""" - + """ + Parser for type definitions supporting both syntactic sugar and + detailed JSON formats. + """ + # Supported base types _SUPPORTED_TYPES = { "string": DataType.STRING, - "str": DataType.STRING, # Allow str as alias for string + "str": DataType.STRING, # Allow str as alias for string "integer": DataType.INTEGER, "int": DataType.INTEGER, # Allow int as alias for integer "float": DataType.FLOAT, @@ -36,28 +40,37 @@ class TypeParser: "date": DataType.DATE, "datetime": DataType.DATETIME, } - + # Regex patterns for syntactic sugar parsing - _STRING_PATTERN = re.compile(r'^(string|str)\s*\(\s*(-?\d+)\s*\)$', re.IGNORECASE) - _FLOAT_PATTERN = re.compile(r'^float\s*\(\s*(-?\d+)\s*,\s*(-?\d+)\s*\)$', re.IGNORECASE) - _DATETIME_PATTERN = re.compile(r'^datetime\s*\(\s*[\'"](.+?)[\'"]\s*\)$', re.IGNORECASE) - _SIMPLE_TYPE_PATTERN = re.compile(r'^(string|str|integer|int|float|boolean|bool|date|datetime)$', re.IGNORECASE) + _STRING_PATTERN = re.compile(r"^(string|str)\s*\(\s*(-?\d+)\s*\)$", re.IGNORECASE) + _FLOAT_PATTERN = re.compile( + r"^float\s*\(\s*(-?\d+)\s*,\s*(-?\d+)\s*\)$", re.IGNORECASE + ) + _DATETIME_PATTERN = re.compile( + r'^datetime\s*\(\s*[\'"](.+?)[\'"]\s*\)$', re.IGNORECASE + ) + _SIMPLE_TYPE_PATTERN = re.compile( + r"^(string|str|integer|int|float|boolean|bool|date|datetime)$", re.IGNORECASE + ) @classmethod - def parse_type_definition(cls, type_def: Union[str, Dict[str, Any]]) -> Dict[str, Any]: + def parse_type_definition( + cls, type_def: Union[str, Dict[str, Any]] + ) -> Dict[str, Any]: """ Parse a type definition that can be either: 1. A string with syntactic sugar (e.g., "string(50)", "float(12,2)") 2. A detailed JSON object (backward compatibility) - + Args: type_def: Type definition as string or dict - + Returns: Dict containing parsed type information with keys: - type: Canonical type name (STRING, INTEGER, etc.) - - Additional metadata keys based on type (max_length, precision, scale, format) - + - Additional metadata keys based on type (max_length, precision, + scale, format) + Raises: TypeParseError: If parsing fails or type is unsupported """ @@ -66,34 +79,34 @@ def parse_type_definition(cls, type_def: Union[str, Dict[str, Any]]) -> Dict[str elif isinstance(type_def, str): return cls._parse_syntactic_sugar(type_def.strip()) else: - raise TypeParseError(f"Type definition must be string or dict, got {type(type_def)}") + raise TypeParseError( + f"Type definition must be string or dict, got {type(type_def)}" + ) @classmethod def _parse_detailed_format(cls, type_def: Dict[str, Any]) -> Dict[str, Any]: """Parse detailed JSON format (backward compatibility).""" if "type" not in type_def: raise TypeParseError("Detailed format must include 'type' field") - + type_name = str(type_def["type"]).lower() if type_name not in cls._SUPPORTED_TYPES: raise TypeParseError(f"Unsupported type '{type_name}' in detailed format") - - result = { - "type": cls._SUPPORTED_TYPES[type_name].value - } - + + result = {"type": cls._SUPPORTED_TYPES[type_name].value} + # Copy over additional metadata metadata_fields = ["max_length", "precision", "scale", "format"] for field in metadata_fields: if field in type_def: result[field] = type_def[field] - + # Validate metadata consistency cls._validate_metadata(result) - + return result - @classmethod + @classmethod def _parse_syntactic_sugar(cls, type_str: str) -> Dict[str, Any]: """Parse syntactic sugar format.""" # Try string(length) pattern @@ -102,11 +115,8 @@ def _parse_syntactic_sugar(cls, type_str: str) -> Dict[str, Any]: length = int(match.group(2)) if length <= 0: raise TypeParseError("String length must be positive") - return { - "type": DataType.STRING.value, - "max_length": length - } - + return {"type": DataType.STRING.value, "max_length": length} + # Try float(precision,scale) pattern match = cls._FLOAT_PATTERN.match(type_str) if match: @@ -121,59 +131,65 @@ def _parse_syntactic_sugar(cls, type_str: str) -> Dict[str, Any]: return { "type": DataType.FLOAT.value, "precision": precision, - "scale": scale + "scale": scale, } - + # Try datetime('format') pattern match = cls._DATETIME_PATTERN.match(type_str) if match: format_str = match.group(1) - return { - "type": DataType.DATETIME.value, - "format": format_str - } - + return {"type": DataType.DATETIME.value, "format": format_str} + # Try simple type names match = cls._SIMPLE_TYPE_PATTERN.match(type_str) if match: type_name = match.group(1).lower() - return { - "type": cls._SUPPORTED_TYPES[type_name].value - } - + return {"type": cls._SUPPORTED_TYPES[type_name].value} + raise TypeParseError(f"Cannot parse type definition '{type_str}'") @classmethod def _validate_metadata(cls, parsed_type: Dict[str, Any]) -> None: """Validate that metadata is consistent with type.""" type_value = parsed_type.get("type") - + # Validate max_length is only for strings if "max_length" in parsed_type: if type_value != DataType.STRING.value: raise TypeParseError( - f"max_length can only be specified for STRING type, not {type_value}" + "max_length can only be specified for STRING type, " + f"not {type_value}" ) - if not isinstance(parsed_type["max_length"], int) or parsed_type["max_length"] <= 0: + if ( + not isinstance(parsed_type["max_length"], int) + or parsed_type["max_length"] <= 0 + ): raise TypeParseError("max_length must be a positive integer") - + # Validate precision/scale are only for floats if "precision" in parsed_type or "scale" in parsed_type: if type_value != DataType.FLOAT.value: raise TypeParseError( - f"precision/scale can only be specified for FLOAT type, not {type_value}" + "precision/scale can only be specified for FLOAT type, " + f"not {type_value}" ) - + if "precision" in parsed_type: - if not isinstance(parsed_type["precision"], int) or parsed_type["precision"] <= 0: + if ( + not isinstance(parsed_type["precision"], int) + or parsed_type["precision"] <= 0 + ): raise TypeParseError("precision must be a positive integer") - + if "scale" in parsed_type: if not isinstance(parsed_type["scale"], int) or parsed_type["scale"] < 0: raise TypeParseError("scale must be a non-negative integer") - if "precision" in parsed_type and parsed_type["scale"] > parsed_type["precision"]: + if ( + "precision" in parsed_type + and parsed_type["scale"] > parsed_type["precision"] + ): raise TypeParseError("scale cannot be greater than precision") - + # Validate format is only for datetime if "format" in parsed_type: if type_value != DataType.DATETIME.value: @@ -186,36 +202,88 @@ def is_syntactic_sugar(cls, type_def: Union[str, Dict[str, Any]]) -> bool: """Check if a type definition uses syntactic sugar format.""" if not isinstance(type_def, str): return False - + type_str = type_def.strip() return bool( - cls._STRING_PATTERN.match(type_str) or - cls._FLOAT_PATTERN.match(type_str) or - cls._DATETIME_PATTERN.match(type_str) or - cls._SIMPLE_TYPE_PATTERN.match(type_str) + cls._STRING_PATTERN.match(type_str) + or cls._FLOAT_PATTERN.match(type_str) + or cls._DATETIME_PATTERN.match(type_str) + or cls._SIMPLE_TYPE_PATTERN.match(type_str) ) @classmethod - def normalize_to_detailed_format(cls, type_def: Union[str, Dict[str, Any]]) -> Dict[str, Any]: + def normalize_to_detailed_format( + cls, type_def: Union[str, Dict[str, Any]] + ) -> Dict[str, Any]: """ Normalize any type definition to detailed format for backward compatibility. - + Args: type_def: Type definition in any supported format - + Returns: Dict in detailed format that existing code can use """ parsed = cls.parse_type_definition(type_def) - + # Convert canonical type back to lowercase for existing code compatibility if "type" in parsed: # Keep the canonical uppercase form for new code, but also provide lowercase - parsed["expected_type"] = parsed["type"] # For schema executor + parsed["desired_type"] = parsed["type"] # For schema executor parsed["type"] = parsed["type"].lower() # For backward compatibility - + return parsed + @classmethod + def parse_desired_type_for_core( + cls, desired_type_def: Union[str, Dict[str, Any]] + ) -> Dict[str, Any]: + """ + Parse desired_type definition and return fields with desired_ prefix + for core layer. + + This method handles the CLI-to-core interface naming for desired_type + fields, ensuring no conflicts with existing type field names. + + Args: + desired_type_def: Desired type definition in syntactic sugar or + detailed format + + Returns: + Dict with desired_ prefixed field names suitable for core layer: + { + "desired_type": "STRING", + "desired_max_length": 50, + "desired_precision": 10, + "desired_scale": 2, + "desired_format": "YYYY-MM-DD" + } + + Example: + parse_desired_type_for_core("string(50)") + → {"desired_type": "STRING", "desired_max_length": 50} + + parse_desired_type_for_core("float(10,2)") + → {"desired_type": "FLOAT", "desired_precision": 10, "desired_scale": 2} + """ + # Parse the desired type definition using existing logic + parsed = cls.parse_type_definition(desired_type_def) + + # Transform to core layer format with desired_ prefix + core_format = {} + + # Main type field + if "type" in parsed: + core_format["desired_type"] = parsed["type"] + + # Metadata fields with desired_ prefix + metadata_fields = ["max_length", "precision", "scale", "format"] + for field in metadata_fields: + if field in parsed: + core_format[f"desired_{field}"] = parsed[field] + + return core_format + # Convenience functions for common usage patterns def parse_type(type_def: Union[str, Dict[str, Any]]) -> Dict[str, Any]: @@ -230,4 +298,14 @@ def is_syntactic_sugar(type_def: Union[str, Dict[str, Any]]) -> bool: def normalize_type(type_def: Union[str, Dict[str, Any]]) -> Dict[str, Any]: """Convenience function to normalize type definition to detailed format.""" - return TypeParser.normalize_to_detailed_format(type_def) \ No newline at end of file + return TypeParser.normalize_to_detailed_format(type_def) + + +def parse_desired_type_for_core( + desired_type_def: Union[str, Dict[str, Any]] +) -> Dict[str, Any]: + """ + Convenience function to parse desired_type with proper core layer + field naming. + """ + return TypeParser.parse_desired_type_for_core(desired_type_def) diff --git a/tests/integration/core/executors/test_native_type_integration.py b/tests/integration/core/executors/test_native_type_integration.py index a6265c3..d25e0e5 100644 --- a/tests/integration/core/executors/test_native_type_integration.py +++ b/tests/integration/core/executors/test_native_type_integration.py @@ -17,10 +17,8 @@ from shared.schema.connection_schema import ConnectionSchema from shared.schema.rule_schema import RuleSchema from shared.utils.logger import get_logger -from tests.shared.utils.database_utils import ( - get_available_databases, - get_mysql_connection_params, -) +from tests.shared.builders.test_builders import TestDataBuilder +from tests.shared.utils.database_utils import get_available_databases pytestmark = pytest.mark.asyncio @@ -32,16 +30,40 @@ def _skip_if_mysql_unavailable() -> None: pytest.skip("MySQL not configured; skipping integration tests") +def build_schema_rule_with_native_reporting( + columns: dict, + table_name: str = "test_table", + database_name: str = "test_db", + strict_mode: bool = False, + case_insensitive: bool = False, +) -> RuleSchema: + """Build a SCHEMA rule for testing native type reporting.""" + builder = TestDataBuilder.rule() + rule = ( + builder.with_name(f"schema_{table_name}") + .with_target(database_name, table_name, "") # Table-level rule + .with_type(RuleType.SCHEMA) + .with_parameter("columns", columns) + .with_parameter("strict_mode", strict_mode) + .with_parameter("case_insensitive", case_insensitive) + .build() + ) + return rule + + @pytest.mark.integration @pytest.mark.database class TestNativeTypeIntegration: """Test native type reporting functionality with real MySQL database.""" - async def _prepare_test_environment(self, mysql_connection_params): + async def _prepare_test_environment( + self, mysql_connection_params: dict + ) -> QueryExecutor: """Prepare MySQL test environment with test table.""" - from shared.database.connection import get_db_url, get_engine from typing import cast - + + from shared.database.connection import get_db_url, get_engine + # Create engine for setup db_url = get_db_url( str(mysql_connection_params["db_type"]), @@ -58,7 +80,7 @@ async def _prepare_test_environment(self, mysql_connection_params): await executor.execute_query( "DROP TABLE IF EXISTS native_type_test", fetch=False ) - + await executor.execute_query( """ CREATE TABLE native_type_test ( @@ -79,8 +101,8 @@ async def _prepare_test_environment(self, mysql_connection_params): # Insert test data await executor.execute_query( """ - INSERT INTO native_type_test - (name, email, age, score, is_active, birth_date) VALUES + INSERT INTO native_type_test + (name, email, age, score, is_active, birth_date) VALUES ('Alice', 'alice@example.com', 25, 85.50, TRUE, '1998-05-15'), ('Bob', 'bob@example.com', 30, 92.75, FALSE, '1993-08-20') """, @@ -90,10 +112,12 @@ async def _prepare_test_environment(self, mysql_connection_params): await engine.dispose() return executor - async def test_native_type_reporting_comprehensive(self, mysql_connection_params): + async def test_native_type_reporting_comprehensive( + self, mysql_connection_params: dict + ) -> None: """Test that native type information is correctly reported for various MySQL types.""" _skip_if_mysql_unavailable() - + # Prepare test environment await self._prepare_test_environment(mysql_connection_params) @@ -118,8 +142,14 @@ async def test_native_type_reporting_comprehensive(self, mysql_connection_params "name": {"expected_type": DataType.STRING.value, "max_length": 50}, "email": {"expected_type": DataType.STRING.value, "max_length": 100}, "age": {"expected_type": DataType.INTEGER.value}, - "score": {"expected_type": DataType.FLOAT.value, "precision": 5, "scale": 2}, - "is_active": {"expected_type": DataType.INTEGER.value}, # MySQL BOOLEAN -> TINYINT(1) -> INTEGER + "score": { + "expected_type": DataType.FLOAT.value, + "precision": 5, + "scale": 2, + }, + "is_active": { + "expected_type": DataType.INTEGER.value + }, # MySQL BOOLEAN -> TINYINT(1) -> INTEGER "birth_date": {"expected_type": DataType.DATE.value}, "created_at": {"expected_type": DataType.DATETIME.value}, "description": {"expected_type": DataType.STRING.value}, @@ -134,11 +164,13 @@ async def test_native_type_reporting_comprehensive(self, mysql_connection_params severity=SeverityLevel.MEDIUM, action=RuleAction.LOG, target=RuleTarget( - entities=[TargetEntity( - database=mysql_connection_params["database"], - table="native_type_test", - column=None - )], + entities=[ + TargetEntity( + database=mysql_connection_params["database"], + table="native_type_test", + column=None, + ) + ], relationship_type="single_table", ), parameters={"columns": columns}, @@ -147,58 +179,70 @@ async def test_native_type_reporting_comprehensive(self, mysql_connection_params try: # Execute the schema rule result = await executor.execute_rule(rule) - + logger.info(f"Schema rule execution status: {result.status}") logger.info(f"Execution message: {result.execution_message}") # Debug: print detailed information execution_plan = result.execution_plan + assert execution_plan is not None if "schema_details" in execution_plan: schema_details = execution_plan["schema_details"] if "field_results" in schema_details: field_results = schema_details["field_results"] logger.info(f"Number of field results: {len(field_results)}") for fr in field_results: - logger.info(f"Field {fr.get('column')}: existence={fr.get('existence')}, type={fr.get('type')}, failure_code={fr.get('failure_code')}") - if fr.get('failure_code') != 'NONE': - logger.info(f" Failure details: {fr.get('failure_details')}") + logger.info( + f"Field {fr.get('column')}: existence={fr.get('existence')}, type={fr.get('type')}, failure_code={fr.get('failure_code')}" + ) + if fr.get("failure_code") != "NONE": + logger.info( + f" Failure details: {fr.get('failure_details')}" + ) # Verify basic execution - should pass now with corrected type expectations - assert result.status == "PASSED", f"Expected PASSED, got {result.status}: {result.execution_message}" - + assert ( + result.status == "PASSED" + ), f"Expected PASSED, got {result.status}: {result.execution_message}" + # Verify execution plan contains schema details + assert execution_plan is not None assert "schema_details" in execution_plan - + schema_details = execution_plan["schema_details"] assert "field_results" in schema_details assert schema_details["table_exists"] is True - + field_results = schema_details["field_results"] - assert len(field_results) == len(columns), f"Expected {len(columns)} field results, got {len(field_results)}" + assert len(field_results) == len( + columns + ), f"Expected {len(columns)} field results, got {len(field_results)}" # Test native type information for each field field_map = {fr["column"]: fr for fr in field_results} - + # Test INTEGER type (id, age) for col in ["id", "age"]: field_result = field_map[col] assert "native_type" in field_result - assert "canonical_type" in field_result + assert "canonical_type" in field_result assert "native_metadata" in field_result - + assert field_result["canonical_type"] == DataType.INTEGER.value assert field_result["native_type"] is not None assert isinstance(field_result["native_metadata"], dict) - - logger.info(f"{col}: native_type={field_result['native_type']}, " - f"canonical_type={field_result['canonical_type']}") + + logger.info( + f"{col}: native_type={field_result['native_type']}, " + f"canonical_type={field_result['canonical_type']}" + ) # Test STRING type with length (name, email) name_result = field_map["name"] assert name_result["canonical_type"] == DataType.STRING.value assert name_result["native_metadata"].get("max_length") == 50 - - email_result = field_map["email"] + + email_result = field_map["email"] assert email_result["canonical_type"] == DataType.STRING.value assert email_result["native_metadata"].get("max_length") == 100 @@ -212,7 +256,9 @@ async def test_native_type_reporting_comprehensive(self, mysql_connection_params boolean_result = field_map["is_active"] # In MySQL, BOOLEAN is actually stored as TINYINT(1) which maps to INTEGER assert boolean_result["canonical_type"] == DataType.INTEGER.value - logger.info(f"is_active correctly identified as INTEGER (MySQL BOOLEAN -> TINYINT mapping)") + logger.info( + f"is_active correctly identified as INTEGER (MySQL BOOLEAN -> TINYINT mapping)" + ) # Test DATE type (birth_date) date_result = field_map["birth_date"] @@ -229,26 +275,29 @@ async def test_native_type_reporting_comprehensive(self, mysql_connection_params # Verify all fields have the required enhanced information for field_result in field_results: assert field_result["existence"] == "PASSED" - assert field_result["type"] == "PASSED" + assert field_result["type"] == "PASSED" assert field_result["failure_code"] == "NONE" - + # Verify enhanced fields exist and have meaningful values assert field_result["native_type"] is not None assert field_result["canonical_type"] is not None assert isinstance(field_result["native_metadata"], dict) - - logger.info(f"✓ {field_result['column']}: " - f"native='{field_result['native_type']}', " - f"canonical='{field_result['canonical_type']}', " - f"metadata={field_result['native_metadata']}") + + logger.info( + f"✓ {field_result['column']}: " + f"native='{field_result['native_type']}', " + f"canonical='{field_result['canonical_type']}', " + f"metadata={field_result['native_metadata']}" + ) logger.info("✅ Native type reporting test completed successfully") finally: # Cleanup - from shared.database.connection import get_db_url, get_engine from typing import cast - + + from shared.database.connection import get_db_url, get_engine + db_url = get_db_url( str(mysql_connection_params["db_type"]), str(mysql_connection_params["host"]), @@ -259,22 +308,24 @@ async def test_native_type_reporting_comprehensive(self, mysql_connection_params ) cleanup_engine = await get_engine(db_url, pool_size=1, echo=False) cleanup_executor = QueryExecutor(cleanup_engine) - + await cleanup_executor.execute_query( "DROP TABLE IF EXISTS native_type_test", fetch=False ) await cleanup_engine.dispose() - async def test_native_type_reporting_with_type_mismatch(self, mysql_connection_params): + async def test_native_type_reporting_with_type_mismatch( + self, mysql_connection_params: dict + ) -> None: """Test native type information is included even for TYPE_MISMATCH cases.""" _skip_if_mysql_unavailable() - - # Prepare test environment + + # Prepare test environment await self._prepare_test_environment(mysql_connection_params) # Create connection schema connection = ConnectionSchema( - name="type_mismatch_test_connection", + name="type_mismatch_test_connection", description="Connection for testing type mismatch scenarios", connection_type=ConnectionType.MYSQL, host=mysql_connection_params["host"], @@ -290,8 +341,12 @@ async def test_native_type_reporting_with_type_mismatch(self, mysql_connection_p # Define schema rule with intentional type mismatches columns = { "id": {"expected_type": DataType.STRING.value}, # Mismatch: actual is INT - "name": {"expected_type": DataType.INTEGER.value}, # Mismatch: actual is VARCHAR - "age": {"expected_type": DataType.FLOAT.value}, # Mismatch: actual is SMALLINT + "name": { + "expected_type": DataType.INTEGER.value + }, # Mismatch: actual is VARCHAR + "age": { + "expected_type": DataType.FLOAT.value + }, # Mismatch: actual is SMALLINT } rule = RuleSchema( @@ -303,11 +358,13 @@ async def test_native_type_reporting_with_type_mismatch(self, mysql_connection_p severity=SeverityLevel.MEDIUM, action=RuleAction.LOG, target=RuleTarget( - entities=[TargetEntity( - database=mysql_connection_params["database"], - table="native_type_test", - column=None - )], + entities=[ + TargetEntity( + database=mysql_connection_params["database"], + table="native_type_test", + column=None, + ) + ], relationship_type="single_table", ), parameters={"columns": columns}, @@ -316,14 +373,15 @@ async def test_native_type_reporting_with_type_mismatch(self, mysql_connection_p try: # Execute the schema rule result = await executor.execute_rule(rule) - + logger.info(f"Type mismatch test status: {result.status}") logger.info(f"Execution message: {result.execution_message}") # Should fail due to type mismatches assert result.status == "FAILED" - + # Verify schema details + assert result.execution_plan is not None schema_details = result.execution_plan["schema_details"] field_results = schema_details["field_results"] assert len(field_results) == 3 @@ -333,27 +391,132 @@ async def test_native_type_reporting_with_type_mismatch(self, mysql_connection_p assert field_result["existence"] == "PASSED" assert field_result["type"] == "FAILED" assert field_result["failure_code"] == "TYPE_MISMATCH" - + # Critical: native type info should still be present for failed validations assert "native_type" in field_result assert "canonical_type" in field_result assert "native_metadata" in field_result - + assert field_result["native_type"] is not None assert field_result["canonical_type"] is not None assert isinstance(field_result["native_metadata"], dict) - - logger.info(f"❌ {field_result['column']}: TYPE_MISMATCH but still has " - f"native='{field_result['native_type']}', " - f"canonical='{field_result['canonical_type']}'") + + logger.info( + f"❌ {field_result['column']}: TYPE_MISMATCH but still has " + f"native='{field_result['native_type']}', " + f"canonical='{field_result['canonical_type']}'" + ) logger.info("✅ Type mismatch native type reporting test completed") finally: # Cleanup + from typing import cast + from shared.database.connection import get_db_url, get_engine + + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + cleanup_engine = await get_engine(db_url, pool_size=1, echo=False) + cleanup_executor = QueryExecutor(cleanup_engine) + + await cleanup_executor.execute_query( + "DROP TABLE IF EXISTS native_type_test", fetch=False + ) + await cleanup_engine.dispose() + + async def test_native_type_reporting_missing_field( + self, mysql_connection_params: dict + ) -> None: + """Test native type information handling for missing fields.""" + _skip_if_mysql_unavailable() + + # Prepare test environment with limited fields + await self._prepare_test_environment(mysql_connection_params) + + # Create connection schema + connection = ConnectionSchema( + name="missing_field_test_connection", + description="Connection for testing missing field scenarios", + connection_type=ConnectionType.MYSQL, + host=mysql_connection_params["host"], + port=mysql_connection_params["port"], + username=mysql_connection_params["username"], + password=mysql_connection_params["password"], + db_name=mysql_connection_params["database"], + ) + + # Create schema executor + executor = SchemaExecutor(connection, test_mode=True) + + # Define schema rule expecting more fields than exist in native_type_test + columns = { + "id": {"expected_type": DataType.INTEGER.value}, + "name": {"expected_type": DataType.STRING.value}, + "missing_field": { + "expected_type": DataType.STRING.value + }, # This field doesn't exist + } + + rule = build_schema_rule_with_native_reporting( + columns, "native_type_test", mysql_connection_params["database"] + ) + + try: + # Execute the schema rule + result = await executor.execute_rule(rule) + + logger.info(f"Missing field test status: {result.status}") + logger.info(f"Execution message: {result.execution_message}") + + # Should fail due to missing field + assert result.status == "FAILED" + + # Verify schema details + assert result.execution_plan is not None + schema_details = result.execution_plan["schema_details"] + field_results = schema_details["field_results"] + assert len(field_results) == 3 + + # Find results for each field + field_map = {fr["column"]: fr for fr in field_results} + + # Existing fields should have native type information + for existing_field in ["id", "name"]: + field_result = field_map[existing_field] + assert field_result["existence"] == "PASSED" + assert field_result["type"] == "PASSED" + assert field_result["native_type"] is not None + assert field_result["canonical_type"] is not None + assert isinstance(field_result["native_metadata"], dict) + logger.info( + f"✓ {existing_field}: native_type={field_result['native_type']}" + ) + + # Missing field should have null native type information + missing_result = field_map["missing_field"] + assert missing_result["existence"] == "FAILED" + assert missing_result["type"] == "SKIPPED" + assert missing_result["failure_code"] == "FIELD_MISSING" + assert missing_result["native_type"] is None + assert missing_result["canonical_type"] is None + assert missing_result["native_metadata"] == {} + logger.info("✓ missing_field: correctly handled as FIELD_MISSING") + + logger.info("✅ Missing field native type reporting test completed") + + finally: + # Cleanup from typing import cast - + + from shared.database.connection import get_db_url, get_engine + db_url = get_db_url( str(mysql_connection_params["db_type"]), str(mysql_connection_params["host"]), @@ -364,8 +527,446 @@ async def test_native_type_reporting_with_type_mismatch(self, mysql_connection_p ) cleanup_engine = await get_engine(db_url, pool_size=1, echo=False) cleanup_executor = QueryExecutor(cleanup_engine) - + await cleanup_executor.execute_query( "DROP TABLE IF EXISTS native_type_test", fetch=False ) - await cleanup_engine.dispose() \ No newline at end of file + await cleanup_engine.dispose() + + async def test_native_metadata_precision_scale( + self, mysql_connection_params: dict + ) -> None: + """Test native metadata reporting for decimal types with precision/scale.""" + _skip_if_mysql_unavailable() + + # Create test environment with decimal types + from typing import cast + + from shared.database.connection import get_db_url, get_engine + + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + engine = await get_engine(db_url, pool_size=1, echo=False) + executor = QueryExecutor(engine) + + # Clean up and create test table with decimal types + await executor.execute_query("DROP TABLE IF EXISTS precision_test", fetch=False) + + await executor.execute_query( + """ + CREATE TABLE precision_test ( + price DECIMAL(10,2), + amount NUMERIC(8,3), + ratio FLOAT(7,4) + ) ENGINE=InnoDB + """, + fetch=False, + ) + + await engine.dispose() + + # Create connection schema + connection = ConnectionSchema( + name="precision_test_connection", + description="Connection for testing precision/scale metadata", + connection_type=ConnectionType.MYSQL, + host=mysql_connection_params["host"], + port=mysql_connection_params["port"], + username=mysql_connection_params["username"], + password=mysql_connection_params["password"], + db_name=mysql_connection_params["database"], + ) + + # Create schema executor + schema_executor = SchemaExecutor(connection, test_mode=True) + + # Define schema rule for decimal types + columns = { + "price": { + "expected_type": DataType.FLOAT.value, + "precision": 10, + "scale": 2, + }, + "amount": { + "expected_type": DataType.FLOAT.value, + "precision": 8, + "scale": 3, + }, + "ratio": {"expected_type": DataType.FLOAT.value}, + } + rule = build_schema_rule_with_native_reporting( + columns, "precision_test", mysql_connection_params["database"] + ) + + try: + # Execute rule + result = await schema_executor.execute_rule(rule) + + logger.info(f"Precision/scale test status: {result.status}") + + # Verify field_results include precision/scale metadata + assert result.execution_plan is not None + schema_details = result.execution_plan["schema_details"] + field_results = schema_details["field_results"] + + assert len(field_results) == 3 + + for field_result in field_results: + assert "native_metadata" in field_result + native_metadata = field_result["native_metadata"] + + # Verify the native type is captured + assert field_result["native_type"] is not None + assert field_result["canonical_type"] == DataType.FLOAT.value + + # Verify structure (MySQL may provide precision/scale info) + assert isinstance(native_metadata, dict) + + column_name = field_result["column"] + logger.info( + f"✓ {column_name}: native_type={field_result['native_type']}, " + f"metadata={native_metadata}" + ) + + logger.info("✅ Precision/scale metadata test completed") + + finally: + # Cleanup + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + cleanup_engine = await get_engine(db_url, pool_size=1, echo=False) + cleanup_executor = QueryExecutor(cleanup_engine) + + await cleanup_executor.execute_query( + "DROP TABLE IF EXISTS precision_test", fetch=False + ) + await cleanup_engine.dispose() + + async def test_comprehensive_type_coverage_extended( + self, mysql_connection_params: dict + ) -> None: + """Test native type reporting across extended variety of database types.""" + _skip_if_mysql_unavailable() + + # Create test environment with comprehensive type coverage + from typing import cast + + from shared.database.connection import get_db_url, get_engine + + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + engine = await get_engine(db_url, pool_size=1, echo=False) + executor = QueryExecutor(engine) + + # Clean up and create comprehensive test table + await executor.execute_query( + "DROP TABLE IF EXISTS comprehensive_test", fetch=False + ) + + await executor.execute_query( + """ + CREATE TABLE comprehensive_test ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + tiny_num TINYINT, + small_num SMALLINT, + medium_num MEDIUMINT, + big_num BIGINT, + float_num FLOAT, + double_num DOUBLE, + decimal_num DECIMAL(15,4), + char_field CHAR(10), + varchar_field VARCHAR(255), + text_field TEXT, + bool_field BOOLEAN, + date_field DATE, + datetime_field DATETIME, + timestamp_field TIMESTAMP + ) ENGINE=InnoDB + """, + fetch=False, + ) + + await engine.dispose() + + # Create connection schema + connection = ConnectionSchema( + name="comprehensive_test_connection", + description="Connection for comprehensive type coverage testing", + connection_type=ConnectionType.MYSQL, + host=mysql_connection_params["host"], + port=mysql_connection_params["port"], + username=mysql_connection_params["username"], + password=mysql_connection_params["password"], + db_name=mysql_connection_params["database"], + ) + + # Create schema executor + schema_executor = SchemaExecutor(connection, test_mode=True) + + # Define comprehensive schema rule + columns = { + "id": {"expected_type": DataType.INTEGER.value}, + "tiny_num": {"expected_type": DataType.INTEGER.value}, + "small_num": {"expected_type": DataType.INTEGER.value}, + "medium_num": {"expected_type": DataType.INTEGER.value}, + "big_num": {"expected_type": DataType.INTEGER.value}, + "float_num": {"expected_type": DataType.FLOAT.value}, + "double_num": {"expected_type": DataType.FLOAT.value}, + "decimal_num": {"expected_type": DataType.FLOAT.value}, + "char_field": {"expected_type": DataType.STRING.value}, + "varchar_field": {"expected_type": DataType.STRING.value}, + "text_field": {"expected_type": DataType.STRING.value}, + "bool_field": { + "expected_type": DataType.INTEGER.value + }, # MySQL BOOLEAN -> TINYINT + "date_field": {"expected_type": DataType.DATE.value}, + "datetime_field": {"expected_type": DataType.DATETIME.value}, + "timestamp_field": {"expected_type": DataType.DATETIME.value}, + } + + rule = build_schema_rule_with_native_reporting( + columns, "comprehensive_test", mysql_connection_params["database"] + ) + + try: + # Execute rule + result = await schema_executor.execute_rule(rule) + + logger.info(f"Comprehensive type coverage test status: {result.status}") + logger.info(f"Execution message: {result.execution_message}") + + # Debug field-level failures before asserting + if result.status == "FAILED": + assert result.execution_plan is not None + schema_details = result.execution_plan["schema_details"] + field_results = schema_details["field_results"] + + for field_result in field_results: + if field_result["failure_code"] != "NONE": + logger.error( + f"❌ {field_result['column']}: {field_result['failure_code']} - " + f"native='{field_result.get('native_type')}', " + f"canonical='{field_result.get('canonical_type')}'" + ) + if field_result.get("failure_details"): + logger.error( + f" Details: {field_result['failure_details']}" + ) + + # Should pass with correct type mappings + assert result.status == "PASSED" + + # Verify all fields have complete native type information + assert result.execution_plan is not None + schema_details = result.execution_plan["schema_details"] + field_results = schema_details["field_results"] + + assert len(field_results) == len(columns) + + for field_result in field_results: + # Every field should have complete native type information + assert field_result["native_type"] is not None + assert field_result["canonical_type"] is not None + assert isinstance(field_result["native_metadata"], dict) + assert field_result["existence"] == "PASSED" + assert field_result["type"] == "PASSED" + assert field_result["failure_code"] == "NONE" + + column_name = field_result["column"] + logger.info( + f"✓ {column_name}: native='{field_result['native_type']}', " + f"canonical='{field_result['canonical_type']}', " + f"metadata={field_result['native_metadata']}" + ) + + logger.info("✅ Comprehensive type coverage test completed successfully") + + finally: + # Cleanup + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + cleanup_engine = await get_engine(db_url, pool_size=1, echo=False) + cleanup_executor = QueryExecutor(cleanup_engine) + + await cleanup_executor.execute_query( + "DROP TABLE IF EXISTS comprehensive_test", fetch=False + ) + await cleanup_engine.dispose() + + +@pytest.mark.integration +@pytest.mark.database +class TestNativeTypeReportingBackwardCompatibility: + """Test that native type enhancements maintain backward compatibility.""" + + async def _prepare_compatibility_test_environment( + self, mysql_connection_params: dict + ) -> QueryExecutor: + """Prepare MySQL test environment for compatibility testing.""" + from typing import cast + + from shared.database.connection import get_db_url, get_engine + + # Create engine for setup + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + engine = await get_engine(db_url, pool_size=1, echo=False) + executor = QueryExecutor(engine) + + # Clean up and create test table + await executor.execute_query("DROP TABLE IF EXISTS compat_test", fetch=False) + + await executor.execute_query( + """ + CREATE TABLE compat_test ( + id INT PRIMARY KEY AUTO_INCREMENT, + name VARCHAR(50) NOT NULL, + status BOOLEAN DEFAULT TRUE + ) ENGINE=InnoDB + """, + fetch=False, + ) + + await engine.dispose() + return executor + + async def test_existing_functionality_unchanged( + self, mysql_connection_params: dict + ) -> None: + """Test that existing schema validation functionality is unchanged.""" + _skip_if_mysql_unavailable() + + # Prepare test environment + await self._prepare_compatibility_test_environment(mysql_connection_params) + + # Create connection schema + connection = ConnectionSchema( + name="compat_test_connection", + description="Connection for backward compatibility testing", + connection_type=ConnectionType.MYSQL, + host=mysql_connection_params["host"], + port=mysql_connection_params["port"], + username=mysql_connection_params["username"], + password=mysql_connection_params["password"], + db_name=mysql_connection_params["database"], + ) + + # Create schema executor + executor = SchemaExecutor(connection, test_mode=True) + + # Use existing schema rule format + columns = { + "id": {"expected_type": DataType.INTEGER.value}, + "name": {"expected_type": DataType.STRING.value}, + "status": { + "expected_type": DataType.INTEGER.value + }, # BOOLEAN -> INTEGER in MySQL + } + + rule = build_schema_rule_with_native_reporting( + columns, "compat_test", mysql_connection_params["database"] + ) + + try: + # Execute rule + result = await executor.execute_rule(rule) + + logger.info(f"Backward compatibility test status: {result.status}") + + # Verify existing fields are still present and working + assert result.status == "PASSED" + assert result.rule_id == rule.id + assert len(result.dataset_metrics) == 1 + + # Verify execution_plan structure is maintained + execution_plan = result.execution_plan + assert execution_plan is not None + assert "execution_type" in execution_plan + assert "schema_details" in execution_plan + + schema_details = execution_plan["schema_details"] + assert "field_results" in schema_details + assert "extras" in schema_details + assert "table_exists" in schema_details + + # Verify field_results have expected legacy fields + field_results = schema_details["field_results"] + assert len(field_results) == 3 + + for field_result in field_results: + # Legacy fields must be present + assert "column" in field_result + assert "existence" in field_result + assert "type" in field_result + assert "failure_code" in field_result + + # Enhanced fields should also be present + assert "native_type" in field_result + assert "canonical_type" in field_result + assert "native_metadata" in field_result + + # Values should be meaningful + assert field_result["existence"] == "PASSED" + assert field_result["type"] == "PASSED" + assert field_result["failure_code"] == "NONE" + assert field_result["native_type"] is not None + assert field_result["canonical_type"] is not None + assert isinstance(field_result["native_metadata"], dict) + + logger.info( + f"✓ {field_result['column']}: legacy + enhanced fields present" + ) + + logger.info("✅ Backward compatibility test completed successfully") + + finally: + # Cleanup + from typing import cast + + from shared.database.connection import get_db_url, get_engine + + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + cleanup_engine = await get_engine(db_url, pool_size=1, echo=False) + cleanup_executor = QueryExecutor(cleanup_engine) + + await cleanup_executor.execute_query( + "DROP TABLE IF EXISTS compat_test", fetch=False + ) + await cleanup_engine.dispose() diff --git a/tests/integration/core/executors/test_schema_executor_native_types.py b/tests/integration/core/executors/test_schema_executor_native_types.py deleted file mode 100644 index 889b49a..0000000 --- a/tests/integration/core/executors/test_schema_executor_native_types.py +++ /dev/null @@ -1,392 +0,0 @@ -""" -Integration tests for SchemaExecutor native type reporting enhancements - -Tests the new functionality that includes native_type, canonical_type, -and native_metadata in field_results for all scenarios including TYPE_MISMATCH. -""" - -import pytest - -from core.executors.schema_executor import SchemaExecutor -from shared.enums import DataType, RuleType -from shared.schema.connection_schema import ConnectionSchema -from shared.schema.rule_schema import RuleSchema -from tests.shared.builders.test_builders import TestDataBuilder -from tests.shared.utils.database_utils import ( - get_available_databases, - get_mysql_connection_params, -) - -pytestmark = pytest.mark.asyncio - - -def _skip_if_mysql_unavailable() -> None: - if "mysql" not in get_available_databases(): - pytest.skip("MySQL not configured; skipping integration tests") - - -@pytest.fixture -def mysql_connection(): - """Create MySQL connection for testing.""" - _skip_if_mysql_unavailable() - params = get_mysql_connection_params() - from shared.enums.connection_types import ConnectionType - from typing import cast - - return ConnectionSchema( - name="mysql_native_type_test", - description="MySQL connection for native type testing", - connection_type=ConnectionType.MYSQL, - host=str(params["host"]), - port=cast(int, params["port"]), - db_name=str(params["database"]), - username=str(params["username"]), - password=str(params["password"]), - ) - - -@pytest.fixture -async def schema_executor(mysql_connection): - """Create SchemaExecutor with MySQL connection.""" - return SchemaExecutor(mysql_connection, test_mode=True) - - -def build_schema_rule_with_native_reporting( - columns: dict, - table_name: str = "test_table", - strict_mode: bool = False, - case_insensitive: bool = False -) -> RuleSchema: - """Build a SCHEMA rule for testing native type reporting.""" - builder = TestDataBuilder.rule() - rule = ( - builder.with_name(f"schema_{table_name}") - .with_target("test_db", table_name, None) # Table-level rule - .with_type(RuleType.SCHEMA) - .with_parameter("columns", columns) - .with_parameter("strict_mode", strict_mode) - .with_parameter("case_insensitive", case_insensitive) - .build() - ) - return rule - - -@pytest.mark.integration -@pytest.mark.database -class TestSchemaExecutorNativeTypeReporting: - """Test native type reporting enhancements in SchemaExecutor.""" - - async def test_native_type_reporting_successful_case(self, schema_executor): - """Test that native type information is included in successful validation.""" - # Create test table with known types - from sqlalchemy import text - engine = await schema_executor.get_engine() - async with engine.begin() as conn: - await conn.execute(text("DROP TABLE IF EXISTS test_users")) - await conn.execute(text( - "CREATE TABLE test_users (id INT, name VARCHAR(50), active BOOLEAN)" - )) - - # Define schema rule that should pass - columns = { - "id": {"expected_type": DataType.INTEGER.value}, - "name": {"expected_type": DataType.STRING.value, "max_length": 50}, - "active": {"expected_type": DataType.BOOLEAN.value}, - } - rule = build_schema_rule_with_native_reporting(columns, "test_users") - - # Execute rule - result = await schema_executor.execute_rule(rule) - - # Verify result structure - assert result.status == "PASSED" - - # Verify enhanced field_results include native type information - schema_details = result.execution_plan.get("schema_details", {}) - field_results = schema_details.get("field_results", []) - - assert len(field_results) == 3 - - for field_result in field_results: - # Each field result should have native type information - assert "native_type" in field_result - assert "canonical_type" in field_result - assert "native_metadata" in field_result - - # Native type should be the database-specific type - assert field_result["native_type"] is not None - assert isinstance(field_result["native_type"], str) - - # Canonical type should be the standardized type - assert field_result["canonical_type"] in [dt.value for dt in DataType] - - # Native metadata should be a dict - assert isinstance(field_result["native_metadata"], dict) - - # Verify specific field expectations - if field_result["column"] == "id": - assert field_result["canonical_type"] == DataType.INTEGER.value - assert field_result["failure_code"] == "NONE" - elif field_result["column"] == "name": - assert field_result["canonical_type"] == DataType.STRING.value - # Should include max_length in native_metadata for VARCHAR(50) - assert "max_length" in field_result["native_metadata"] - assert field_result["native_metadata"]["max_length"] == 50 - elif field_result["column"] == "active": - assert field_result["canonical_type"] == DataType.BOOLEAN.value - - async def test_native_type_reporting_type_mismatch(self, schema_executor): - """Test that native type information is included even for TYPE_MISMATCH cases.""" - # Create test table - from sqlalchemy import text - engine = await schema_executor.get_engine() - async with engine.begin() as conn: - await conn.execute(text("DROP TABLE IF EXISTS test_mismatch")) - await conn.execute(text( - "CREATE TABLE test_mismatch (id INT, name VARCHAR(100))" - )) - - # Define schema rule with type mismatches - columns = { - "id": {"expected_type": DataType.STRING.value}, # Mismatch: expecting string, actual is integer - "name": {"expected_type": DataType.INTEGER.value}, # Mismatch: expecting integer, actual is string - } - rule = build_schema_rule_with_native_reporting(columns, "test_mismatch") - - # Execute rule - result = await schema_executor.execute_rule(rule) - - # Should fail due to type mismatches - assert result.status == "FAILED" - - # Verify enhanced field_results include native type information even for failures - schema_details = result.execution_plan.get("schema_details", {}) - field_results = schema_details.get("field_results", []) - - assert len(field_results) == 2 - - for field_result in field_results: - # Even with type mismatches, native type information should be present - assert "native_type" in field_result - assert "canonical_type" in field_result - assert "native_metadata" in field_result - - # Should have failed type validation but passed existence - assert field_result["existence"] == "PASSED" - assert field_result["type"] == "FAILED" - assert field_result["failure_code"] == "TYPE_MISMATCH" - - # Native type information should still be accurate - assert field_result["native_type"] is not None - assert field_result["canonical_type"] is not None - - # Verify the actual vs expected mismatch - if field_result["column"] == "id": - # Actual type is INTEGER, but expected STRING - assert field_result["canonical_type"] == DataType.INTEGER.value - elif field_result["column"] == "name": - # Actual type is STRING, but expected INTEGER - assert field_result["canonical_type"] == DataType.STRING.value - # Should include max_length from VARCHAR(100) - assert "max_length" in field_result["native_metadata"] - assert field_result["native_metadata"]["max_length"] == 100 - - async def test_native_type_reporting_field_missing(self, schema_executor): - """Test native type information handling for missing fields.""" - # Create test table with only some of the expected fields - from sqlalchemy import text - engine = await schema_executor.get_engine() - async with engine.begin() as conn: - await conn.execute(text("DROP TABLE IF EXISTS test_partial")) - await conn.execute(text("CREATE TABLE test_partial (id INT)")) - - # Define schema rule expecting more fields than exist - columns = { - "id": {"expected_type": DataType.INTEGER.value}, - "missing_field": {"expected_type": DataType.STRING.value}, - } - rule = build_schema_rule_with_native_reporting(columns, "test_partial") - - # Execute rule - result = await schema_executor.execute_rule(rule) - - # Should fail due to missing field - assert result.status == "FAILED" - - # Verify field_results - schema_details = result.execution_plan.get("schema_details", {}) - field_results = schema_details.get("field_results", []) - - assert len(field_results) == 2 - - # Find results for each field - id_result = next(fr for fr in field_results if fr["column"] == "id") - missing_result = next(fr for fr in field_results if fr["column"] == "missing_field") - - # Existing field should have native type information - assert id_result["existence"] == "PASSED" - assert id_result["type"] == "PASSED" - assert id_result["native_type"] is not None - assert id_result["canonical_type"] == DataType.INTEGER.value - assert isinstance(id_result["native_metadata"], dict) - - # Missing field should have null native type information - assert missing_result["existence"] == "FAILED" - assert missing_result["type"] == "SKIPPED" - assert missing_result["failure_code"] == "FIELD_MISSING" - assert missing_result["native_type"] is None - assert missing_result["canonical_type"] is None - assert missing_result["native_metadata"] == {} - - async def test_native_metadata_precision_scale(self, schema_executor): - """Test native metadata reporting for float types with precision/scale.""" - # Create test table with decimal/numeric types - from sqlalchemy import text - engine = await schema_executor.get_engine() - async with engine.begin() as conn: - await conn.execute(text("DROP TABLE IF EXISTS test_decimal")) - # MySQL supports DECIMAL with precision/scale - await conn.execute(text("CREATE TABLE test_decimal (price DECIMAL(10,2), amount NUMERIC(8,3))")) - - # Define schema rule for decimal types - columns = { - "price": {"expected_type": DataType.FLOAT.value, "precision": 10, "scale": 2}, - "amount": {"expected_type": DataType.FLOAT.value, "precision": 8, "scale": 3}, - } - rule = build_schema_rule_with_native_reporting(columns, "test_decimal") - - # Execute rule - result = await schema_executor.execute_rule(rule) - - # Verify field_results include precision/scale metadata - schema_details = result.execution_plan.get("schema_details", {}) - field_results = schema_details.get("field_results", []) - - for field_result in field_results: - assert "native_metadata" in field_result - native_metadata = field_result["native_metadata"] - - # Verify the native type is captured - assert field_result["native_type"] is not None - assert field_result["canonical_type"] == DataType.FLOAT.value - - # Note: SQLite might not preserve exact precision/scale, but the structure should be correct - assert isinstance(native_metadata, dict) - - async def test_comprehensive_native_type_coverage(self, schema_executor): - """Test native type reporting across various database type scenarios.""" - # Create table with various data types - from sqlalchemy import text - engine = await schema_executor.get_engine() - async with engine.begin() as conn: - await conn.execute(text("DROP TABLE IF EXISTS test_comprehensive")) - await conn.execute(text(""" - CREATE TABLE test_comprehensive ( - id INT, - name TEXT, - email VARCHAR(255), - age SMALLINT, - salary DOUBLE, - is_active BOOLEAN, - birth_date DATE, - created_at DATETIME - ) - """)) - - # Define schema rule covering all types - columns = { - "id": {"expected_type": DataType.INTEGER.value}, - "name": {"expected_type": DataType.STRING.value}, - "email": {"expected_type": DataType.STRING.value, "max_length": 255}, - "age": {"expected_type": DataType.INTEGER.value}, - "salary": {"expected_type": DataType.FLOAT.value}, - "is_active": {"expected_type": DataType.BOOLEAN.value}, - "birth_date": {"expected_type": DataType.DATE.value}, - "created_at": {"expected_type": DataType.DATETIME.value}, - } - rule = build_schema_rule_with_native_reporting(columns, "test_comprehensive") - - # Execute rule - result = await schema_executor.execute_rule(rule) - - # Verify all fields have complete native type information - schema_details = result.execution_plan.get("schema_details", {}) - field_results = schema_details.get("field_results", []) - - assert len(field_results) == 8 - - for field_result in field_results: - # Every field should have complete native type information - assert field_result["native_type"] is not None - assert field_result["canonical_type"] is not None - assert isinstance(field_result["native_metadata"], dict) - - # Verify canonical type mapping is correct - column_name = field_result["column"] - canonical_type = field_result["canonical_type"] - - type_expectations = { - "id": DataType.INTEGER.value, - "name": DataType.STRING.value, - "email": DataType.STRING.value, - "age": DataType.INTEGER.value, - "salary": DataType.FLOAT.value, - "is_active": DataType.BOOLEAN.value, - "birth_date": DataType.DATE.value, - "created_at": DataType.DATETIME.value, - } - - assert canonical_type == type_expectations[column_name] - - -@pytest.mark.integration -@pytest.mark.database -class TestSchemaExecutorBackwardCompatibility: - """Test that enhancements maintain backward compatibility.""" - - async def test_existing_functionality_unchanged(self, schema_executor): - """Test that existing schema validation functionality is unchanged.""" - # Create test table - from sqlalchemy import text - engine = await schema_executor.get_engine() - async with engine.begin() as conn: - await conn.execute(text("DROP TABLE IF EXISTS test_compat")) - await conn.execute(text("CREATE TABLE test_compat (id INT, name VARCHAR(50))")) - - # Use existing schema rule format - columns = { - "id": {"expected_type": DataType.INTEGER.value}, - "name": {"expected_type": DataType.STRING.value}, - } - rule = build_schema_rule_with_native_reporting(columns, "test_compat") - - # Execute rule - result = await schema_executor.execute_rule(rule) - - # Verify existing fields are still present and working - assert result.status == "PASSED" - assert result.rule_id == rule.id - assert len(result.dataset_metrics) == 1 - - # Verify execution_plan structure is maintained - execution_plan = result.execution_plan - assert "execution_type" in execution_plan - assert "schema_details" in execution_plan - - schema_details = execution_plan["schema_details"] - assert "field_results" in schema_details - assert "extras" in schema_details - assert "table_exists" in schema_details - - # Verify field_results have expected legacy fields - field_results = schema_details["field_results"] - for field_result in field_results: - assert "column" in field_result - assert "existence" in field_result - assert "type" in field_result - assert "failure_code" in field_result - - # NEW: Also verify enhanced fields are added - assert "native_type" in field_result - assert "canonical_type" in field_result - assert "native_metadata" in field_result \ No newline at end of file diff --git a/tests/integration/core/executors/test_simple_native_type_reporting.py b/tests/integration/core/executors/test_simple_native_type_reporting.py deleted file mode 100644 index 3b4974e..0000000 --- a/tests/integration/core/executors/test_simple_native_type_reporting.py +++ /dev/null @@ -1,139 +0,0 @@ -""" -Simple integration test to verify native type reporting functionality works. - -This is a minimal test to demonstrate that the native type reporting enhancements -work correctly with a real MySQL database. -""" - -import pytest -from sqlalchemy import text - -from core.executors.schema_executor import SchemaExecutor -from shared.enums import DataType, RuleType -from shared.enums.connection_types import ConnectionType -from shared.schema.connection_schema import ConnectionSchema -from shared.schema.rule_schema import RuleSchema -from tests.shared.builders.test_builders import TestDataBuilder -from tests.shared.utils.database_utils import ( - get_available_databases, - get_mysql_connection_params, -) - -pytestmark = pytest.mark.asyncio - - -def _skip_if_mysql_unavailable() -> None: - if "mysql" not in get_available_databases(): - pytest.skip("MySQL not configured; skipping integration tests") - - -def build_simple_schema_rule(columns: dict) -> RuleSchema: - """Build a simple SCHEMA rule for testing.""" - builder = TestDataBuilder.rule() - rule = ( - builder.with_name("test_native_reporting") - .with_target("test_db", "native_test_table", None) # Table-level rule - .with_type(RuleType.SCHEMA) - .with_parameter("columns", columns) - .build() - ) - return rule - - -@pytest.mark.integration -@pytest.mark.database -class TestSimpleNativeTypeReporting: - """Simple test for native type reporting.""" - - async def test_native_type_reporting_works(self): - """Test that native type information is included in schema validation results.""" - _skip_if_mysql_unavailable() - - # Create connection - params = get_mysql_connection_params() - from typing import cast - - connection = ConnectionSchema( - name="test_native_types", - description="Test connection for native type reporting", - connection_type=ConnectionType.MYSQL, - host=str(params["host"]), - port=cast(int, params["port"]), - db_name=str(params["database"]), - username=str(params["username"]), - password=str(params["password"]), - ) - - # Create executor - executor = SchemaExecutor(connection, test_mode=True) - - # Create and setup table - engine = await executor.get_engine() - - # Use regular connection (not transaction) for DDL - async with engine.connect() as conn: - # Drop and create table - await conn.execute(text("DROP TABLE IF EXISTS native_test_table")) - await conn.execute(text(""" - CREATE TABLE native_test_table ( - id INT PRIMARY KEY, - name VARCHAR(50) NOT NULL, - score DECIMAL(5,2) - ) - """)) - await conn.commit() - - try: - # Create schema rule - columns = { - "id": {"expected_type": DataType.INTEGER.value}, - "name": {"expected_type": DataType.STRING.value, "max_length": 50}, - "score": {"expected_type": DataType.FLOAT.value, "precision": 5, "scale": 2}, - } - rule = build_simple_schema_rule(columns) - - # Execute rule - result = await executor.execute_rule(rule) - - # Basic validation - print(f"Rule execution status: {result.status}") - print(f"Execution message: {result.execution_message}") - - # Check that we have schema details - execution_plan = result.execution_plan - assert "schema_details" in execution_plan - - schema_details = execution_plan["schema_details"] - assert "field_results" in schema_details - - field_results = schema_details["field_results"] - assert len(field_results) >= 1 # Should have at least one field result - - # Check that native type information is present - for field_result in field_results: - print(f"Field: {field_result.get('column')}") - print(f" - Native type: {field_result.get('native_type')}") - print(f" - Canonical type: {field_result.get('canonical_type')}") - print(f" - Native metadata: {field_result.get('native_metadata')}") - - # Verify enhanced fields are present - assert "native_type" in field_result - assert "canonical_type" in field_result - assert "native_metadata" in field_result - - # Verify they have meaningful values - assert field_result["native_type"] is not None - assert field_result["canonical_type"] is not None - assert isinstance(field_result["native_metadata"], dict) - - # Print overall result for debugging - print(f"Test completed with result status: {result.status}") - - finally: - # Clean up - async with engine.connect() as conn: - await conn.execute(text("DROP TABLE IF EXISTS native_test_table")) - await conn.commit() - - # Close engine - await engine.dispose() \ No newline at end of file diff --git a/tests/unit/shared/utils/test_type_parser.py b/tests/unit/shared/utils/test_type_parser.py index 637e836..fb6b7de 100644 --- a/tests/unit/shared/utils/test_type_parser.py +++ b/tests/unit/shared/utils/test_type_parser.py @@ -4,16 +4,24 @@ Comprehensive test coverage for syntactic sugar type parsing and backward compatibility. """ +from typing import Any + import pytest from shared.enums.data_types import DataType -from shared.utils.type_parser import TypeParser, TypeParseError, parse_type, is_syntactic_sugar, normalize_type +from shared.utils.type_parser import ( + TypeParseError, + TypeParser, + is_syntactic_sugar, + normalize_type, + parse_type, +) class TestTypeParser: """Test TypeParser class methods""" - def test_parse_simple_types(self): + def test_parse_simple_types(self) -> None: """Test parsing of simple type names.""" # Test all supported simple types test_cases = [ @@ -27,74 +35,107 @@ def test_parse_simple_types(self): ("date", {"type": DataType.DATE.value}), ("datetime", {"type": DataType.DATETIME.value}), ] - + for input_type, expected in test_cases: result = TypeParser.parse_type_definition(input_type) assert result == expected - def test_parse_case_insensitive(self): + def test_parse_case_insensitive(self) -> None: """Test that parsing is case insensitive.""" test_cases = ["STRING", "String", "sTrInG", "INTEGER", "Int", "FLOAT", "Float"] - + for input_type in test_cases: result = TypeParser.parse_type_definition(input_type) assert "type" in result assert result["type"] in [dt.value for dt in DataType] - def test_parse_string_with_length(self): + def test_parse_string_with_length(self) -> None: """Test parsing string with length specification.""" test_cases = [ ("string(50)", {"type": DataType.STRING.value, "max_length": 50}), ("STRING(255)", {"type": DataType.STRING.value, "max_length": 255}), ("str(10)", {"type": DataType.STRING.value, "max_length": 10}), - ("string( 100 )", {"type": DataType.STRING.value, "max_length": 100}), # with spaces + ( + "string( 100 )", + {"type": DataType.STRING.value, "max_length": 100}, + ), # with spaces ] - + for input_type, expected in test_cases: result = TypeParser.parse_type_definition(input_type) assert result == expected - def test_parse_float_with_precision_scale(self): + def test_parse_float_with_precision_scale(self) -> None: """Test parsing float with precision and scale.""" test_cases = [ - ("float(10,2)", {"type": DataType.FLOAT.value, "precision": 10, "scale": 2}), - ("FLOAT(12,4)", {"type": DataType.FLOAT.value, "precision": 12, "scale": 4}), - ("float( 8 , 3 )", {"type": DataType.FLOAT.value, "precision": 8, "scale": 3}), # with spaces - ("float(15,0)", {"type": DataType.FLOAT.value, "precision": 15, "scale": 0}), + ( + "float(10,2)", + {"type": DataType.FLOAT.value, "precision": 10, "scale": 2}, + ), + ( + "FLOAT(12,4)", + {"type": DataType.FLOAT.value, "precision": 12, "scale": 4}, + ), + ( + "float( 8 , 3 )", + {"type": DataType.FLOAT.value, "precision": 8, "scale": 3}, + ), # with spaces + ( + "float(15,0)", + {"type": DataType.FLOAT.value, "precision": 15, "scale": 0}, + ), ] - + for input_type, expected in test_cases: result = TypeParser.parse_type_definition(input_type) assert result == expected - def test_parse_datetime_with_format(self): + def test_parse_datetime_with_format(self) -> None: """Test parsing datetime with format specification.""" test_cases = [ - ("datetime('yyyymmdd')", {"type": DataType.DATETIME.value, "format": "yyyymmdd"}), - ("DATETIME(\"yyyy-mm-dd\")", {"type": DataType.DATETIME.value, "format": "yyyy-mm-dd"}), - ("datetime( 'dd/mm/yyyy hh:mm:ss' )", {"type": DataType.DATETIME.value, "format": "dd/mm/yyyy hh:mm:ss"}), + ( + "datetime('yyyymmdd')", + {"type": DataType.DATETIME.value, "format": "yyyymmdd"}, + ), + ( + 'DATETIME("yyyy-mm-dd")', + {"type": DataType.DATETIME.value, "format": "yyyy-mm-dd"}, + ), + ( + "datetime( 'dd/mm/yyyy hh:mm:ss' )", + {"type": DataType.DATETIME.value, "format": "dd/mm/yyyy hh:mm:ss"}, + ), ] - + for input_type, expected in test_cases: result = TypeParser.parse_type_definition(input_type) assert result == expected - def test_parse_detailed_format_backward_compatibility(self): + def test_parse_detailed_format_backward_compatibility(self) -> None: """Test parsing detailed JSON format for backward compatibility.""" - test_cases = [ + test_cases: list[tuple[dict, dict]] = [ ({"type": "string"}, {"type": DataType.STRING.value}), - ({"type": "string", "max_length": 100}, {"type": DataType.STRING.value, "max_length": 100}), - ({"type": "float", "precision": 10, "scale": 2}, {"type": DataType.FLOAT.value, "precision": 10, "scale": 2}), - ({"type": "datetime", "format": "yyyy-mm-dd"}, {"type": DataType.DATETIME.value, "format": "yyyy-mm-dd"}), + ( + {"type": "string", "max_length": 100}, + {"type": DataType.STRING.value, "max_length": 100}, + ), + ( + {"type": "float", "precision": 10, "scale": 2}, + {"type": DataType.FLOAT.value, "precision": 10, "scale": 2}, + ), + ( + {"type": "datetime", "format": "yyyy-mm-dd"}, + {"type": DataType.DATETIME.value, "format": "yyyy-mm-dd"}, + ), ] - + for input_type, expected in test_cases: result = TypeParser.parse_type_definition(input_type) assert result == expected - def test_error_cases(self): + def test_error_cases(self) -> None: """Test error handling for invalid type definitions.""" - error_cases = [ + error_cases: list[tuple[Any, str]] = [ ("invalid_type", "Cannot parse type definition"), ("string(-1)", "String length must be positive"), ("float(0,2)", "Float precision must be positive"), @@ -105,32 +146,50 @@ def test_error_cases(self): (123, "Type definition must be string or dict"), (None, "Type definition must be string or dict"), ] - + for input_type, expected_error in error_cases: with pytest.raises(TypeParseError) as exc_info: TypeParser.parse_type_definition(input_type) assert expected_error in str(exc_info.value) - def test_metadata_validation(self): + def test_metadata_validation(self) -> None: """Test metadata validation for type consistency.""" # Test invalid metadata combinations in detailed format - invalid_cases = [ - ({"type": "integer", "max_length": 10}, "max_length can only be specified for STRING type"), - ({"type": "string", "precision": 5}, "precision/scale can only be specified for FLOAT type"), - ({"type": "boolean", "scale": 2}, "precision/scale can only be specified for FLOAT type"), - ({"type": "date", "format": "yyyy"}, "format can only be specified for DATETIME type"), - ({"type": "string", "max_length": 0}, "max_length must be a positive integer"), + invalid_cases: list[tuple[dict, str]] = [ + ( + {"type": "integer", "max_length": 10}, + "max_length can only be specified for STRING type", + ), + ( + {"type": "string", "precision": 5}, + "precision/scale can only be specified for FLOAT type", + ), + ( + {"type": "boolean", "scale": 2}, + "precision/scale can only be specified for FLOAT type", + ), + ( + {"type": "date", "format": "yyyy"}, + "format can only be specified for DATETIME type", + ), + ( + {"type": "string", "max_length": 0}, + "max_length must be a positive integer", + ), ({"type": "float", "precision": 0}, "precision must be a positive integer"), ({"type": "float", "scale": -1}, "scale must be a non-negative integer"), - ({"type": "float", "precision": 3, "scale": 5}, "scale cannot be greater than precision"), + ( + {"type": "float", "precision": 3, "scale": 5}, + "scale cannot be greater than precision", + ), ] - + for input_type, expected_error in invalid_cases: with pytest.raises(TypeParseError) as exc_info: TypeParser.parse_type_definition(input_type) assert expected_error in str(exc_info.value) - def test_is_syntactic_sugar(self): + def test_is_syntactic_sugar(self) -> None: """Test identification of syntactic sugar formats.""" sugar_cases = [ "string(50)", @@ -139,28 +198,35 @@ def test_is_syntactic_sugar(self): "integer", "boolean", ] - + detailed_cases = [ {"type": "string"}, {"type": "float", "precision": 10}, 123, None, ] - + + case: Any = None for case in sugar_cases: assert TypeParser.is_syntactic_sugar(case) is True - + for case in detailed_cases: assert TypeParser.is_syntactic_sugar(case) is False - def test_normalize_to_detailed_format(self): + def test_normalize_to_detailed_format(self) -> None: """Test normalization to detailed format.""" - test_cases = [ - ("string(50)", {"type": "string", "expected_type": "STRING", "max_length": 50}), - ("float(10,2)", {"type": "float", "expected_type": "FLOAT", "precision": 10, "scale": 2}), - ({"type": "boolean"}, {"type": "boolean", "expected_type": "BOOLEAN"}), + test_cases: list[tuple[str | dict, dict]] = [ + ( + "string(50)", + {"type": "string", "desired_type": "STRING", "max_length": 50}, + ), + ( + "float(10,2)", + {"type": "float", "desired_type": "FLOAT", "precision": 10, "scale": 2}, + ), + ({"type": "boolean"}, {"type": "boolean", "desired_type": "BOOLEAN"}), ] - + for input_type, expected_keys in test_cases: result = TypeParser.normalize_to_detailed_format(input_type) for key, value in expected_keys.items(): @@ -170,41 +236,47 @@ def test_normalize_to_detailed_format(self): class TestConvenienceFunctions: """Test convenience functions""" - def test_parse_type_function(self): + def test_parse_type_function(self) -> None: """Test parse_type convenience function.""" result = parse_type("string(100)") assert result == {"type": DataType.STRING.value, "max_length": 100} - def test_is_syntactic_sugar_function(self): + def test_is_syntactic_sugar_function(self) -> None: """Test is_syntactic_sugar convenience function.""" assert is_syntactic_sugar("float(10,2)") is True assert is_syntactic_sugar({"type": "string"}) is False - def test_normalize_type_function(self): + def test_normalize_type_function(self) -> None: """Test normalize_type convenience function.""" result = normalize_type("string(50)") assert result["type"] == "string" - assert result["expected_type"] == "STRING" + assert result["desired_type"] == "STRING" assert result["max_length"] == 50 class TestEdgeCases: """Test edge cases and boundary conditions""" - def test_whitespace_handling(self): + def test_whitespace_handling(self) -> None: """Test handling of various whitespace scenarios.""" test_cases = [ (" string ", {"type": DataType.STRING.value}), ("string( 50 )", {"type": DataType.STRING.value, "max_length": 50}), - ("float( 10 , 2 )", {"type": DataType.FLOAT.value, "precision": 10, "scale": 2}), - ("datetime( ' format ' )", {"type": DataType.DATETIME.value, "format": " format "}), + ( + "float( 10 , 2 )", + {"type": DataType.FLOAT.value, "precision": 10, "scale": 2}, + ), + ( + "datetime( ' format ' )", + {"type": DataType.DATETIME.value, "format": " format "}, + ), ] - + for input_type, expected in test_cases: result = TypeParser.parse_type_definition(input_type) assert result == expected - def test_boundary_values(self): + def test_boundary_values(self) -> None: """Test boundary values for numeric parameters.""" # Test valid boundary values valid_cases = [ @@ -212,7 +284,7 @@ def test_boundary_values(self): ("float(1,0)", {"type": DataType.FLOAT.value, "precision": 1, "scale": 0}), ("float(1,1)", {"type": DataType.FLOAT.value, "precision": 1, "scale": 1}), ] - + for input_type, expected in valid_cases: result = TypeParser.parse_type_definition(input_type) assert result == expected @@ -222,32 +294,38 @@ def test_boundary_values(self): ("string(0)", "String length must be positive"), ("float(0,0)", "Float precision must be positive"), ] - + for input_type, expected_error in invalid_cases: with pytest.raises(TypeParseError) as exc_info: TypeParser.parse_type_definition(input_type) assert expected_error in str(exc_info.value) - def test_quote_variations(self): + def test_quote_variations(self) -> None: """Test different quote styles for datetime format.""" test_cases = [ ("datetime('format')", "format"), - ("datetime(\"format\")", "format"), + ('datetime("format")', "format"), ("datetime('format with spaces')", "format with spaces"), ("datetime(\"format with 'quotes'\")", "format with 'quotes'"), ] - + for input_type, expected_format in test_cases: result = TypeParser.parse_type_definition(input_type) - assert result == {"type": DataType.DATETIME.value, "format": expected_format} + assert result == { + "type": DataType.DATETIME.value, + "format": expected_format, + } - def test_large_numbers(self): + def test_large_numbers(self) -> None: """Test handling of large numeric values.""" test_cases = [ ("string(65535)", {"type": DataType.STRING.value, "max_length": 65535}), - ("float(38,10)", {"type": DataType.FLOAT.value, "precision": 38, "scale": 10}), + ( + "float(38,10)", + {"type": DataType.FLOAT.value, "precision": 38, "scale": 10}, + ), ] - + for input_type, expected in test_cases: result = TypeParser.parse_type_definition(input_type) assert result == expected @@ -256,7 +334,7 @@ def test_large_numbers(self): class TestIntegrationWithDataTypeEnum: """Test integration with DataType enum""" - def test_all_data_types_supported(self): + def test_all_data_types_supported(self) -> None: """Test that all DataType enum values are supported.""" type_mappings = { "string": DataType.STRING, @@ -266,29 +344,32 @@ def test_all_data_types_supported(self): "date": DataType.DATE, "datetime": DataType.DATETIME, } - + for type_name, expected_enum in type_mappings.items(): result = TypeParser.parse_type_definition(type_name) assert result["type"] == expected_enum.value - def test_enum_value_consistency(self): + def test_enum_value_consistency(self) -> None: """Test that returned type values match DataType enum values.""" result = TypeParser.parse_type_definition("string") assert result["type"] == DataType.STRING.value == "STRING" - + result = TypeParser.parse_type_definition("float(10,2)") assert result["type"] == DataType.FLOAT.value == "FLOAT" -@pytest.mark.parametrize("input_type,expected", [ - ("string(50)", {"type": "STRING", "max_length": 50}), - ("float(12,2)", {"type": "FLOAT", "precision": 12, "scale": 2}), - ("datetime('yyyymmdd')", {"type": "DATETIME", "format": "yyyymmdd"}), - ("integer", {"type": "INTEGER"}), - ("boolean", {"type": "BOOLEAN"}), - ("date", {"type": "DATE"}), -]) -def test_acceptance_criteria_examples(input_type, expected): +@pytest.mark.parametrize( + "input_type,expected", + [ + ("string(50)", {"type": "STRING", "max_length": 50}), + ("float(12,2)", {"type": "FLOAT", "precision": 12, "scale": 2}), + ("datetime('yyyymmdd')", {"type": "DATETIME", "format": "yyyymmdd"}), + ("integer", {"type": "INTEGER"}), + ("boolean", {"type": "BOOLEAN"}), + ("date", {"type": "DATE"}), + ], +) +def test_acceptance_criteria_examples(input_type: str, expected: dict) -> None: """Test the specific examples from the acceptance criteria.""" result = parse_type(input_type) - assert result == expected \ No newline at end of file + assert result == expected