From 7bdf23d5e140cc0ab548caae2f83e5dafbef0e9b Mon Sep 17 00:00:00 2001 From: VarshiniGunti Date: Fri, 8 May 2026 19:19:36 +0530 Subject: [PATCH 1/2] feat: add canonical log schema and normalization parser prototype --- .gitignore | 3 ++ README.md | 25 +++++++++ examples/raw_logs_sample.jsonl | 4 ++ requirements-dev.txt | 2 + tests/test_parser.py | 21 ++++++++ training_setup_logs/__init__.py | 1 + training_setup_logs/cli.py | 32 +++++++++++ training_setup_logs/parser.py | 95 +++++++++++++++++++++++++++++++++ training_setup_logs/schema.py | 37 +++++++++++++ 9 files changed, 220 insertions(+) create mode 100644 .gitignore create mode 100644 examples/raw_logs_sample.jsonl create mode 100644 requirements-dev.txt create mode 100644 tests/test_parser.py create mode 100644 training_setup_logs/__init__.py create mode 100644 training_setup_logs/cli.py create mode 100644 training_setup_logs/parser.py create mode 100644 training_setup_logs/schema.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d86882e --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +*.py[cod] +examples/canonical_events.jsonl diff --git a/README.md b/README.md index 5a82102..925089c 100644 --- a/README.md +++ b/README.md @@ -90,3 +90,28 @@ The pipeline should treat **export formats** as first-class requirements so the 6. Define **student model** constraints (context length, tool set) and a **filter + eval** plan for teacher-to-student parity before production swap. + +--- + +## Prototype: PR1 (Schema + Parser) + +This repository now includes a working prototype for log normalization: + +- Canonical event schema: `training_setup_logs/schema.py` +- Parser + JSONL IO: `training_setup_logs/parser.py` +- CLI: `training_setup_logs/cli.py` +- Sample input: `examples/raw_logs_sample.jsonl` +- Tests: `tests/test_parser.py` + +### Run parser + +```bash +python -m training_setup_logs.cli --input examples/raw_logs_sample.jsonl --output examples/canonical_events.jsonl +``` + +### Run tests + +```bash +pip install -r requirements-dev.txt +pytest -q +``` diff --git a/examples/raw_logs_sample.jsonl b/examples/raw_logs_sample.jsonl new file mode 100644 index 0000000..7bbdb84 --- /dev/null +++ b/examples/raw_logs_sample.jsonl @@ -0,0 +1,4 @@ +{"timestamp":"2026-05-01T10:00:00Z","session_id":"demo-1","role":"user","message":"What is today's mandi price for wheat?"} +{"timestamp":"2026-05-01T10:00:01Z","session_id":"demo-1","type":"tool-call","tool":"mandi_prices","call_id":"call-1","args":{"crop":"wheat","market":"Pune"}} +{"timestamp":"2026-05-01T10:00:01Z","session_id":"demo-1","event_type":"tool_result","tool_name":"mandi_prices","tool_call_id":"call-1","result":{"price_per_quintal":2450}} +{"timestamp":"2026-05-01T10:00:02Z","session_id":"demo-1","role":"assistant","message":"Current wheat mandi price is ?2450/quintal."} diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..4a727b5 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,2 @@ +pytest==8.3.5 +pydantic==2.11.4 diff --git a/tests/test_parser.py b/tests/test_parser.py new file mode 100644 index 0000000..03e385c --- /dev/null +++ b/tests/test_parser.py @@ -0,0 +1,21 @@ +from training_setup_logs.parser import parse_rows + + +def test_parse_rows_normalizes_mixed_shapes() -> None: + rows = [ + {"timestamp": "2026-05-01T10:00:00Z", "session_id": "s1", "role": "user", "message": "Hi"}, + {"timestamp": "2026-05-01T10:00:01Z", "session_id": "s1", "event_type": "tool-call", "tool": "search", "call_id": "c1", "args": {"q": "weather"}}, + {"timestamp": "2026-05-01T10:00:02Z", "thread_id": "s1", "type": "tool_result", "tool_name": "search", "tool_call_id": "c1", "result": {"ok": True}}, + {"timestamp": "2026-05-01T10:00:03Z", "conversation_id": "s1", "kind": "assistant", "text": "Done"}, + {"timestamp": "2026-05-01T10:00:03Z", "kind": "assistant", "text": "missing session"}, + ] + + events, summary = parse_rows(rows) + + assert summary.input_rows == 5 + assert summary.output_events == 4 + assert summary.skipped_rows == 1 + assert events[0].event_type.value == "user" + assert events[1].event_type.value == "tool_call" + assert events[2].event_type.value == "tool_result" + assert events[3].event_type.value == "assistant" diff --git a/training_setup_logs/__init__.py b/training_setup_logs/__init__.py new file mode 100644 index 0000000..63e272a --- /dev/null +++ b/training_setup_logs/__init__.py @@ -0,0 +1 @@ +"""Logs-to-training pipeline package.""" diff --git a/training_setup_logs/cli.py b/training_setup_logs/cli.py new file mode 100644 index 0000000..ad8ffad --- /dev/null +++ b/training_setup_logs/cli.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import argparse +from pathlib import Path + +from training_setup_logs.parser import parse_rows, read_jsonl, write_jsonl + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Normalize mixed production logs into canonical training events." + ) + parser.add_argument("--input", required=True, help="Input raw log JSONL path") + parser.add_argument("--output", required=True, help="Output canonical JSONL path") + args = parser.parse_args() + + input_path = Path(args.input) + output_path = Path(args.output) + + rows = read_jsonl(input_path) + events, summary = parse_rows(rows) + write_jsonl(output_path, events) + + print( + f"Parsed {summary.input_rows} rows -> {summary.output_events} events " + f"(skipped {summary.skipped_rows})" + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/training_setup_logs/parser.py b/training_setup_logs/parser.py new file mode 100644 index 0000000..b2de891 --- /dev/null +++ b/training_setup_logs/parser.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Tuple + +from training_setup_logs.schema import CanonicalEvent, EventType, ParseSummary + + +EVENT_ALIASES = { + "user": EventType.USER, + "assistant": EventType.ASSISTANT, + "tool_call": EventType.TOOL_CALL, + "tool_result": EventType.TOOL_RESULT, + "system": EventType.SYSTEM, + "error": EventType.ERROR, + "tool-call": EventType.TOOL_CALL, + "tool-return": EventType.TOOL_RESULT, +} + + +def _normalized_event_type(row: Dict[str, Any]) -> Optional[EventType]: + raw = ( + row.get("event_type") + or row.get("type") + or row.get("role") + or row.get("kind") + or "" + ) + if not raw: + return None + return EVENT_ALIASES.get(str(raw).strip().lower()) + + +def _as_event(row: Dict[str, Any]) -> Optional[CanonicalEvent]: + event_type = _normalized_event_type(row) + if event_type is None: + return None + + session_id = row.get("session_id") or row.get("thread_id") or row.get("conversation_id") + if not session_id: + return None + + return CanonicalEvent( + schema_version="v1", + timestamp=row.get("timestamp") or row.get("ts") or row.get("created_at"), + session_id=str(session_id), + event_type=event_type, + content=row.get("content") or row.get("message") or row.get("text"), + tool_name=row.get("tool_name") or row.get("tool"), + tool_call_id=row.get("tool_call_id") or row.get("call_id"), + tool_args=row.get("tool_args") or row.get("args"), + tool_result=row.get("tool_result") or row.get("result") or row.get("observation"), + error_type=row.get("error_type"), + error_message=row.get("error_message") or row.get("error"), + metadata=row.get("metadata") or {}, + ) + + +def parse_rows(rows: Iterable[Dict[str, Any]]) -> Tuple[List[CanonicalEvent], ParseSummary]: + events: List[CanonicalEvent] = [] + input_rows = 0 + skipped_rows = 0 + + for row in rows: + input_rows += 1 + event = _as_event(row) + if event is None: + skipped_rows += 1 + continue + events.append(event) + + summary = ParseSummary( + input_rows=input_rows, + output_events=len(events), + skipped_rows=skipped_rows, + ) + return events, summary + + +def read_jsonl(path: Path) -> List[Dict[str, Any]]: + rows: List[Dict[str, Any]] = [] + for line in path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line: + continue + rows.append(json.loads(line)) + return rows + + +def write_jsonl(path: Path, events: Iterable[CanonicalEvent]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as f: + for event in events: + f.write(event.model_dump_json(exclude_none=True) + "\n") diff --git a/training_setup_logs/schema.py b/training_setup_logs/schema.py new file mode 100644 index 0000000..c674f23 --- /dev/null +++ b/training_setup_logs/schema.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from enum import Enum +from typing import Any, Dict, Optional + +from pydantic import BaseModel, Field + + +class EventType(str, Enum): + USER = "user" + ASSISTANT = "assistant" + TOOL_CALL = "tool_call" + TOOL_RESULT = "tool_result" + SYSTEM = "system" + ERROR = "error" + + +class CanonicalEvent(BaseModel): + schema_version: str = "v1" + timestamp: str = Field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) + session_id: str + event_type: EventType + content: Optional[str] = None + tool_name: Optional[str] = None + tool_call_id: Optional[str] = None + tool_args: Optional[Dict[str, Any]] = None + tool_result: Optional[Any] = None + error_type: Optional[str] = None + error_message: Optional[str] = None + metadata: Dict[str, Any] = Field(default_factory=dict) + + +class ParseSummary(BaseModel): + input_rows: int + output_events: int + skipped_rows: int From 9188ef3d5d71cb9b379a25f52a2b640cc6be8746 Mon Sep 17 00:00:00 2001 From: VarshiniGunti Date: Fri, 8 May 2026 19:25:36 +0530 Subject: [PATCH 2/2] feat: add split leakage and trajectory consistency validators --- README.md | 28 +++++++ examples/test_sample.jsonl | 1 + examples/train_sample.jsonl | 2 + examples/trajectory_invalid.jsonl | 1 + examples/trajectory_valid.jsonl | 4 + examples/val_sample.jsonl | 1 + tests/test_integrity.py | 14 ++++ tests/test_trajectory_validator.py | 21 +++++ training_setup_logs/integrity.py | 64 ++++++++++++++++ training_setup_logs/trajectory_validator.py | 39 ++++++++++ training_setup_logs/validate_cli.py | 85 +++++++++++++++++++++ 11 files changed, 260 insertions(+) create mode 100644 examples/test_sample.jsonl create mode 100644 examples/train_sample.jsonl create mode 100644 examples/trajectory_invalid.jsonl create mode 100644 examples/trajectory_valid.jsonl create mode 100644 examples/val_sample.jsonl create mode 100644 tests/test_integrity.py create mode 100644 tests/test_trajectory_validator.py create mode 100644 training_setup_logs/integrity.py create mode 100644 training_setup_logs/trajectory_validator.py create mode 100644 training_setup_logs/validate_cli.py diff --git a/README.md b/README.md index 925089c..a00dc4d 100644 --- a/README.md +++ b/README.md @@ -115,3 +115,31 @@ python -m training_setup_logs.cli --input examples/raw_logs_sample.jsonl --outpu pip install -r requirements-dev.txt pytest -q ``` + +--- + +## Prototype: PR2 (Integrity Validators) + +This repository now includes integrity guards to reduce data quality regressions: + +- Split leakage detection: `training_setup_logs/integrity.py` +- Trajectory consistency validation: `training_setup_logs/trajectory_validator.py` +- Validator CLI: `training_setup_logs/validate_cli.py` + +### Validate split leakage + +```bash +python -m training_setup_logs.validate_cli validate-splits \ + --train examples/train_sample.jsonl \ + --val examples/val_sample.jsonl \ + --test examples/test_sample.jsonl \ + --report examples/split_leakage_report.json +``` + +### Validate trajectories + +```bash +python -m training_setup_logs.validate_cli validate-trajectories \ + --input examples/trajectory_valid.jsonl \ + --report examples/trajectory_report.json +``` diff --git a/examples/test_sample.jsonl b/examples/test_sample.jsonl new file mode 100644 index 0000000..eeb441a --- /dev/null +++ b/examples/test_sample.jsonl @@ -0,0 +1 @@ +{"prompt":"Crop insurance eligibility","response":"Check PMFBY portal"} diff --git a/examples/train_sample.jsonl b/examples/train_sample.jsonl new file mode 100644 index 0000000..34a63ab --- /dev/null +++ b/examples/train_sample.jsonl @@ -0,0 +1,2 @@ +{"prompt":"What is weather in Pune?","response":"Sunny"} +{"prompt":"Mandi rate of wheat","response":"2500"} diff --git a/examples/trajectory_invalid.jsonl b/examples/trajectory_invalid.jsonl new file mode 100644 index 0000000..e13c539 --- /dev/null +++ b/examples/trajectory_invalid.jsonl @@ -0,0 +1 @@ +{"session_id":"s1","event_type":"tool_result","tool_name":"weather","tool_call_id":"ghost","tool_result":{"temp":33}} diff --git a/examples/trajectory_valid.jsonl b/examples/trajectory_valid.jsonl new file mode 100644 index 0000000..0e79275 --- /dev/null +++ b/examples/trajectory_valid.jsonl @@ -0,0 +1,4 @@ +{"session_id":"s1","event_type":"user","content":"Find weather"} +{"session_id":"s1","event_type":"tool_call","tool_name":"weather","tool_call_id":"c1","tool_args":{"district":"Pune"}} +{"session_id":"s1","event_type":"tool_result","tool_name":"weather","tool_call_id":"c1","tool_result":{"temp":33}} +{"session_id":"s1","event_type":"assistant","content":"It is 33C"} diff --git a/examples/val_sample.jsonl b/examples/val_sample.jsonl new file mode 100644 index 0000000..73d3793 --- /dev/null +++ b/examples/val_sample.jsonl @@ -0,0 +1 @@ +{"prompt":"what is weather in pune","response":"Clear"} diff --git a/tests/test_integrity.py b/tests/test_integrity.py new file mode 100644 index 0000000..f2dce45 --- /dev/null +++ b/tests/test_integrity.py @@ -0,0 +1,14 @@ +from training_setup_logs.integrity import find_split_leakage + + +def test_find_split_leakage_detects_cross_split_duplicate() -> None: + splits = { + "train": [{"prompt": "What is weather in Pune?"}], + "val": [{"prompt": "what is weather in pune"}], + "test": [{"prompt": "Different question"}], + } + + leaks = find_split_leakage(splits) + + assert len(leaks) == 1 + assert {leaks[0].split_a, leaks[0].split_b} == {"train", "val"} diff --git a/tests/test_trajectory_validator.py b/tests/test_trajectory_validator.py new file mode 100644 index 0000000..c5b027c --- /dev/null +++ b/tests/test_trajectory_validator.py @@ -0,0 +1,21 @@ +from training_setup_logs.trajectory_validator import validate_trajectory + + +def test_validate_trajectory_flags_orphan_tool_result() -> None: + events = [ + {"event_type": "tool_result", "tool_call_id": "ghost"}, + ] + + issues = validate_trajectory(events) + assert len(issues) == 1 + assert "without prior tool_call" in issues[0].message + + +def test_validate_trajectory_accepts_valid_pair() -> None: + events = [ + {"event_type": "tool_call", "tool_call_id": "c1"}, + {"event_type": "tool_result", "tool_call_id": "c1"}, + ] + + issues = validate_trajectory(events) + assert issues == [] diff --git a/training_setup_logs/integrity.py b/training_setup_logs/integrity.py new file mode 100644 index 0000000..0004643 --- /dev/null +++ b/training_setup_logs/integrity.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import hashlib +import re +import unicodedata +from dataclasses import dataclass +from typing import Dict, Iterable, List, Tuple + + +def normalize_text(text: str) -> str: + text = unicodedata.normalize("NFKC", text).lower() + text = re.sub(r"\s+", " ", text).strip() + text = re.sub(r"[^\w\s]", "", text) + return text + + +def text_fingerprint(text: str) -> str: + normalized = normalize_text(text) + return hashlib.sha256(normalized.encode("utf-8")).hexdigest() + + +@dataclass +class LeakageRecord: + key: str + split_a: str + split_b: str + sample_a_idx: int + sample_b_idx: int + + +def _collect_fingerprints(rows: Iterable[dict], split_name: str) -> Dict[str, Tuple[str, int]]: + fp_map: Dict[str, Tuple[str, int]] = {} + for idx, row in enumerate(rows): + text = row.get("prompt") or row.get("content") or row.get("text") or "" + if not text: + continue + fp = text_fingerprint(text) + if fp not in fp_map: + fp_map[fp] = (split_name, idx) + return fp_map + + +def find_split_leakage(split_rows: Dict[str, List[dict]]) -> List[LeakageRecord]: + seen: Dict[str, Tuple[str, int]] = {} + leaks: List[LeakageRecord] = [] + + for split_name, rows in split_rows.items(): + current = _collect_fingerprints(rows, split_name) + for key, (cur_split, cur_idx) in current.items(): + if key in seen: + prev_split, prev_idx = seen[key] + if prev_split != cur_split: + leaks.append( + LeakageRecord( + key=key, + split_a=prev_split, + split_b=cur_split, + sample_a_idx=prev_idx, + sample_b_idx=cur_idx, + ) + ) + else: + seen[key] = (cur_split, cur_idx) + return leaks diff --git a/training_setup_logs/trajectory_validator.py b/training_setup_logs/trajectory_validator.py new file mode 100644 index 0000000..ccdb60a --- /dev/null +++ b/training_setup_logs/trajectory_validator.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict, List + + +@dataclass +class TrajectoryIssue: + index: int + message: str + + +def validate_trajectory(events: List[dict]) -> List[TrajectoryIssue]: + issues: List[TrajectoryIssue] = [] + pending_calls: Dict[str, int] = {} + + for idx, event in enumerate(events): + event_type = event.get("event_type") + + if event_type == "tool_call": + call_id = event.get("tool_call_id") + if not call_id: + issues.append(TrajectoryIssue(index=idx, message="tool_call missing tool_call_id")) + else: + pending_calls[call_id] = idx + + elif event_type == "tool_result": + call_id = event.get("tool_call_id") + if not call_id: + issues.append(TrajectoryIssue(index=idx, message="tool_result missing tool_call_id")) + elif call_id not in pending_calls: + issues.append( + TrajectoryIssue( + index=idx, + message=f"tool_result without prior tool_call: {call_id}", + ) + ) + + return issues diff --git a/training_setup_logs/validate_cli.py b/training_setup_logs/validate_cli.py new file mode 100644 index 0000000..36e40a0 --- /dev/null +++ b/training_setup_logs/validate_cli.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +import argparse +import json +from pathlib import Path +from typing import Dict, List + +from training_setup_logs.integrity import find_split_leakage +from training_setup_logs.trajectory_validator import validate_trajectory + + +def _read_json(path: Path): + with path.open("r", encoding="utf-8") as f: + return json.load(f) + + +def _read_jsonl(path: Path) -> List[dict]: + rows: List[dict] = [] + for line in path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line: + continue + rows.append(json.loads(line)) + return rows + + +def cmd_validate_splits(args: argparse.Namespace) -> int: + split_rows: Dict[str, List[dict]] = { + "train": _read_jsonl(Path(args.train)), + "val": _read_jsonl(Path(args.val)), + "test": _read_jsonl(Path(args.test)), + } + leaks = find_split_leakage(split_rows) + report = { + "leak_count": len(leaks), + "leaks": [ + { + "fingerprint": leak.key, + "split_a": leak.split_a, + "split_b": leak.split_b, + "sample_a_idx": leak.sample_a_idx, + "sample_b_idx": leak.sample_b_idx, + } + for leak in leaks + ], + } + Path(args.report).write_text(json.dumps(report, indent=2), encoding="utf-8") + print(f"Split leakage report written: {args.report} (count={len(leaks)})") + return 1 if leaks else 0 + + +def cmd_validate_trajectory(args: argparse.Namespace) -> int: + events = _read_jsonl(Path(args.input)) + issues = validate_trajectory(events) + report = { + "issue_count": len(issues), + "issues": [{"index": issue.index, "message": issue.message} for issue in issues], + } + Path(args.report).write_text(json.dumps(report, indent=2), encoding="utf-8") + print(f"Trajectory report written: {args.report} (issues={len(issues)})") + return 1 if issues else 0 + + +def main() -> int: + parser = argparse.ArgumentParser(description="Data integrity validators for training datasets") + sub = parser.add_subparsers(dest="command", required=True) + + split_parser = sub.add_parser("validate-splits", help="Detect near-duplicate leakage across splits") + split_parser.add_argument("--train", required=True) + split_parser.add_argument("--val", required=True) + split_parser.add_argument("--test", required=True) + split_parser.add_argument("--report", required=True) + split_parser.set_defaults(func=cmd_validate_splits) + + traj_parser = sub.add_parser("validate-trajectories", help="Validate tool call/result consistency") + traj_parser.add_argument("--input", required=True) + traj_parser.add_argument("--report", required=True) + traj_parser.set_defaults(func=cmd_validate_trajectory) + + args = parser.parse_args() + return args.func(args) + + +if __name__ == "__main__": + raise SystemExit(main())