Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__pycache__/
*.py[cod]
examples/canonical_events.jsonl
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,28 @@ The pipeline should treat **export formats** as first-class requirements so the
6. Define **student model** constraints (context length, tool set) and a **filter + eval** plan for teacher-to-student parity before production swap.



---

## Prototype: PR1 (Schema + Parser)

This repository now includes a working prototype for log normalization:

- Canonical event schema: `training_setup_logs/schema.py`
- Parser + JSONL IO: `training_setup_logs/parser.py`
- CLI: `training_setup_logs/cli.py`
- Sample input: `examples/raw_logs_sample.jsonl`
- Tests: `tests/test_parser.py`

### Run parser

```bash
python -m training_setup_logs.cli --input examples/raw_logs_sample.jsonl --output examples/canonical_events.jsonl
```

### Run tests

```bash
pip install -r requirements-dev.txt
pytest -q
```
4 changes: 4 additions & 0 deletions examples/raw_logs_sample.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"timestamp":"2026-05-01T10:00:00Z","session_id":"demo-1","role":"user","message":"What is today's mandi price for wheat?"}
{"timestamp":"2026-05-01T10:00:01Z","session_id":"demo-1","type":"tool-call","tool":"mandi_prices","call_id":"call-1","args":{"crop":"wheat","market":"Pune"}}
{"timestamp":"2026-05-01T10:00:01Z","session_id":"demo-1","event_type":"tool_result","tool_name":"mandi_prices","tool_call_id":"call-1","result":{"price_per_quintal":2450}}
{"timestamp":"2026-05-01T10:00:02Z","session_id":"demo-1","role":"assistant","message":"Current wheat mandi price is ?2450/quintal."}
2 changes: 2 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pytest==8.3.5
pydantic==2.11.4
21 changes: 21 additions & 0 deletions tests/test_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from training_setup_logs.parser import parse_rows


def test_parse_rows_normalizes_mixed_shapes() -> None:
rows = [
{"timestamp": "2026-05-01T10:00:00Z", "session_id": "s1", "role": "user", "message": "Hi"},
{"timestamp": "2026-05-01T10:00:01Z", "session_id": "s1", "event_type": "tool-call", "tool": "search", "call_id": "c1", "args": {"q": "weather"}},
{"timestamp": "2026-05-01T10:00:02Z", "thread_id": "s1", "type": "tool_result", "tool_name": "search", "tool_call_id": "c1", "result": {"ok": True}},
{"timestamp": "2026-05-01T10:00:03Z", "conversation_id": "s1", "kind": "assistant", "text": "Done"},
{"timestamp": "2026-05-01T10:00:03Z", "kind": "assistant", "text": "missing session"},
]

events, summary = parse_rows(rows)

assert summary.input_rows == 5
assert summary.output_events == 4
assert summary.skipped_rows == 1
assert events[0].event_type.value == "user"
assert events[1].event_type.value == "tool_call"
assert events[2].event_type.value == "tool_result"
assert events[3].event_type.value == "assistant"
1 change: 1 addition & 0 deletions training_setup_logs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Logs-to-training pipeline package."""
32 changes: 32 additions & 0 deletions training_setup_logs/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from __future__ import annotations

import argparse
from pathlib import Path

from training_setup_logs.parser import parse_rows, read_jsonl, write_jsonl


def main() -> int:
parser = argparse.ArgumentParser(
description="Normalize mixed production logs into canonical training events."
)
parser.add_argument("--input", required=True, help="Input raw log JSONL path")
parser.add_argument("--output", required=True, help="Output canonical JSONL path")
args = parser.parse_args()

input_path = Path(args.input)
output_path = Path(args.output)

rows = read_jsonl(input_path)
events, summary = parse_rows(rows)
write_jsonl(output_path, events)

print(
f"Parsed {summary.input_rows} rows -> {summary.output_events} events "
f"(skipped {summary.skipped_rows})"
)
return 0


if __name__ == "__main__":
raise SystemExit(main())
95 changes: 95 additions & 0 deletions training_setup_logs/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from __future__ import annotations

import json
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple

from training_setup_logs.schema import CanonicalEvent, EventType, ParseSummary


EVENT_ALIASES = {
"user": EventType.USER,
"assistant": EventType.ASSISTANT,
"tool_call": EventType.TOOL_CALL,
"tool_result": EventType.TOOL_RESULT,
"system": EventType.SYSTEM,
"error": EventType.ERROR,
"tool-call": EventType.TOOL_CALL,
"tool-return": EventType.TOOL_RESULT,
}


def _normalized_event_type(row: Dict[str, Any]) -> Optional[EventType]:
raw = (
row.get("event_type")
or row.get("type")
or row.get("role")
or row.get("kind")
or ""
)
if not raw:
return None
return EVENT_ALIASES.get(str(raw).strip().lower())


def _as_event(row: Dict[str, Any]) -> Optional[CanonicalEvent]:
event_type = _normalized_event_type(row)
if event_type is None:
return None

session_id = row.get("session_id") or row.get("thread_id") or row.get("conversation_id")
if not session_id:
return None

return CanonicalEvent(
schema_version="v1",
timestamp=row.get("timestamp") or row.get("ts") or row.get("created_at"),
session_id=str(session_id),
event_type=event_type,
content=row.get("content") or row.get("message") or row.get("text"),
tool_name=row.get("tool_name") or row.get("tool"),
tool_call_id=row.get("tool_call_id") or row.get("call_id"),
tool_args=row.get("tool_args") or row.get("args"),
tool_result=row.get("tool_result") or row.get("result") or row.get("observation"),
error_type=row.get("error_type"),
error_message=row.get("error_message") or row.get("error"),
metadata=row.get("metadata") or {},
)


def parse_rows(rows: Iterable[Dict[str, Any]]) -> Tuple[List[CanonicalEvent], ParseSummary]:
events: List[CanonicalEvent] = []
input_rows = 0
skipped_rows = 0

for row in rows:
input_rows += 1
event = _as_event(row)
if event is None:
skipped_rows += 1
continue
events.append(event)

summary = ParseSummary(
input_rows=input_rows,
output_events=len(events),
skipped_rows=skipped_rows,
)
return events, summary


def read_jsonl(path: Path) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
rows.append(json.loads(line))
return rows


def write_jsonl(path: Path, events: Iterable[CanonicalEvent]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
for event in events:
f.write(event.model_dump_json(exclude_none=True) + "\n")
37 changes: 37 additions & 0 deletions training_setup_logs/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from __future__ import annotations

from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, Optional

from pydantic import BaseModel, Field


class EventType(str, Enum):
USER = "user"
ASSISTANT = "assistant"
TOOL_CALL = "tool_call"
TOOL_RESULT = "tool_result"
SYSTEM = "system"
ERROR = "error"


class CanonicalEvent(BaseModel):
schema_version: str = "v1"
timestamp: str = Field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
session_id: str
event_type: EventType
content: Optional[str] = None
tool_name: Optional[str] = None
tool_call_id: Optional[str] = None
tool_args: Optional[Dict[str, Any]] = None
tool_result: Optional[Any] = None
error_type: Optional[str] = None
error_message: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)


class ParseSummary(BaseModel):
input_rows: int
output_events: int
skipped_rows: int