Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,21 @@ The pipeline should treat **export formats** as first-class requirements so the
5. Specify **one SFT JSONL schema** and **one DPO JSONL schema** (and chat template) validated end-to-end with a **LoRA** dry run and a **small DPO** dry run on toy data.
6. Define **student model** constraints (context length, tool set) and a **filter + eval** plan for teacher-to-student parity before production swap.

---

## Prototype contribution

A minimal privacy-preserving prototype has been added in `pipeline/`:

- `pipeline/pii_redactor.py`: rule-based detection and placeholder replacement for email, phone, URL token, UUID, credit card, SSN, and IP address patterns.
- `pipeline/normalize.py`: canonical event normalization for user, assistant, tool, and tool_result log entries.
- `pipeline/cli.py`: sample runner that audits PII, applies redaction, and exports `sft_ready.jsonl` and `dpo_ready.jsonl`.
- `sample_logs/example.jsonl`: representative log sample containing both Q&A and agentic tool workflow traces.

Run the prototype from the repo root:

```bash
python pipeline/cli.py sample_logs/example.jsonl output
```


24 changes: 24 additions & 0 deletions pipeline/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# PII Redaction Pipeline

This pipeline demonstrates a privacy-preserving preprocessing flow for sample production logs.

## What it does

- Normalizes heterogeneous log entries into a canonical event schema.
- Detects and redacts PII categories such as emails, phone numbers, URLs with tokens, credit card numbers, and UUIDs.
- Produces SFT-ready JSONL and DPO-ready JSONL exports from redacted events.

## Usage

From the repository root:

```bash
cd training_setup_logs
python pipeline/cli.py sample_logs/example.jsonl output
```

This writes:

- `output/sft_ready.jsonl`
- `output/dpo_ready.jsonl`
- `output/redacted_events.jsonl`
1 change: 1 addition & 0 deletions pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Pipeline package for log normalization and PII sanitization."""
Binary file added pipeline/__pycache__/__init__.cpython-314.pyc
Binary file not shown.
Binary file added pipeline/__pycache__/cli.cpython-314.pyc
Binary file not shown.
Binary file added pipeline/__pycache__/normalize.cpython-314.pyc
Binary file not shown.
Binary file added pipeline/__pycache__/pii_redactor.cpython-314.pyc
Binary file not shown.
79 changes: 79 additions & 0 deletions pipeline/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import argparse
import json
from pathlib import Path
from typing import Any, Dict, List

from pipeline.normalize import build_dpo_example, build_sft_example, normalize_log_entry
from pipeline.pii_redactor import audit_json, redact_json


def read_jsonl(path: Path) -> List[Dict[str, Any]]:
records: List[Dict[str, Any]] = []
with path.open("r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if not line:
continue
records.append(json.loads(line))
return records


def write_jsonl(path: Path, rows: List[Dict[str, Any]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row, ensure_ascii=False) + "\n")


def load_and_process(path: Path) -> List[Dict[str, Any]]:
raw_entries = read_jsonl(path)
events: List[Dict[str, Any]] = []
for raw in raw_entries:
normalized = normalize_log_entry(raw)
if normalized["event_type"] == "tool" and "tool_result" in normalized["payload"]:
normalized["event_type"] = "tool_result"
events.append(normalized)
return events


def main() -> None:
parser = argparse.ArgumentParser(
description="Run a small pipeline to normalize logs, audit/redact PII, and emit SFT/DPO JSONL."
)
parser.add_argument("input", type=Path, help="Input JSONL sample log path")
parser.add_argument("output_dir", type=Path, help="Output directory for redacted data")
args = parser.parse_args()

events = load_and_process(args.input)

audit_findings = audit_json(events)
if audit_findings:
print(f"Detected {len(audit_findings)} PII items before redaction.")
for finding in audit_findings:
print(f"- {finding['path']}: {finding['category']} -> {finding['match']}")
else:
print("No PII detected in the sample input.")

redacted_events = redact_json(events)
sft = [build_sft_example(redacted_events)]
dpo = [
build_dpo_example(
[e for e in redacted_events if e["event_type"] in {"user", "assistant", "system"}],
[
{
"event_type": "assistant",
"payload": {"text": "I cannot help with that request."},
}
],
)
]

write_jsonl(args.output_dir / "sft_ready.jsonl", sft)
write_jsonl(args.output_dir / "dpo_ready.jsonl", dpo)
write_jsonl(args.output_dir / "redacted_events.jsonl", redacted_events)

print(f"Wrote {len(sft)} SFT example(s) and {len(dpo)} DPO example(s) to {args.output_dir}")


if __name__ == "__main__":
main()
113 changes: 113 additions & 0 deletions pipeline/normalize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import json
from typing import Any, Dict, List


def normalize_log_entry(raw: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize heterogeneous log entries into a canonical event schema."""
event_type = raw.get("event_type")
if not event_type:
role = raw.get("role")
if role == "user":
event_type = "user"
elif role in {"assistant", "system"}:
event_type = "assistant" if role == "assistant" else "system"
elif "tool_name" in raw or raw.get("tool"):
event_type = "tool"
else:
event_type = raw.get("type", "unknown")

payload: Dict[str, Any] = {}
if "content" in raw:
payload["text"] = raw["content"]
if "role" in raw:
payload["role"] = raw["role"]
if "tool_name" in raw:
payload["tool_name"] = raw["tool_name"]
if "tool" in raw:
payload["tool_name"] = raw["tool"]
if "tool_args" in raw:
payload["tool_args"] = raw["tool_args"]
if "result" in raw:
payload["tool_result"] = raw["result"]
if "metadata" in raw:
payload["metadata"] = raw["metadata"]
if "error" in raw:
payload["error"] = raw["error"]

normalized = {
"session_id": raw.get("session_id", raw.get("conversation_id", "unknown-session")),
"timestamp": raw.get("timestamp"),
"turn_id": raw.get("turn_id", raw.get("index")),
"event_type": event_type,
"payload": payload,
}
return normalized


def build_sft_example(events: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Build a simple SFT chat example from normalized events."""
messages = []
for event in events:
if event["event_type"] in {"user", "assistant", "system"}:
role = event["payload"].get("role", event["event_type"])
text = event["payload"].get("text", "")
if text:
messages.append({"role": role, "content": text})
elif event["event_type"] == "tool":
messages.append(
{
"role": "tool",
"name": event["payload"].get("tool_name", "unknown_tool"),
"content": event["payload"].get("tool_args", ""),
}
)
elif event["event_type"] == "tool_result":
messages.append(
{
"role": "assistant",
"name": event["payload"].get("tool_name", "unknown_tool"),
"content": event["payload"].get("tool_result", ""),
}
)

return {
"session_id": events[0].get("session_id", "unknown-session") if events else "unknown-session",
"messages": messages,
"metadata": {
"trajectory_length": len(events),
"has_tool_usage": any(e["event_type"] in {"tool", "tool_result"} for e in events),
},
}


def build_dpo_example(events: List[Dict[str, Any]], rejected: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Build a DPO-style prompt/completion pair from event trajectories."""
prompt_messages = []
for event in events:
if event["event_type"] == "user":
prompt_messages.append({"role": "user", "content": event["payload"].get("text", "")})
elif event["event_type"] == "assistant" and event["payload"].get("role") == "system":
prompt_messages.append({"role": "system", "content": event["payload"].get("text", "")})

chosen = "\n".join(
event["payload"].get("text", "")
for event in events
if event["event_type"] == "assistant"
)
rejected_texts = []
for event in rejected:
if event["event_type"] == "assistant":
rejected_texts.append(event["payload"].get("text", ""))
rejected = "\n".join(rejected_texts)

return {
"session_id": events[0].get("session_id", "unknown-session") if events else "unknown-session",
"prompt": prompt_messages,
"chosen": chosen,
"rejected": rejected,
"metadata": {
"prefix_length": len(prompt_messages),
"chosen_length": len(chosen.split()),
"rejected_length": len(rejected.split()),
},
}
122 changes: 122 additions & 0 deletions pipeline/pii_redactor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import re
from typing import Any, Dict, Iterable, List, Tuple, Union

REDACTION_PATTERNS = [
(
"EMAIL",
re.compile(r"(?i)\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b"),
),
(
"CREDIT_CARD",
re.compile(r"\b(?:\d[ -]*?){13,16}\b"),
),
(
"PHONE",
re.compile(
r"(?x)(?:\+?\d{1,3}[\s-]?)?(?:\(\d{2,4}\)|\d{2,4})[\s-]?\d{3,4}[\s-]?\d{3,4}\b"
),
),
(
"SSN",
re.compile(r"\b\d{3}-\d{2}-\d{4}\b"),
),
(
"UUID",
re.compile(
r"\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\b"
),
),
(
"URL_TOKEN",
re.compile(
r"\bhttps?://[^\s]*?(?:token|secret|api[_-]?key|access[_-]?token|auth)[^\s]*\b",
re.IGNORECASE,
),
),
(
"URL",
re.compile(r"\bhttps?://[^\s]+\b", re.IGNORECASE),
),
(
"IP_ADDRESS",
re.compile(
r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b"
),
),
]

PLACEHOLDER_MAP = {
"EMAIL": "<EMAIL>",
"PHONE": "<PHONE>",
"CREDIT_CARD": "<CREDIT_CARD>",
"SSN": "<SSN>",
"UUID": "<UUID>",
"URL_TOKEN": "<URL>",
"URL": "<URL>",
"IP_ADDRESS": "<IP_ADDRESS>",
}


def redact_text(text: str) -> str:
"""Redact known PII patterns from a text string."""
redacted = text
for category, pattern in REDACTION_PATTERNS:
redacted = pattern.sub(PLACEHOLDER_MAP[category], redacted)
return redacted


def spans_overlap(span_a: Tuple[int, int], span_b: Tuple[int, int]) -> bool:
start_a, end_a = span_a
start_b, end_b = span_b
return not (end_a <= start_b or end_b <= start_a)


def find_pii(text: str) -> List[Tuple[str, str]]:
"""Return a list of detected PII category matches in text."""
matches: List[Tuple[str, str]] = []
seen_spans: List[Tuple[int, int]] = []
for category, pattern in REDACTION_PATTERNS:
for match in pattern.finditer(text):
span = match.span()
if any(spans_overlap(span, seen) for seen in seen_spans):
continue
seen_spans.append(span)
matches.append((category, match.group(0)))
return matches


def redact_json(obj: Any) -> Any:
"""Recursively redact PII in JSON-like objects."""
if isinstance(obj, str):
return redact_text(obj)
if isinstance(obj, dict):
return {key: redact_json(value) for key, value in obj.items()}
if isinstance(obj, list):
return [redact_json(item) for item in obj]
return obj


def audit_json(obj: Any, path: str = "") -> List[Dict[str, Any]]:
"""Collect detected PII items in a nested JSON object for audit review."""
findings: List[Dict[str, Any]] = []
if isinstance(obj, str):
seen_spans: List[Tuple[int, int]] = []
for category, pattern in REDACTION_PATTERNS:
for match in pattern.finditer(obj):
span = match.span()
if any(spans_overlap(span, seen) for seen in seen_spans):
continue
seen_spans.append(span)
findings.append(
{"path": path, "category": category, "match": match.group(0)}
)
return findings
if isinstance(obj, dict):
for key, value in obj.items():
findings.extend(audit_json(value, f"{path}.{key}" if path else key))
return findings
if isinstance(obj, list):
for index, item in enumerate(obj):
findings.extend(audit_json(item, f"{path}[{index}]") )
return findings
return findings
27 changes: 27 additions & 0 deletions pipeline/pii_taxonomy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
pii_taxonomy:
description: "List of PII categories detected and replaced by the pipeline."
categories:
- name: EMAIL
description: "Email addresses or email-like identifiers."
example: "user@example.com"
- name: PHONE
description: "Phone numbers or voice contact strings."
example: "+1-555-123-4567"
- name: CREDIT_CARD
description: "Credit/debit card numbers or card-like digit sequences."
example: "4111 1111 1111 1111"
- name: SSN
description: "Social Security numbers or national ID numbers in common numeric format."
example: "123-45-6789"
- name: UUID
description: "Standard UUID/GUID values used for tokens, request IDs, or session IDs."
example: "3f8f9b8e-4d49-4e00-8c1d-1a4e1dbfe9e4"
- name: URL_TOKEN
description: "URLs containing auth tokens, API keys, or secrets in the query string or path."
example: "https://api.example.com/data?api_key=abcd1234"
- name: URL
description: "Generic HTTP(S) URLs that may contain sensitive endpoints or links."
example: "https://example.com/resource"
- name: IP_ADDRESS
description: "IPv4 addresses found in logs or runtime traces."
example: "192.168.1.100"
Loading