Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions backend/app/detection/email_normalization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from __future__ import annotations

import re
from dataclasses import dataclass


EMAIL_PATTERN = re.compile(
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,24}\b",
flags=re.IGNORECASE,
)
_AT_TOKEN_PATTERN = re.compile(
r"(?:\[\s*at\s*\]|\(\s*at\s*\)|\{\s*at\s*\}|\bat\b)",
flags=re.IGNORECASE,
)
_DOT_TOKEN_PATTERN = re.compile(
r"(?:\[\s*dot\s*\]|\(\s*dot\s*\)|\{\s*dot\s*\}|\bdot\b)",
flags=re.IGNORECASE,
)
_EXPLICIT_MARKER_PATTERN = re.compile(
r"[\[\(\{]\s*(?:at|dot)\s*[\]\)\}]",
flags=re.IGNORECASE,
)
_OBFUSCATED_EMAIL_PATTERN = re.compile(
rf"(?<![A-Za-z0-9._%+-])"
rf"[A-Za-z0-9][A-Za-z0-9._%+-]{{0,63}}"
rf"\s*{_AT_TOKEN_PATTERN.pattern}\s*"
rf"[A-Za-z0-9-]+(?:\s*{_DOT_TOKEN_PATTERN.pattern}\s*[A-Za-z0-9-]+)+"
rf"(?![A-Za-z0-9._%+-])",
flags=re.IGNORECASE,
)


@dataclass(frozen=True, slots=True)
class ObfuscatedEmailMatch:
start: int
end: int
raw_text: str
normalized_email: str


def normalize_obfuscated_email_candidate(text: str) -> str:
normalized = _AT_TOKEN_PATTERN.sub("@", text)
normalized = _DOT_TOKEN_PATTERN.sub(".", normalized)
return re.sub(r"\s+", "", normalized)


def _should_accept_bare_word_form(raw_text: str, normalized_email: str) -> bool:
if _EXPLICIT_MARKER_PATTERN.search(raw_text):
return True
local_part, _sep, _domain = normalized_email.partition("@")
return len(local_part) >= 3


def extract_obfuscated_emails(text: str) -> list[ObfuscatedEmailMatch]:
matches: list[ObfuscatedEmailMatch] = []
seen: set[tuple[int, int, str]] = set()

for match in _OBFUSCATED_EMAIL_PATTERN.finditer(text):
raw_text = match.group(0)
normalized_email = normalize_obfuscated_email_candidate(raw_text)
if EMAIL_PATTERN.fullmatch(normalized_email) is None:
continue
if not _should_accept_bare_word_form(raw_text, normalized_email):
continue

key = (match.start(), match.end(), normalized_email.lower())
if key in seen:
continue
seen.add(key)
matches.append(
ObfuscatedEmailMatch(
start=match.start(),
end=match.end(),
raw_text=raw_text,
normalized_email=normalized_email,
)
)

return matches


def restore_obfuscated_emails(text: str) -> tuple[str, list[str]]:
matches = extract_obfuscated_emails(text)
if not matches:
return text, []

parts: list[str] = []
restored_emails: list[str] = []
cursor = 0

for match in matches:
parts.append(text[cursor:match.start])
parts.append(match.normalized_email)
restored_emails.append(match.normalized_email)
cursor = match.end

parts.append(text[cursor:])
return "".join(parts), restored_emails
6 changes: 6 additions & 0 deletions backend/app/detection/hybrid_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class HybridDetectionResult:
fallback_used: bool
model_label: str | None = None
model_confidence: float | None = None
model_threshold: float | None = None
model_prediction_accepted: bool = False
model_reason_code: str | None = None
reason_codes: list[str] = field(default_factory=list)
primary_reason_code: str | None = None
risk_score: float = 0.0
Expand Down Expand Up @@ -155,6 +158,9 @@ def detect_hybrid(
model_status=model_result.model_status,
model_label=model_result.model_label,
model_confidence=model_result.model_confidence,
model_threshold=model_result.model_threshold,
model_prediction_accepted=model_result.model_prediction_accepted,
model_reason_code=model_result.model_reason_code,
fallback_used=model_result.fallback_used,
reason_codes=reason_codes,
primary_reason_code=(
Expand Down
25 changes: 24 additions & 1 deletion backend/app/detection/model_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ class ModelDetectionResult:
fallback_used: bool = False
model_label: str | None = None
model_confidence: float | None = None
model_threshold: float | None = None
model_prediction_accepted: bool = False
model_reason_code: str | None = None
model_prediction: LightweightPrediction | None = None


Expand Down Expand Up @@ -195,6 +198,18 @@ def _prediction_reasons(prediction: LightweightPrediction) -> list[str]:
return [prediction.reason_code]


def _prediction_reason_code(prediction: LightweightPrediction) -> str | None:
if prediction.reason_code:
return prediction.reason_code

normalized = prediction.label.strip().lower()
if "pii" in normalized or "privacy" in normalized:
return ReasonCode.MODEL_PII_RISK.value
if "inj" in normalized or "prompt" in normalized or "jailbreak" in normalized:
return ReasonCode.MODEL_INJECTION_RISK.value
return None


def _fallback_reason_code(status: str) -> str | None:
if status == "artifact_missing":
return ReasonCode.MODEL_ARTIFACT_MISSING.value
Expand Down Expand Up @@ -268,6 +283,8 @@ def _error_result(settings: DetectionSettings) -> ModelDetectionResult:
fallback_used=True,
model_label="ERROR",
model_confidence=0.0,
model_threshold=settings.model_detector_threshold,
model_prediction_accepted=False,
)


Expand All @@ -288,6 +305,7 @@ def detect_model(
model_enabled=False,
model_status="disabled",
fallback_used=False,
model_prediction_accepted=False,
)

active_classifier = classifier or get_lightweight_classifier()
Expand All @@ -299,6 +317,7 @@ def detect_model(
prediction = detect_lightweight(text, active_classifier)
heuristic_reasons = _heuristic_reasons(text)
prediction_reasons = _prediction_reasons(prediction)
predicted_reason_code = _prediction_reason_code(prediction)
signal_reasons = ordered_reason_codes(
[*heuristic_reasons, *prediction_reasons]
)
Expand All @@ -320,6 +339,7 @@ def detect_model(
else action
)
label = _prediction_label(prediction)
prediction_accepted = bool(prediction.detected and predicted_reason_code is not None)
detections = _build_detections(reasons, confidence)
summary = DetectorRunSummary(
detector="llm",
Expand Down Expand Up @@ -347,7 +367,10 @@ def detect_model(
model_status=status,
fallback_used=not classifier_status.enabled,
model_label=label,
model_confidence=summary.confidence,
model_confidence=prediction.confidence,
model_threshold=active_settings.model_detector_threshold,
model_prediction_accepted=prediction_accepted,
model_reason_code=predicted_reason_code,
model_prediction=prediction,
)
except Exception as exc: # pragma: no cover - defensive path
Expand Down
22 changes: 18 additions & 4 deletions backend/app/detection/pii_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import re

from .email_normalization import EMAIL_PATTERN, extract_obfuscated_emails
from .models import DetectionResult, DetectorType
from .reason_codes import ReasonCode

Expand All @@ -10,10 +11,7 @@
(
"EMAIL",
ReasonCode.PII_EMAIL_DETECTED.value,
re.compile(
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,24}\b",
flags=re.IGNORECASE,
),
EMAIL_PATTERN,
0.95,
),
(
Expand Down Expand Up @@ -305,6 +303,21 @@ def _intent_detections(text: str) -> list[DetectionResult]:
return results


def _obfuscated_email_detections(text: str) -> list[DetectionResult]:
return [
DetectionResult(
detector_type=DetectorType.PII,
category="EMAIL_OBFUSCATED",
reason_code=ReasonCode.PII_EMAIL_OBFUSCATED.value,
start=match.start,
end=match.end,
matched_text=match.raw_text,
score=0.95,
)
for match in extract_obfuscated_emails(text)
]


def _looks_like_math_expression(candidate: str, context: str) -> bool:
if _has_any(context, _MATH_CONTEXT_TERMS):
return True
Expand Down Expand Up @@ -366,6 +379,7 @@ def detect_pii(text: str) -> list[DetectionResult]:

results: list[DetectionResult] = []
results.extend(_intent_detections(text))
results.extend(_obfuscated_email_detections(text))
for category, reason_code, pattern, score in _PII_PATTERNS:
for match in pattern.finditer(text):
matched_text = match.group(0)
Expand Down
3 changes: 3 additions & 0 deletions backend/app/detection/reason_codes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

class ReasonCode(str, Enum):
PII_EMAIL_DETECTED = "PII_EMAIL_DETECTED"
PII_EMAIL_OBFUSCATED = "PII_EMAIL_OBFUSCATED"
PII_PHONE_DETECTED = "PII_PHONE_DETECTED"
PII_ADDRESS_DETECTED = "PII_ADDRESS_DETECTED"
PII_RRN_DETECTED = "PII_RRN_DETECTED"
Expand Down Expand Up @@ -41,6 +42,7 @@ class ReasonCode(str, Enum):
ReasonCode.PII_EXFILTRATION_REQUEST.value,
ReasonCode.PII_RRN_DETECTED.value,
ReasonCode.PII_PHONE_DETECTED.value,
ReasonCode.PII_EMAIL_OBFUSCATED.value,
ReasonCode.PII_EMAIL_DETECTED.value,
ReasonCode.MODEL_DETECTOR_ERROR.value,
ReasonCode.MODEL_ARTIFACT_MISSING.value,
Expand All @@ -52,6 +54,7 @@ class ReasonCode(str, Enum):

_REASON_ACTIONS = {
ReasonCode.PII_EMAIL_DETECTED.value: PolicyAction.MASK.value,
ReasonCode.PII_EMAIL_OBFUSCATED.value: PolicyAction.MASK.value,
ReasonCode.PII_PHONE_DETECTED.value: PolicyAction.MASK.value,
ReasonCode.PII_ADDRESS_DETECTED.value: PolicyAction.MASK.value,
ReasonCode.PII_RRN_DETECTED.value: PolicyAction.BLOCK.value,
Expand Down
4 changes: 4 additions & 0 deletions backend/app/engine/masking.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import re

from backend.app.detection.email_normalization import normalize_obfuscated_email_candidate
from backend.app.detection.models import DetectionResult
from backend.app.detection.reason_codes import ReasonCode

Expand Down Expand Up @@ -59,6 +60,9 @@ def _mask_address(value: str) -> str:
def _mask_by_reason(reason_code: str, value: str) -> str:
if reason_code == ReasonCode.PII_EMAIL_DETECTED.value:
return _mask_email(value)
if reason_code == ReasonCode.PII_EMAIL_OBFUSCATED.value:
normalized = normalize_obfuscated_email_candidate(value)
return _mask_email(normalized) if "@" in normalized else "*" * len(value)
if reason_code == ReasonCode.PII_PHONE_DETECTED.value:
return _mask_phone(value)
if reason_code == ReasonCode.PII_ADDRESS_DETECTED.value:
Expand Down
13 changes: 12 additions & 1 deletion backend/app/services/proxy_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _audit_from_detections(

if hybrid_result is not None:
detector_counts = dict(hybrid_result.detector_counts)
total_detections = len([item for item in hybrid_result.detector_results if item.reasons])
total_detections = len(detections)
pii_detected = hybrid_result.pii_detected
injection_detected = hybrid_result.injection_detected
detector_results = []
Expand All @@ -97,6 +97,12 @@ def _audit_from_detections(
}
if result.confidence is not None:
item["confidence"] = round(result.confidence, 3)
if result.detector == "llm":
if hybrid_result.model_threshold is not None:
item["model_threshold"] = hybrid_result.model_threshold
item["model_prediction_accepted"] = hybrid_result.model_prediction_accepted
if hybrid_result.model_reason_code is not None:
item["model_reason_code"] = hybrid_result.model_reason_code
detector_results.append(item)

summary = {
Expand Down Expand Up @@ -126,6 +132,11 @@ def _audit_from_detections(
hybrid_detection["model_label"] = hybrid_result.model_label
if hybrid_result.model_confidence is not None:
hybrid_detection["model_confidence"] = hybrid_result.model_confidence
if hybrid_result.model_threshold is not None:
hybrid_detection["model_threshold"] = hybrid_result.model_threshold
hybrid_detection["model_prediction_accepted"] = hybrid_result.model_prediction_accepted
if hybrid_result.model_reason_code is not None:
hybrid_detection["model_reason_code"] = hybrid_result.model_reason_code
summary["hybrid_detection"] = hybrid_detection
return summary

Expand Down
49 changes: 49 additions & 0 deletions backend/tests/test_hybrid_detector.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,34 @@
from pathlib import Path

import pytest

from backend.app.detection.hybrid_detector import detect_hybrid
from backend.app.detection.lightweight_classifier import (
LightweightClassifier,
LightweightModelStatus,
LightweightPrediction,
get_lightweight_classifier,
)
from backend.app.detection.reason_codes import ReasonCode
from backend.app.config import DetectionSettings


class _StubClassifier:
def __init__(self, prediction: LightweightPrediction) -> None:
self._prediction = prediction
self.threshold = 0.0

def status(self) -> LightweightModelStatus:
return LightweightModelStatus(
enabled=True,
status="enabled",
note="stub",
vectorizer_path=Path("vectorizer.joblib"),
classifier_path=Path("classifier.joblib"),
)

def classify(self, _text: str) -> LightweightPrediction:
return self._prediction


def test_lightweight_classifier_is_safe_when_artifacts_are_missing() -> None:
Expand Down Expand Up @@ -130,3 +153,29 @@ def test_hybrid_detector_contextual_attacks_are_not_safe_when_model_is_loaded()
assert result.model_status == "enabled"
assert result.action != "ALLOW"
assert ReasonCode.SAFE_INPUT.value not in result.reason_codes


def test_hybrid_detector_keeps_low_confidence_prediction_out_of_counts() -> None:
classifier = _StubClassifier(
LightweightPrediction(
detected=False,
confidence=0.349,
reason_code=None,
label="pii",
source="lightweight_model",
)
)
settings = DetectionSettings(model_detector_threshold=0.7)

result = detect_hybrid("완전히 안전한 문장입니다.", classifier=classifier, settings=settings)
llm_summary = next(item for item in result.detector_results if item.detector == "llm")

assert result.action == "ALLOW"
assert result.detector_counts == {}
assert llm_summary.action == "ALLOW"
assert llm_summary.reasons == []
assert result.model_label == "PII"
assert result.model_confidence == 0.349
assert result.model_threshold == 0.7
assert result.model_prediction_accepted is False
assert result.model_reason_code == ReasonCode.MODEL_PII_RISK.value
Loading
Loading