diff --git a/CHANGELOG.md b/CHANGELOG.md index 7af93f9..1a2dd09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,10 +8,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added -- None +- feat(schema): Implement syntactic sugar for type definitions in schema rules +- feat(core): Add TypeParser utility for parsing compact type definitions (e.g., `string(50)`, `float(12,2)`) +- feat(schema): Support shorthand type syntax: `string(50)` → `{"type": "string", "max_length": 50}` +- feat(schema): Support float precision/scale syntax: `float(12,2)` → `{"type": "float", "precision": 12, "scale": 2}` +- feat(schema): Support datetime format syntax: `datetime('yyyymmdd')` → `{"type": "datetime", "format": "yyyymmdd"}` +- feat(core): Enhanced schema executor with native database type reporting capabilities +- feat(core): Add comprehensive type aliases support (str→string, int→integer, bool→boolean) +- feat(tests): Comprehensive test coverage for type parser with unit and integration tests +- feat(tests): Native type integration testing for enhanced schema validation ### Changed -- None +- enhance(cli): Updated schema command to support both syntactic sugar and detailed JSON type definitions +- enhance(core): Improved schema executor to handle parsed type definitions with metadata +- enhance(validation): Maintain backward compatibility with existing detailed JSON schema format ### Fixed - None diff --git a/cli/commands/schema.py b/cli/commands/schema.py index f0d304f..946bec5 100644 --- a/cli/commands/schema.py +++ b/cli/commands/schema.py @@ -132,15 +132,23 @@ def _validate_single_rule_item(item: Dict[str, Any], context: str) -> None: if not isinstance(field_name, str) or not field_name: raise click.UsageError(f"{context}.field must be a non-empty string") - # type + # type - validate using TypeParser to support syntactic sugar if "type" in item: type_name = item["type"] if not isinstance(type_name, str): raise click.UsageError(f"{context}.type must be a string when provided") - if type_name.lower() not in _ALLOWED_TYPE_NAMES: + + # Use TypeParser to validate the type definition + from shared.utils.type_parser import TypeParseError, TypeParser + + try: + TypeParser.parse_type_definition(type_name) + except TypeParseError as e: allowed = ", ".join(sorted(_ALLOWED_TYPE_NAMES)) raise click.UsageError( - f"{context}.type '{type_name}' is not supported. " f"Allowed: {allowed}" + f"{context}.type '{type_name}' is not supported. Error: {str(e)}. " + f"Supported formats: {allowed} or syntactic sugar like string(50), " + "float(12,2), datetime('format')" ) # required @@ -160,58 +168,29 @@ def _validate_single_rule_item(item: Dict[str, Any], context: str) -> None: f"{context}.{bound_key} must be numeric when provided" ) - # max_length + # max_length - basic validation, TypeParser will handle type consistency if "max_length" in item: value = item["max_length"] if not isinstance(value, int) or value < 0: raise click.UsageError( f"{context}.max_length must be a non-negative integer when provided" ) - # Validate max_length is only for string types - type_name = item.get("type", "").lower() if item.get("type") else None - if type_name and type_name != "string": - raise click.UsageError( - f"{context}.max_length can only be specified for 'string' type " - f"fields, not '{type_name}'" - ) - # precision + # precision - basic validation, TypeParser will handle type consistency if "precision" in item: value = item["precision"] if not isinstance(value, int) or value < 0: raise click.UsageError( f"{context}.precision must be a non-negative integer when provided" ) - # Validate precision is only for float types - type_name = item.get("type", "").lower() if item.get("type") else None - if type_name and type_name != "float": - raise click.UsageError( - f"{context}.precision can only be specified for 'float' type " - f"fields, not '{type_name}'" - ) - # scale + # scale - basic validation, TypeParser will handle type consistency if "scale" in item: value = item["scale"] if not isinstance(value, int) or value < 0: raise click.UsageError( f"{context}.scale must be a non-negative integer when provided" ) - # Validate scale is only for float types - type_name = item.get("type", "").lower() if item.get("type") else None - if type_name and type_name != "float": - raise click.UsageError( - f"{context}.scale can only be specified for 'float' type " - f"fields, not '{type_name}'" - ) - # Validate scale <= precision when both are specified - if "precision" in item: - precision_val = item["precision"] - if isinstance(precision_val, int) and value > precision_val: - raise click.UsageError( - f"{context}.scale ({value}) cannot be greater than precision " - f"({precision_val})" - ) def _validate_rules_payload(payload: Any) -> Tuple[List[str], int]: @@ -379,21 +358,55 @@ def _decompose_single_table_schema( # Should have been validated earlier; keep defensive check raise click.UsageError("Each rule item must have a non-empty 'field'") - # SCHEMA: collect column metadata + # SCHEMA: collect column metadata using new TypeParser column_metadata = {} - # Add expected_type if type is specified + # Handle type definition using TypeParser (supports syntactic sugar) if "type" in item and item["type"] is not None: - dt = _map_type_name_to_datatype(str(item["type"])) - column_metadata["expected_type"] = dt.value - - # Add metadata fields if present - if "max_length" in item: - column_metadata["max_length"] = item["max_length"] - if "precision" in item: - column_metadata["precision"] = item["precision"] - if "scale" in item: - column_metadata["scale"] = item["scale"] + from shared.utils.type_parser import TypeParseError, TypeParser + + try: + # Create a type definition dict for the parser + type_def = {"type": item["type"]} + + # Add metadata fields if present in the item + for metadata_field in ["max_length", "precision", "scale", "format"]: + if metadata_field in item: + type_def[metadata_field] = item[metadata_field] + + # Parse using TypeParser (handles both syntactic sugar + # and detailed format) + parsed_type = TypeParser.parse_type_definition(item["type"]) + + # Add expected_type for schema validation + column_metadata["expected_type"] = parsed_type["type"] + + # Add any parsed metadata + for metadata_field in ["max_length", "precision", "scale", "format"]: + if metadata_field in parsed_type: + column_metadata[metadata_field] = parsed_type[metadata_field] + + # Also add any explicit metadata from the item (overrides parsed values) + for metadata_field in ["max_length", "precision", "scale", "format"]: + if metadata_field in item: + column_metadata[metadata_field] = item[metadata_field] + + except TypeParseError as e: + raise click.UsageError( + f"Invalid type definition for field '{field_name}': {str(e)}" + ) + except Exception: + # Fallback to original parsing for backward compatibility + dt = _map_type_name_to_datatype(str(item["type"])) + column_metadata["expected_type"] = dt.value + + # Add metadata fields if present + if "max_length" in item: + column_metadata["max_length"] = item["max_length"] + if "precision" in item: + column_metadata["precision"] = item["precision"] + if "scale" in item: + column_metadata["scale"] = item["scale"] # Only add to columns_map if we have any metadata to store if column_metadata: diff --git a/core/executors/schema_executor.py b/core/executors/schema_executor.py index 62a3b31..7576136 100644 --- a/core/executors/schema_executor.py +++ b/core/executors/schema_executor.py @@ -305,7 +305,7 @@ def compare_metadata( # Count failures across declared columns and strict-mode extras total_declared = len(columns_cfg) failures = 0 - field_results: list[dict[str, str]] = [] + field_results: list[dict[str, Any]] = [] for declared_name, cfg in columns_cfg.items(): expected_type_raw = cfg.get("expected_type") @@ -331,6 +331,9 @@ def compare_metadata( "existence": "FAILED", "type": "SKIPPED", "failure_code": "FIELD_MISSING", + "native_type": None, + "canonical_type": None, + "native_metadata": {}, } ) continue @@ -357,6 +360,14 @@ def compare_metadata( "type": "FAILED", "failure_code": "TYPE_MISMATCH", "failure_details": comparison_result["failure_details"], + "native_type": actual_meta.get("type"), + "canonical_type": actual_meta.get("canonical_type"), + "native_metadata": { + k: v + for k, v in actual_meta.items() + if k in ["max_length", "precision", "scale"] + and v is not None + }, } ) elif comparison_result["metadata_status"] == "FAILED": @@ -368,6 +379,14 @@ def compare_metadata( "type": "PASSED", "failure_code": "METADATA_MISMATCH", "failure_details": comparison_result["failure_details"], + "native_type": actual_meta.get("type"), + "canonical_type": actual_meta.get("canonical_type"), + "native_metadata": { + k: v + for k, v in actual_meta.items() + if k in ["max_length", "precision", "scale"] + and v is not None + }, } ) else: @@ -377,6 +396,14 @@ def compare_metadata( "existence": "PASSED", "type": "PASSED", "failure_code": "NONE", + "native_type": actual_meta.get("type"), + "canonical_type": actual_meta.get("canonical_type"), + "native_metadata": { + k: v + for k, v in actual_meta.items() + if k in ["max_length", "precision", "scale"] + and v is not None + }, } ) diff --git a/shared/utils/type_parser.py b/shared/utils/type_parser.py new file mode 100644 index 0000000..d6efa42 --- /dev/null +++ b/shared/utils/type_parser.py @@ -0,0 +1,311 @@ +""" +Type Definition Parser + +Provides reusable parsing logic for syntactic sugar type definitions +while maintaining backward compatibility with detailed JSON format. + +Supports formats like: +- string(50) → {"type": "string", "max_length": 50} +- float(12,2) → {"type": "float", "precision": 12, "scale": 2} +- datetime('yyyymmdd') → {"type": "datetime", "format": "yyyymmdd"} +""" + +import re +from typing import Any, Dict, Union + +from shared.enums.data_types import DataType + + +class TypeParseError(Exception): + """Raised when type definition parsing fails.""" + + pass + + +class TypeParser: + """ + Parser for type definitions supporting both syntactic sugar and + detailed JSON formats. + """ + + # Supported base types + _SUPPORTED_TYPES = { + "string": DataType.STRING, + "str": DataType.STRING, # Allow str as alias for string + "integer": DataType.INTEGER, + "int": DataType.INTEGER, # Allow int as alias for integer + "float": DataType.FLOAT, + "boolean": DataType.BOOLEAN, + "bool": DataType.BOOLEAN, # Allow bool as alias for boolean + "date": DataType.DATE, + "datetime": DataType.DATETIME, + } + + # Regex patterns for syntactic sugar parsing + _STRING_PATTERN = re.compile(r"^(string|str)\s*\(\s*(-?\d+)\s*\)$", re.IGNORECASE) + _FLOAT_PATTERN = re.compile( + r"^float\s*\(\s*(-?\d+)\s*,\s*(-?\d+)\s*\)$", re.IGNORECASE + ) + _DATETIME_PATTERN = re.compile( + r'^datetime\s*\(\s*[\'"](.+?)[\'"]\s*\)$', re.IGNORECASE + ) + _SIMPLE_TYPE_PATTERN = re.compile( + r"^(string|str|integer|int|float|boolean|bool|date|datetime)$", re.IGNORECASE + ) + + @classmethod + def parse_type_definition( + cls, type_def: Union[str, Dict[str, Any]] + ) -> Dict[str, Any]: + """ + Parse a type definition that can be either: + 1. A string with syntactic sugar (e.g., "string(50)", "float(12,2)") + 2. A detailed JSON object (backward compatibility) + + Args: + type_def: Type definition as string or dict + + Returns: + Dict containing parsed type information with keys: + - type: Canonical type name (STRING, INTEGER, etc.) + - Additional metadata keys based on type (max_length, precision, + scale, format) + + Raises: + TypeParseError: If parsing fails or type is unsupported + """ + if isinstance(type_def, dict): + return cls._parse_detailed_format(type_def) + elif isinstance(type_def, str): + return cls._parse_syntactic_sugar(type_def.strip()) + else: + raise TypeParseError( + f"Type definition must be string or dict, got {type(type_def)}" + ) + + @classmethod + def _parse_detailed_format(cls, type_def: Dict[str, Any]) -> Dict[str, Any]: + """Parse detailed JSON format (backward compatibility).""" + if "type" not in type_def: + raise TypeParseError("Detailed format must include 'type' field") + + type_name = str(type_def["type"]).lower() + if type_name not in cls._SUPPORTED_TYPES: + raise TypeParseError(f"Unsupported type '{type_name}' in detailed format") + + result = {"type": cls._SUPPORTED_TYPES[type_name].value} + + # Copy over additional metadata + metadata_fields = ["max_length", "precision", "scale", "format"] + for field in metadata_fields: + if field in type_def: + result[field] = type_def[field] + + # Validate metadata consistency + cls._validate_metadata(result) + + return result + + @classmethod + def _parse_syntactic_sugar(cls, type_str: str) -> Dict[str, Any]: + """Parse syntactic sugar format.""" + # Try string(length) pattern + match = cls._STRING_PATTERN.match(type_str) + if match: + length = int(match.group(2)) + if length <= 0: + raise TypeParseError("String length must be positive") + return {"type": DataType.STRING.value, "max_length": length} + + # Try float(precision,scale) pattern + match = cls._FLOAT_PATTERN.match(type_str) + if match: + precision = int(match.group(1)) + scale = int(match.group(2)) + if precision <= 0: + raise TypeParseError("Float precision must be positive") + if scale < 0: + raise TypeParseError("Float scale cannot be negative") + if scale > precision: + raise TypeParseError("Float scale cannot be greater than precision") + return { + "type": DataType.FLOAT.value, + "precision": precision, + "scale": scale, + } + + # Try datetime('format') pattern + match = cls._DATETIME_PATTERN.match(type_str) + if match: + format_str = match.group(1) + return {"type": DataType.DATETIME.value, "format": format_str} + + # Try simple type names + match = cls._SIMPLE_TYPE_PATTERN.match(type_str) + if match: + type_name = match.group(1).lower() + return {"type": cls._SUPPORTED_TYPES[type_name].value} + + raise TypeParseError(f"Cannot parse type definition '{type_str}'") + + @classmethod + def _validate_metadata(cls, parsed_type: Dict[str, Any]) -> None: + """Validate that metadata is consistent with type.""" + type_value = parsed_type.get("type") + + # Validate max_length is only for strings + if "max_length" in parsed_type: + if type_value != DataType.STRING.value: + raise TypeParseError( + "max_length can only be specified for STRING type, " + f"not {type_value}" + ) + if ( + not isinstance(parsed_type["max_length"], int) + or parsed_type["max_length"] <= 0 + ): + raise TypeParseError("max_length must be a positive integer") + + # Validate precision/scale are only for floats + if "precision" in parsed_type or "scale" in parsed_type: + if type_value != DataType.FLOAT.value: + raise TypeParseError( + "precision/scale can only be specified for FLOAT type, " + f"not {type_value}" + ) + + if "precision" in parsed_type: + if ( + not isinstance(parsed_type["precision"], int) + or parsed_type["precision"] <= 0 + ): + raise TypeParseError("precision must be a positive integer") + + if "scale" in parsed_type: + if not isinstance(parsed_type["scale"], int) or parsed_type["scale"] < 0: + raise TypeParseError("scale must be a non-negative integer") + if ( + "precision" in parsed_type + and parsed_type["scale"] > parsed_type["precision"] + ): + raise TypeParseError("scale cannot be greater than precision") + + # Validate format is only for datetime + if "format" in parsed_type: + if type_value != DataType.DATETIME.value: + raise TypeParseError( + f"format can only be specified for DATETIME type, not {type_value}" + ) + + @classmethod + def is_syntactic_sugar(cls, type_def: Union[str, Dict[str, Any]]) -> bool: + """Check if a type definition uses syntactic sugar format.""" + if not isinstance(type_def, str): + return False + + type_str = type_def.strip() + return bool( + cls._STRING_PATTERN.match(type_str) + or cls._FLOAT_PATTERN.match(type_str) + or cls._DATETIME_PATTERN.match(type_str) + or cls._SIMPLE_TYPE_PATTERN.match(type_str) + ) + + @classmethod + def normalize_to_detailed_format( + cls, type_def: Union[str, Dict[str, Any]] + ) -> Dict[str, Any]: + """ + Normalize any type definition to detailed format for backward compatibility. + + Args: + type_def: Type definition in any supported format + + Returns: + Dict in detailed format that existing code can use + """ + parsed = cls.parse_type_definition(type_def) + + # Convert canonical type back to lowercase for existing code compatibility + if "type" in parsed: + # Keep the canonical uppercase form for new code, but also provide lowercase + parsed["desired_type"] = parsed["type"] # For schema executor + parsed["type"] = parsed["type"].lower() # For backward compatibility + + return parsed + + @classmethod + def parse_desired_type_for_core( + cls, desired_type_def: Union[str, Dict[str, Any]] + ) -> Dict[str, Any]: + """ + Parse desired_type definition and return fields with desired_ prefix + for core layer. + + This method handles the CLI-to-core interface naming for desired_type + fields, ensuring no conflicts with existing type field names. + + Args: + desired_type_def: Desired type definition in syntactic sugar or + detailed format + + Returns: + Dict with desired_ prefixed field names suitable for core layer: + { + "desired_type": "STRING", + "desired_max_length": 50, + "desired_precision": 10, + "desired_scale": 2, + "desired_format": "YYYY-MM-DD" + } + + Example: + parse_desired_type_for_core("string(50)") + → {"desired_type": "STRING", "desired_max_length": 50} + + parse_desired_type_for_core("float(10,2)") + → {"desired_type": "FLOAT", "desired_precision": 10, "desired_scale": 2} + """ + # Parse the desired type definition using existing logic + parsed = cls.parse_type_definition(desired_type_def) + + # Transform to core layer format with desired_ prefix + core_format = {} + + # Main type field + if "type" in parsed: + core_format["desired_type"] = parsed["type"] + + # Metadata fields with desired_ prefix + metadata_fields = ["max_length", "precision", "scale", "format"] + for field in metadata_fields: + if field in parsed: + core_format[f"desired_{field}"] = parsed[field] + + return core_format + + +# Convenience functions for common usage patterns +def parse_type(type_def: Union[str, Dict[str, Any]]) -> Dict[str, Any]: + """Convenience function to parse a type definition.""" + return TypeParser.parse_type_definition(type_def) + + +def is_syntactic_sugar(type_def: Union[str, Dict[str, Any]]) -> bool: + """Convenience function to check if type definition uses syntactic sugar.""" + return TypeParser.is_syntactic_sugar(type_def) + + +def normalize_type(type_def: Union[str, Dict[str, Any]]) -> Dict[str, Any]: + """Convenience function to normalize type definition to detailed format.""" + return TypeParser.normalize_to_detailed_format(type_def) + + +def parse_desired_type_for_core( + desired_type_def: Union[str, Dict[str, Any]] +) -> Dict[str, Any]: + """ + Convenience function to parse desired_type with proper core layer + field naming. + """ + return TypeParser.parse_desired_type_for_core(desired_type_def) diff --git a/test_data/schema.json b/test_data/schema.json index 5ce4404..d557a38 100644 --- a/test_data/schema.json +++ b/test_data/schema.json @@ -15,7 +15,7 @@ { "field": "customer_id", "type": "integer", "required": true }, { "field": "product_name", "type": "string", "max_length": 155, "required": true }, { "field": "quantity", "type": "integer", "required": true }, - { "field": "price", "type": "float", "precision": 8, "scale": 2, "required": true}, + { "field": "price", "type": "float(10,2)", "required": true}, { "field": "status", "type": "string", "max_length": 50, "required": true }, { "field": "order_date", "type": "date", "required": true } ], diff --git a/tests/integration/core/executors/test_native_type_integration.py b/tests/integration/core/executors/test_native_type_integration.py new file mode 100644 index 0000000..d25e0e5 --- /dev/null +++ b/tests/integration/core/executors/test_native_type_integration.py @@ -0,0 +1,972 @@ +""" +Integration test for native type reporting functionality using MySQL. + +Based on the established pattern from test_mysql_integration.py. +Tests the enhanced SchemaExecutor that includes native_type, canonical_type, +and native_metadata in field_results. +""" + +import pytest + +from core.executors.schema_executor import SchemaExecutor +from shared.database.query_executor import QueryExecutor +from shared.enums import RuleAction, RuleCategory, RuleType, SeverityLevel +from shared.enums.connection_types import ConnectionType +from shared.enums.data_types import DataType +from shared.schema.base import RuleTarget, TargetEntity +from shared.schema.connection_schema import ConnectionSchema +from shared.schema.rule_schema import RuleSchema +from shared.utils.logger import get_logger +from tests.shared.builders.test_builders import TestDataBuilder +from tests.shared.utils.database_utils import get_available_databases + +pytestmark = pytest.mark.asyncio + +logger = get_logger(__name__) + + +def _skip_if_mysql_unavailable() -> None: + if "mysql" not in get_available_databases(): + pytest.skip("MySQL not configured; skipping integration tests") + + +def build_schema_rule_with_native_reporting( + columns: dict, + table_name: str = "test_table", + database_name: str = "test_db", + strict_mode: bool = False, + case_insensitive: bool = False, +) -> RuleSchema: + """Build a SCHEMA rule for testing native type reporting.""" + builder = TestDataBuilder.rule() + rule = ( + builder.with_name(f"schema_{table_name}") + .with_target(database_name, table_name, "") # Table-level rule + .with_type(RuleType.SCHEMA) + .with_parameter("columns", columns) + .with_parameter("strict_mode", strict_mode) + .with_parameter("case_insensitive", case_insensitive) + .build() + ) + return rule + + +@pytest.mark.integration +@pytest.mark.database +class TestNativeTypeIntegration: + """Test native type reporting functionality with real MySQL database.""" + + async def _prepare_test_environment( + self, mysql_connection_params: dict + ) -> QueryExecutor: + """Prepare MySQL test environment with test table.""" + from typing import cast + + from shared.database.connection import get_db_url, get_engine + + # Create engine for setup + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + engine = await get_engine(db_url, pool_size=1, echo=False) + executor = QueryExecutor(engine) + + # Clean up and create test table + await executor.execute_query( + "DROP TABLE IF EXISTS native_type_test", fetch=False + ) + + await executor.execute_query( + """ + CREATE TABLE native_type_test ( + id INT PRIMARY KEY AUTO_INCREMENT, + name VARCHAR(50) NOT NULL, + email VARCHAR(100), + age SMALLINT, + score DECIMAL(5,2), + is_active BOOLEAN DEFAULT TRUE, + birth_date DATE, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + description TEXT + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """, + fetch=False, + ) + + # Insert test data + await executor.execute_query( + """ + INSERT INTO native_type_test + (name, email, age, score, is_active, birth_date) VALUES + ('Alice', 'alice@example.com', 25, 85.50, TRUE, '1998-05-15'), + ('Bob', 'bob@example.com', 30, 92.75, FALSE, '1993-08-20') + """, + fetch=False, + ) + + await engine.dispose() + return executor + + async def test_native_type_reporting_comprehensive( + self, mysql_connection_params: dict + ) -> None: + """Test that native type information is correctly reported for various MySQL types.""" + _skip_if_mysql_unavailable() + + # Prepare test environment + await self._prepare_test_environment(mysql_connection_params) + + # Create connection schema + connection = ConnectionSchema( + name="native_type_test_connection", + description="Connection for testing native type reporting", + connection_type=ConnectionType.MYSQL, + host=mysql_connection_params["host"], + port=mysql_connection_params["port"], + username=mysql_connection_params["username"], + password=mysql_connection_params["password"], + db_name=mysql_connection_params["database"], + ) + + # Create schema executor + executor = SchemaExecutor(connection, test_mode=True) + + # Define schema rule with expected types + columns = { + "id": {"expected_type": DataType.INTEGER.value}, + "name": {"expected_type": DataType.STRING.value, "max_length": 50}, + "email": {"expected_type": DataType.STRING.value, "max_length": 100}, + "age": {"expected_type": DataType.INTEGER.value}, + "score": { + "expected_type": DataType.FLOAT.value, + "precision": 5, + "scale": 2, + }, + "is_active": { + "expected_type": DataType.INTEGER.value + }, # MySQL BOOLEAN -> TINYINT(1) -> INTEGER + "birth_date": {"expected_type": DataType.DATE.value}, + "created_at": {"expected_type": DataType.DATETIME.value}, + "description": {"expected_type": DataType.STRING.value}, + } + + rule = RuleSchema( + id="native_type_test_rule", + name="Native Type Reporting Test", + description="Test rule for native type reporting", + type=RuleType.SCHEMA, + category=RuleCategory.VALIDITY, + severity=SeverityLevel.MEDIUM, + action=RuleAction.LOG, + target=RuleTarget( + entities=[ + TargetEntity( + database=mysql_connection_params["database"], + table="native_type_test", + column=None, + ) + ], + relationship_type="single_table", + ), + parameters={"columns": columns}, + ) + + try: + # Execute the schema rule + result = await executor.execute_rule(rule) + + logger.info(f"Schema rule execution status: {result.status}") + logger.info(f"Execution message: {result.execution_message}") + + # Debug: print detailed information + execution_plan = result.execution_plan + assert execution_plan is not None + if "schema_details" in execution_plan: + schema_details = execution_plan["schema_details"] + if "field_results" in schema_details: + field_results = schema_details["field_results"] + logger.info(f"Number of field results: {len(field_results)}") + for fr in field_results: + logger.info( + f"Field {fr.get('column')}: existence={fr.get('existence')}, type={fr.get('type')}, failure_code={fr.get('failure_code')}" + ) + if fr.get("failure_code") != "NONE": + logger.info( + f" Failure details: {fr.get('failure_details')}" + ) + + # Verify basic execution - should pass now with corrected type expectations + assert ( + result.status == "PASSED" + ), f"Expected PASSED, got {result.status}: {result.execution_message}" + + # Verify execution plan contains schema details + assert execution_plan is not None + assert "schema_details" in execution_plan + + schema_details = execution_plan["schema_details"] + assert "field_results" in schema_details + assert schema_details["table_exists"] is True + + field_results = schema_details["field_results"] + assert len(field_results) == len( + columns + ), f"Expected {len(columns)} field results, got {len(field_results)}" + + # Test native type information for each field + field_map = {fr["column"]: fr for fr in field_results} + + # Test INTEGER type (id, age) + for col in ["id", "age"]: + field_result = field_map[col] + assert "native_type" in field_result + assert "canonical_type" in field_result + assert "native_metadata" in field_result + + assert field_result["canonical_type"] == DataType.INTEGER.value + assert field_result["native_type"] is not None + assert isinstance(field_result["native_metadata"], dict) + + logger.info( + f"{col}: native_type={field_result['native_type']}, " + f"canonical_type={field_result['canonical_type']}" + ) + + # Test STRING type with length (name, email) + name_result = field_map["name"] + assert name_result["canonical_type"] == DataType.STRING.value + assert name_result["native_metadata"].get("max_length") == 50 + + email_result = field_map["email"] + assert email_result["canonical_type"] == DataType.STRING.value + assert email_result["native_metadata"].get("max_length") == 100 + + # Test FLOAT type with precision/scale (score) + score_result = field_map["score"] + assert score_result["canonical_type"] == DataType.FLOAT.value + # Note: MySQL may return precision/scale info in native_metadata + logger.info(f"score native_metadata: {score_result['native_metadata']}") + + # Test BOOLEAN type (is_active) - Note: MySQL maps BOOLEAN to TINYINT(1) -> INTEGER + boolean_result = field_map["is_active"] + # In MySQL, BOOLEAN is actually stored as TINYINT(1) which maps to INTEGER + assert boolean_result["canonical_type"] == DataType.INTEGER.value + logger.info( + f"is_active correctly identified as INTEGER (MySQL BOOLEAN -> TINYINT mapping)" + ) + + # Test DATE type (birth_date) + date_result = field_map["birth_date"] + assert date_result["canonical_type"] == DataType.DATE.value + + # Test DATETIME type (created_at) + datetime_result = field_map["created_at"] + assert datetime_result["canonical_type"] == DataType.DATETIME.value + + # Test TEXT type (description) - should map to STRING + desc_result = field_map["description"] + assert desc_result["canonical_type"] == DataType.STRING.value + + # Verify all fields have the required enhanced information + for field_result in field_results: + assert field_result["existence"] == "PASSED" + assert field_result["type"] == "PASSED" + assert field_result["failure_code"] == "NONE" + + # Verify enhanced fields exist and have meaningful values + assert field_result["native_type"] is not None + assert field_result["canonical_type"] is not None + assert isinstance(field_result["native_metadata"], dict) + + logger.info( + f"✓ {field_result['column']}: " + f"native='{field_result['native_type']}', " + f"canonical='{field_result['canonical_type']}', " + f"metadata={field_result['native_metadata']}" + ) + + logger.info("✅ Native type reporting test completed successfully") + + finally: + # Cleanup + from typing import cast + + from shared.database.connection import get_db_url, get_engine + + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + cleanup_engine = await get_engine(db_url, pool_size=1, echo=False) + cleanup_executor = QueryExecutor(cleanup_engine) + + await cleanup_executor.execute_query( + "DROP TABLE IF EXISTS native_type_test", fetch=False + ) + await cleanup_engine.dispose() + + async def test_native_type_reporting_with_type_mismatch( + self, mysql_connection_params: dict + ) -> None: + """Test native type information is included even for TYPE_MISMATCH cases.""" + _skip_if_mysql_unavailable() + + # Prepare test environment + await self._prepare_test_environment(mysql_connection_params) + + # Create connection schema + connection = ConnectionSchema( + name="type_mismatch_test_connection", + description="Connection for testing type mismatch scenarios", + connection_type=ConnectionType.MYSQL, + host=mysql_connection_params["host"], + port=mysql_connection_params["port"], + username=mysql_connection_params["username"], + password=mysql_connection_params["password"], + db_name=mysql_connection_params["database"], + ) + + # Create schema executor + executor = SchemaExecutor(connection, test_mode=True) + + # Define schema rule with intentional type mismatches + columns = { + "id": {"expected_type": DataType.STRING.value}, # Mismatch: actual is INT + "name": { + "expected_type": DataType.INTEGER.value + }, # Mismatch: actual is VARCHAR + "age": { + "expected_type": DataType.FLOAT.value + }, # Mismatch: actual is SMALLINT + } + + rule = RuleSchema( + id="type_mismatch_test_rule", + name="Type Mismatch Test", + description="Test rule for type mismatch scenarios", + type=RuleType.SCHEMA, + category=RuleCategory.VALIDITY, + severity=SeverityLevel.MEDIUM, + action=RuleAction.LOG, + target=RuleTarget( + entities=[ + TargetEntity( + database=mysql_connection_params["database"], + table="native_type_test", + column=None, + ) + ], + relationship_type="single_table", + ), + parameters={"columns": columns}, + ) + + try: + # Execute the schema rule + result = await executor.execute_rule(rule) + + logger.info(f"Type mismatch test status: {result.status}") + logger.info(f"Execution message: {result.execution_message}") + + # Should fail due to type mismatches + assert result.status == "FAILED" + + # Verify schema details + assert result.execution_plan is not None + schema_details = result.execution_plan["schema_details"] + field_results = schema_details["field_results"] + assert len(field_results) == 3 + + # Verify that native type information is provided even for failed cases + for field_result in field_results: + assert field_result["existence"] == "PASSED" + assert field_result["type"] == "FAILED" + assert field_result["failure_code"] == "TYPE_MISMATCH" + + # Critical: native type info should still be present for failed validations + assert "native_type" in field_result + assert "canonical_type" in field_result + assert "native_metadata" in field_result + + assert field_result["native_type"] is not None + assert field_result["canonical_type"] is not None + assert isinstance(field_result["native_metadata"], dict) + + logger.info( + f"❌ {field_result['column']}: TYPE_MISMATCH but still has " + f"native='{field_result['native_type']}', " + f"canonical='{field_result['canonical_type']}'" + ) + + logger.info("✅ Type mismatch native type reporting test completed") + + finally: + # Cleanup + from typing import cast + + from shared.database.connection import get_db_url, get_engine + + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + cleanup_engine = await get_engine(db_url, pool_size=1, echo=False) + cleanup_executor = QueryExecutor(cleanup_engine) + + await cleanup_executor.execute_query( + "DROP TABLE IF EXISTS native_type_test", fetch=False + ) + await cleanup_engine.dispose() + + async def test_native_type_reporting_missing_field( + self, mysql_connection_params: dict + ) -> None: + """Test native type information handling for missing fields.""" + _skip_if_mysql_unavailable() + + # Prepare test environment with limited fields + await self._prepare_test_environment(mysql_connection_params) + + # Create connection schema + connection = ConnectionSchema( + name="missing_field_test_connection", + description="Connection for testing missing field scenarios", + connection_type=ConnectionType.MYSQL, + host=mysql_connection_params["host"], + port=mysql_connection_params["port"], + username=mysql_connection_params["username"], + password=mysql_connection_params["password"], + db_name=mysql_connection_params["database"], + ) + + # Create schema executor + executor = SchemaExecutor(connection, test_mode=True) + + # Define schema rule expecting more fields than exist in native_type_test + columns = { + "id": {"expected_type": DataType.INTEGER.value}, + "name": {"expected_type": DataType.STRING.value}, + "missing_field": { + "expected_type": DataType.STRING.value + }, # This field doesn't exist + } + + rule = build_schema_rule_with_native_reporting( + columns, "native_type_test", mysql_connection_params["database"] + ) + + try: + # Execute the schema rule + result = await executor.execute_rule(rule) + + logger.info(f"Missing field test status: {result.status}") + logger.info(f"Execution message: {result.execution_message}") + + # Should fail due to missing field + assert result.status == "FAILED" + + # Verify schema details + assert result.execution_plan is not None + schema_details = result.execution_plan["schema_details"] + field_results = schema_details["field_results"] + assert len(field_results) == 3 + + # Find results for each field + field_map = {fr["column"]: fr for fr in field_results} + + # Existing fields should have native type information + for existing_field in ["id", "name"]: + field_result = field_map[existing_field] + assert field_result["existence"] == "PASSED" + assert field_result["type"] == "PASSED" + assert field_result["native_type"] is not None + assert field_result["canonical_type"] is not None + assert isinstance(field_result["native_metadata"], dict) + logger.info( + f"✓ {existing_field}: native_type={field_result['native_type']}" + ) + + # Missing field should have null native type information + missing_result = field_map["missing_field"] + assert missing_result["existence"] == "FAILED" + assert missing_result["type"] == "SKIPPED" + assert missing_result["failure_code"] == "FIELD_MISSING" + assert missing_result["native_type"] is None + assert missing_result["canonical_type"] is None + assert missing_result["native_metadata"] == {} + logger.info("✓ missing_field: correctly handled as FIELD_MISSING") + + logger.info("✅ Missing field native type reporting test completed") + + finally: + # Cleanup + from typing import cast + + from shared.database.connection import get_db_url, get_engine + + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + cleanup_engine = await get_engine(db_url, pool_size=1, echo=False) + cleanup_executor = QueryExecutor(cleanup_engine) + + await cleanup_executor.execute_query( + "DROP TABLE IF EXISTS native_type_test", fetch=False + ) + await cleanup_engine.dispose() + + async def test_native_metadata_precision_scale( + self, mysql_connection_params: dict + ) -> None: + """Test native metadata reporting for decimal types with precision/scale.""" + _skip_if_mysql_unavailable() + + # Create test environment with decimal types + from typing import cast + + from shared.database.connection import get_db_url, get_engine + + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + engine = await get_engine(db_url, pool_size=1, echo=False) + executor = QueryExecutor(engine) + + # Clean up and create test table with decimal types + await executor.execute_query("DROP TABLE IF EXISTS precision_test", fetch=False) + + await executor.execute_query( + """ + CREATE TABLE precision_test ( + price DECIMAL(10,2), + amount NUMERIC(8,3), + ratio FLOAT(7,4) + ) ENGINE=InnoDB + """, + fetch=False, + ) + + await engine.dispose() + + # Create connection schema + connection = ConnectionSchema( + name="precision_test_connection", + description="Connection for testing precision/scale metadata", + connection_type=ConnectionType.MYSQL, + host=mysql_connection_params["host"], + port=mysql_connection_params["port"], + username=mysql_connection_params["username"], + password=mysql_connection_params["password"], + db_name=mysql_connection_params["database"], + ) + + # Create schema executor + schema_executor = SchemaExecutor(connection, test_mode=True) + + # Define schema rule for decimal types + columns = { + "price": { + "expected_type": DataType.FLOAT.value, + "precision": 10, + "scale": 2, + }, + "amount": { + "expected_type": DataType.FLOAT.value, + "precision": 8, + "scale": 3, + }, + "ratio": {"expected_type": DataType.FLOAT.value}, + } + rule = build_schema_rule_with_native_reporting( + columns, "precision_test", mysql_connection_params["database"] + ) + + try: + # Execute rule + result = await schema_executor.execute_rule(rule) + + logger.info(f"Precision/scale test status: {result.status}") + + # Verify field_results include precision/scale metadata + assert result.execution_plan is not None + schema_details = result.execution_plan["schema_details"] + field_results = schema_details["field_results"] + + assert len(field_results) == 3 + + for field_result in field_results: + assert "native_metadata" in field_result + native_metadata = field_result["native_metadata"] + + # Verify the native type is captured + assert field_result["native_type"] is not None + assert field_result["canonical_type"] == DataType.FLOAT.value + + # Verify structure (MySQL may provide precision/scale info) + assert isinstance(native_metadata, dict) + + column_name = field_result["column"] + logger.info( + f"✓ {column_name}: native_type={field_result['native_type']}, " + f"metadata={native_metadata}" + ) + + logger.info("✅ Precision/scale metadata test completed") + + finally: + # Cleanup + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + cleanup_engine = await get_engine(db_url, pool_size=1, echo=False) + cleanup_executor = QueryExecutor(cleanup_engine) + + await cleanup_executor.execute_query( + "DROP TABLE IF EXISTS precision_test", fetch=False + ) + await cleanup_engine.dispose() + + async def test_comprehensive_type_coverage_extended( + self, mysql_connection_params: dict + ) -> None: + """Test native type reporting across extended variety of database types.""" + _skip_if_mysql_unavailable() + + # Create test environment with comprehensive type coverage + from typing import cast + + from shared.database.connection import get_db_url, get_engine + + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + engine = await get_engine(db_url, pool_size=1, echo=False) + executor = QueryExecutor(engine) + + # Clean up and create comprehensive test table + await executor.execute_query( + "DROP TABLE IF EXISTS comprehensive_test", fetch=False + ) + + await executor.execute_query( + """ + CREATE TABLE comprehensive_test ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + tiny_num TINYINT, + small_num SMALLINT, + medium_num MEDIUMINT, + big_num BIGINT, + float_num FLOAT, + double_num DOUBLE, + decimal_num DECIMAL(15,4), + char_field CHAR(10), + varchar_field VARCHAR(255), + text_field TEXT, + bool_field BOOLEAN, + date_field DATE, + datetime_field DATETIME, + timestamp_field TIMESTAMP + ) ENGINE=InnoDB + """, + fetch=False, + ) + + await engine.dispose() + + # Create connection schema + connection = ConnectionSchema( + name="comprehensive_test_connection", + description="Connection for comprehensive type coverage testing", + connection_type=ConnectionType.MYSQL, + host=mysql_connection_params["host"], + port=mysql_connection_params["port"], + username=mysql_connection_params["username"], + password=mysql_connection_params["password"], + db_name=mysql_connection_params["database"], + ) + + # Create schema executor + schema_executor = SchemaExecutor(connection, test_mode=True) + + # Define comprehensive schema rule + columns = { + "id": {"expected_type": DataType.INTEGER.value}, + "tiny_num": {"expected_type": DataType.INTEGER.value}, + "small_num": {"expected_type": DataType.INTEGER.value}, + "medium_num": {"expected_type": DataType.INTEGER.value}, + "big_num": {"expected_type": DataType.INTEGER.value}, + "float_num": {"expected_type": DataType.FLOAT.value}, + "double_num": {"expected_type": DataType.FLOAT.value}, + "decimal_num": {"expected_type": DataType.FLOAT.value}, + "char_field": {"expected_type": DataType.STRING.value}, + "varchar_field": {"expected_type": DataType.STRING.value}, + "text_field": {"expected_type": DataType.STRING.value}, + "bool_field": { + "expected_type": DataType.INTEGER.value + }, # MySQL BOOLEAN -> TINYINT + "date_field": {"expected_type": DataType.DATE.value}, + "datetime_field": {"expected_type": DataType.DATETIME.value}, + "timestamp_field": {"expected_type": DataType.DATETIME.value}, + } + + rule = build_schema_rule_with_native_reporting( + columns, "comprehensive_test", mysql_connection_params["database"] + ) + + try: + # Execute rule + result = await schema_executor.execute_rule(rule) + + logger.info(f"Comprehensive type coverage test status: {result.status}") + logger.info(f"Execution message: {result.execution_message}") + + # Debug field-level failures before asserting + if result.status == "FAILED": + assert result.execution_plan is not None + schema_details = result.execution_plan["schema_details"] + field_results = schema_details["field_results"] + + for field_result in field_results: + if field_result["failure_code"] != "NONE": + logger.error( + f"❌ {field_result['column']}: {field_result['failure_code']} - " + f"native='{field_result.get('native_type')}', " + f"canonical='{field_result.get('canonical_type')}'" + ) + if field_result.get("failure_details"): + logger.error( + f" Details: {field_result['failure_details']}" + ) + + # Should pass with correct type mappings + assert result.status == "PASSED" + + # Verify all fields have complete native type information + assert result.execution_plan is not None + schema_details = result.execution_plan["schema_details"] + field_results = schema_details["field_results"] + + assert len(field_results) == len(columns) + + for field_result in field_results: + # Every field should have complete native type information + assert field_result["native_type"] is not None + assert field_result["canonical_type"] is not None + assert isinstance(field_result["native_metadata"], dict) + assert field_result["existence"] == "PASSED" + assert field_result["type"] == "PASSED" + assert field_result["failure_code"] == "NONE" + + column_name = field_result["column"] + logger.info( + f"✓ {column_name}: native='{field_result['native_type']}', " + f"canonical='{field_result['canonical_type']}', " + f"metadata={field_result['native_metadata']}" + ) + + logger.info("✅ Comprehensive type coverage test completed successfully") + + finally: + # Cleanup + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + cleanup_engine = await get_engine(db_url, pool_size=1, echo=False) + cleanup_executor = QueryExecutor(cleanup_engine) + + await cleanup_executor.execute_query( + "DROP TABLE IF EXISTS comprehensive_test", fetch=False + ) + await cleanup_engine.dispose() + + +@pytest.mark.integration +@pytest.mark.database +class TestNativeTypeReportingBackwardCompatibility: + """Test that native type enhancements maintain backward compatibility.""" + + async def _prepare_compatibility_test_environment( + self, mysql_connection_params: dict + ) -> QueryExecutor: + """Prepare MySQL test environment for compatibility testing.""" + from typing import cast + + from shared.database.connection import get_db_url, get_engine + + # Create engine for setup + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + engine = await get_engine(db_url, pool_size=1, echo=False) + executor = QueryExecutor(engine) + + # Clean up and create test table + await executor.execute_query("DROP TABLE IF EXISTS compat_test", fetch=False) + + await executor.execute_query( + """ + CREATE TABLE compat_test ( + id INT PRIMARY KEY AUTO_INCREMENT, + name VARCHAR(50) NOT NULL, + status BOOLEAN DEFAULT TRUE + ) ENGINE=InnoDB + """, + fetch=False, + ) + + await engine.dispose() + return executor + + async def test_existing_functionality_unchanged( + self, mysql_connection_params: dict + ) -> None: + """Test that existing schema validation functionality is unchanged.""" + _skip_if_mysql_unavailable() + + # Prepare test environment + await self._prepare_compatibility_test_environment(mysql_connection_params) + + # Create connection schema + connection = ConnectionSchema( + name="compat_test_connection", + description="Connection for backward compatibility testing", + connection_type=ConnectionType.MYSQL, + host=mysql_connection_params["host"], + port=mysql_connection_params["port"], + username=mysql_connection_params["username"], + password=mysql_connection_params["password"], + db_name=mysql_connection_params["database"], + ) + + # Create schema executor + executor = SchemaExecutor(connection, test_mode=True) + + # Use existing schema rule format + columns = { + "id": {"expected_type": DataType.INTEGER.value}, + "name": {"expected_type": DataType.STRING.value}, + "status": { + "expected_type": DataType.INTEGER.value + }, # BOOLEAN -> INTEGER in MySQL + } + + rule = build_schema_rule_with_native_reporting( + columns, "compat_test", mysql_connection_params["database"] + ) + + try: + # Execute rule + result = await executor.execute_rule(rule) + + logger.info(f"Backward compatibility test status: {result.status}") + + # Verify existing fields are still present and working + assert result.status == "PASSED" + assert result.rule_id == rule.id + assert len(result.dataset_metrics) == 1 + + # Verify execution_plan structure is maintained + execution_plan = result.execution_plan + assert execution_plan is not None + assert "execution_type" in execution_plan + assert "schema_details" in execution_plan + + schema_details = execution_plan["schema_details"] + assert "field_results" in schema_details + assert "extras" in schema_details + assert "table_exists" in schema_details + + # Verify field_results have expected legacy fields + field_results = schema_details["field_results"] + assert len(field_results) == 3 + + for field_result in field_results: + # Legacy fields must be present + assert "column" in field_result + assert "existence" in field_result + assert "type" in field_result + assert "failure_code" in field_result + + # Enhanced fields should also be present + assert "native_type" in field_result + assert "canonical_type" in field_result + assert "native_metadata" in field_result + + # Values should be meaningful + assert field_result["existence"] == "PASSED" + assert field_result["type"] == "PASSED" + assert field_result["failure_code"] == "NONE" + assert field_result["native_type"] is not None + assert field_result["canonical_type"] is not None + assert isinstance(field_result["native_metadata"], dict) + + logger.info( + f"✓ {field_result['column']}: legacy + enhanced fields present" + ) + + logger.info("✅ Backward compatibility test completed successfully") + + finally: + # Cleanup + from typing import cast + + from shared.database.connection import get_db_url, get_engine + + db_url = get_db_url( + str(mysql_connection_params["db_type"]), + str(mysql_connection_params["host"]), + cast(int, mysql_connection_params["port"]), + str(mysql_connection_params["database"]), + str(mysql_connection_params["username"]), + str(mysql_connection_params["password"]), + ) + cleanup_engine = await get_engine(db_url, pool_size=1, echo=False) + cleanup_executor = QueryExecutor(cleanup_engine) + + await cleanup_executor.execute_query( + "DROP TABLE IF EXISTS compat_test", fetch=False + ) + await cleanup_engine.dispose() diff --git a/tests/unit/shared/utils/test_type_parser.py b/tests/unit/shared/utils/test_type_parser.py new file mode 100644 index 0000000..fb6b7de --- /dev/null +++ b/tests/unit/shared/utils/test_type_parser.py @@ -0,0 +1,375 @@ +""" +Tests for TypeParser utility + +Comprehensive test coverage for syntactic sugar type parsing and backward compatibility. +""" + +from typing import Any + +import pytest + +from shared.enums.data_types import DataType +from shared.utils.type_parser import ( + TypeParseError, + TypeParser, + is_syntactic_sugar, + normalize_type, + parse_type, +) + + +class TestTypeParser: + """Test TypeParser class methods""" + + def test_parse_simple_types(self) -> None: + """Test parsing of simple type names.""" + # Test all supported simple types + test_cases = [ + ("string", {"type": DataType.STRING.value}), + ("str", {"type": DataType.STRING.value}), + ("integer", {"type": DataType.INTEGER.value}), + ("int", {"type": DataType.INTEGER.value}), + ("float", {"type": DataType.FLOAT.value}), + ("boolean", {"type": DataType.BOOLEAN.value}), + ("bool", {"type": DataType.BOOLEAN.value}), + ("date", {"type": DataType.DATE.value}), + ("datetime", {"type": DataType.DATETIME.value}), + ] + + for input_type, expected in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + def test_parse_case_insensitive(self) -> None: + """Test that parsing is case insensitive.""" + test_cases = ["STRING", "String", "sTrInG", "INTEGER", "Int", "FLOAT", "Float"] + + for input_type in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert "type" in result + assert result["type"] in [dt.value for dt in DataType] + + def test_parse_string_with_length(self) -> None: + """Test parsing string with length specification.""" + test_cases = [ + ("string(50)", {"type": DataType.STRING.value, "max_length": 50}), + ("STRING(255)", {"type": DataType.STRING.value, "max_length": 255}), + ("str(10)", {"type": DataType.STRING.value, "max_length": 10}), + ( + "string( 100 )", + {"type": DataType.STRING.value, "max_length": 100}, + ), # with spaces + ] + + for input_type, expected in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + def test_parse_float_with_precision_scale(self) -> None: + """Test parsing float with precision and scale.""" + test_cases = [ + ( + "float(10,2)", + {"type": DataType.FLOAT.value, "precision": 10, "scale": 2}, + ), + ( + "FLOAT(12,4)", + {"type": DataType.FLOAT.value, "precision": 12, "scale": 4}, + ), + ( + "float( 8 , 3 )", + {"type": DataType.FLOAT.value, "precision": 8, "scale": 3}, + ), # with spaces + ( + "float(15,0)", + {"type": DataType.FLOAT.value, "precision": 15, "scale": 0}, + ), + ] + + for input_type, expected in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + def test_parse_datetime_with_format(self) -> None: + """Test parsing datetime with format specification.""" + test_cases = [ + ( + "datetime('yyyymmdd')", + {"type": DataType.DATETIME.value, "format": "yyyymmdd"}, + ), + ( + 'DATETIME("yyyy-mm-dd")', + {"type": DataType.DATETIME.value, "format": "yyyy-mm-dd"}, + ), + ( + "datetime( 'dd/mm/yyyy hh:mm:ss' )", + {"type": DataType.DATETIME.value, "format": "dd/mm/yyyy hh:mm:ss"}, + ), + ] + + for input_type, expected in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + def test_parse_detailed_format_backward_compatibility(self) -> None: + """Test parsing detailed JSON format for backward compatibility.""" + test_cases: list[tuple[dict, dict]] = [ + ({"type": "string"}, {"type": DataType.STRING.value}), + ( + {"type": "string", "max_length": 100}, + {"type": DataType.STRING.value, "max_length": 100}, + ), + ( + {"type": "float", "precision": 10, "scale": 2}, + {"type": DataType.FLOAT.value, "precision": 10, "scale": 2}, + ), + ( + {"type": "datetime", "format": "yyyy-mm-dd"}, + {"type": DataType.DATETIME.value, "format": "yyyy-mm-dd"}, + ), + ] + + for input_type, expected in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + def test_error_cases(self) -> None: + """Test error handling for invalid type definitions.""" + error_cases: list[tuple[Any, str]] = [ + ("invalid_type", "Cannot parse type definition"), + ("string(-1)", "String length must be positive"), + ("float(0,2)", "Float precision must be positive"), + ("float(5,-1)", "Float scale cannot be negative"), + ("float(3,5)", "Float scale cannot be greater than precision"), + ({"type": "unknown"}, "Unsupported type 'unknown'"), + ({"missing_type": "value"}, "Detailed format must include 'type' field"), + (123, "Type definition must be string or dict"), + (None, "Type definition must be string or dict"), + ] + + for input_type, expected_error in error_cases: + with pytest.raises(TypeParseError) as exc_info: + TypeParser.parse_type_definition(input_type) + assert expected_error in str(exc_info.value) + + def test_metadata_validation(self) -> None: + """Test metadata validation for type consistency.""" + # Test invalid metadata combinations in detailed format + invalid_cases: list[tuple[dict, str]] = [ + ( + {"type": "integer", "max_length": 10}, + "max_length can only be specified for STRING type", + ), + ( + {"type": "string", "precision": 5}, + "precision/scale can only be specified for FLOAT type", + ), + ( + {"type": "boolean", "scale": 2}, + "precision/scale can only be specified for FLOAT type", + ), + ( + {"type": "date", "format": "yyyy"}, + "format can only be specified for DATETIME type", + ), + ( + {"type": "string", "max_length": 0}, + "max_length must be a positive integer", + ), + ({"type": "float", "precision": 0}, "precision must be a positive integer"), + ({"type": "float", "scale": -1}, "scale must be a non-negative integer"), + ( + {"type": "float", "precision": 3, "scale": 5}, + "scale cannot be greater than precision", + ), + ] + + for input_type, expected_error in invalid_cases: + with pytest.raises(TypeParseError) as exc_info: + TypeParser.parse_type_definition(input_type) + assert expected_error in str(exc_info.value) + + def test_is_syntactic_sugar(self) -> None: + """Test identification of syntactic sugar formats.""" + sugar_cases = [ + "string(50)", + "float(10,2)", + "datetime('yyyy-mm-dd')", + "integer", + "boolean", + ] + + detailed_cases = [ + {"type": "string"}, + {"type": "float", "precision": 10}, + 123, + None, + ] + + case: Any = None + for case in sugar_cases: + assert TypeParser.is_syntactic_sugar(case) is True + + for case in detailed_cases: + assert TypeParser.is_syntactic_sugar(case) is False + + def test_normalize_to_detailed_format(self) -> None: + """Test normalization to detailed format.""" + test_cases: list[tuple[str | dict, dict]] = [ + ( + "string(50)", + {"type": "string", "desired_type": "STRING", "max_length": 50}, + ), + ( + "float(10,2)", + {"type": "float", "desired_type": "FLOAT", "precision": 10, "scale": 2}, + ), + ({"type": "boolean"}, {"type": "boolean", "desired_type": "BOOLEAN"}), + ] + + for input_type, expected_keys in test_cases: + result = TypeParser.normalize_to_detailed_format(input_type) + for key, value in expected_keys.items(): + assert result[key] == value + + +class TestConvenienceFunctions: + """Test convenience functions""" + + def test_parse_type_function(self) -> None: + """Test parse_type convenience function.""" + result = parse_type("string(100)") + assert result == {"type": DataType.STRING.value, "max_length": 100} + + def test_is_syntactic_sugar_function(self) -> None: + """Test is_syntactic_sugar convenience function.""" + assert is_syntactic_sugar("float(10,2)") is True + assert is_syntactic_sugar({"type": "string"}) is False + + def test_normalize_type_function(self) -> None: + """Test normalize_type convenience function.""" + result = normalize_type("string(50)") + assert result["type"] == "string" + assert result["desired_type"] == "STRING" + assert result["max_length"] == 50 + + +class TestEdgeCases: + """Test edge cases and boundary conditions""" + + def test_whitespace_handling(self) -> None: + """Test handling of various whitespace scenarios.""" + test_cases = [ + (" string ", {"type": DataType.STRING.value}), + ("string( 50 )", {"type": DataType.STRING.value, "max_length": 50}), + ( + "float( 10 , 2 )", + {"type": DataType.FLOAT.value, "precision": 10, "scale": 2}, + ), + ( + "datetime( ' format ' )", + {"type": DataType.DATETIME.value, "format": " format "}, + ), + ] + + for input_type, expected in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + def test_boundary_values(self) -> None: + """Test boundary values for numeric parameters.""" + # Test valid boundary values + valid_cases = [ + ("string(1)", {"type": DataType.STRING.value, "max_length": 1}), + ("float(1,0)", {"type": DataType.FLOAT.value, "precision": 1, "scale": 0}), + ("float(1,1)", {"type": DataType.FLOAT.value, "precision": 1, "scale": 1}), + ] + + for input_type, expected in valid_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + # Test invalid boundary values + invalid_cases = [ + ("string(0)", "String length must be positive"), + ("float(0,0)", "Float precision must be positive"), + ] + + for input_type, expected_error in invalid_cases: + with pytest.raises(TypeParseError) as exc_info: + TypeParser.parse_type_definition(input_type) + assert expected_error in str(exc_info.value) + + def test_quote_variations(self) -> None: + """Test different quote styles for datetime format.""" + test_cases = [ + ("datetime('format')", "format"), + ('datetime("format")', "format"), + ("datetime('format with spaces')", "format with spaces"), + ("datetime(\"format with 'quotes'\")", "format with 'quotes'"), + ] + + for input_type, expected_format in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == { + "type": DataType.DATETIME.value, + "format": expected_format, + } + + def test_large_numbers(self) -> None: + """Test handling of large numeric values.""" + test_cases = [ + ("string(65535)", {"type": DataType.STRING.value, "max_length": 65535}), + ( + "float(38,10)", + {"type": DataType.FLOAT.value, "precision": 38, "scale": 10}, + ), + ] + + for input_type, expected in test_cases: + result = TypeParser.parse_type_definition(input_type) + assert result == expected + + +class TestIntegrationWithDataTypeEnum: + """Test integration with DataType enum""" + + def test_all_data_types_supported(self) -> None: + """Test that all DataType enum values are supported.""" + type_mappings = { + "string": DataType.STRING, + "integer": DataType.INTEGER, + "float": DataType.FLOAT, + "boolean": DataType.BOOLEAN, + "date": DataType.DATE, + "datetime": DataType.DATETIME, + } + + for type_name, expected_enum in type_mappings.items(): + result = TypeParser.parse_type_definition(type_name) + assert result["type"] == expected_enum.value + + def test_enum_value_consistency(self) -> None: + """Test that returned type values match DataType enum values.""" + result = TypeParser.parse_type_definition("string") + assert result["type"] == DataType.STRING.value == "STRING" + + result = TypeParser.parse_type_definition("float(10,2)") + assert result["type"] == DataType.FLOAT.value == "FLOAT" + + +@pytest.mark.parametrize( + "input_type,expected", + [ + ("string(50)", {"type": "STRING", "max_length": 50}), + ("float(12,2)", {"type": "FLOAT", "precision": 12, "scale": 2}), + ("datetime('yyyymmdd')", {"type": "DATETIME", "format": "yyyymmdd"}), + ("integer", {"type": "INTEGER"}), + ("boolean", {"type": "BOOLEAN"}), + ("date", {"type": "DATE"}), + ], +) +def test_acceptance_criteria_examples(input_type: str, expected: dict) -> None: + """Test the specific examples from the acceptance criteria.""" + result = parse_type(input_type) + assert result == expected