From 7bdf23d5e140cc0ab548caae2f83e5dafbef0e9b Mon Sep 17 00:00:00 2001 From: VarshiniGunti Date: Fri, 8 May 2026 19:19:36 +0530 Subject: [PATCH] feat: add canonical log schema and normalization parser prototype --- .gitignore | 3 ++ README.md | 25 +++++++++ examples/raw_logs_sample.jsonl | 4 ++ requirements-dev.txt | 2 + tests/test_parser.py | 21 ++++++++ training_setup_logs/__init__.py | 1 + training_setup_logs/cli.py | 32 +++++++++++ training_setup_logs/parser.py | 95 +++++++++++++++++++++++++++++++++ training_setup_logs/schema.py | 37 +++++++++++++ 9 files changed, 220 insertions(+) create mode 100644 .gitignore create mode 100644 examples/raw_logs_sample.jsonl create mode 100644 requirements-dev.txt create mode 100644 tests/test_parser.py create mode 100644 training_setup_logs/__init__.py create mode 100644 training_setup_logs/cli.py create mode 100644 training_setup_logs/parser.py create mode 100644 training_setup_logs/schema.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d86882e --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +*.py[cod] +examples/canonical_events.jsonl diff --git a/README.md b/README.md index 5a82102..925089c 100644 --- a/README.md +++ b/README.md @@ -90,3 +90,28 @@ The pipeline should treat **export formats** as first-class requirements so the 6. Define **student model** constraints (context length, tool set) and a **filter + eval** plan for teacher-to-student parity before production swap. + +--- + +## Prototype: PR1 (Schema + Parser) + +This repository now includes a working prototype for log normalization: + +- Canonical event schema: `training_setup_logs/schema.py` +- Parser + JSONL IO: `training_setup_logs/parser.py` +- CLI: `training_setup_logs/cli.py` +- Sample input: `examples/raw_logs_sample.jsonl` +- Tests: `tests/test_parser.py` + +### Run parser + +```bash +python -m training_setup_logs.cli --input examples/raw_logs_sample.jsonl --output examples/canonical_events.jsonl +``` + +### Run tests + +```bash +pip install -r requirements-dev.txt +pytest -q +``` diff --git a/examples/raw_logs_sample.jsonl b/examples/raw_logs_sample.jsonl new file mode 100644 index 0000000..7bbdb84 --- /dev/null +++ b/examples/raw_logs_sample.jsonl @@ -0,0 +1,4 @@ +{"timestamp":"2026-05-01T10:00:00Z","session_id":"demo-1","role":"user","message":"What is today's mandi price for wheat?"} +{"timestamp":"2026-05-01T10:00:01Z","session_id":"demo-1","type":"tool-call","tool":"mandi_prices","call_id":"call-1","args":{"crop":"wheat","market":"Pune"}} +{"timestamp":"2026-05-01T10:00:01Z","session_id":"demo-1","event_type":"tool_result","tool_name":"mandi_prices","tool_call_id":"call-1","result":{"price_per_quintal":2450}} +{"timestamp":"2026-05-01T10:00:02Z","session_id":"demo-1","role":"assistant","message":"Current wheat mandi price is ?2450/quintal."} diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..4a727b5 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,2 @@ +pytest==8.3.5 +pydantic==2.11.4 diff --git a/tests/test_parser.py b/tests/test_parser.py new file mode 100644 index 0000000..03e385c --- /dev/null +++ b/tests/test_parser.py @@ -0,0 +1,21 @@ +from training_setup_logs.parser import parse_rows + + +def test_parse_rows_normalizes_mixed_shapes() -> None: + rows = [ + {"timestamp": "2026-05-01T10:00:00Z", "session_id": "s1", "role": "user", "message": "Hi"}, + {"timestamp": "2026-05-01T10:00:01Z", "session_id": "s1", "event_type": "tool-call", "tool": "search", "call_id": "c1", "args": {"q": "weather"}}, + {"timestamp": "2026-05-01T10:00:02Z", "thread_id": "s1", "type": "tool_result", "tool_name": "search", "tool_call_id": "c1", "result": {"ok": True}}, + {"timestamp": "2026-05-01T10:00:03Z", "conversation_id": "s1", "kind": "assistant", "text": "Done"}, + {"timestamp": "2026-05-01T10:00:03Z", "kind": "assistant", "text": "missing session"}, + ] + + events, summary = parse_rows(rows) + + assert summary.input_rows == 5 + assert summary.output_events == 4 + assert summary.skipped_rows == 1 + assert events[0].event_type.value == "user" + assert events[1].event_type.value == "tool_call" + assert events[2].event_type.value == "tool_result" + assert events[3].event_type.value == "assistant" diff --git a/training_setup_logs/__init__.py b/training_setup_logs/__init__.py new file mode 100644 index 0000000..63e272a --- /dev/null +++ b/training_setup_logs/__init__.py @@ -0,0 +1 @@ +"""Logs-to-training pipeline package.""" diff --git a/training_setup_logs/cli.py b/training_setup_logs/cli.py new file mode 100644 index 0000000..ad8ffad --- /dev/null +++ b/training_setup_logs/cli.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import argparse +from pathlib import Path + +from training_setup_logs.parser import parse_rows, read_jsonl, write_jsonl + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Normalize mixed production logs into canonical training events." + ) + parser.add_argument("--input", required=True, help="Input raw log JSONL path") + parser.add_argument("--output", required=True, help="Output canonical JSONL path") + args = parser.parse_args() + + input_path = Path(args.input) + output_path = Path(args.output) + + rows = read_jsonl(input_path) + events, summary = parse_rows(rows) + write_jsonl(output_path, events) + + print( + f"Parsed {summary.input_rows} rows -> {summary.output_events} events " + f"(skipped {summary.skipped_rows})" + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/training_setup_logs/parser.py b/training_setup_logs/parser.py new file mode 100644 index 0000000..b2de891 --- /dev/null +++ b/training_setup_logs/parser.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Tuple + +from training_setup_logs.schema import CanonicalEvent, EventType, ParseSummary + + +EVENT_ALIASES = { + "user": EventType.USER, + "assistant": EventType.ASSISTANT, + "tool_call": EventType.TOOL_CALL, + "tool_result": EventType.TOOL_RESULT, + "system": EventType.SYSTEM, + "error": EventType.ERROR, + "tool-call": EventType.TOOL_CALL, + "tool-return": EventType.TOOL_RESULT, +} + + +def _normalized_event_type(row: Dict[str, Any]) -> Optional[EventType]: + raw = ( + row.get("event_type") + or row.get("type") + or row.get("role") + or row.get("kind") + or "" + ) + if not raw: + return None + return EVENT_ALIASES.get(str(raw).strip().lower()) + + +def _as_event(row: Dict[str, Any]) -> Optional[CanonicalEvent]: + event_type = _normalized_event_type(row) + if event_type is None: + return None + + session_id = row.get("session_id") or row.get("thread_id") or row.get("conversation_id") + if not session_id: + return None + + return CanonicalEvent( + schema_version="v1", + timestamp=row.get("timestamp") or row.get("ts") or row.get("created_at"), + session_id=str(session_id), + event_type=event_type, + content=row.get("content") or row.get("message") or row.get("text"), + tool_name=row.get("tool_name") or row.get("tool"), + tool_call_id=row.get("tool_call_id") or row.get("call_id"), + tool_args=row.get("tool_args") or row.get("args"), + tool_result=row.get("tool_result") or row.get("result") or row.get("observation"), + error_type=row.get("error_type"), + error_message=row.get("error_message") or row.get("error"), + metadata=row.get("metadata") or {}, + ) + + +def parse_rows(rows: Iterable[Dict[str, Any]]) -> Tuple[List[CanonicalEvent], ParseSummary]: + events: List[CanonicalEvent] = [] + input_rows = 0 + skipped_rows = 0 + + for row in rows: + input_rows += 1 + event = _as_event(row) + if event is None: + skipped_rows += 1 + continue + events.append(event) + + summary = ParseSummary( + input_rows=input_rows, + output_events=len(events), + skipped_rows=skipped_rows, + ) + return events, summary + + +def read_jsonl(path: Path) -> List[Dict[str, Any]]: + rows: List[Dict[str, Any]] = [] + for line in path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line: + continue + rows.append(json.loads(line)) + return rows + + +def write_jsonl(path: Path, events: Iterable[CanonicalEvent]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as f: + for event in events: + f.write(event.model_dump_json(exclude_none=True) + "\n") diff --git a/training_setup_logs/schema.py b/training_setup_logs/schema.py new file mode 100644 index 0000000..c674f23 --- /dev/null +++ b/training_setup_logs/schema.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from enum import Enum +from typing import Any, Dict, Optional + +from pydantic import BaseModel, Field + + +class EventType(str, Enum): + USER = "user" + ASSISTANT = "assistant" + TOOL_CALL = "tool_call" + TOOL_RESULT = "tool_result" + SYSTEM = "system" + ERROR = "error" + + +class CanonicalEvent(BaseModel): + schema_version: str = "v1" + timestamp: str = Field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) + session_id: str + event_type: EventType + content: Optional[str] = None + tool_name: Optional[str] = None + tool_call_id: Optional[str] = None + tool_args: Optional[Dict[str, Any]] = None + tool_result: Optional[Any] = None + error_type: Optional[str] = None + error_message: Optional[str] = None + metadata: Dict[str, Any] = Field(default_factory=dict) + + +class ParseSummary(BaseModel): + input_rows: int + output_events: int + skipped_rows: int