Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
__pycache__/
.pytest_cache/
.venv/
*.egg-info/
out/
71 changes: 71 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,75 @@ The pipeline should treat **export formats** as first-class requirements so the
5. Specify **one SFT JSONL schema** and **one DPO JSONL schema** (and chat template) validated end-to-end with a **LoRA** dry run and a **small DPO** dry run on toy data.
6. Define **student model** constraints (context length, tool set) and a **filter + eval** plan for teacher-to-student parity before production swap.

---

## Implementation approach

This repository now includes a complete first implementation slice for the log-to-training pipeline:

- file or directory ingestion for JSON/JSONL logs
- canonical event schema for Q&A, assistant turns, tool calls, tool results, errors, and feedback
- deterministic PII/secrets redaction with stable placeholders
- redaction report with audit samples that never include raw sensitive values
- session segmentation into Q&A units and agent trajectories
- trajectory complexity tags for staged training schedules
- deterministic train/validation/test splits with leakage buckets
- optional tool registry validation for declared tool names and required arguments
- SFT JSONL export for LoRA-compatible chat training
- DPO candidate export for human-approved preference data
- redacted canonical unit export for review and downstream transforms

The implementation stays dependency-light so it can run in controlled environments, but it is structured as installable Python package code rather than ad hoc scripts.

### Quickstart

```bash
python -m pip install -e ".[dev]"
training-setup-logs examples/sample_agent_logs.jsonl \
--tool-schema examples/tool_schema.json \
--out-dir out
```

Outputs:

- `out/redacted_units.jsonl`: canonical redacted units for audit and downstream transforms
- `out/sft.jsonl`: LoRA-ready supervised chat rows
- `out/dpo_candidates.jsonl`: preference-pair candidates that require human approval
- `out/redaction_report.json`: PII finding kinds and placeholders, without raw sensitive values
- `out/manifest.json`: PII counts, validation counts, and complexity distribution

Run tests:

```bash
python -m pytest
```

## Repository structure

```text
src/training_setup_logs/
audit.py audit samples for redacted data review
cli.py command line entrypoint
export.py SFT, DPO candidate, and redacted-unit exporters
ingest.py JSON/JSONL file and directory ingestion
pii.py deterministic PII and secret redaction
schemas.py canonical dataclasses
segment.py session-to-training-unit segmentation
split.py deterministic split and leakage-bucket assignment
tagging.py trajectory complexity and scheduling tags
tool_schema.py optional tool registry loading
validate.py trajectory and tool-use validation
examples/
sample_agent_logs.jsonl
tool_schema.json
tests/
test_pipeline.py
```

## Canonical outputs

Each `redacted_units.jsonl` row contains `unit_id`, `session_id`, `unit_type`, redacted events, split metadata, complexity tags, and validation issues. `sft.jsonl` converts the same units into chat-style `messages`. `dpo_candidates.jsonl` only emits governed preference candidates, such as failed/error traces followed by a later recovery, and marks them as requiring human approval.

## Privacy assumptions

The default redactor covers common emails, phone numbers, Aadhaar-like IDs, IP addresses, bearer tokens, API-key-shaped secrets, and URL secret query parameters. Production use should add organization-specific dictionaries, policy approval, and human audit sampling before training artifacts are shipped.
9 changes: 9 additions & 0 deletions examples/sample_agent_logs.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{"session_id":"s1","timestamp":"2026-04-28T08:00:00Z","role":"user","content":"My email is farmer.ravi@example.com. Please set up irrigation advice for plot 42."}
{"session_id":"s1","timestamp":"2026-04-28T08:00:02Z","role":"assistant","content":"I will inspect the available weather and crop profile before suggesting irrigation."}
{"session_id":"s1","timestamp":"2026-04-28T08:00:03Z","type":"tool_call","tool_name":"get_weather","tool_args":{"district":"Raipur","api_key":"sk_TEST1234567890abcdef"}}
{"session_id":"s1","timestamp":"2026-04-28T08:00:04Z","type":"tool_result","tool_result":{"rain_forecast_mm":0,"temperature_c":35}}
{"session_id":"s1","timestamp":"2026-04-28T08:00:05Z","type":"tool_call","tool_name":"get_crop_profile","tool_args":{"phone":"+91 98765 43210","crop":"paddy"}}
{"session_id":"s1","timestamp":"2026-04-28T08:00:06Z","type":"error","content":"crop profile lookup timed out"}
{"session_id":"s1","timestamp":"2026-04-28T08:00:08Z","role":"assistant","content":"The crop profile lookup failed, so I will provide a conservative recommendation and flag this for retry."}
{"session_id":"s2","timestamp":"2026-04-28T09:00:00Z","role":"user","content":"What is LoRA fine-tuning?"}
{"session_id":"s2","timestamp":"2026-04-28T09:00:02Z","role":"assistant","content":"LoRA fine-tuning updates small low-rank adapter matrices instead of all model weights, making supervised fine-tuning cheaper and easier to deploy."}
10 changes: 10 additions & 0 deletions examples/tool_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"tools": {
"get_weather": {
"required_args": ["district"]
},
"get_crop_profile": {
"required_args": ["crop"]
}
}
}
25 changes: 25 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"

[project]
name = "training-setup-logs"
version = "0.1.0"
description = "Privacy-safe log-to-training-data pipeline for Q&A and agentic traces."
readme = "README.md"
requires-python = ">=3.10"
authors = [{ name = "OpenAgriNet contributors" }]
dependencies = []

[project.optional-dependencies]
dev = ["pytest>=8"]

[project.scripts]
training-setup-logs = "training_setup_logs.cli:main"

[tool.setuptools.packages.find]
where = ["src"]

[tool.pytest.ini_options]
pythonpath = ["src"]
testpaths = ["tests"]
5 changes: 5 additions & 0 deletions src/training_setup_logs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Privacy-safe log-to-training-data pipeline."""

from training_setup_logs.schemas import LogEvent, TrainingUnit

__all__ = ["LogEvent", "TrainingUnit"]
42 changes: 42 additions & 0 deletions src/training_setup_logs/audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Audit helpers for privacy review and data quality checks."""

from __future__ import annotations

from training_setup_logs.schemas import TrainingUnit
from training_setup_logs.validate import validate_unit


def build_audit_sample(units: list[TrainingUnit], limit: int = 10) -> list[dict[str, object]]:
"""Return a compact redacted sample for human audit workflows."""

sample: list[dict[str, object]] = []
for unit in units[:limit]:
sample.append(
{
"unit_id": unit.unit_id,
"session_id": unit.session_id,
"unit_type": unit.unit_type,
"event_count": len(unit.events),
"preview": _preview(unit),
"validation_issues": [issue.to_dict() for issue in validate_unit(unit)],
}
)
return sample


def _preview(unit: TrainingUnit) -> list[dict[str, str]]:
preview: list[dict[str, str]] = []
for event in unit.events[:6]:
text = event.content
if text is None and event.tool_name:
text = f"{event.type}: {event.tool_name}"
if text is None and event.tool_result is not None:
text = f"{event.type}: {type(event.tool_result).__name__}"
preview.append(
{
"event_id": event.event_id,
"type": event.type,
"text": (text or "")[:180],
}
)
return preview
105 changes: 105 additions & 0 deletions src/training_setup_logs/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""Command line interface for the log-to-training pipeline."""

from __future__ import annotations

import argparse
import json
from pathlib import Path

from training_setup_logs.audit import build_audit_sample
from training_setup_logs.export import (
to_dpo_candidate_row,
to_redacted_unit_row,
to_sft_row,
write_jsonl,
)
from training_setup_logs.ingest import load_events_from_path
from training_setup_logs.pii import PiiRedactor
from training_setup_logs.segment import segment_events
from training_setup_logs.schemas import TrainingUnit
from training_setup_logs.tagging import tag_unit
from training_setup_logs.tool_schema import ToolRegistry
from training_setup_logs.validate import validate_unit


def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Build privacy-safe SFT and DPO JSONL from logs.")
parser.add_argument("input", type=Path, help="Input JSON/JSONL log file or directory.")
parser.add_argument("--out-dir", type=Path, default=Path("out"), help="Output directory.")
parser.add_argument("--tool-schema", type=Path, help="Optional JSON registry for tool validation.")
parser.add_argument("--audit-sample-size", type=int, default=10)
return parser


def run(
input_path: Path,
out_dir: Path,
tool_schema: Path | None = None,
audit_sample_size: int = 10,
) -> dict[str, object]:
events = load_events_from_path(input_path)
tool_registry = ToolRegistry.from_path(tool_schema)
redactor = PiiRedactor()
redacted_events = [redactor.redact_event(event) for event in events]
units = segment_events(redacted_events)

sft_rows = [to_sft_row(unit) for unit in units]
dpo_rows = [row for unit in units if (row := to_dpo_candidate_row(unit)) is not None]
redacted_unit_rows = [to_redacted_unit_row(unit) for unit in units]
manifest = {
"input": str(input_path),
"unit_count": len(units),
"sft_rows": len(sft_rows),
"dpo_candidate_rows": len(dpo_rows),
"pii_counts": redactor.report.counts_by_kind(),
"validation_issue_count": sum(len(validate_unit(unit, tool_registry)) for unit in units),
"tool_registry_count": len(tool_registry.tools),
"split_summary": _split_summary(sft_rows),
"tag_summary": _tag_summary(units),
}

out_dir.mkdir(parents=True, exist_ok=True)
write_jsonl(out_dir / "redacted_units.jsonl", redacted_unit_rows)
write_jsonl(out_dir / "sft.jsonl", sft_rows)
write_jsonl(out_dir / "dpo_candidates.jsonl", dpo_rows)
(out_dir / "redaction_report.json").write_text(
json.dumps(
{
"counts_by_kind": redactor.report.counts_by_kind(),
"findings": [finding.to_dict() for finding in redactor.report.findings],
"audit_sample": build_audit_sample(units, limit=audit_sample_size),
},
indent=2,
sort_keys=True,
),
encoding="utf-8",
)
(out_dir / "manifest.json").write_text(json.dumps(manifest, indent=2, sort_keys=True), encoding="utf-8")
return manifest


def main() -> None:
args = build_parser().parse_args()
manifest = run(args.input, args.out_dir, args.tool_schema, args.audit_sample_size)
print(json.dumps(manifest, indent=2, sort_keys=True))


def _tag_summary(units: list[TrainingUnit]) -> dict[str, int]:
summary: dict[str, int] = {}
for unit in units:
complexity = str(tag_unit(unit)["complexity"])
summary[complexity] = summary.get(complexity, 0) + 1
return summary


def _split_summary(rows: list[dict[str, object]]) -> dict[str, int]:
summary: dict[str, int] = {}
for row in rows:
metadata = row.get("metadata", {})
split = str(metadata.get("split", "unknown")) if isinstance(metadata, dict) else "unknown"
summary[split] = summary.get(split, 0) + 1
return summary


if __name__ == "__main__":
main()
Loading