Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
6bac41e
feat: annotate traces with `de.*` attributes via metadata channel + p…
Jun 12, 2026
fb07061
fix: guard `column_lineage` extraction with `Array.isArray` + narrow …
Jun 15, 2026
37306de
fix: classify `altimate-dbt` commands before falling through to dbt r…
Jun 15, 2026
1de2e64
fix: cap span-attribute size by UTF-8 bytes, not JS string length
Jun 15, 2026
310ef3d
fix: derive `de.env.dbt_manifest_present` from file existence, not RP…
Jun 15, 2026
d87be06
fix: import `DE_*` vocab constants in tool opt-ins; drop hardcoded `d…
Jun 15, 2026
e303bc1
fix: bash SQL lineage extracts from full query, not truncated 8 KB slice
Jun 15, 2026
b44c03a
fix: hoist `classifyBash` regexes to module scope
Jun 15, 2026
f75079c
fix: `extractColumn` strips table prefix in endpoint-string fallback
Jun 15, 2026
8ec4b99
fix: `extractColumn` mirrors `extractTable` object-form branch
Jun 15, 2026
5ea44cd
fix: `dbt_troubleshoot` heuristic accepts `write` in tool-mix check
Jun 15, 2026
6b3cac5
fix: `de.outcome.change_applied` requires successful write/edit span
Jun 15, 2026
9220058
fix: drop annotator-side SQL char cap; let tracer enforce byte limit
Jun 15, 2026
81ae7b9
fix: extend SQL-text/lineage extraction to all SQL-handling tools
Jun 15, 2026
c09893d
fix: anchor `classifyBash` dbt/altimate-dbt patterns to command bound…
Jun 15, 2026
3cda78f
fix: `de.outcome.executed` reads cached bash_intent/dbt.command attri…
Jun 15, 2026
6766457
fix: session env reads Layer-1 attrs from latest `project_scan` span
Jun 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
640 changes: 640 additions & 0 deletions packages/opencode/src/altimate/observability/annotator.ts

Large diffs are not rendered by default.

82 changes: 82 additions & 0 deletions packages/opencode/src/altimate/observability/de-attributes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ export const DE_WAREHOUSE = {
ROWS_RETURNED: "de.warehouse.rows_returned",
/** Rows affected (INSERT/UPDATE/DELETE) */
ROWS_AFFECTED: "de.warehouse.rows_affected",
/** Total rows in a table (from schema_inspect, not query result) */
ROWS_TOTAL: "de.warehouse.rows_total",
/** Query ID from the warehouse (for linking to warehouse query history) */
QUERY_ID: "de.warehouse.query_id",
/** Whether the query hit a warehouse cache (Snowflake result cache, BQ cache) */
Expand Down Expand Up @@ -102,6 +104,8 @@ export const DE_SQL = {
export const DE_DBT = {
/** dbt command: run, test, build, compile, seed, snapshot */
COMMAND: "de.dbt.command",
/** dbt project layer derived from model path: staging, intermediate, dim, fact, agg, mart, source, seed, macro, test */
LAYER: "de.dbt.layer",
/** Model unique_id (e.g., model.my_project.stg_orders) */
MODEL_UNIQUE_ID: "de.dbt.model.unique_id",
/** Model short name */
Expand Down Expand Up @@ -206,6 +210,79 @@ export const DE_COST = {
ATTRIBUTION_PROJECT: "de.cost.attribution.project",
} as const

// ---------------------------------------------------------------------------
// Workflow & session classification (Layer 6) — populated by derived annotator
// ---------------------------------------------------------------------------

export const DE_WORKFLOW = {
/** Session workflow type: dbt_develop, dbt_troubleshoot, dbt_test, dbt_docs, sql_analysis, warehouse_exploration, project_scan, generic_file_edit */
TYPE: "de.workflow.type",
/** Session intent verb-phrase: "create model", "fix error", "add tests", "refactor model", "inspect schema", "run query" */
INTENT: "de.workflow.intent",
/** Confidence of the classifier (0.0-1.0) */
TYPE_CONFIDENCE: "de.workflow.type_confidence",
} as const

// ---------------------------------------------------------------------------
// Session outcome (Layer 7) — populated by derived annotator
// ---------------------------------------------------------------------------

export const DE_OUTCOME = {
/** success, failure, interrupted, provider_routing_failure, validation_failure, no_op, partial_fix */
CLASS: "de.outcome.class",
/** Whether a build/test/run was actually executed */
EXECUTED: "de.outcome.executed",
/** Whether a fix or change was applied */
CHANGE_APPLIED: "de.outcome.change_applied",
} as const

// ---------------------------------------------------------------------------
// Touched artifacts (Layer 8) — populated by derived annotator
// ---------------------------------------------------------------------------

export const DE_ARTIFACTS = {
/** File paths read (JSON array) */
FILES_READ: "de.artifacts.files_read",
/** File paths written or edited (JSON array) */
FILES_EDITED: "de.artifacts.files_edited",
/** dbt model names mentioned in the user prompt (JSON array) */
MODELS_MENTIONED: "de.artifacts.models_mentioned",
/** Fully-qualified tables referenced by executed SQL (JSON array) */
TABLES_REFERENCED: "de.artifacts.tables_referenced",
} as const

// ---------------------------------------------------------------------------
// Environment capabilities (Layer 9) — populated by derived annotator
// ---------------------------------------------------------------------------

export const DE_ENV = {
/** Whether dbt is present in the working directory */
DBT_PRESENT: "de.env.dbt_present",
/** Whether dbt manifest.json exists */
DBT_MANIFEST_PRESENT: "de.env.dbt_manifest_present",
/** Warehouse adapter detected: snowflake, bigquery, postgres, duckdb, databricks, redshift */
WAREHOUSE_TYPE: "de.env.warehouse_type",
/** Orchestrators / data-quality / lint tools detected (JSON array): airflow, dagster, prefect, soda, sqlmesh, great_expectations, sqlfluff */
TOOLS_DETECTED: "de.env.tools_detected",
} as const

// ---------------------------------------------------------------------------
// Generic tool classification (Layer 10) — populated by derived annotator
// ---------------------------------------------------------------------------

export const DE_TOOL = {
/** Tool category: warehouse, sql, dbt, schema, lineage, quality, finops, fs, exec, planning, generic */
CATEGORY: "de.tool.category",
/** Finer-grained subcategory (e.g., "sql.execute", "dbt.build", "fs.read") */
SUBCATEGORY: "de.tool.subcategory",
/** Vendor or framework the tool wraps: snowflake, bigquery, duckdb, dbt, altimate-core, etc. */
VENDOR: "de.tool.vendor",
/** bash-command intent when kind=tool name=bash: dbt, altimate_dbt, python_sql, sql, fs, vcs, install, other */
BASH_INTENT: "de.tool.bash_intent",
/** Concrete CLI invoked in a bash command: dbt, altimate-dbt, python3, psql, sqlfluff, etc. */
BASH_INVOKED: "de.tool.bash_invoked",
} as const

// ---------------------------------------------------------------------------
// Convenience namespace
// ---------------------------------------------------------------------------
Expand All @@ -217,4 +294,9 @@ export const DE = {
DBT: DE_DBT,
QUALITY: DE_QUALITY,
COST: DE_COST,
WORKFLOW: DE_WORKFLOW,
OUTCOME: DE_OUTCOME,
ARTIFACTS: DE_ARTIFACTS,
ENV: DE_ENV,
TOOL: DE_TOOL,
} as const
124 changes: 124 additions & 0 deletions packages/opencode/src/altimate/observability/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import path from "path"
import { Global } from "../../global"
import { randomUUIDv7 } from "bun"
import { Log } from "../../util/log"
// altimate_change start — trace augmentation: procedural classifier
import { annotateToolSpan, annotateSession } from "./annotator"
// altimate_change end

// ---------------------------------------------------------------------------
// Trace data types — v2 schema
Expand Down Expand Up @@ -841,12 +844,14 @@ export class Trace {
status: "completed"
input: Record<string, unknown>
output: string
metadata?: Record<string, unknown>
time: { start: number; end: number }
}
| {
status: "error"
input: Record<string, unknown>
error: string
metadata?: Record<string, unknown>
time: { start: number; end: number }
}
}) {
Expand Down Expand Up @@ -874,6 +879,78 @@ export class Trace {
safeInput = { _serialization_error: "Input contained circular references or non-serializable data" }
}

// altimate_change start — trace augmentation: lift structured tool metadata
// (carried via ToolStateCompleted/Error.metadata in message-v2) onto the
// span. Only keys matching the `de.` semantic-convention prefix are
// promoted — other metadata stays internal to the tool framework
// (e.g., `truncated`, `outputPath`, `findings`, `success`).
//
// This is the "metadata channel": tools surface structured fields by
// setting `de.*` keys on their returned metadata object, without ever
// importing the observability layer.
//
// Values must be JSON-compatible (Date / BigInt / class instances are
// silently dropped or stringified, matching the trace file's serialization
// contract). Per-value and total byte caps below prevent runaway payloads
// from ballooning snapshots and HTTP exports — both truncation layers
// upstream (`tool.ts` output truncation, `tracing.ts` 10 KB output slice)
// operate on `output: string`, not on `metadata`.
//
// Byte size is computed as UTF-8 byte length, not JS string length
// (which counts UTF-16 code units): non-ASCII payloads (e.g.,
// CJK identifiers, emoji in error messages) would otherwise bypass
// the cap and balloon trace exports.
const ATTR_VALUE_MAX_BYTES = 10_000
const ATTR_TOTAL_MAX_BYTES = 32_000
const spanAttributes: Record<string, unknown> = {}
let totalBytes = 0

// Layer 1: tool-provided structured metadata (high fidelity — driver
// values, parser output). Filtered to the de.* prefix.
const rawMetadata = state.metadata
if (rawMetadata && typeof rawMetadata === "object") {
for (const [k, v] of Object.entries(rawMetadata)) {
if (typeof k !== "string" || !k.startsWith("de.") || v === undefined) continue
try {
const serialized = JSON.stringify(v)
if (serialized === undefined) continue
const valueBytes = Buffer.byteLength(serialized, "utf8")
if (valueBytes > ATTR_VALUE_MAX_BYTES) continue
if (totalBytes + valueBytes > ATTR_TOTAL_MAX_BYTES) continue
// Store original value (matches setSpanAttributes() at line ~1135 for
// consistent overwrite semantics if both paths target the same key).
spanAttributes[k] = v
totalBytes += valueBytes
} catch {
// Bad metadata value must never break the tracer
}
}
}

// Layer 2: derived classification from (name, input, output). Best-effort
// procedural — taxonomy lookup, bash intent, dbt layer from path, etc.
// Tool-provided metadata (Layer 1) wins on conflicts.
try {
const derived = annotateToolSpan(toolName, safeInput, isError ? errorStr : outputStr)
for (const [k, v] of Object.entries(derived)) {
if (v === undefined || k in spanAttributes) continue
try {
const serialized = JSON.stringify(v)
if (serialized === undefined) continue
const valueBytes = Buffer.byteLength(serialized, "utf8")
if (valueBytes > ATTR_VALUE_MAX_BYTES) continue
if (totalBytes + valueBytes > ATTR_TOTAL_MAX_BYTES) continue
spanAttributes[k] = v
totalBytes += valueBytes
} catch {
// best-effort
}
}
} catch {
// Annotator must never break the tracer
}
// altimate_change end

this.spans.push({
spanId: randomUUIDv7(),
parentSpanId: this.currentGenerationSpanId ?? this.rootSpanId,
Expand All @@ -889,6 +966,7 @@ export class Trace {
},
input: safeInput,
output: isError ? { error: errorStr } : outputStr.slice(0, 10000),
...(Object.keys(spanAttributes).length > 0 && { attributes: spanAttributes }),
})
this.toolCallCount++

Expand Down Expand Up @@ -1233,6 +1311,34 @@ export class Trace {

const trace = this.buildTraceFile(error)

// altimate_change start — trace augmentation: session-level rollup.
// Pure-function classifier attaches workflow / outcome / artifacts / env
// attributes to the root (session) span. Runs over the snapshotted trace
// (post buildTraceFile) so it sees the final state. Best-effort.
//
// Merge semantics: derived attributes fill only ABSENT keys. Anything an
// upstream caller set explicitly via setSpanAttributes(..., "session")
// wins — Layer 1 (caller-provided) > Layer 2 (derived), same rule as
// logToolCall.
try {
const sessionAttrs = annotateSession(trace)
if (Object.keys(sessionAttrs).length > 0) {
const rootSnapshotSpan = trace.spans.find((s) => s.spanId === this.rootSpanId)
if (rootSnapshotSpan) {
rootSnapshotSpan.attributes = { ...sessionAttrs, ...(rootSnapshotSpan.attributes ?? {}) }
}
// Also mirror onto the live root span so a subsequent snapshot()
// (e.g., via flushSync) sees the same attributes.
const liveRoot = this.spans.find((s) => s.spanId === this.rootSpanId)
if (liveRoot) {
liveRoot.attributes = { ...sessionAttrs, ...(liveRoot.attributes ?? {}) }
}
}
} catch {
// Session annotation must never break the trace
}
// altimate_change end

// altimate_change start — trace: post-session summary (narrative, loops, topTools)
try {
// Top tools by call count
Expand Down Expand Up @@ -1346,6 +1452,24 @@ export class Trace {
}
const trace = this.buildTraceFile(error || "Process exited before trace completed")
trace.summary.status = "crashed"

// altimate_change start — trace augmentation: session-level rollup on crash.
// Without this, crashed/interrupted sessions get per-tool attributes but
// no root rollup (workflow/outcome/artifacts/env). Pure-function call,
// best-effort, must never throw.
try {
const sessionAttrs = annotateSession(trace)
if (Object.keys(sessionAttrs).length > 0) {
const rootSnapshotSpan = trace.spans.find((s) => s.spanId === this.rootSpanId)
if (rootSnapshotSpan) {
rootSnapshotSpan.attributes = { ...sessionAttrs, ...(rootSnapshotSpan.attributes ?? {}) }
}
}
} catch {
// best-effort
}
// altimate_change end

const safeId = (this.sessionId || "unknown").replace(/[/\\.:]/g, "_") || "unknown"
const filePath = path.join(this.snapshotDir, `${safeId}.json`)
// Must be synchronous — async writes won't complete before signal handler exits
Expand Down
2 changes: 1 addition & 1 deletion packages/opencode/src/altimate/observability/viewer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ function showDetail(span) {
}
// DE attributes grouped
var a = span.attributes || {};
var groups = [['de.warehouse.','Warehouse','cyan'],['de.sql.','SQL','secondary'],['de.dbt.','dbt','orange'],['de.quality.','Quality','green'],['de.cost.','Cost','orange']];
var groups = [['de.warehouse.','Warehouse','cyan'],['de.sql.','SQL','secondary'],['de.dbt.','dbt','orange'],['de.quality.','Quality','green'],['de.cost.','Cost','orange'],['de.workflow.','Workflow','accent'],['de.outcome.','Outcome','green'],['de.artifacts.','Artifacts','secondary'],['de.env.','Environment','cyan'],['de.tool.','Tool','accent']];

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[🔵 LOW] The use of var for variable declarations is strictly prohibited. Please use const or let instead. In this case, since groups is not reassigned, const is the appropriate choice.

Suggested change:

Suggested change
var groups = [['de.warehouse.','Warehouse','cyan'],['de.sql.','SQL','secondary'],['de.dbt.','dbt','orange'],['de.quality.','Quality','green'],['de.cost.','Cost','orange'],['de.workflow.','Workflow','accent'],['de.outcome.','Outcome','green'],['de.artifacts.','Artifacts','secondary'],['de.env.','Environment','cyan'],['de.tool.','Tool','accent']];
const groups = [['de.warehouse.','Warehouse','cyan'],['de.sql.','SQL','secondary'],['de.dbt.','dbt','orange'],['de.quality.','Quality','green'],['de.cost.','Cost','orange'],['de.workflow.','Workflow','accent'],['de.outcome.','Outcome','green'],['de.artifacts.','Artifacts','secondary'],['de.env.','Environment','cyan'],['de.tool.','Tool','accent']];

var used = {};
groups.forEach(function(g) {
var entries = Object.keys(a).filter(function(k){return k.indexOf(g[0])===0;});
Expand Down
Loading
Loading