diff --git a/README.md b/README.md index 5a82102..c487e99 100644 --- a/README.md +++ b/README.md @@ -89,4 +89,21 @@ The pipeline should treat **export formats** as first-class requirements so the 5. Specify **one SFT JSONL schema** and **one DPO JSONL schema** (and chat template) validated end-to-end with a **LoRA** dry run and a **small DPO** dry run on toy data. 6. Define **student model** constraints (context length, tool set) and a **filter + eval** plan for teacher-to-student parity before production swap. +--- + +## Prototype contribution + +A minimal privacy-preserving prototype has been added in `pipeline/`: + +- `pipeline/pii_redactor.py`: rule-based detection and placeholder replacement for email, phone, URL token, UUID, credit card, SSN, and IP address patterns. +- `pipeline/normalize.py`: canonical event normalization for user, assistant, tool, and tool_result log entries. +- `pipeline/cli.py`: sample runner that audits PII, applies redaction, and exports `sft_ready.jsonl` and `dpo_ready.jsonl`. +- `sample_logs/example.jsonl`: representative log sample containing both Q&A and agentic tool workflow traces. + +Run the prototype from the repo root: + +```bash +python pipeline/cli.py sample_logs/example.jsonl output +``` + diff --git a/pipeline/README.md b/pipeline/README.md new file mode 100644 index 0000000..e89a9d6 --- /dev/null +++ b/pipeline/README.md @@ -0,0 +1,24 @@ +# PII Redaction Pipeline + +This pipeline demonstrates a privacy-preserving preprocessing flow for sample production logs. + +## What it does + +- Normalizes heterogeneous log entries into a canonical event schema. +- Detects and redacts PII categories such as emails, phone numbers, URLs with tokens, credit card numbers, and UUIDs. +- Produces SFT-ready JSONL and DPO-ready JSONL exports from redacted events. + +## Usage + +From the repository root: + +```bash +cd training_setup_logs +python pipeline/cli.py sample_logs/example.jsonl output +``` + +This writes: + +- `output/sft_ready.jsonl` +- `output/dpo_ready.jsonl` +- `output/redacted_events.jsonl` diff --git a/pipeline/__init__.py b/pipeline/__init__.py new file mode 100644 index 0000000..d51d6c9 --- /dev/null +++ b/pipeline/__init__.py @@ -0,0 +1 @@ +"""Pipeline package for log normalization and PII sanitization.""" diff --git a/pipeline/__pycache__/__init__.cpython-314.pyc b/pipeline/__pycache__/__init__.cpython-314.pyc new file mode 100644 index 0000000..c64e450 Binary files /dev/null and b/pipeline/__pycache__/__init__.cpython-314.pyc differ diff --git a/pipeline/__pycache__/cli.cpython-314.pyc b/pipeline/__pycache__/cli.cpython-314.pyc new file mode 100644 index 0000000..5aaf318 Binary files /dev/null and b/pipeline/__pycache__/cli.cpython-314.pyc differ diff --git a/pipeline/__pycache__/normalize.cpython-314.pyc b/pipeline/__pycache__/normalize.cpython-314.pyc new file mode 100644 index 0000000..88310f1 Binary files /dev/null and b/pipeline/__pycache__/normalize.cpython-314.pyc differ diff --git a/pipeline/__pycache__/pii_redactor.cpython-314.pyc b/pipeline/__pycache__/pii_redactor.cpython-314.pyc new file mode 100644 index 0000000..1e4ed5a Binary files /dev/null and b/pipeline/__pycache__/pii_redactor.cpython-314.pyc differ diff --git a/pipeline/cli.py b/pipeline/cli.py new file mode 100644 index 0000000..2607cc1 --- /dev/null +++ b/pipeline/cli.py @@ -0,0 +1,79 @@ +import argparse +import json +from pathlib import Path +from typing import Any, Dict, List + +from pipeline.normalize import build_dpo_example, build_sft_example, normalize_log_entry +from pipeline.pii_redactor import audit_json, redact_json + + +def read_jsonl(path: Path) -> List[Dict[str, Any]]: + records: List[Dict[str, Any]] = [] + with path.open("r", encoding="utf-8") as handle: + for line in handle: + line = line.strip() + if not line: + continue + records.append(json.loads(line)) + return records + + +def write_jsonl(path: Path, rows: List[Dict[str, Any]]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as handle: + for row in rows: + handle.write(json.dumps(row, ensure_ascii=False) + "\n") + + +def load_and_process(path: Path) -> List[Dict[str, Any]]: + raw_entries = read_jsonl(path) + events: List[Dict[str, Any]] = [] + for raw in raw_entries: + normalized = normalize_log_entry(raw) + if normalized["event_type"] == "tool" and "tool_result" in normalized["payload"]: + normalized["event_type"] = "tool_result" + events.append(normalized) + return events + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Run a small pipeline to normalize logs, audit/redact PII, and emit SFT/DPO JSONL." + ) + parser.add_argument("input", type=Path, help="Input JSONL sample log path") + parser.add_argument("output_dir", type=Path, help="Output directory for redacted data") + args = parser.parse_args() + + events = load_and_process(args.input) + + audit_findings = audit_json(events) + if audit_findings: + print(f"Detected {len(audit_findings)} PII items before redaction.") + for finding in audit_findings: + print(f"- {finding['path']}: {finding['category']} -> {finding['match']}") + else: + print("No PII detected in the sample input.") + + redacted_events = redact_json(events) + sft = [build_sft_example(redacted_events)] + dpo = [ + build_dpo_example( + [e for e in redacted_events if e["event_type"] in {"user", "assistant", "system"}], + [ + { + "event_type": "assistant", + "payload": {"text": "I cannot help with that request."}, + } + ], + ) + ] + + write_jsonl(args.output_dir / "sft_ready.jsonl", sft) + write_jsonl(args.output_dir / "dpo_ready.jsonl", dpo) + write_jsonl(args.output_dir / "redacted_events.jsonl", redacted_events) + + print(f"Wrote {len(sft)} SFT example(s) and {len(dpo)} DPO example(s) to {args.output_dir}") + + +if __name__ == "__main__": + main() diff --git a/pipeline/normalize.py b/pipeline/normalize.py new file mode 100644 index 0000000..410a498 --- /dev/null +++ b/pipeline/normalize.py @@ -0,0 +1,113 @@ +import json +from typing import Any, Dict, List + + +def normalize_log_entry(raw: Dict[str, Any]) -> Dict[str, Any]: + """Normalize heterogeneous log entries into a canonical event schema.""" + event_type = raw.get("event_type") + if not event_type: + role = raw.get("role") + if role == "user": + event_type = "user" + elif role in {"assistant", "system"}: + event_type = "assistant" if role == "assistant" else "system" + elif "tool_name" in raw or raw.get("tool"): + event_type = "tool" + else: + event_type = raw.get("type", "unknown") + + payload: Dict[str, Any] = {} + if "content" in raw: + payload["text"] = raw["content"] + if "role" in raw: + payload["role"] = raw["role"] + if "tool_name" in raw: + payload["tool_name"] = raw["tool_name"] + if "tool" in raw: + payload["tool_name"] = raw["tool"] + if "tool_args" in raw: + payload["tool_args"] = raw["tool_args"] + if "result" in raw: + payload["tool_result"] = raw["result"] + if "metadata" in raw: + payload["metadata"] = raw["metadata"] + if "error" in raw: + payload["error"] = raw["error"] + + normalized = { + "session_id": raw.get("session_id", raw.get("conversation_id", "unknown-session")), + "timestamp": raw.get("timestamp"), + "turn_id": raw.get("turn_id", raw.get("index")), + "event_type": event_type, + "payload": payload, + } + return normalized + + +def build_sft_example(events: List[Dict[str, Any]]) -> Dict[str, Any]: + """Build a simple SFT chat example from normalized events.""" + messages = [] + for event in events: + if event["event_type"] in {"user", "assistant", "system"}: + role = event["payload"].get("role", event["event_type"]) + text = event["payload"].get("text", "") + if text: + messages.append({"role": role, "content": text}) + elif event["event_type"] == "tool": + messages.append( + { + "role": "tool", + "name": event["payload"].get("tool_name", "unknown_tool"), + "content": event["payload"].get("tool_args", ""), + } + ) + elif event["event_type"] == "tool_result": + messages.append( + { + "role": "assistant", + "name": event["payload"].get("tool_name", "unknown_tool"), + "content": event["payload"].get("tool_result", ""), + } + ) + + return { + "session_id": events[0].get("session_id", "unknown-session") if events else "unknown-session", + "messages": messages, + "metadata": { + "trajectory_length": len(events), + "has_tool_usage": any(e["event_type"] in {"tool", "tool_result"} for e in events), + }, + } + + +def build_dpo_example(events: List[Dict[str, Any]], rejected: List[Dict[str, Any]]) -> Dict[str, Any]: + """Build a DPO-style prompt/completion pair from event trajectories.""" + prompt_messages = [] + for event in events: + if event["event_type"] == "user": + prompt_messages.append({"role": "user", "content": event["payload"].get("text", "")}) + elif event["event_type"] == "assistant" and event["payload"].get("role") == "system": + prompt_messages.append({"role": "system", "content": event["payload"].get("text", "")}) + + chosen = "\n".join( + event["payload"].get("text", "") + for event in events + if event["event_type"] == "assistant" + ) + rejected_texts = [] + for event in rejected: + if event["event_type"] == "assistant": + rejected_texts.append(event["payload"].get("text", "")) + rejected = "\n".join(rejected_texts) + + return { + "session_id": events[0].get("session_id", "unknown-session") if events else "unknown-session", + "prompt": prompt_messages, + "chosen": chosen, + "rejected": rejected, + "metadata": { + "prefix_length": len(prompt_messages), + "chosen_length": len(chosen.split()), + "rejected_length": len(rejected.split()), + }, + } diff --git a/pipeline/pii_redactor.py b/pipeline/pii_redactor.py new file mode 100644 index 0000000..db3d7ab --- /dev/null +++ b/pipeline/pii_redactor.py @@ -0,0 +1,122 @@ +import re +from typing import Any, Dict, Iterable, List, Tuple, Union + +REDACTION_PATTERNS = [ + ( + "EMAIL", + re.compile(r"(?i)\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b"), + ), + ( + "CREDIT_CARD", + re.compile(r"\b(?:\d[ -]*?){13,16}\b"), + ), + ( + "PHONE", + re.compile( + r"(?x)(?:\+?\d{1,3}[\s-]?)?(?:\(\d{2,4}\)|\d{2,4})[\s-]?\d{3,4}[\s-]?\d{3,4}\b" + ), + ), + ( + "SSN", + re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), + ), + ( + "UUID", + re.compile( + r"\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\b" + ), + ), + ( + "URL_TOKEN", + re.compile( + r"\bhttps?://[^\s]*?(?:token|secret|api[_-]?key|access[_-]?token|auth)[^\s]*\b", + re.IGNORECASE, + ), + ), + ( + "URL", + re.compile(r"\bhttps?://[^\s]+\b", re.IGNORECASE), + ), + ( + "IP_ADDRESS", + re.compile( + r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b" + ), + ), +] + +PLACEHOLDER_MAP = { + "EMAIL": "", + "PHONE": "", + "CREDIT_CARD": "", + "SSN": "", + "UUID": "", + "URL_TOKEN": "", + "URL": "", + "IP_ADDRESS": "", +} + + +def redact_text(text: str) -> str: + """Redact known PII patterns from a text string.""" + redacted = text + for category, pattern in REDACTION_PATTERNS: + redacted = pattern.sub(PLACEHOLDER_MAP[category], redacted) + return redacted + + +def spans_overlap(span_a: Tuple[int, int], span_b: Tuple[int, int]) -> bool: + start_a, end_a = span_a + start_b, end_b = span_b + return not (end_a <= start_b or end_b <= start_a) + + +def find_pii(text: str) -> List[Tuple[str, str]]: + """Return a list of detected PII category matches in text.""" + matches: List[Tuple[str, str]] = [] + seen_spans: List[Tuple[int, int]] = [] + for category, pattern in REDACTION_PATTERNS: + for match in pattern.finditer(text): + span = match.span() + if any(spans_overlap(span, seen) for seen in seen_spans): + continue + seen_spans.append(span) + matches.append((category, match.group(0))) + return matches + + +def redact_json(obj: Any) -> Any: + """Recursively redact PII in JSON-like objects.""" + if isinstance(obj, str): + return redact_text(obj) + if isinstance(obj, dict): + return {key: redact_json(value) for key, value in obj.items()} + if isinstance(obj, list): + return [redact_json(item) for item in obj] + return obj + + +def audit_json(obj: Any, path: str = "") -> List[Dict[str, Any]]: + """Collect detected PII items in a nested JSON object for audit review.""" + findings: List[Dict[str, Any]] = [] + if isinstance(obj, str): + seen_spans: List[Tuple[int, int]] = [] + for category, pattern in REDACTION_PATTERNS: + for match in pattern.finditer(obj): + span = match.span() + if any(spans_overlap(span, seen) for seen in seen_spans): + continue + seen_spans.append(span) + findings.append( + {"path": path, "category": category, "match": match.group(0)} + ) + return findings + if isinstance(obj, dict): + for key, value in obj.items(): + findings.extend(audit_json(value, f"{path}.{key}" if path else key)) + return findings + if isinstance(obj, list): + for index, item in enumerate(obj): + findings.extend(audit_json(item, f"{path}[{index}]") ) + return findings + return findings diff --git a/pipeline/pii_taxonomy.yaml b/pipeline/pii_taxonomy.yaml new file mode 100644 index 0000000..3d5a919 --- /dev/null +++ b/pipeline/pii_taxonomy.yaml @@ -0,0 +1,27 @@ +pii_taxonomy: + description: "List of PII categories detected and replaced by the pipeline." + categories: + - name: EMAIL + description: "Email addresses or email-like identifiers." + example: "user@example.com" + - name: PHONE + description: "Phone numbers or voice contact strings." + example: "+1-555-123-4567" + - name: CREDIT_CARD + description: "Credit/debit card numbers or card-like digit sequences." + example: "4111 1111 1111 1111" + - name: SSN + description: "Social Security numbers or national ID numbers in common numeric format." + example: "123-45-6789" + - name: UUID + description: "Standard UUID/GUID values used for tokens, request IDs, or session IDs." + example: "3f8f9b8e-4d49-4e00-8c1d-1a4e1dbfe9e4" + - name: URL_TOKEN + description: "URLs containing auth tokens, API keys, or secrets in the query string or path." + example: "https://api.example.com/data?api_key=abcd1234" + - name: URL + description: "Generic HTTP(S) URLs that may contain sensitive endpoints or links." + example: "https://example.com/resource" + - name: IP_ADDRESS + description: "IPv4 addresses found in logs or runtime traces." + example: "192.168.1.100" diff --git a/pipeline/schema.yaml b/pipeline/schema.yaml new file mode 100644 index 0000000..0b1c080 --- /dev/null +++ b/pipeline/schema.yaml @@ -0,0 +1,19 @@ +canonical_event_schema: + description: "A normalized event schema for training data extraction." + fields: + session_id: + type: string + description: "Identifier for the user session or conversation." + timestamp: + type: string + description: "ISO-8601 or event timestamp from the source log." + turn_id: + type: string + description: "Optional ordinal or unique turn identifier within the session." + event_type: + type: string + description: "Normalized event type, e.g. user, assistant, tool, tool_result, system, error." + enum: [user, assistant, system, tool, tool_result, error, unknown] + payload: + type: object + description: "Event-specific data such as text content, tool name, tool args, results, metadata, or error details." diff --git a/sample_logs/example.jsonl b/sample_logs/example.jsonl new file mode 100644 index 0000000..14bc6f1 --- /dev/null +++ b/sample_logs/example.jsonl @@ -0,0 +1,9 @@ +{"timestamp": "2026-05-16T10:00:00Z", "session_id": "sess-123", "turn_id": 1, "role": "user", "content": "Hi, my name is Alex Rivera. Please find the nearest urgent care at 123 Main St and call me at +1-555-123-4567. Also email me at alex.rivera@example.com."} +{"timestamp": "2026-05-16T10:00:02Z", "session_id": "sess-123", "turn_id": 2, "role": "assistant", "content": "Checking the nearest urgent care locations now."} +{"timestamp": "2026-05-16T10:00:03Z", "session_id": "sess-123", "turn_id": 3, "tool": "geo_search", "tool_args": "query=urgent care near 123 Main St"} +{"timestamp": "2026-05-16T10:00:04Z", "session_id": "sess-123", "turn_id": 4, "tool_name": "geo_search", "result": "Found urgent care clinic at 125 Main St, 0.3 miles away."} +{"timestamp": "2026-05-16T10:00:05Z", "session_id": "sess-123", "turn_id": 5, "role": "assistant", "content": "I found an urgent care clinic at 125 Main St, 0.3 miles from your location."} +{"timestamp": "2026-05-16T10:05:00Z", "session_id": "sess-234", "turn_id": 1, "role": "user", "content": "Please transfer the invoice to finance@company.com and verify the transaction reference 4111 1111 1111 1111."} +{"timestamp": "2026-05-16T10:05:05Z", "session_id": "sess-234", "turn_id": 2, "tool": "payment_api", "tool_args": "POST https://api.payments.example.com/charge?api_key=secret_token_ABC123"} +{"timestamp": "2026-05-16T10:05:06Z", "session_id": "sess-234", "turn_id": 3, "tool_name": "payment_api", "result": "Payment accepted, confirmation id 3f8f9b8e-4d49-4e00-8c1d-1a4e1dbfe9e4."} +{"timestamp": "2026-05-16T10:05:07Z", "session_id": "sess-234", "turn_id": 4, "role": "assistant", "content": "The payment has been processed and the confirmation id is available."}