Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__pycache__/
*.py[cod]
examples/canonical_events.jsonl
53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,56 @@ The pipeline should treat **export formats** as first-class requirements so the
6. Define **student model** constraints (context length, tool set) and a **filter + eval** plan for teacher-to-student parity before production swap.



---

## Prototype: PR1 (Schema + Parser)

This repository now includes a working prototype for log normalization:

- Canonical event schema: `training_setup_logs/schema.py`
- Parser + JSONL IO: `training_setup_logs/parser.py`
- CLI: `training_setup_logs/cli.py`
- Sample input: `examples/raw_logs_sample.jsonl`
- Tests: `tests/test_parser.py`

### Run parser

```bash
python -m training_setup_logs.cli --input examples/raw_logs_sample.jsonl --output examples/canonical_events.jsonl
```

### Run tests

```bash
pip install -r requirements-dev.txt
pytest -q
```

---

## Prototype: PR2 (Integrity Validators)

This repository now includes integrity guards to reduce data quality regressions:

- Split leakage detection: `training_setup_logs/integrity.py`
- Trajectory consistency validation: `training_setup_logs/trajectory_validator.py`
- Validator CLI: `training_setup_logs/validate_cli.py`

### Validate split leakage

```bash
python -m training_setup_logs.validate_cli validate-splits \
--train examples/train_sample.jsonl \
--val examples/val_sample.jsonl \
--test examples/test_sample.jsonl \
--report examples/split_leakage_report.json
```

### Validate trajectories

```bash
python -m training_setup_logs.validate_cli validate-trajectories \
--input examples/trajectory_valid.jsonl \
--report examples/trajectory_report.json
```
4 changes: 4 additions & 0 deletions examples/raw_logs_sample.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"timestamp":"2026-05-01T10:00:00Z","session_id":"demo-1","role":"user","message":"What is today's mandi price for wheat?"}
{"timestamp":"2026-05-01T10:00:01Z","session_id":"demo-1","type":"tool-call","tool":"mandi_prices","call_id":"call-1","args":{"crop":"wheat","market":"Pune"}}
{"timestamp":"2026-05-01T10:00:01Z","session_id":"demo-1","event_type":"tool_result","tool_name":"mandi_prices","tool_call_id":"call-1","result":{"price_per_quintal":2450}}
{"timestamp":"2026-05-01T10:00:02Z","session_id":"demo-1","role":"assistant","message":"Current wheat mandi price is ?2450/quintal."}
1 change: 1 addition & 0 deletions examples/test_sample.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"prompt":"Crop insurance eligibility","response":"Check PMFBY portal"}
2 changes: 2 additions & 0 deletions examples/train_sample.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"prompt":"What is weather in Pune?","response":"Sunny"}
{"prompt":"Mandi rate of wheat","response":"2500"}
1 change: 1 addition & 0 deletions examples/trajectory_invalid.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"session_id":"s1","event_type":"tool_result","tool_name":"weather","tool_call_id":"ghost","tool_result":{"temp":33}}
4 changes: 4 additions & 0 deletions examples/trajectory_valid.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"session_id":"s1","event_type":"user","content":"Find weather"}
{"session_id":"s1","event_type":"tool_call","tool_name":"weather","tool_call_id":"c1","tool_args":{"district":"Pune"}}
{"session_id":"s1","event_type":"tool_result","tool_name":"weather","tool_call_id":"c1","tool_result":{"temp":33}}
{"session_id":"s1","event_type":"assistant","content":"It is 33C"}
1 change: 1 addition & 0 deletions examples/val_sample.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"prompt":"what is weather in pune","response":"Clear"}
2 changes: 2 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pytest==8.3.5
pydantic==2.11.4
14 changes: 14 additions & 0 deletions tests/test_integrity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from training_setup_logs.integrity import find_split_leakage


def test_find_split_leakage_detects_cross_split_duplicate() -> None:
splits = {
"train": [{"prompt": "What is weather in Pune?"}],
"val": [{"prompt": "what is weather in pune"}],
"test": [{"prompt": "Different question"}],
}

leaks = find_split_leakage(splits)

assert len(leaks) == 1
assert {leaks[0].split_a, leaks[0].split_b} == {"train", "val"}
21 changes: 21 additions & 0 deletions tests/test_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from training_setup_logs.parser import parse_rows


def test_parse_rows_normalizes_mixed_shapes() -> None:
rows = [
{"timestamp": "2026-05-01T10:00:00Z", "session_id": "s1", "role": "user", "message": "Hi"},
{"timestamp": "2026-05-01T10:00:01Z", "session_id": "s1", "event_type": "tool-call", "tool": "search", "call_id": "c1", "args": {"q": "weather"}},
{"timestamp": "2026-05-01T10:00:02Z", "thread_id": "s1", "type": "tool_result", "tool_name": "search", "tool_call_id": "c1", "result": {"ok": True}},
{"timestamp": "2026-05-01T10:00:03Z", "conversation_id": "s1", "kind": "assistant", "text": "Done"},
{"timestamp": "2026-05-01T10:00:03Z", "kind": "assistant", "text": "missing session"},
]

events, summary = parse_rows(rows)

assert summary.input_rows == 5
assert summary.output_events == 4
assert summary.skipped_rows == 1
assert events[0].event_type.value == "user"
assert events[1].event_type.value == "tool_call"
assert events[2].event_type.value == "tool_result"
assert events[3].event_type.value == "assistant"
21 changes: 21 additions & 0 deletions tests/test_trajectory_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from training_setup_logs.trajectory_validator import validate_trajectory


def test_validate_trajectory_flags_orphan_tool_result() -> None:
events = [
{"event_type": "tool_result", "tool_call_id": "ghost"},
]

issues = validate_trajectory(events)
assert len(issues) == 1
assert "without prior tool_call" in issues[0].message


def test_validate_trajectory_accepts_valid_pair() -> None:
events = [
{"event_type": "tool_call", "tool_call_id": "c1"},
{"event_type": "tool_result", "tool_call_id": "c1"},
]

issues = validate_trajectory(events)
assert issues == []
1 change: 1 addition & 0 deletions training_setup_logs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Logs-to-training pipeline package."""
32 changes: 32 additions & 0 deletions training_setup_logs/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from __future__ import annotations

import argparse
from pathlib import Path

from training_setup_logs.parser import parse_rows, read_jsonl, write_jsonl


def main() -> int:
parser = argparse.ArgumentParser(
description="Normalize mixed production logs into canonical training events."
)
parser.add_argument("--input", required=True, help="Input raw log JSONL path")
parser.add_argument("--output", required=True, help="Output canonical JSONL path")
args = parser.parse_args()

input_path = Path(args.input)
output_path = Path(args.output)

rows = read_jsonl(input_path)
events, summary = parse_rows(rows)
write_jsonl(output_path, events)

print(
f"Parsed {summary.input_rows} rows -> {summary.output_events} events "
f"(skipped {summary.skipped_rows})"
)
return 0


if __name__ == "__main__":
raise SystemExit(main())
64 changes: 64 additions & 0 deletions training_setup_logs/integrity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from __future__ import annotations

import hashlib
import re
import unicodedata
from dataclasses import dataclass
from typing import Dict, Iterable, List, Tuple


def normalize_text(text: str) -> str:
text = unicodedata.normalize("NFKC", text).lower()
text = re.sub(r"\s+", " ", text).strip()
text = re.sub(r"[^\w\s]", "", text)
return text


def text_fingerprint(text: str) -> str:
normalized = normalize_text(text)
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()


@dataclass
class LeakageRecord:
key: str
split_a: str
split_b: str
sample_a_idx: int
sample_b_idx: int


def _collect_fingerprints(rows: Iterable[dict], split_name: str) -> Dict[str, Tuple[str, int]]:
fp_map: Dict[str, Tuple[str, int]] = {}
for idx, row in enumerate(rows):
text = row.get("prompt") or row.get("content") or row.get("text") or ""
if not text:
continue
fp = text_fingerprint(text)
if fp not in fp_map:
fp_map[fp] = (split_name, idx)
return fp_map


def find_split_leakage(split_rows: Dict[str, List[dict]]) -> List[LeakageRecord]:
seen: Dict[str, Tuple[str, int]] = {}
leaks: List[LeakageRecord] = []

for split_name, rows in split_rows.items():
current = _collect_fingerprints(rows, split_name)
for key, (cur_split, cur_idx) in current.items():
if key in seen:
prev_split, prev_idx = seen[key]
if prev_split != cur_split:
leaks.append(
LeakageRecord(
key=key,
split_a=prev_split,
split_b=cur_split,
sample_a_idx=prev_idx,
sample_b_idx=cur_idx,
)
)
else:
seen[key] = (cur_split, cur_idx)
return leaks
95 changes: 95 additions & 0 deletions training_setup_logs/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from __future__ import annotations

import json
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple

from training_setup_logs.schema import CanonicalEvent, EventType, ParseSummary


EVENT_ALIASES = {
"user": EventType.USER,
"assistant": EventType.ASSISTANT,
"tool_call": EventType.TOOL_CALL,
"tool_result": EventType.TOOL_RESULT,
"system": EventType.SYSTEM,
"error": EventType.ERROR,
"tool-call": EventType.TOOL_CALL,
"tool-return": EventType.TOOL_RESULT,
}


def _normalized_event_type(row: Dict[str, Any]) -> Optional[EventType]:
raw = (
row.get("event_type")
or row.get("type")
or row.get("role")
or row.get("kind")
or ""
)
if not raw:
return None
return EVENT_ALIASES.get(str(raw).strip().lower())


def _as_event(row: Dict[str, Any]) -> Optional[CanonicalEvent]:
event_type = _normalized_event_type(row)
if event_type is None:
return None

session_id = row.get("session_id") or row.get("thread_id") or row.get("conversation_id")
if not session_id:
return None

return CanonicalEvent(
schema_version="v1",
timestamp=row.get("timestamp") or row.get("ts") or row.get("created_at"),
session_id=str(session_id),
event_type=event_type,
content=row.get("content") or row.get("message") or row.get("text"),
tool_name=row.get("tool_name") or row.get("tool"),
tool_call_id=row.get("tool_call_id") or row.get("call_id"),
tool_args=row.get("tool_args") or row.get("args"),
tool_result=row.get("tool_result") or row.get("result") or row.get("observation"),
error_type=row.get("error_type"),
error_message=row.get("error_message") or row.get("error"),
metadata=row.get("metadata") or {},
)


def parse_rows(rows: Iterable[Dict[str, Any]]) -> Tuple[List[CanonicalEvent], ParseSummary]:
events: List[CanonicalEvent] = []
input_rows = 0
skipped_rows = 0

for row in rows:
input_rows += 1
event = _as_event(row)
if event is None:
skipped_rows += 1
continue
events.append(event)

summary = ParseSummary(
input_rows=input_rows,
output_events=len(events),
skipped_rows=skipped_rows,
)
return events, summary


def read_jsonl(path: Path) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
rows.append(json.loads(line))
return rows


def write_jsonl(path: Path, events: Iterable[CanonicalEvent]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
for event in events:
f.write(event.model_dump_json(exclude_none=True) + "\n")
37 changes: 37 additions & 0 deletions training_setup_logs/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from __future__ import annotations

from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, Optional

from pydantic import BaseModel, Field


class EventType(str, Enum):
USER = "user"
ASSISTANT = "assistant"
TOOL_CALL = "tool_call"
TOOL_RESULT = "tool_result"
SYSTEM = "system"
ERROR = "error"


class CanonicalEvent(BaseModel):
schema_version: str = "v1"
timestamp: str = Field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
session_id: str
event_type: EventType
content: Optional[str] = None
tool_name: Optional[str] = None
tool_call_id: Optional[str] = None
tool_args: Optional[Dict[str, Any]] = None
tool_result: Optional[Any] = None
error_type: Optional[str] = None
error_message: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)


class ParseSummary(BaseModel):
input_rows: int
output_events: int
skipped_rows: int
Loading