From 4d125aec374bcfae7c73f362663f3b73c5d016d5 Mon Sep 17 00:00:00 2001 From: youngdong Date: Sat, 9 May 2026 15:44:30 +0900 Subject: [PATCH] fix: detect obfuscated email evasions --- backend/app/detection/email_normalization.py | 98 ++++++++++++++++++++ backend/app/detection/hybrid_detector.py | 6 ++ backend/app/detection/model_detector.py | 25 ++++- backend/app/detection/pii_detector.py | 22 ++++- backend/app/detection/reason_codes.py | 3 + backend/app/engine/masking.py | 4 + backend/app/services/proxy_service.py | 13 ++- backend/tests/test_hybrid_detector.py | 49 ++++++++++ backend/tests/test_mock_llm.py | 28 ++++++ backend/tests/test_pii_detector.py | 16 ++++ backend/tests/test_proxy_api.py | 31 +++++++ policies/policy.yaml | 5 + policies/strict.yaml | 5 + tools/mock_llm.py | 21 ++++- 14 files changed, 318 insertions(+), 8 deletions(-) create mode 100644 backend/app/detection/email_normalization.py create mode 100644 backend/tests/test_mock_llm.py diff --git a/backend/app/detection/email_normalization.py b/backend/app/detection/email_normalization.py new file mode 100644 index 0000000..2b361f9 --- /dev/null +++ b/backend/app/detection/email_normalization.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import re +from dataclasses import dataclass + + +EMAIL_PATTERN = re.compile( + r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,24}\b", + flags=re.IGNORECASE, +) +_AT_TOKEN_PATTERN = re.compile( + r"(?:\[\s*at\s*\]|\(\s*at\s*\)|\{\s*at\s*\}|\bat\b)", + flags=re.IGNORECASE, +) +_DOT_TOKEN_PATTERN = re.compile( + r"(?:\[\s*dot\s*\]|\(\s*dot\s*\)|\{\s*dot\s*\}|\bdot\b)", + flags=re.IGNORECASE, +) +_EXPLICIT_MARKER_PATTERN = re.compile( + r"[\[\(\{]\s*(?:at|dot)\s*[\]\)\}]", + flags=re.IGNORECASE, +) +_OBFUSCATED_EMAIL_PATTERN = re.compile( + rf"(? str: + normalized = _AT_TOKEN_PATTERN.sub("@", text) + normalized = _DOT_TOKEN_PATTERN.sub(".", normalized) + return re.sub(r"\s+", "", normalized) + + +def _should_accept_bare_word_form(raw_text: str, normalized_email: str) -> bool: + if _EXPLICIT_MARKER_PATTERN.search(raw_text): + return True + local_part, _sep, _domain = normalized_email.partition("@") + return len(local_part) >= 3 + + +def extract_obfuscated_emails(text: str) -> list[ObfuscatedEmailMatch]: + matches: list[ObfuscatedEmailMatch] = [] + seen: set[tuple[int, int, str]] = set() + + for match in _OBFUSCATED_EMAIL_PATTERN.finditer(text): + raw_text = match.group(0) + normalized_email = normalize_obfuscated_email_candidate(raw_text) + if EMAIL_PATTERN.fullmatch(normalized_email) is None: + continue + if not _should_accept_bare_word_form(raw_text, normalized_email): + continue + + key = (match.start(), match.end(), normalized_email.lower()) + if key in seen: + continue + seen.add(key) + matches.append( + ObfuscatedEmailMatch( + start=match.start(), + end=match.end(), + raw_text=raw_text, + normalized_email=normalized_email, + ) + ) + + return matches + + +def restore_obfuscated_emails(text: str) -> tuple[str, list[str]]: + matches = extract_obfuscated_emails(text) + if not matches: + return text, [] + + parts: list[str] = [] + restored_emails: list[str] = [] + cursor = 0 + + for match in matches: + parts.append(text[cursor:match.start]) + parts.append(match.normalized_email) + restored_emails.append(match.normalized_email) + cursor = match.end + + parts.append(text[cursor:]) + return "".join(parts), restored_emails diff --git a/backend/app/detection/hybrid_detector.py b/backend/app/detection/hybrid_detector.py index 8f9b64d..f108d51 100644 --- a/backend/app/detection/hybrid_detector.py +++ b/backend/app/detection/hybrid_detector.py @@ -28,6 +28,9 @@ class HybridDetectionResult: fallback_used: bool model_label: str | None = None model_confidence: float | None = None + model_threshold: float | None = None + model_prediction_accepted: bool = False + model_reason_code: str | None = None reason_codes: list[str] = field(default_factory=list) primary_reason_code: str | None = None risk_score: float = 0.0 @@ -155,6 +158,9 @@ def detect_hybrid( model_status=model_result.model_status, model_label=model_result.model_label, model_confidence=model_result.model_confidence, + model_threshold=model_result.model_threshold, + model_prediction_accepted=model_result.model_prediction_accepted, + model_reason_code=model_result.model_reason_code, fallback_used=model_result.fallback_used, reason_codes=reason_codes, primary_reason_code=( diff --git a/backend/app/detection/model_detector.py b/backend/app/detection/model_detector.py index 9643e60..b58ec0f 100644 --- a/backend/app/detection/model_detector.py +++ b/backend/app/detection/model_detector.py @@ -137,6 +137,9 @@ class ModelDetectionResult: fallback_used: bool = False model_label: str | None = None model_confidence: float | None = None + model_threshold: float | None = None + model_prediction_accepted: bool = False + model_reason_code: str | None = None model_prediction: LightweightPrediction | None = None @@ -195,6 +198,18 @@ def _prediction_reasons(prediction: LightweightPrediction) -> list[str]: return [prediction.reason_code] +def _prediction_reason_code(prediction: LightweightPrediction) -> str | None: + if prediction.reason_code: + return prediction.reason_code + + normalized = prediction.label.strip().lower() + if "pii" in normalized or "privacy" in normalized: + return ReasonCode.MODEL_PII_RISK.value + if "inj" in normalized or "prompt" in normalized or "jailbreak" in normalized: + return ReasonCode.MODEL_INJECTION_RISK.value + return None + + def _fallback_reason_code(status: str) -> str | None: if status == "artifact_missing": return ReasonCode.MODEL_ARTIFACT_MISSING.value @@ -268,6 +283,8 @@ def _error_result(settings: DetectionSettings) -> ModelDetectionResult: fallback_used=True, model_label="ERROR", model_confidence=0.0, + model_threshold=settings.model_detector_threshold, + model_prediction_accepted=False, ) @@ -288,6 +305,7 @@ def detect_model( model_enabled=False, model_status="disabled", fallback_used=False, + model_prediction_accepted=False, ) active_classifier = classifier or get_lightweight_classifier() @@ -299,6 +317,7 @@ def detect_model( prediction = detect_lightweight(text, active_classifier) heuristic_reasons = _heuristic_reasons(text) prediction_reasons = _prediction_reasons(prediction) + predicted_reason_code = _prediction_reason_code(prediction) signal_reasons = ordered_reason_codes( [*heuristic_reasons, *prediction_reasons] ) @@ -320,6 +339,7 @@ def detect_model( else action ) label = _prediction_label(prediction) + prediction_accepted = bool(prediction.detected and predicted_reason_code is not None) detections = _build_detections(reasons, confidence) summary = DetectorRunSummary( detector="llm", @@ -347,7 +367,10 @@ def detect_model( model_status=status, fallback_used=not classifier_status.enabled, model_label=label, - model_confidence=summary.confidence, + model_confidence=prediction.confidence, + model_threshold=active_settings.model_detector_threshold, + model_prediction_accepted=prediction_accepted, + model_reason_code=predicted_reason_code, model_prediction=prediction, ) except Exception as exc: # pragma: no cover - defensive path diff --git a/backend/app/detection/pii_detector.py b/backend/app/detection/pii_detector.py index 97b853c..253bf52 100644 --- a/backend/app/detection/pii_detector.py +++ b/backend/app/detection/pii_detector.py @@ -2,6 +2,7 @@ import re +from .email_normalization import EMAIL_PATTERN, extract_obfuscated_emails from .models import DetectionResult, DetectorType from .reason_codes import ReasonCode @@ -10,10 +11,7 @@ ( "EMAIL", ReasonCode.PII_EMAIL_DETECTED.value, - re.compile( - r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,24}\b", - flags=re.IGNORECASE, - ), + EMAIL_PATTERN, 0.95, ), ( @@ -305,6 +303,21 @@ def _intent_detections(text: str) -> list[DetectionResult]: return results +def _obfuscated_email_detections(text: str) -> list[DetectionResult]: + return [ + DetectionResult( + detector_type=DetectorType.PII, + category="EMAIL_OBFUSCATED", + reason_code=ReasonCode.PII_EMAIL_OBFUSCATED.value, + start=match.start, + end=match.end, + matched_text=match.raw_text, + score=0.95, + ) + for match in extract_obfuscated_emails(text) + ] + + def _looks_like_math_expression(candidate: str, context: str) -> bool: if _has_any(context, _MATH_CONTEXT_TERMS): return True @@ -366,6 +379,7 @@ def detect_pii(text: str) -> list[DetectionResult]: results: list[DetectionResult] = [] results.extend(_intent_detections(text)) + results.extend(_obfuscated_email_detections(text)) for category, reason_code, pattern, score in _PII_PATTERNS: for match in pattern.finditer(text): matched_text = match.group(0) diff --git a/backend/app/detection/reason_codes.py b/backend/app/detection/reason_codes.py index ed2811a..a197e6a 100644 --- a/backend/app/detection/reason_codes.py +++ b/backend/app/detection/reason_codes.py @@ -5,6 +5,7 @@ class ReasonCode(str, Enum): PII_EMAIL_DETECTED = "PII_EMAIL_DETECTED" + PII_EMAIL_OBFUSCATED = "PII_EMAIL_OBFUSCATED" PII_PHONE_DETECTED = "PII_PHONE_DETECTED" PII_ADDRESS_DETECTED = "PII_ADDRESS_DETECTED" PII_RRN_DETECTED = "PII_RRN_DETECTED" @@ -41,6 +42,7 @@ class ReasonCode(str, Enum): ReasonCode.PII_EXFILTRATION_REQUEST.value, ReasonCode.PII_RRN_DETECTED.value, ReasonCode.PII_PHONE_DETECTED.value, + ReasonCode.PII_EMAIL_OBFUSCATED.value, ReasonCode.PII_EMAIL_DETECTED.value, ReasonCode.MODEL_DETECTOR_ERROR.value, ReasonCode.MODEL_ARTIFACT_MISSING.value, @@ -52,6 +54,7 @@ class ReasonCode(str, Enum): _REASON_ACTIONS = { ReasonCode.PII_EMAIL_DETECTED.value: PolicyAction.MASK.value, + ReasonCode.PII_EMAIL_OBFUSCATED.value: PolicyAction.MASK.value, ReasonCode.PII_PHONE_DETECTED.value: PolicyAction.MASK.value, ReasonCode.PII_ADDRESS_DETECTED.value: PolicyAction.MASK.value, ReasonCode.PII_RRN_DETECTED.value: PolicyAction.BLOCK.value, diff --git a/backend/app/engine/masking.py b/backend/app/engine/masking.py index 3dc9100..070d946 100644 --- a/backend/app/engine/masking.py +++ b/backend/app/engine/masking.py @@ -2,6 +2,7 @@ import re +from backend.app.detection.email_normalization import normalize_obfuscated_email_candidate from backend.app.detection.models import DetectionResult from backend.app.detection.reason_codes import ReasonCode @@ -59,6 +60,9 @@ def _mask_address(value: str) -> str: def _mask_by_reason(reason_code: str, value: str) -> str: if reason_code == ReasonCode.PII_EMAIL_DETECTED.value: return _mask_email(value) + if reason_code == ReasonCode.PII_EMAIL_OBFUSCATED.value: + normalized = normalize_obfuscated_email_candidate(value) + return _mask_email(normalized) if "@" in normalized else "*" * len(value) if reason_code == ReasonCode.PII_PHONE_DETECTED.value: return _mask_phone(value) if reason_code == ReasonCode.PII_ADDRESS_DETECTED.value: diff --git a/backend/app/services/proxy_service.py b/backend/app/services/proxy_service.py index 257c373..207304a 100644 --- a/backend/app/services/proxy_service.py +++ b/backend/app/services/proxy_service.py @@ -82,7 +82,7 @@ def _audit_from_detections( if hybrid_result is not None: detector_counts = dict(hybrid_result.detector_counts) - total_detections = len([item for item in hybrid_result.detector_results if item.reasons]) + total_detections = len(detections) pii_detected = hybrid_result.pii_detected injection_detected = hybrid_result.injection_detected detector_results = [] @@ -97,6 +97,12 @@ def _audit_from_detections( } if result.confidence is not None: item["confidence"] = round(result.confidence, 3) + if result.detector == "llm": + if hybrid_result.model_threshold is not None: + item["model_threshold"] = hybrid_result.model_threshold + item["model_prediction_accepted"] = hybrid_result.model_prediction_accepted + if hybrid_result.model_reason_code is not None: + item["model_reason_code"] = hybrid_result.model_reason_code detector_results.append(item) summary = { @@ -126,6 +132,11 @@ def _audit_from_detections( hybrid_detection["model_label"] = hybrid_result.model_label if hybrid_result.model_confidence is not None: hybrid_detection["model_confidence"] = hybrid_result.model_confidence + if hybrid_result.model_threshold is not None: + hybrid_detection["model_threshold"] = hybrid_result.model_threshold + hybrid_detection["model_prediction_accepted"] = hybrid_result.model_prediction_accepted + if hybrid_result.model_reason_code is not None: + hybrid_detection["model_reason_code"] = hybrid_result.model_reason_code summary["hybrid_detection"] = hybrid_detection return summary diff --git a/backend/tests/test_hybrid_detector.py b/backend/tests/test_hybrid_detector.py index ff35224..2ed5bd8 100644 --- a/backend/tests/test_hybrid_detector.py +++ b/backend/tests/test_hybrid_detector.py @@ -1,11 +1,34 @@ +from pathlib import Path + import pytest from backend.app.detection.hybrid_detector import detect_hybrid from backend.app.detection.lightweight_classifier import ( LightweightClassifier, + LightweightModelStatus, + LightweightPrediction, get_lightweight_classifier, ) from backend.app.detection.reason_codes import ReasonCode +from backend.app.config import DetectionSettings + + +class _StubClassifier: + def __init__(self, prediction: LightweightPrediction) -> None: + self._prediction = prediction + self.threshold = 0.0 + + def status(self) -> LightweightModelStatus: + return LightweightModelStatus( + enabled=True, + status="enabled", + note="stub", + vectorizer_path=Path("vectorizer.joblib"), + classifier_path=Path("classifier.joblib"), + ) + + def classify(self, _text: str) -> LightweightPrediction: + return self._prediction def test_lightweight_classifier_is_safe_when_artifacts_are_missing() -> None: @@ -130,3 +153,29 @@ def test_hybrid_detector_contextual_attacks_are_not_safe_when_model_is_loaded() assert result.model_status == "enabled" assert result.action != "ALLOW" assert ReasonCode.SAFE_INPUT.value not in result.reason_codes + + +def test_hybrid_detector_keeps_low_confidence_prediction_out_of_counts() -> None: + classifier = _StubClassifier( + LightweightPrediction( + detected=False, + confidence=0.349, + reason_code=None, + label="pii", + source="lightweight_model", + ) + ) + settings = DetectionSettings(model_detector_threshold=0.7) + + result = detect_hybrid("완전히 안전한 문장입니다.", classifier=classifier, settings=settings) + llm_summary = next(item for item in result.detector_results if item.detector == "llm") + + assert result.action == "ALLOW" + assert result.detector_counts == {} + assert llm_summary.action == "ALLOW" + assert llm_summary.reasons == [] + assert result.model_label == "PII" + assert result.model_confidence == 0.349 + assert result.model_threshold == 0.7 + assert result.model_prediction_accepted is False + assert result.model_reason_code == ReasonCode.MODEL_PII_RISK.value diff --git a/backend/tests/test_mock_llm.py b/backend/tests/test_mock_llm.py new file mode 100644 index 0000000..95fed7e --- /dev/null +++ b/backend/tests/test_mock_llm.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from fastapi.testclient import TestClient + +from tools.mock_llm import app + + +client = TestClient(app) + + +def test_mock_llm_restores_obfuscated_email_response() -> None: + response = client.post( + "/v1/chat/completions", + json={ + "messages": [ + { + "role": "user", + "content": "메일은 hong [at] test [dot] co [dot] kr 이야. 실제 이메일 형식으로 바꿔줘.", + } + ] + }, + ) + + assert response.status_code == 200 + payload = response.json() + content = payload["choices"][0]["message"]["content"] + + assert "hong@test.co.kr" in content diff --git a/backend/tests/test_pii_detector.py b/backend/tests/test_pii_detector.py index 0ecce18..e82c875 100644 --- a/backend/tests/test_pii_detector.py +++ b/backend/tests/test_pii_detector.py @@ -1,5 +1,6 @@ from backend.app.detection.pii_detector import detect_pii from backend.app.detection.reason_codes import ReasonCode +import pytest def test_detect_pii_multiple_categories() -> None: @@ -16,6 +17,21 @@ def test_detect_pii_safe_text() -> None: assert detect_pii("오늘은 보안 정책 회의가 있습니다.") == [] +@pytest.mark.parametrize( + "text", + [ + "hong [at] test [dot] co [dot] kr", + "hong(at)test(dot)co(dot)kr", + "hong at test dot co dot kr", + ], +) +def test_detect_pii_obfuscated_email_variants(text: str) -> None: + results = detect_pii(text) + reason_codes = {item.reason_code for item in results} + + assert ReasonCode.PII_EMAIL_OBFUSCATED.value in reason_codes + + def test_detect_pii_phone_variants() -> None: text = "연락처는 +82 10 1234 5678 또는 01012345678 입니다." results = detect_pii(text) diff --git a/backend/tests/test_proxy_api.py b/backend/tests/test_proxy_api.py index ebb3ab6..78db6cc 100644 --- a/backend/tests/test_proxy_api.py +++ b/backend/tests/test_proxy_api.py @@ -6,6 +6,7 @@ import pytest from backend.app.api.proxy import ProxyRequest, proxy_chat +from backend.app.detection.reason_codes import ReasonCode from backend.app.services import llm_service from backend.app.services import proxy_service @@ -123,6 +124,36 @@ def test_proxy_blocks_on_output_injection(monkeypatch) -> None: assert "hybrid_detection" in result.audit_summary["output"] +def test_proxy_masks_obfuscated_input_and_detects_restored_output_email(monkeypatch) -> None: + payload = {"choices": [{"message": {"content": "변환된 이메일은 hong@test.co.kr 입니다."}}]} + monkeypatch.setattr(llm_service.httpx, "AsyncClient", _build_fake_client(payload)) + + req = ProxyRequest(message="메일은 hong [at] test [dot] co [dot] kr 이야. 실제 이메일 형식으로 바꿔줘.") + result = asyncio.run(proxy_chat(req)) + + assert result.action == "MASK" + assert result.input_action == "MASK" + assert result.output_action == "MASK" + assert result.content is not None + assert "hong@test.co.kr" not in result.content + assert "ho***@test.co.kr" in result.content + assert ReasonCode.PII_EMAIL_OBFUSCATED.value in result.reasons + assert ReasonCode.PII_EMAIL_DETECTED.value in result.reasons + + input_summary = result.audit_summary["input"] + output_summary = result.audit_summary["output"] + hybrid_detection = result.audit_summary["hybrid_detection"] + + assert input_summary["pii_detected"] is True + assert output_summary["pii_detected"] is True + assert input_summary["detector_counts"]["regex"] >= 1 + assert output_summary["detector_counts"]["regex"] >= 1 + assert hybrid_detection["input"]["model_threshold"] == 0.7 + assert "model_prediction_accepted" in hybrid_detection["input"] + assert hybrid_detection["output"]["model_threshold"] == 0.7 + assert "model_prediction_accepted" in hybrid_detection["output"] + + def test_proxy_returns_timeout_error(monkeypatch) -> None: # 타임아웃은 안정적인 프록시 오류 응답으로 변환되어야 합니다. class _TimeoutAsyncClient: diff --git a/policies/policy.yaml b/policies/policy.yaml index 4a52082..2d5031e 100644 --- a/policies/policy.yaml +++ b/policies/policy.yaml @@ -5,6 +5,11 @@ rules: priority: 60 threshold: 0.7 description: "Mask email address before forwarding to model." + PII_EMAIL_OBFUSCATED: + action: MASK + priority: 61 + threshold: 0.7 + description: "Mask obfuscated email expressions such as [at]/[dot] before forwarding to model." PII_PHONE_DETECTED: action: MASK priority: 70 diff --git a/policies/strict.yaml b/policies/strict.yaml index d6a28a8..90ee9bb 100644 --- a/policies/strict.yaml +++ b/policies/strict.yaml @@ -5,6 +5,11 @@ rules: priority: 60 threshold: 0.7 description: "Mask email address before forwarding to model." + PII_EMAIL_OBFUSCATED: + action: MASK + priority: 61 + threshold: 0.7 + description: "Mask obfuscated email expressions such as [at]/[dot] before forwarding to model." PII_PHONE_DETECTED: action: MASK priority: 70 diff --git a/tools/mock_llm.py b/tools/mock_llm.py index 59ccf11..8f9e167 100644 --- a/tools/mock_llm.py +++ b/tools/mock_llm.py @@ -1,22 +1,39 @@ +import re + +import uvicorn from fastapi import FastAPI from pydantic import BaseModel -import uvicorn + +from backend.app.detection.email_normalization import restore_obfuscated_emails app = FastAPI() +_EMAIL_RESTORE_HINT = re.compile( + r"(실제\s*이메일\s*형식|이메일\s*형식으로\s*바꿔|restore\s+the\s+email|convert\s+to\s+an?\s+email)", + flags=re.IGNORECASE, +) + class ChatRequest(BaseModel): messages: list model: str = "mock" + @app.post("/v1/chat/completions") async def mock_chat(req: ChatRequest): last_msg = req.messages[-1]["content"] if req.messages else "" + _, restored_emails = restore_obfuscated_emails(last_msg) + if restored_emails and _EMAIL_RESTORE_HINT.search(last_msg): + content = f"변환된 이메일은 {restored_emails[0]} 입니다." + elif restored_emails: + content = f"복원된 이메일 후보는 {restored_emails[0]} 입니다." + else: + content = f"[Mock 응답] 입력 받음: {last_msg[:30]}..." return { "id": "mock-001", "choices": [{ "message": { "role": "assistant", - "content": f"[Mock 응답] 입력 받음: {last_msg[:30]}..." + "content": content } }] }