diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..de289a8 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,36 @@ +# See https://pre-commit.com for more information +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v5.0.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-json + - id: check-added-large-files + args: ['--maxkb=500'] + - id: check-merge-conflict + - id: detect-private-key + + - repo: https://github.com/pycqa/flake8 + rev: 7.1.2 + hooks: + - id: flake8 + args: + - --max-line-length=150 + - --extend-ignore=E501,W503,W504 + - --select=E9,F63,F7,F82 + + - repo: https://github.com/pycqa/isort + rev: 6.0.1 + hooks: + - id: isort + args: ['--profile=black', '--line-length=150'] + + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.15.0 + hooks: + - id: mypy + args: ['--config-file=mypy.ini', '--ignore-missing-imports'] + additional_dependencies: [] + pass_filenames: false diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..7d7b987 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,95 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). + +## [2.3.1] - 2026 + +### Fixed +- Unicode output crash on Windows (PyInstaller cp1252 encoding issue) +- Replaced Cyrillic text in argparse help with English for cross-platform compatibility +- Added PyInstaller runtime hook to force UTF-8 on Windows + +### Changed +- CI: replaced softprops/action-gh-release with `gh` CLI +- CI: opted into Node.js 24 for GitHub Actions runners +- Docs: updated copyright year range to 2025-2026 + +## [2.3.0] - 2026 + +### Added +- `modules/security.py` — AES-256-GCM encryption/decryption for mapping files +- `modules/config.py` — YAML + ENV + CLI configuration with priority chain (CLI > ENV > YAML > Default) +- `modules/masking_logger.py` — structured logging (JSON + colored console output) +- `modules/selective.py` — `--only` / `--exclude` filters for selective masking +- `modules/re_mask.py` — multi-pass re-masking with chain tracking +- `modules/tools.py` — atomic masking functions for programmatic API usage +- `modules/password_generator.py` — cryptographically secure password generation +- CI/CD: GitHub Actions for linting, testing (Python 3.13, 3.9 compat), and releases +- CI/CD: Windows binary builds via PyInstaller + +### Changed +- Complete migration to `modules/` package architecture +- UTF-8 encoding fixes (mojibake prevention) + +## [2.2.14] - 2025 + +### Changed +- Improved code documentation with detailed docstrings +- Added inline comments for complex logic +- Improved block comments for code sections + +## [2.2.13] - 2025 + +### Changed +- Merged `data_masking.py` (v2.2.10) and `data_masking_v2_2_12_fixed.py` +- Preserved all bug fixes from v2.2.12 + +## [2.2.12] - 2025 + +### Fixed +- Bug #18: `mask_rank()` did not preserve Title Case for multi-word ranks + ("Старший Лейтенант" now correctly maps to "Майор" in Title Case) + +## [2.2.11] - 2025 + +### Fixed +- Bug #16: `mask_rank()` did not preserve case when using `.title()` + ("Капітан" now correctly maps to "Майор" in Title Case) +- Bug #17: `mask_name()` did not apply case for names already in mapping + ("петро" now correctly maps to "павло" in lowercase) + +## [2.2.10] - 2025 + +### Fixed +- Bug #15: "старшого\nсержанта" was incorrectly masked as "старшого старшого сержанта" + Added `normalize_broken_ranks()` function to handle line-broken ranks +- Restored full report format and statistics output + +## [2.1.16] - 2025 + +### Added +- Abbreviation whitelist support (ЗСУ, МОУ, ВСУ, etc. are no longer masked) + +## [2.0.0] - 2025 + +### Added +- Instance tracking for all masked values +- Deterministic masking via blake2b hash-based seed generation +- v2.0 mapping file format with per-instance tracking +- Support for Ukrainian military ranks with all grammatical cases (nominative, genitive, dative, instrumental) +- Gender-aware masking (male/female rank forms) +- Case preservation (UPPER, Title, lower) +- Support for "у відставці" / "в запасі" / "на пенсії" suffixes + +### Supported data types +- PIB (names, surnames, patronymics) with declension support +- IPN (10-digit tax identification numbers) +- Passports (AA123456) and ID passports (9-digit) +- Military IDs (МТ123456) +- Military ranks (Army, Navy, Legal, Medical services) +- Brigades, military units (в/ч А1234) +- Order numbers (наказ №123) +- BR numbers (75/25/3400/Р) +- Dates (DD.MM.YYYY with ±30 day shift) diff --git a/data_masking.py b/data_masking.py index 59c8f62..fc39e39 100644 --- a/data_masking.py +++ b/data_masking.py @@ -83,41 +83,50 @@ # ============================================================================ # --- MODULES v2.3.0 --- +import logging as _logging +_opt_logger = _logging.getLogger(__name__) + try: from modules.selective import SelectiveFilter, apply_filter_to_globals, get_available_types SELECTIVE_AVAILABLE = True except ImportError: SELECTIVE_AVAILABLE = False + _opt_logger.debug("modules.selective not available — --only/--exclude disabled") try: from modules.re_mask import ReMasker, MappingChain, make_empty_masking_dict REMASK_AVAILABLE = True except ImportError: REMASK_AVAILABLE = False + _opt_logger.debug("modules.re_mask not available — re-masking disabled") try: from modules.security import MappingSecurityManager SECURITY_AVAILABLE = True except ImportError: SECURITY_AVAILABLE = False + _opt_logger.debug("modules.security not available — encryption disabled") try: from modules.config import ConfigLoader CONFIG_AVAILABLE = True except ImportError: CONFIG_AVAILABLE = False + _opt_logger.debug("modules.config not available — YAML config disabled") try: from modules.masking_logger import setup_logging LOGGING_AVAILABLE = True except ImportError: LOGGING_AVAILABLE = False + _opt_logger.debug("modules.masking_logger not available — structured logging disabled") try: from modules.password_generator import generate_password PASSWORD_GENERATOR_AVAILABLE = True except ImportError: PASSWORD_GENERATOR_AVAILABLE = False + _opt_logger.debug("modules.password_generator not available — password generation disabled") # ============================================================================ # МЕТАДАНІ @@ -265,6 +274,27 @@ MONTHS_GENITIVE_BY_NUM = {v: k for k, v in MONTHS_GENITIVE.items()} MONTHS_GENITIVE_PATTERN = '|'.join(re.escape(m) for m in MONTHS_GENITIVE.keys()) +# Maximum input file size in bytes (default: 100 MB) +MAX_INPUT_FILE_SIZE = 100 * 1024 * 1024 + + +def validate_file_size(file_path: Path, max_size: int = MAX_INPUT_FILE_SIZE) -> None: + """ + Перевіряє розмір файлу перед зчитуванням у пам'ять. + + Raises: + ValueError: якщо файл перевищує допустимий розмір + """ + file_size = file_path.stat().st_size + if file_size > max_size: + max_mb = max_size / (1024 * 1024) + actual_mb = file_size / (1024 * 1024) + raise ValueError( + f"File {file_path.name} ({actual_mb:.1f} MB) exceeds maximum " + f"allowed size ({max_mb:.0f} MB)" + ) + + # ============================================================================ # ДОПОМІЖНІ ФУНКЦІЇ (БАЗОВІ) # ============================================================================ @@ -1290,10 +1320,9 @@ def mask_date(original: str, masking_dict: Dict, instance_counters: Dict) -> str new_date = datetime(2035, 12, 31) - timedelta(days=random.randint(0, 365)) masked = new_date.strftime("%d.%m.%Y") - except (ValueError, OverflowError): - return original - except Exception as e: - print(f"Warning: unexpected error parsing date '{original}': {e}") + except (ValueError, OverflowError, TypeError, AttributeError) as e: + if DEBUG_MODE: + print(f"Warning: error parsing date '{original}': {e}") return original return add_to_mapping(masking_dict, instance_counters, "date", original, masked) @@ -1988,7 +2017,8 @@ def generate_password_from_config(config) -> str: return ''.join(secrets.choice(alphabet) for _ in range(length)) -def main(): +def _build_parser() -> argparse.ArgumentParser: + """Build and return the CLI argument parser.""" parser = argparse.ArgumentParser( description=f"Data Masking Script v{__version__}", formatter_class=argparse.RawDescriptionHelpFormatter @@ -2009,7 +2039,6 @@ def main(): parser.add_argument("--log-file", type=str, default=None, help="Path to log file") - # Selective filtering arguments if SELECTIVE_AVAILABLE: parser.add_argument("--only", nargs="+", metavar="TYPE", help="Mask only these types") @@ -2018,7 +2047,6 @@ def main(): parser.add_argument("--list-types", action="store_true", help="List available masking types and exit") - # Security arguments if SECURITY_AVAILABLE: parser.add_argument("--encrypt", action="store_true", help="Encrypt the mapping file") @@ -2027,197 +2055,195 @@ def main(): parser.add_argument("--password-env", type=str, default=None, help="Environment variable name containing the password") - # Re-masking arguments if REMASK_AVAILABLE: parser.add_argument("--re-mask", type=int, default=None, metavar="N", help="Number of re-masking passes (2-10)") - args = parser.parse_args() + return parser - # ================================================================ - # Handle --init-config - # ================================================================ - if args.init_config: - generate_default_config("config.yaml") - print("Generated config.yaml") - return - # ================================================================ - # Handle --list-types - # ================================================================ - if SELECTIVE_AVAILABLE and getattr(args, 'list_types', False): - types = sorted(get_available_types()) - print("Available masking types:") - for t in types: - print(f" - {t}") - return +def _load_config(args) -> Optional[Any]: + """Load configuration from YAML file if available. - # ================================================================ - # Config loading with priority: CLI args > config file > defaults - # ================================================================ - config = None - if CONFIG_AVAILABLE: - config_path = args.config - if config_path is None and Path("config.yaml").exists(): - config_path = "config.yaml" - if config_path: - try: - loader = ConfigLoader(config_path) - config = loader.load() - print(f"Loaded config from {config_path}") - except Exception as e: - print(f"Warning: could not load config from {config_path}: {e}") - config = None + Returns: + Config object or None. + """ + if not CONFIG_AVAILABLE: + return None - # ================================================================ - # Structured logging setup - # ================================================================ - logger = None - if LOGGING_AVAILABLE: - log_level = args.log_level - log_file = args.log_file + config_path = args.config + if config_path is None and Path("config.yaml").exists(): + config_path = "config.yaml" + if not config_path: + return None - # Config file values as fallback - if config is not None: - if log_level is None: - log_level = getattr(getattr(config, 'logging', None), 'level', None) - if log_file is None: - log_file = getattr(getattr(config, 'logging', None), 'file', None) + try: + loader = ConfigLoader(config_path) + config = loader.load() + print(f"Loaded config from {config_path}") + return config + except (FileNotFoundError, PermissionError, ValueError, OSError) as e: + print(f"Warning: could not load config from {config_path}: {e}") + return None + + +def _setup_logger(args, config) -> Optional[Any]: + """Set up structured logging. + + Returns: + Logger instance or None. + """ + if not LOGGING_AVAILABLE: + return None + + log_level = args.log_level + log_file = args.log_file + if config is not None: if log_level is None: - log_level = "INFO" + log_level = getattr(getattr(config, 'logging', None), 'level', None) + if log_file is None: + log_file = getattr(getattr(config, 'logging', None), 'file', None) - try: - logger = setup_logging(level=log_level, log_file=log_file) - logger.info(f"Data Masking Script v{__version__} started") - except Exception as e: - print(f"Warning: could not setup logging: {e}") - logger = None + if log_level is None: + log_level = "INFO" - global DEBUG_MODE - if args.debug: - DEBUG_MODE = True + try: + logger = setup_logging(level=log_level, log_file=log_file) + logger.info(f"Data Masking Script v{__version__} started") + return logger + except (ValueError, OSError, TypeError) as e: + print(f"Warning: could not setup logging: {e}") + return None - # ================================================================ - # Apply selective filtering (--only / --exclude) - # ================================================================ + +def _apply_selective_filters(args, logger) -> None: + """Apply --only / --exclude selective masking filters. + + Modifies global MASK_* variables in-place. + """ global MASK_NAMES, MASK_IPN, MASK_PASSPORT, MASK_MILITARY_ID, MASK_RANKS global MASK_BRIGADES, MASK_UNITS, MASK_ORDERS, MASK_BR_NUMBERS, MASK_DATES - if SELECTIVE_AVAILABLE: - only_types = getattr(args, 'only', None) - exclude_types = getattr(args, 'exclude', None) - - if only_types or exclude_types: - type_flag_map = { - "names": "MASK_NAMES", - "ipn": "MASK_IPN", - "passport": "MASK_PASSPORT", - "military_id": "MASK_MILITARY_ID", - "ranks": "MASK_RANKS", - "brigades": "MASK_BRIGADES", - "units": "MASK_UNITS", - "orders": "MASK_ORDERS", - "br_numbers": "MASK_BR_NUMBERS", - "dates": "MASK_DATES", - } + if not SELECTIVE_AVAILABLE: + return - if only_types: - # Disable all, then enable only specified - MASK_NAMES = False - MASK_IPN = False - MASK_PASSPORT = False - MASK_MILITARY_ID = False - MASK_RANKS = False - MASK_BRIGADES = False - MASK_UNITS = False - MASK_ORDERS = False - MASK_BR_NUMBERS = False - MASK_DATES = False - for t in only_types: - t_lower = t.lower() - if t_lower in type_flag_map: - globals()[type_flag_map[t_lower]] = True - else: - print(f"Warning: unknown type '{t}', ignoring") - print(f"Selective masking: --only {' '.join(only_types)}") - - elif exclude_types: - for t in exclude_types: - t_lower = t.lower() - if t_lower in type_flag_map: - globals()[type_flag_map[t_lower]] = False - else: - print(f"Warning: unknown type '{t}', ignoring") - print(f"Selective masking: --exclude {' '.join(exclude_types)}") + only_types = getattr(args, 'only', None) + exclude_types = getattr(args, 'exclude', None) - # ================================================================ - # Apply config-based masking rules (lower priority than CLI --only/--exclude) - # ================================================================ - if config is not None: - masking_rules = getattr(config, 'masking_rules', None) - if masking_rules is not None: - # Only apply config rules if --only/--exclude were NOT specified via CLI - only_types = getattr(args, 'only', None) if SELECTIVE_AVAILABLE else None - exclude_types = getattr(args, 'exclude', None) if SELECTIVE_AVAILABLE else None - - if not only_types and not exclude_types: - config_rules_map = { - 'enable_ranks': 'MASK_RANKS', - 'enable_names': 'MASK_NAMES', - 'enable_ipn': 'MASK_IPN', - 'enable_passport': 'MASK_PASSPORT', - 'enable_military_id': 'MASK_MILITARY_ID', - 'enable_dates': 'MASK_DATES', - 'enable_brigades': 'MASK_BRIGADES', - 'enable_units': 'MASK_UNITS', - 'enable_orders': 'MASK_ORDERS', - 'enable_br_numbers': 'MASK_BR_NUMBERS', - } - for config_key, global_var in config_rules_map.items(): - value = getattr(masking_rules, config_key, None) - if value is not None: - globals()[global_var] = bool(value) - if logger: - logger.debug(f"Config: {config_key} = {value}") + if not only_types and not exclude_types: + return - # ================================================================ - # Apply system settings from config - # ================================================================ - if config is not None: - system_cfg = getattr(config, 'system', None) - if system_cfg is not None: - cfg_debug = getattr(system_cfg, 'debug_mode', None) - if cfg_debug is not None and not args.debug: - DEBUG_MODE = bool(cfg_debug) - - cfg_preserve_case = getattr(system_cfg, 'preserve_case', None) - if cfg_preserve_case is not None: - global PRESERVE_CASE - PRESERVE_CASE = bool(cfg_preserve_case) - - cfg_hash = getattr(system_cfg, 'hash_algorithm', None) - if cfg_hash is not None: - global HASH_ALGORITHM - HASH_ALGORITHM = str(cfg_hash) + type_flag_map = { + "names": "MASK_NAMES", + "ipn": "MASK_IPN", + "passport": "MASK_PASSPORT", + "military_id": "MASK_MILITARY_ID", + "ranks": "MASK_RANKS", + "brigades": "MASK_BRIGADES", + "units": "MASK_UNITS", + "orders": "MASK_ORDERS", + "br_numbers": "MASK_BR_NUMBERS", + "dates": "MASK_DATES", + } - if logger: - logger.info(f"Masking flags: NAMES={MASK_NAMES}, IPN={MASK_IPN}, " - f"PASSPORT={MASK_PASSPORT}, MILITARY_ID={MASK_MILITARY_ID}, " - f"RANKS={MASK_RANKS}, BRIGADES={MASK_BRIGADES}, " - f"UNITS={MASK_UNITS}, ORDERS={MASK_ORDERS}, " - f"BR_NUMBERS={MASK_BR_NUMBERS}, DATES={MASK_DATES}") + if only_types: + MASK_NAMES = False + MASK_IPN = False + MASK_PASSPORT = False + MASK_MILITARY_ID = False + MASK_RANKS = False + MASK_BRIGADES = False + MASK_UNITS = False + MASK_ORDERS = False + MASK_BR_NUMBERS = False + MASK_DATES = False + for t in only_types: + t_lower = t.lower() + if t_lower in type_flag_map: + globals()[type_flag_map[t_lower]] = True + else: + print(f"Warning: unknown type '{t}', ignoring") + if logger: + logger.warning(f"Unknown selective type: {t}") + print(f"Selective masking: --only {' '.join(only_types)}") + if logger: + logger.info(f"Selective masking: --only {' '.join(only_types)}") - # ================================================================ - # Input file handling - # ================================================================ - input_path = Path(args.input) - if not input_path.exists(): - print(f"Error: {args.input} not found") + elif exclude_types: + for t in exclude_types: + t_lower = t.lower() + if t_lower in type_flag_map: + globals()[type_flag_map[t_lower]] = False + else: + print(f"Warning: unknown type '{t}', ignoring") + if logger: + logger.warning(f"Unknown selective type: {t}") + print(f"Selective masking: --exclude {' '.join(exclude_types)}") if logger: - logger.error(f"Input file not found: {args.input}") + logger.info(f"Selective masking: --exclude {' '.join(exclude_types)}") + + +def _apply_config_settings(args, config, logger) -> None: + """Apply config-based masking rules and system settings. + + Modifies global variables (MASK_*, PRESERVE_CASE, HASH_ALGORITHM, + DEBUG_MODE) as needed. Config rules have lower priority than CLI + --only/--exclude. + """ + global DEBUG_MODE, PRESERVE_CASE, HASH_ALGORITHM + + if config is None: return + # Masking rules from config (only if --only/--exclude not specified) + masking_rules = getattr(config, 'masking_rules', None) + if masking_rules is not None: + only_types = getattr(args, 'only', None) if SELECTIVE_AVAILABLE else None + exclude_types = getattr(args, 'exclude', None) if SELECTIVE_AVAILABLE else None + + if not only_types and not exclude_types: + config_rules_map = { + 'enable_ranks': 'MASK_RANKS', + 'enable_names': 'MASK_NAMES', + 'enable_ipn': 'MASK_IPN', + 'enable_passport': 'MASK_PASSPORT', + 'enable_military_id': 'MASK_MILITARY_ID', + 'enable_dates': 'MASK_DATES', + 'enable_brigades': 'MASK_BRIGADES', + 'enable_units': 'MASK_UNITS', + 'enable_orders': 'MASK_ORDERS', + 'enable_br_numbers': 'MASK_BR_NUMBERS', + } + for config_key, global_var in config_rules_map.items(): + value = getattr(masking_rules, config_key, None) + if value is not None: + globals()[global_var] = bool(value) + if logger: + logger.debug(f"Config: {config_key} = {value}") + + # System settings from config + system_cfg = getattr(config, 'system', None) + if system_cfg is not None: + cfg_debug = getattr(system_cfg, 'debug_mode', None) + if cfg_debug is not None and not args.debug: + DEBUG_MODE = bool(cfg_debug) + + cfg_preserve_case = getattr(system_cfg, 'preserve_case', None) + if cfg_preserve_case is not None: + PRESERVE_CASE = bool(cfg_preserve_case) + + cfg_hash = getattr(system_cfg, 'hash_algorithm', None) + if cfg_hash is not None: + HASH_ALGORITHM = str(cfg_hash) + + +def _prepare_output_paths(args, input_path: Path) -> Tuple[Path, Path, Path, str, int]: + """Determine output, mapping, and report file paths. + + Returns: + Tuple of (output_path, map_path, report_path, timestamp, random_suffix) + """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") random_suffix = random.randint(100, 999) is_json = input_path.suffix.lower() == '.json' @@ -2230,45 +2256,40 @@ def main(): map_path = Path(f"masking_map_{timestamp}_{random_suffix}.json") report_path = Path(f"masking_report_{timestamp}_{random_suffix}.txt") - masking_dict = { - "version": __version__, - "timestamp": datetime.now().isoformat(), - "input_file": str(input_path), - "statistics": {}, - "mappings": {k: {} for k in [ - "ipn", "passport_id", "military_id", "surname", "name", - "military_unit", "order_number", "order_number_with_letters", - "br_number", "br_number_slash", "br_number_complex", - "rank", "brigade_number", "date", "date_text", "patronymic" - ]}, - "instance_tracking": {} - } - instance_counters = {} + return output_path, map_path, report_path, timestamp, random_suffix - print(f"Data Masking Script v{__version__}") - print(f"Processing {input_path}...") - print(f"Output format: {'JSON' if is_json else 'TXT'}") - if logger: - logger.info(f"Processing {input_path}") +def _read_input(input_path: Path, is_json: bool, logger): + """Read input file (JSON or text). + + Returns: + Parsed data or None on error. + """ try: + validate_file_size(input_path) with open(input_path, 'r', encoding='utf-8', newline='') as f: if is_json: - input_data = json.load(f) + return json.load(f) else: - input_data = f.read() - except Exception as e: + return f.read() + except (FileNotFoundError, PermissionError, OSError, json.JSONDecodeError, + UnicodeDecodeError, ValueError) as e: print(f"Error reading file: {e}") if logger: logger.error(f"Error reading file: {e}") - return + return None - # ================================================================ - # Masking: multi-pass re-masking or single-pass - # ================================================================ + +def _run_masking(input_data, is_json: bool, masking_dict: Dict, + instance_counters: Dict, args, logger, + timestamp: str, random_suffix: int) -> Tuple[Any, int]: + """Execute single-pass or multi-pass masking. + + Returns: + Tuple of (masked_data, total_unique_count) + """ re_mask_passes = getattr(args, 're_mask', None) - # Validate re-mask passes range if re_mask_passes is not None: if re_mask_passes < 2: print("Warning: --re-mask requires at least 2 passes, using single-pass mode") @@ -2282,202 +2303,351 @@ def main(): re_mask_passes = 10 if REMASK_AVAILABLE and re_mask_passes and re_mask_passes > 1: - # Multi-pass re-masking + return _run_multi_pass_masking( + input_data, is_json, masking_dict, args, logger, + re_mask_passes, timestamp, random_suffix + ) + else: + return _run_single_pass_masking( + input_data, is_json, masking_dict, instance_counters, logger + ) + + +def _run_multi_pass_masking(input_data, is_json: bool, masking_dict: Dict, + args, logger, re_mask_passes: int, + timestamp: str, random_suffix: int) -> Tuple[Any, int]: + """Execute multi-pass re-masking with chain tracking.""" + if logger: + logger.info(f"Starting multi-pass re-masking ({re_mask_passes} passes)") + + chain = MappingChain() + masked_data = input_data + + for pass_num in range(1, re_mask_passes + 1): + print(f" Прохід {pass_num}/{re_mask_passes}...") if logger: - logger.info(f"Starting multi-pass re-masking ({re_mask_passes} passes)") + logger.info(f"Re-masking pass {pass_num}/{re_mask_passes}") - chain = MappingChain() - masked_data = input_data + pass_dict = make_empty_masking_dict(__version__) + pass_counters = {} - for pass_num in range(1, re_mask_passes + 1): - print(f" Прохід {pass_num}/{re_mask_passes}...") - if logger: - logger.info(f"Re-masking pass {pass_num}/{re_mask_passes}") + if is_json: + masked_data = mask_json_recursive(masked_data, pass_dict, pass_counters) + else: + masked_data = mask_text_context_aware(masked_data, pass_dict, pass_counters) - pass_dict = make_empty_masking_dict(__version__) - pass_counters = {} + pass_dict["instance_tracking"] = pass_counters + for category, mappings in pass_dict["mappings"].items(): + pass_dict["statistics"][category] = len(mappings) + chain.add_pass(pass_dict) - if is_json: - masked_data = mask_json_recursive(masked_data, pass_dict, pass_counters) - else: - masked_data = mask_text_context_aware(masked_data, pass_dict, pass_counters) + chain_path = Path(f"masking_chain_{timestamp}_{random_suffix}.json") + chain.save(chain_path) + print(f" Chain mapping ({re_mask_passes} passes): {chain_path}") + if logger: + logger.info(f"Chain mapping saved to {chain_path}") - pass_dict["instance_tracking"] = pass_counters - for category, mappings in pass_dict["mappings"].items(): - pass_dict["statistics"][category] = len(mappings) - chain.add_pass(pass_dict) + masking_dict["instance_tracking"] = {} + total_unique = 0 + for p in chain.passes: + for cat, count in p.get("statistics", {}).items(): + masking_dict["statistics"][cat] = masking_dict["statistics"].get(cat, 0) + count + if cat != "total_masked": + total_unique += count + masking_dict["statistics"]["total_masked"] = total_unique - # Save chain mapping - chain_path = Path(f"masking_chain_{timestamp}_{random_suffix}.json") - chain.save(chain_path) - print(f" Chain mapping ({re_mask_passes} passes): {chain_path}") - if logger: - logger.info(f"Chain mapping saved to {chain_path}") - - # Also save combined stats to masking_dict for report - masking_dict["instance_tracking"] = {} - total_unique = 0 - for p in chain.passes: - for cat, count in p.get("statistics", {}).items(): - masking_dict["statistics"][cat] = masking_dict["statistics"].get(cat, 0) + count - if cat != "total_masked": - total_unique += count - masking_dict["statistics"]["total_masked"] = total_unique + return masked_data, total_unique + + +def _run_single_pass_masking(input_data, is_json: bool, masking_dict: Dict, + instance_counters: Dict, logger) -> Tuple[Any, int]: + """Execute single-pass masking.""" + if logger: + logger.info("Starting single-pass masking") + + if is_json: + masked_data = mask_json_recursive(input_data, masking_dict, instance_counters) else: - # Single-pass masking (original logic) - if logger: - logger.info("Starting single-pass masking") + masked_data = mask_text_context_aware(input_data, masking_dict, instance_counters) - if is_json: - masked_data = mask_json_recursive(input_data, masking_dict, instance_counters) + masking_dict["instance_tracking"] = instance_counters + + total_unique = 0 + for category, mappings in masking_dict["mappings"].items(): + count = len(mappings) + masking_dict["statistics"][category] = count + total_unique += count + masking_dict["statistics"]["total_masked"] = total_unique + + return masked_data, total_unique + + +def _handle_encryption(args, config, masking_dict: Dict, map_path: Path, + logger) -> None: + """Encrypt the mapping file if --encrypt is requested.""" + if not SECURITY_AVAILABLE or not getattr(args, 'encrypt', False): + return + + enc_path = map_path.with_suffix('.enc') + password = getattr(args, 'password', None) + + if not password: + password_env = getattr(args, 'password_env', None) + if password_env: + password = os.environ.get(password_env) + if not password: + print(f"Warning: environment variable '{password_env}' is not set or empty") + if logger: + logger.warning(f"Environment variable '{password_env}' is not set or empty") + password = generate_password_from_config(config) + print(f" Generated password: {password}") else: - masked_data = mask_text_context_aware(input_data, masking_dict, instance_counters) + password = generate_password_from_config(config) + print(f" Generated password: {password}") - # ФОРМУВАННЯ ПОВНОЇ СТРУКТУРИ JSON - masking_dict["instance_tracking"] = instance_counters + manager = MappingSecurityManager() + manager.encrypt_mapping(masking_dict, password, enc_path) + print(f" Encrypted mapping: {enc_path}") + if logger: + logger.info(f"Mapping encrypted to {enc_path}") + + +def _write_report(report_path: Path, masking_dict: Dict, input_path: Path, + output_path: Path, is_json: bool, total_unique: int, + re_mask_passes, args, config) -> None: + """Generate the detailed masking report file.""" + with open(report_path, 'w', encoding='utf-8') as f: + f.write("=" * 60 + "\n") + f.write("ЗВІТ МАСКУВАННЯ ДАНИХ\n") + f.write("=" * 60 + "\n\n") + f.write(f"Версія: {__version__}\n") + f.write(f"Дата та час: {masking_dict['timestamp']}\n") + f.write(f"Вхідний файл: {input_path}\n") + f.write(f"Вихідний файл: {output_path}\n") + f.write(f"Формат: {'JSON' if is_json else 'TXT'}\n") + + f.write("\n" + "-" * 60 + "\n") + f.write("КОНФІГУРАЦІЯ МАСКУВАННЯ\n") + f.write("-" * 60 + "\n\n") + f.write(f" Імена (MASK_NAMES): {MASK_NAMES}\n") + f.write(f" ІПН (MASK_IPN): {MASK_IPN}\n") + f.write(f" Паспорти (MASK_PASSPORT): {MASK_PASSPORT}\n") + f.write(f" Військові ID (MASK_MILITARY_ID): {MASK_MILITARY_ID}\n") + f.write(f" Звання (MASK_RANKS): {MASK_RANKS}\n") + f.write(f" Бригади (MASK_BRIGADES): {MASK_BRIGADES}\n") + f.write(f" Частини (MASK_UNITS): {MASK_UNITS}\n") + f.write(f" Накази (MASK_ORDERS): {MASK_ORDERS}\n") + f.write(f" БР номери (MASK_BR_NUMBERS): {MASK_BR_NUMBERS}\n") + f.write(f" Дати (MASK_DATES): {MASK_DATES}\n") + f.write(f" Збереження регістру (PRESERVE_CASE): {PRESERVE_CASE}\n") + f.write(f" Алгоритм хешування: {HASH_ALGORITHM}\n") + + if re_mask_passes and re_mask_passes > 1: + f.write(f"\n Режим: повторне маскування ({re_mask_passes} проходів)\n") + else: + f.write(f"\n Режим: одинарне маскування\n") - total_unique = 0 - for category, mappings in masking_dict["mappings"].items(): - count = len(mappings) - masking_dict["statistics"][category] = count - total_unique += count - masking_dict["statistics"]["total_masked"] = total_unique + if config is not None: + f.write(f" Конфігурація: {args.config or 'config.yaml'}\n") + + f.write("\n" + "-" * 60 + "\n") + f.write("СТАТИСТИКА МАСКУВАННЯ\n") + f.write("-" * 60 + "\n\n") + f.write(f"Загальна кількість УНІКАЛЬНИХ замаскованих елементів: {total_unique}\n\n") + + for key, value in sorted(masking_dict["statistics"].items()): + if key != "total_masked" and value > 0: + f.write(f" • {key}: {value} (унікальних оригіналів)\n") + + f.write("\n" + "=" * 60 + "\n") + f.write("СТАТИСТИКА ВХОДЖЕНЬ (Instance Tracking)\n") + f.write("-" * 60 + "\n\n") + + sorted_report_instances = sorted( + masking_dict["instance_tracking"].items(), + key=lambda x: x[1], + reverse=True + ) + for masked_val, count in sorted_report_instances: + f.write(f" • '{masked_val}': {count} входжень\n") + + f.write("\n" + "=" * 60 + "\n") + f.write(f"Кінець звіту. Всього записів у instance tracking: " + f"{len(masking_dict['instance_tracking'])}\n") + f.write("=" * 60 + "\n") + + +def _print_summary(masking_dict: Dict, total_unique: int, + output_path: Path, map_path: Path, + report_path: Optional[Path], logger) -> None: + """Print masking summary to console.""" + print() + print("✅ Маскування завершено успішно!") + print() + print(f"📊 Статистика:") + print(f" Загальна кількість УНІКАЛЬНИХ замаскованих елементів: {total_unique}") + for key, value in sorted(masking_dict["statistics"].items()): + if key != "total_masked" and value > 0: + print(f" • {key}: {value} (унікальних оригіналів)") + print() + print(f" Статистика входжень (Instance Tracking):") + sorted_instances = sorted(masking_dict["instance_tracking"].items(), key=lambda x: x[1], reverse=True) + for masked_val, count in sorted_instances[:10]: + print(f" • '{masked_val}': {count} входжень") + if len(sorted_instances) > 10: + print(f" ... та ще {len(sorted_instances) - 10} записів") + + print() + print(f"📁 Файли збережено:") + print(f" • Замасковані дані: {output_path.absolute()}") + print(f" • Словник замін: {map_path.absolute()}") + if report_path: + print(f" • Звіт: {report_path.absolute()}") + + if logger: + logger.info(f"Masking completed: {total_unique} unique items masked") + logger.info(f"Output: {output_path.absolute()}") + logger.info(f"Mapping: {map_path.absolute()}") + + +def _save_results(masked_data, is_json: bool, masking_dict: Dict, + output_path: Path, map_path: Path, report_path: Path, + total_unique: int, args, config, logger) -> None: + """Save masked output, mapping, optional encryption, and report.""" + re_mask_passes = getattr(args, 're_mask', None) - # ================================================================ - # Save results - # ================================================================ try: - # Збереження результату with open(output_path, 'w', encoding='utf-8', newline='') as f: if is_json: json.dump(masked_data, f, ensure_ascii=False, indent=2) else: f.write(masked_data) - # Збереження мапи (single-pass only; chain saves its own file) + # Save mapping (single-pass only; chain saves its own file) if not (REMASK_AVAILABLE and re_mask_passes and re_mask_passes > 1): with open(map_path, 'w', encoding='utf-8') as f: json.dump(masking_dict, f, ensure_ascii=False, indent=2) - # Encrypt mapping if requested - if SECURITY_AVAILABLE and getattr(args, 'encrypt', False): - enc_path = map_path.with_suffix('.enc') - # Password priority: --password > --password-env > auto-generate - password = getattr(args, 'password', None) - if not password: - password_env = getattr(args, 'password_env', None) - if password_env: - password = os.environ.get(password_env) - if not password: - print(f"Warning: environment variable '{password_env}' is not set or empty") - password = generate_password_from_config(config) - print(f" Generated password: {password}") - else: - password = generate_password_from_config(config) - print(f" Generated password: {password}") - manager = MappingSecurityManager() - manager.encrypt_mapping(masking_dict, password, enc_path) - print(f" Encrypted mapping: {enc_path}") - if logger: - logger.info(f"Mapping encrypted to {enc_path}") + _handle_encryption(args, config, masking_dict, map_path, logger) - # ГЕНЕРАЦІЯ ДЕТАЛЬНОГО ЗВІТУ - if not args.no_report: - with open(report_path, 'w', encoding='utf-8') as f: - f.write("=" * 60 + "\n") - f.write("ЗВІТ МАСКУВАННЯ ДАНИХ\n") - f.write("=" * 60 + "\n\n") - f.write(f"Версія: {__version__}\n") - f.write(f"Дата та час: {masking_dict['timestamp']}\n") - f.write(f"Вхідний файл: {input_path}\n") - f.write(f"Вихідний файл: {output_path}\n") - f.write(f"Формат: {'JSON' if is_json else 'TXT'}\n") - - # Конфігурація маскування - f.write("\n" + "-" * 60 + "\n") - f.write("КОНФІГУРАЦІЯ МАСКУВАННЯ\n") - f.write("-" * 60 + "\n\n") - f.write(f" Імена (MASK_NAMES): {MASK_NAMES}\n") - f.write(f" ІПН (MASK_IPN): {MASK_IPN}\n") - f.write(f" Паспорти (MASK_PASSPORT): {MASK_PASSPORT}\n") - f.write(f" Військові ID (MASK_MILITARY_ID): {MASK_MILITARY_ID}\n") - f.write(f" Звання (MASK_RANKS): {MASK_RANKS}\n") - f.write(f" Бригади (MASK_BRIGADES): {MASK_BRIGADES}\n") - f.write(f" Частини (MASK_UNITS): {MASK_UNITS}\n") - f.write(f" Накази (MASK_ORDERS): {MASK_ORDERS}\n") - f.write(f" БР номери (MASK_BR_NUMBERS): {MASK_BR_NUMBERS}\n") - f.write(f" Дати (MASK_DATES): {MASK_DATES}\n") - f.write(f" Збереження регістру (PRESERVE_CASE): {PRESERVE_CASE}\n") - f.write(f" Алгоритм хешування: {HASH_ALGORITHM}\n") - - if re_mask_passes and re_mask_passes > 1: - f.write(f"\n Режим: повторне маскування ({re_mask_passes} проходів)\n") - else: - f.write(f"\n Режим: одинарне маскування\n") - - if config is not None: - f.write(f" Конфігурація: {args.config or 'config.yaml'}\n") - - f.write("\n" + "-" * 60 + "\n") - f.write("СТАТИСТИКА МАСКУВАННЯ\n") - f.write("-" * 60 + "\n\n") - f.write(f"Загальна кількість УНІКАЛЬНИХ замаскованих елементів: {total_unique}\n\n") - - for key, value in sorted(masking_dict["statistics"].items()): - if key != "total_masked" and value > 0: - f.write(f" • {key}: {value} (унікальних оригіналів)\n") - - f.write("\n" + "=" * 60 + "\n") - f.write("СТАТИСТИКА ВХОДЖЕНЬ (Instance Tracking)\n") - f.write("-" * 60 + "\n\n") - - sorted_report_instances = sorted( - masking_dict["instance_tracking"].items(), - key=lambda x: x[1], - reverse=True - ) - for masked_val, count in sorted_report_instances: - f.write(f" • '{masked_val}': {count} входжень\n") - - f.write("\n" + "=" * 60 + "\n") - f.write(f"Кінець звіту. Всього записів у instance tracking: " - f"{len(masking_dict['instance_tracking'])}\n") - f.write("=" * 60 + "\n") - - # ДЕТАЛЬНИЙ ВИВІД У КОНСОЛЬ - print() - print("✅ Маскування завершено успішно!") - print() - print(f"📊 Статистика:") - print(f" Загальна кількість УНІКАЛЬНИХ замаскованих елементів: {total_unique}") - for key, value in sorted(masking_dict["statistics"].items()): - if key != "total_masked" and value > 0: - print(f" • {key}: {value} (унікальних оригіналів)") - print() - print(f" Статистика входжень (Instance Tracking):") - # Виводимо тільки топ-10 для консолі, щоб не засмічувати - sorted_instances = sorted(masking_dict["instance_tracking"].items(), key=lambda x: x[1], reverse=True) - for masked_val, count in sorted_instances[:10]: - print(f" • '{masked_val}': {count} входжень") - if len(sorted_instances) > 10: - print(f" ... та ще {len(sorted_instances) - 10} записів") - - print() - print(f"📁 Файли збережено:") - print(f" • Замасковані дані: {output_path.absolute()}") - print(f" • Словник замін: {map_path.absolute()}") + # Generate report if not args.no_report: - print(f" • Звіт: {report_path.absolute()}") + _write_report(report_path, masking_dict, Path(args.input), + output_path, is_json, total_unique, + re_mask_passes, args, config) - if logger: - logger.info(f"Masking completed: {total_unique} unique items masked") - logger.info(f"Output: {output_path.absolute()}") - logger.info(f"Mapping: {map_path.absolute()}") + _print_summary( + masking_dict, total_unique, output_path, map_path, + report_path if not args.no_report else None, logger + ) - except Exception as e: + except (OSError, PermissionError, json.JSONDecodeError, UnicodeEncodeError) as e: print(f"❌ Помилка збереження файлів: {e}") if logger: logger.error(f"Error saving files: {e}") + +def main(): + parser = _build_parser() + args = parser.parse_args() + + # ================================================================ + # Handle --init-config (early exit) + # ================================================================ + if args.init_config: + generate_default_config("config.yaml") + print("Generated config.yaml") + return + + # ================================================================ + # Handle --list-types (early exit) + # ================================================================ + if SELECTIVE_AVAILABLE and getattr(args, 'list_types', False): + types = sorted(get_available_types()) + print("Available masking types:") + for t in types: + print(f" - {t}") + return + + # ================================================================ + # Setup: config, logging, filters, system settings + # ================================================================ + config = _load_config(args) + logger = _setup_logger(args, config) + + global DEBUG_MODE + if args.debug: + DEBUG_MODE = True + + _apply_selective_filters(args, logger) + _apply_config_settings(args, config, logger) + + if logger: + logger.info(f"Masking flags: NAMES={MASK_NAMES}, IPN={MASK_IPN}, " + f"PASSPORT={MASK_PASSPORT}, MILITARY_ID={MASK_MILITARY_ID}, " + f"RANKS={MASK_RANKS}, BRIGADES={MASK_BRIGADES}, " + f"UNITS={MASK_UNITS}, ORDERS={MASK_ORDERS}, " + f"BR_NUMBERS={MASK_BR_NUMBERS}, DATES={MASK_DATES}") + + # ================================================================ + # Input file validation and path setup + # ================================================================ + input_path = Path(args.input) + if not input_path.exists(): + print(f"Error: {args.input} not found") + if logger: + logger.error(f"Input file not found: {args.input}") + return + + output_path, map_path, report_path, timestamp, random_suffix = \ + _prepare_output_paths(args, input_path) + + is_json = input_path.suffix.lower() == '.json' + + masking_dict = { + "version": __version__, + "timestamp": datetime.now().isoformat(), + "input_file": str(input_path), + "statistics": {}, + "mappings": {k: {} for k in [ + "ipn", "passport_id", "military_id", "surname", "name", + "military_unit", "order_number", "order_number_with_letters", + "br_number", "br_number_slash", "br_number_complex", + "rank", "brigade_number", "date", "date_text", "patronymic" + ]}, + "instance_tracking": {} + } + instance_counters = {} + + # ================================================================ + # Read input + # ================================================================ + print(f"Data Masking Script v{__version__}") + print(f"Processing {input_path}...") + print(f"Output format: {'JSON' if is_json else 'TXT'}") + if logger: + logger.info(f"Processing {input_path}") + + input_data = _read_input(input_path, is_json, logger) + if input_data is None: + return + + # ================================================================ + # Run masking pipeline + # ================================================================ + masked_data, total_unique = _run_masking( + input_data, is_json, masking_dict, instance_counters, + args, logger, timestamp, random_suffix + ) + + # ================================================================ + # Save results + # ================================================================ + _save_results( + masked_data, is_json, masking_dict, + output_path, map_path, report_path, + total_unique, args, config, logger + ) + if __name__ == "__main__": main() \ No newline at end of file diff --git a/diagnose_mapping.py b/diagnose_mapping.py index 00e9a3a..1c31035 100644 --- a/diagnose_mapping.py +++ b/diagnose_mapping.py @@ -91,7 +91,7 @@ def load_json(path: Path) -> Optional[Dict]: try: with open(path, 'r', encoding='utf-8') as f: return json.load(f) - except Exception as e: + except (FileNotFoundError, PermissionError, OSError, json.JSONDecodeError, UnicodeDecodeError) as e: print(f"❌ Помилка читання JSON {path}: {e}") return None @@ -308,7 +308,7 @@ def verify_text_recovery(original_path: Path, recovery_path: Path, ignore_flags: f2.seek(0) lines_orig = f1.readlines() lines_rec = f2.readlines() - except Exception as e: + except (FileNotFoundError, PermissionError, OSError, UnicodeDecodeError) as e: print(f"❌ Помилка читання файлів: {e}") return @@ -409,7 +409,7 @@ def verify_text_recovery(original_path: Path, recovery_path: Path, ignore_flags: # MAIN (Точка входу) # ============================================================================ -def main(): +def main() -> None: parser = argparse.ArgumentParser( description="Mapping diagnostics, comparison and text verification utility.", formatter_class=argparse.RawTextHelpFormatter diff --git a/modules/config.py b/modules/config.py index 79fc703..2190cf6 100644 --- a/modules/config.py +++ b/modules/config.py @@ -82,6 +82,14 @@ class MaskingRulesConfig: enable_units: bool = True enable_orders: bool = True enable_br_numbers: bool = True + # Tuning parameters + rank_shift_options: List[int] = field(default_factory=lambda: [-2, -1, 1, 2]) + date_shift_days: int = 30 + date_year_min: int = 2015 + date_year_max: int = 2035 + brigade_number_max: int = 160 + max_masking_iterations: int = 10 + name_generation_max_attempts: int = 50 @dataclass @@ -251,7 +259,8 @@ def _load_yaml(self, path: str) -> Optional[Dict[str, Any]]: return None logger.info("Loaded YAML config from %s", filepath) return data - except Exception as exc: + except (FileNotFoundError, PermissionError, OSError, ValueError, + yaml.YAMLError) as exc: # type: ignore[union-attr] logger.error("Failed to load YAML config: %s", exc) return None diff --git a/modules/tools.py b/modules/tools.py index 1962421..3e80853 100644 --- a/modules/tools.py +++ b/modules/tools.py @@ -47,7 +47,7 @@ import random import re from datetime import datetime, timedelta -from typing import Dict, Optional +from typing import Dict, Optional, Tuple # ============================================================================ # METADATA @@ -356,7 +356,7 @@ def init_instance_counters() -> Dict: # INTERNAL RANK HELPERS # ============================================================================ -def _get_rank_category_and_match(text: str): +def _get_rank_category_and_match(text: str) -> Tuple[Optional[str], Optional[str]]: """Detect the rank category and matched text from *text*. Returns: @@ -371,7 +371,7 @@ def _get_rank_category_and_match(text: str): return None, None -def _get_rank_info(rank_form: str): +def _get_rank_info(rank_form: str) -> Tuple[Optional[str], Optional[str], Optional[str]]: """Look up a rank form in RANK_TO_NOMINATIVE. Args: @@ -538,9 +538,7 @@ def mask_date_direct( ) masked = new_date.strftime("%d.%m.%Y") - except (ValueError, OverflowError): - return value - except Exception: + except (ValueError, OverflowError, TypeError, AttributeError): return value return add_to_mapping(masking_dict, instance_counters, entity_type, diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..1ed6af8 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,16 @@ +[mypy] +python_version = 3.9 +warn_return_any = True +warn_unused_configs = True +ignore_missing_imports = True +check_untyped_defs = False +disallow_untyped_defs = False +no_implicit_optional = True + +# Strict for modules/ package +[mypy-modules.*] +check_untyped_defs = True + +# Ignore test files +[mypy-tests.*] +ignore_errors = True diff --git a/tests/test_masking_logger.py b/tests/test_masking_logger.py new file mode 100644 index 0000000..0802a74 --- /dev/null +++ b/tests/test_masking_logger.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +"""Tests for modules/masking_logger.py""" + +import json +import logging +import os +import tempfile +import pytest + +from modules.masking_logger import ( + JsonFormatter, + ConsoleFormatter, + MaskingLogger, + setup_logging, +) + + +@pytest.fixture(autouse=True) +def _clean_loggers(): + """Remove handlers added during tests to avoid side effects.""" + yield + for name in ("test_masking", "data_masking", "test_json", "test_console"): + logger = logging.getLogger(name) + logger.handlers.clear() + + +class TestJsonFormatter: + """Tests for JsonFormatter.""" + + def test_basic_format(self): + fmt = JsonFormatter() + record = logging.LogRecord( + name="test", level=logging.INFO, pathname="", lineno=0, + msg="hello %s", args=("world",), exc_info=None, + ) + result = fmt.format(record) + parsed = json.loads(result) + assert parsed["level"] == "INFO" + assert parsed["message"] == "hello world" + assert "timestamp" in parsed + + def test_exception_info(self): + fmt = JsonFormatter() + try: + raise ValueError("test error") + except ValueError: + import sys + exc_info = sys.exc_info() + record = logging.LogRecord( + name="test", level=logging.ERROR, pathname="", lineno=0, + msg="error occurred", args=(), exc_info=exc_info, + ) + result = fmt.format(record) + parsed = json.loads(result) + assert "exception" in parsed + assert parsed["exception"]["type"] == "ValueError" + assert "test error" in parsed["exception"]["message"] + + def test_extra_data(self): + fmt = JsonFormatter() + record = logging.LogRecord( + name="test", level=logging.INFO, pathname="", lineno=0, + msg="with data", args=(), exc_info=None, + ) + record.data = {"key": "value"} + result = fmt.format(record) + parsed = json.loads(result) + assert parsed["data"] == {"key": "value"} + + +class TestConsoleFormatter: + """Tests for ConsoleFormatter.""" + + def test_basic_format(self): + fmt = ConsoleFormatter() + record = logging.LogRecord( + name="test", level=logging.INFO, pathname="", lineno=0, + msg="hello", args=(), exc_info=None, + ) + result = fmt.format(record) + assert "hello" in result + assert "INFO" in result + + def test_color_codes(self): + fmt = ConsoleFormatter() + for level_name in ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"): + level = getattr(logging, level_name) + record = logging.LogRecord( + name="test", level=level, pathname="", lineno=0, + msg="msg", args=(), exc_info=None, + ) + result = fmt.format(record) + assert level_name in result + + +class TestMaskingLogger: + """Tests for MaskingLogger.""" + + def test_creation(self): + ml = MaskingLogger(name="test_masking", level="DEBUG") + assert ml.logger.level == logging.DEBUG + + def test_log_levels(self, capsys): + ml = MaskingLogger(name="test_masking", level="DEBUG", format_type="console") + ml.debug("debug msg") + ml.info("info msg") + ml.warning("warn msg") + ml.error("error msg") + ml.critical("critical msg") + # No assertion on output — just ensure no exceptions + + def test_json_format(self): + ml = MaskingLogger(name="test_json", level="DEBUG", format_type="json") + ml.info("test message") + # Just ensure no exceptions + + def test_file_logging(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.log', delete=False) as f: + log_path = f.name + try: + ml = MaskingLogger( + name="test_console", level="INFO", + format_type="console", log_file=log_path + ) + ml.info("file test message") + # Force flush + for handler in ml.logger.handlers: + handler.flush() + with open(log_path, 'r', encoding='utf-8') as f: + content = f.read() + assert "file test message" in content + finally: + os.unlink(log_path) + + def test_stats_tracking(self): + ml = MaskingLogger(name="test_masking", level="INFO") + assert ml._stats["entities_masked"] == 0 + assert ml._stats["collisions"] == 0 + + def test_log_with_data(self): + ml = MaskingLogger(name="test_masking", level="DEBUG", format_type="json") + ml.info("test", data={"count": 42}) + # Just ensure no exceptions + + +class TestSetupLogging: + """Tests for setup_logging() convenience function.""" + + def test_default_setup(self): + logger = setup_logging(level="WARNING") + assert logger is not None + + def test_setup_with_file(self): + with tempfile.NamedTemporaryFile(suffix='.log', delete=False) as f: + log_path = f.name + try: + logger = setup_logging(level="INFO", log_file=log_path) + assert logger is not None + finally: + os.unlink(log_path) diff --git a/tests/test_password_generator.py b/tests/test_password_generator.py new file mode 100644 index 0000000..43db93c --- /dev/null +++ b/tests/test_password_generator.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +"""Tests for modules/password_generator.py""" + +import string +import pytest + +from modules.password_generator import ( + generate_password, + generate_password_from_config, + generate_passwords, + PasswordConfig, + CYRILLIC_UPPER, + CYRILLIC_LOWER, + DEFAULT_SPECIAL_CHARS, +) + + +class TestGeneratePassword: + """Tests for generate_password() function.""" + + def test_default_length(self): + pwd = generate_password() + assert len(pwd) == 24 + + def test_custom_length(self): + for length in (8, 16, 32, 64): + pwd = generate_password(length=length) + assert len(pwd) == length + + def test_only_digits(self): + pwd = generate_password( + length=20, + include_ascii_upper=False, + include_ascii_lower=False, + include_special=False, + ) + assert all(c in string.digits for c in pwd) + + def test_only_uppercase(self): + pwd = generate_password( + length=20, + include_ascii_lower=False, + include_digits=False, + include_special=False, + ) + assert all(c in string.ascii_uppercase for c in pwd) + + def test_with_cyrillic(self): + pwd = generate_password( + length=50, + include_cyrillic_lower=True, + include_cyrillic_upper=True, + ) + has_cyrillic = any( + c in CYRILLIC_UPPER or c in CYRILLIC_LOWER for c in pwd + ) + assert has_cyrillic + + def test_custom_chars(self): + custom = "!@#" + pwd = generate_password( + length=50, + include_ascii_upper=False, + include_ascii_lower=False, + include_digits=False, + include_special=False, + custom_chars=custom, + ) + assert all(c in custom for c in pwd) + + def test_ensure_variety(self): + """With ensure_variety=True, password should contain chars from each enabled set.""" + # Run multiple times to reduce flakiness + found_all = False + for _ in range(10): + pwd = generate_password(length=24, ensure_variety=True) + has_upper = any(c in string.ascii_uppercase for c in pwd) + has_lower = any(c in string.ascii_lowercase for c in pwd) + has_digit = any(c in string.digits for c in pwd) + has_special = any(c in DEFAULT_SPECIAL_CHARS for c in pwd) + if has_upper and has_lower and has_digit and has_special: + found_all = True + break + assert found_all, "ensure_variety should include chars from all enabled sets" + + def test_no_special_chars(self): + pwd = generate_password(length=100, include_special=False) + assert not any(c in DEFAULT_SPECIAL_CHARS for c in pwd) + + def test_empty_charset_fallback(self): + """When nothing is selected, should fallback to ascii_letters + digits.""" + pwd = generate_password( + length=20, + include_ascii_upper=False, + include_ascii_lower=False, + include_digits=False, + include_special=False, + ) + assert len(pwd) == 20 + assert all(c in (string.ascii_letters + string.digits) for c in pwd) + + def test_uniqueness(self): + """Generated passwords should be unique (cryptographically random).""" + passwords = {generate_password() for _ in range(100)} + assert len(passwords) == 100 + + def test_minimum_length(self): + pwd = generate_password(length=1) + assert len(pwd) == 1 + + +class TestGeneratePasswords: + """Tests for generate_passwords() function.""" + + def test_count(self): + passwords = generate_passwords(count=5) + assert len(passwords) == 5 + + def test_count_one(self): + passwords = generate_passwords(count=1) + assert len(passwords) == 1 + + def test_count_zero_becomes_one(self): + passwords = generate_passwords(count=0) + assert len(passwords) == 1 + + def test_all_correct_length(self): + passwords = generate_passwords(count=10, length=16) + assert all(len(p) == 16 for p in passwords) + + def test_all_unique(self): + passwords = generate_passwords(count=50) + assert len(set(passwords)) == 50 + + +class TestPasswordConfig: + """Tests for PasswordConfig dataclass.""" + + def test_defaults(self): + cfg = PasswordConfig() + assert cfg.length == 24 + assert cfg.include_ascii_upper is True + assert cfg.include_ascii_lower is True + assert cfg.include_digits is True + assert cfg.include_special is True + assert cfg.include_cyrillic_upper is False + assert cfg.include_cyrillic_lower is False + assert cfg.ensure_variety is True + + def test_custom_config(self): + cfg = PasswordConfig(length=32, include_cyrillic_lower=True) + assert cfg.length == 32 + assert cfg.include_cyrillic_lower is True + + def test_charset_info(self): + cfg = PasswordConfig() + info = cfg.get_charset_info() + assert info['total_chars'] > 0 + assert info['entropy_bits'] > 0 + assert info['bits_per_char'] > 0 + assert len(info['components']) > 0 + + def test_charset_info_digits_only(self): + cfg = PasswordConfig( + include_ascii_upper=False, + include_ascii_lower=False, + include_special=False, + ) + info = cfg.get_charset_info() + assert info['total_chars'] == 10 + + def test_charset_info_with_cyrillic(self): + cfg = PasswordConfig( + include_cyrillic_upper=True, + include_cyrillic_lower=True, + ) + info = cfg.get_charset_info() + assert info['total_chars'] > 100 # ASCII + cyrillic + + +class TestGeneratePasswordFromConfig: + """Tests for generate_password_from_config() function.""" + + def test_basic_config(self): + cfg = PasswordConfig(length=16) + pwd = generate_password_from_config(cfg) + assert pwd is not None + assert len(pwd) == 16 + + def test_auto_generate_false(self): + cfg = PasswordConfig(auto_generate=False) + result = generate_password_from_config(cfg) + assert result is None + + def test_cyrillic_config(self): + cfg = PasswordConfig( + length=50, + include_cyrillic_lower=True, + ) + pwd = generate_password_from_config(cfg) + assert pwd is not None + has_cyrillic = any(c in CYRILLIC_LOWER for c in pwd) + assert has_cyrillic + + +class TestConstants: + """Tests for module constants.""" + + def test_cyrillic_upper_length(self): + assert len(CYRILLIC_UPPER) == 33 # Ukrainian alphabet + + def test_cyrillic_lower_length(self): + assert len(CYRILLIC_LOWER) == 33 + + def test_special_chars_not_empty(self): + assert len(DEFAULT_SPECIAL_CHARS) > 0 diff --git a/tests/test_selective.py b/tests/test_selective.py new file mode 100644 index 0000000..6991bf6 --- /dev/null +++ b/tests/test_selective.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +"""Tests for modules/selective.py""" + +import pytest + +from modules.selective import ( + AVAILABLE_TYPES, + TYPE_ALIASES, + TYPE_GROUPS, + SelectiveFilter, + get_available_types, + parse_type_list, + apply_filter_to_globals, +) + + +class TestAvailableTypes: + """Tests for available type definitions.""" + + def test_available_types_not_empty(self): + assert len(AVAILABLE_TYPES) > 0 + + def test_core_types_present(self): + for t in ("ipn", "passport", "surname", "rank", "date"): + assert t in AVAILABLE_TYPES + + def test_get_available_types(self): + result = get_available_types() + assert isinstance(result, (set, list)) + assert len(result) == len(AVAILABLE_TYPES) + + +class TestTypeAliases: + """Tests for type alias resolution.""" + + def test_plural_aliases(self): + assert TYPE_ALIASES["ipns"] == "ipn" + assert TYPE_ALIASES["passports"] == "passport" + assert TYPE_ALIASES["ranks"] == "rank" + assert TYPE_ALIASES["dates"] == "date" + + def test_ukrainian_aliases(self): + assert TYPE_ALIASES["іпн"] == "ipn" + assert TYPE_ALIASES["паспорт"] == "passport" + assert TYPE_ALIASES["звання"] == "rank" + assert TYPE_ALIASES["дата"] == "date" + + def test_shortcut_aliases(self): + assert TYPE_ALIASES["unit"] == "military_unit" + assert TYPE_ALIASES["order"] == "order_number" + assert TYPE_ALIASES["mid"] == "military_id" + + def test_all_aliases_resolve_to_available_types(self): + for alias, canonical in TYPE_ALIASES.items(): + assert canonical in AVAILABLE_TYPES, ( + f"Alias '{alias}' resolves to '{canonical}' " + f"which is not in AVAILABLE_TYPES" + ) + + +class TestTypeGroups: + """Tests for type group definitions.""" + + def test_personal_group(self): + assert "surname" in TYPE_GROUPS["personal"] + assert "name" in TYPE_GROUPS["personal"] + assert "patronymic" in TYPE_GROUPS["personal"] + + def test_ids_group(self): + assert "ipn" in TYPE_GROUPS["ids"] + assert "passport" in TYPE_GROUPS["ids"] + + def test_all_group_matches_available(self): + assert TYPE_GROUPS["all"] == AVAILABLE_TYPES + + +class TestSelectiveFilter: + """Tests for SelectiveFilter dataclass.""" + + def test_default_enables_all(self): + sf = SelectiveFilter() + assert sf.enabled_types == AVAILABLE_TYPES + + def test_only_filter(self): + sf = SelectiveFilter(only_types={"ipn", "passport"}) + assert sf.is_enabled("ipn") + assert sf.is_enabled("passport") + assert not sf.is_enabled("rank") + assert not sf.is_enabled("date") + + def test_exclude_filter(self): + sf = SelectiveFilter(exclude_types={"date", "rank"}) + assert not sf.is_enabled("date") + assert not sf.is_enabled("rank") + assert sf.is_enabled("ipn") + assert sf.is_enabled("surname") + + def test_both_only_and_exclude_raises(self): + with pytest.raises(ValueError, match="Cannot use both"): + SelectiveFilter(only_types={"ipn"}, exclude_types={"date"}) + + def test_unknown_type_in_only_raises(self): + with pytest.raises(ValueError, match="Unknown masking type"): + SelectiveFilter(only_types={"nonexistent_type"}) + + def test_unknown_type_in_exclude_raises(self): + with pytest.raises(ValueError, match="Unknown masking type"): + SelectiveFilter(exclude_types={"nonexistent_type"}) + + def test_get_enabled_list(self): + sf = SelectiveFilter(only_types={"ipn", "date"}) + enabled = sf.get_enabled_list() + assert sorted(enabled) == ["date", "ipn"] + + def test_get_disabled_list(self): + sf = SelectiveFilter(only_types={"ipn"}) + disabled = sf.get_disabled_list() + assert "ipn" not in disabled + assert len(disabled) == len(AVAILABLE_TYPES) - 1 + + def test_to_dict(self): + sf = SelectiveFilter(only_types={"ipn", "rank"}) + d = sf.to_dict() + assert "enabled_types" in d + assert "disabled_types" in d + assert "only_types" in d + + def test_from_dict_roundtrip(self): + sf = SelectiveFilter(only_types={"ipn", "passport"}) + d = sf.to_dict() + sf2 = SelectiveFilter.from_dict(d) + assert sf.enabled_types == sf2.enabled_types + + def test_exclude_roundtrip(self): + sf = SelectiveFilter(exclude_types={"date"}) + d = sf.to_dict() + sf2 = SelectiveFilter.from_dict(d) + assert sf.enabled_types == sf2.enabled_types + + def test_empty_only_set(self): + """Empty only_types is treated as no filter (falsy empty set).""" + sf = SelectiveFilter(only_types=set()) + # Empty set is falsy, so _compute_enabled returns all types + assert sf.enabled_types == AVAILABLE_TYPES + + def test_single_type_only(self): + sf = SelectiveFilter(only_types={"surname"}) + assert sf.is_enabled("surname") + assert len(sf.get_enabled_list()) == 1 + + +class TestParseTypeList: + """Tests for parse_type_list() function.""" + + def test_comma_separated(self): + result = parse_type_list("ipn,passport,rank") + assert result == {"ipn", "passport", "rank"} + + def test_with_spaces(self): + result = parse_type_list("ipn, passport, rank") + assert result == {"ipn", "passport", "rank"} + + def test_alias_resolution(self): + result = parse_type_list("ipns,ranks,dates") + assert result == {"ipn", "rank", "date"} + + def test_single_type(self): + result = parse_type_list("ipn") + assert result == {"ipn"} + + def test_group_expansion(self): + result = parse_type_list("personal") + assert "surname" in result + assert "name" in result + assert "patronymic" in result diff --git a/unmask_data.py b/unmask_data.py index 8cc1c79..d1a7b35 100644 --- a/unmask_data.py +++ b/unmask_data.py @@ -72,7 +72,8 @@ from modules.security import MappingSecurityManager, is_encryption_available SECURITY_AVAILABLE = True except ImportError: - pass + import logging as _logging + _logging.getLogger(__name__).debug("modules.security not available — encryption disabled") # Re-mask (ланцюгове перемаскування) REMASK_AVAILABLE = False @@ -80,7 +81,8 @@ from modules.re_mask import ChainUnmasker, load_chain, get_chain_info REMASK_AVAILABLE = True except ImportError: - pass + import logging as _logging + _logging.getLogger(__name__).debug("modules.re_mask not available — chain unmasking disabled") # Конфігурація CONFIG_AVAILABLE = False @@ -88,7 +90,8 @@ from modules.config import load_config, ConfigLoader CONFIG_AVAILABLE = True except ImportError: - pass + import logging as _logging + _logging.getLogger(__name__).debug("modules.config not available — YAML config disabled") # Логування LOGGING_AVAILABLE = False @@ -96,7 +99,8 @@ from modules.masking_logger import MaskingLogger, setup_logging LOGGING_AVAILABLE = True except ImportError: - pass + import logging as _logging + _logging.getLogger(__name__).debug("modules.masking_logger not available — structured logging disabled") # ============================================================================ # ІМПОРТ ДАНИХ З МОДУЛЯ @@ -138,6 +142,27 @@ Path('./result') # Підтека result ] +# Maximum input file size in bytes (default: 100 MB) +MAX_INPUT_FILE_SIZE = 100 * 1024 * 1024 + + +def validate_file_size(file_path: Path, max_size: int = MAX_INPUT_FILE_SIZE) -> None: + """ + Перевіряє розмір файлу перед зчитуванням у пам'ять. + + Raises: + ValueError: якщо файл перевищує допустимий розмір + """ + file_size = file_path.stat().st_size + if file_size > max_size: + max_mb = max_size / (1024 * 1024) + actual_mb = file_size / (1024 * 1024) + raise ValueError( + f"File {file_path.name} ({actual_mb:.1f} MB) exceeds maximum " + f"allowed size ({max_mb:.0f} MB)" + ) + + # ============================================================================ # ДОПОМІЖНІ ФУНКЦІЇ - АНАЛІЗ ТА РОЗПІЗНАВАННЯ # ============================================================================ @@ -922,7 +947,7 @@ def unmask_chain(masked_text: str, chain_data: Dict) -> Tuple[str, Dict]: return text, total_stats -def unmask_json_chain(masked_data, chain_data: Dict): +def unmask_json_chain(masked_data: Any, chain_data: Dict) -> Any: """ Unmask JSON data masked with multiple passes (chain). @@ -1002,10 +1027,46 @@ def load_mapping_file(map_path: Path, password: str = None) -> Dict: security_mgr = MappingSecurityManager(password) return security_mgr.decrypt_mapping(map_path) else: + validate_file_size(map_path) with open(map_path, 'r', encoding='utf-8') as f: return json.load(f) +def validate_mapping_schema(mapping: Dict) -> None: + """ + Валідує структуру mapping-файлу. + + Перевіряє наявність обов'язкових полів та коректність формату. + + Args: + mapping: Завантажений словник маппінгів + + Raises: + ValueError: якщо структура mapping невалідна + """ + if not isinstance(mapping, dict): + raise ValueError("Mapping file must be a JSON object") + + # Chain mapping має окрему структуру + if "passes" in mapping and "total_passes" in mapping: + if not isinstance(mapping["passes"], list): + raise ValueError("Chain mapping 'passes' must be a list") + return + + # v2.0+ mapping + version = mapping.get("version") + if version and version.startswith("2"): + if "mappings" not in mapping: + raise ValueError( + f"Mapping v{version} must contain 'mappings' key" + ) + if not isinstance(mapping["mappings"], dict): + raise ValueError("'mappings' must be a dictionary") + return + + # v1 mapping — flat dict with category keys, no strict validation needed + + def show_chain_info(masking_map: Dict) -> None: """ Показує інформацію про ланцюг перемаскування. @@ -1135,7 +1196,7 @@ def log_debug(msg): try: config = load_config(args.config) log_info(f"Конфігурацію завантажено з {args.config}") - except Exception as e: + except (FileNotFoundError, PermissionError, ValueError, OSError) as e: print(f"❌ Помилка завантаження конфігурації: {e}") log_error(f"Помилка завантаження конфігурації: {e}") return @@ -1210,6 +1271,7 @@ def log_debug(msg): try: # Завантажуємо словник маппінгів (з підтримкою шифрування) masking_map = load_mapping_file(map_path, password=password) + validate_mapping_schema(masking_map) log_info(f"Mapping завантажено: {map_path.name}") # Обробка --chain-info (показати інформацію та вийти) @@ -1228,12 +1290,13 @@ def log_debug(msg): print(f"✅ Конвертовано до версії {args.to_version}: {output_converted}") log_info(f"Конвертовано до версії {args.to_version}") return - except Exception as e: + except (KeyError, ValueError, TypeError, OSError) as e: print(f"❌ Помилка конвертації: {e}") log_error(f"Помилка конвертації: {e}") return # Завантажуємо замасковані дані (JSON або текст) + validate_file_size(masked_path) with open(masked_path, 'r', encoding='utf-8', newline='') as f: if masked_path.suffix == '.json': masked_data = json.load(f) @@ -1242,7 +1305,7 @@ def log_debug(msg): log_debug(f"Замасковані дані завантажено: {masked_path.name}") - except Exception as e: + except (FileNotFoundError, PermissionError, OSError, json.JSONDecodeError, UnicodeDecodeError) as e: print(f"❌ Помилка читання: {e}") log_error(f"Помилка читання: {e}") return @@ -1297,7 +1360,7 @@ def log_debug(msg): log_info(f"Статистика: відновлено={stats.get('restored_count', 0)}, " f"пропущено={stats.get('skipped_count', 0)}") - except Exception as e: + except (OSError, PermissionError, UnicodeEncodeError) as e: print(f"❌ Помилка збереження: {e}") log_error(f"Помилка збереження: {e}")