diff --git a/packages/opencode/src/altimate/observability/annotator.ts b/packages/opencode/src/altimate/observability/annotator.ts new file mode 100644 index 000000000..74baef2ee --- /dev/null +++ b/packages/opencode/src/altimate/observability/annotator.ts @@ -0,0 +1,640 @@ +/** + * Procedural trace annotator. + * + * Pure functions that classify spans and sessions without LLM judgment: + * - `annotateToolSpan(name, input, output)` — per-tool-span attributes + * (de.tool.*, de.dbt.*, de.sql.query_text from input fields) + * - `annotateSession(spans, metadata)` — session-level attributes + * (de.workflow.*, de.outcome.*, de.artifacts.*) + * + * Both return Record. Both follow the rule: + * prefer absent attribute over wrong attribute. + * When the classifier isn't confident, return undefined for that key and + * the caller writes nothing. + * + * Tools never import this module. The tracer calls it from `logToolCall` + * (per-span) and `endTrace` (session rollup). Tools opt into the metadata + * channel separately by setting `de.*` keys on their returned metadata. + */ + +import { DE } from "./de-attributes" +import type { TraceFile, TraceSpan } from "./tracing" + +// --------------------------------------------------------------------------- +// Tool category lookup — deterministic taxonomy by tool name +// --------------------------------------------------------------------------- + +type ToolClassification = { + category: string + subcategory?: string + vendor?: string +} + +const TOOL_TAXONOMY: Record = { + // Built-in framework tools + bash: { category: "exec", subcategory: "shell" }, + read: { category: "fs", subcategory: "read" }, + write: { category: "fs", subcategory: "write" }, + edit: { category: "fs", subcategory: "edit" }, + glob: { category: "fs", subcategory: "glob" }, + grep: { category: "fs", subcategory: "grep" }, + todowrite: { category: "planning", subcategory: "todo" }, + skill: { category: "planning", subcategory: "skill" }, + task: { category: "planning", subcategory: "subagent" }, + + // Warehouse / SQL execution + sql_execute: { category: "warehouse", subcategory: "execute" }, + sql_analyze: { category: "sql", subcategory: "analyze" }, + sql_optimize: { category: "sql", subcategory: "optimize" }, + sql_fix: { category: "sql", subcategory: "fix" }, + sql_format: { category: "sql", subcategory: "format" }, + sql_explain: { category: "sql", subcategory: "explain" }, + sql_translate: { category: "sql", subcategory: "translate" }, + sql_rewrite: { category: "sql", subcategory: "rewrite" }, + sql_diff: { category: "sql", subcategory: "diff" }, + sql_classify: { category: "sql", subcategory: "classify" }, + sql_autocomplete: { category: "sql", subcategory: "autocomplete" }, + + // Schema + schema_inspect: { category: "schema", subcategory: "inspect" }, + schema_search: { category: "schema", subcategory: "search" }, + schema_index: { category: "schema", subcategory: "index" }, + schema_diff: { category: "schema", subcategory: "diff" }, + schema_detect_pii: { category: "schema", subcategory: "pii" }, + schema_tags: { category: "schema", subcategory: "tags" }, + schema_cache_status: { category: "schema", subcategory: "cache" }, + + // Warehouse management + warehouse_list: { category: "warehouse", subcategory: "list" }, + warehouse_add: { category: "warehouse", subcategory: "add" }, + warehouse_remove: { category: "warehouse", subcategory: "remove" }, + warehouse_test: { category: "warehouse", subcategory: "test" }, + warehouse_discover: { category: "warehouse", subcategory: "discover" }, + + // dbt + dbt_profiles: { category: "dbt", subcategory: "profiles" }, + dbt_manifest: { category: "dbt", subcategory: "manifest" }, + dbt_lineage: { category: "dbt", subcategory: "lineage" }, + dbt_unit_test_gen: { category: "dbt", subcategory: "testgen" }, + + // Lineage / impact + lineage_check: { category: "lineage", subcategory: "check" }, + impact_analysis: { category: "lineage", subcategory: "impact" }, + + // FinOps + finops_query_history: { category: "finops", subcategory: "history" }, + finops_expensive_queries: { category: "finops", subcategory: "expensive" }, + finops_warehouse_advice: { category: "finops", subcategory: "advice" }, + finops_analyze_credits: { category: "finops", subcategory: "credits" }, + finops_unused_resources: { category: "finops", subcategory: "unused" }, + finops_role_access: { category: "finops", subcategory: "access" }, + + // Data + data_diff: { category: "quality", subcategory: "diff" }, + datamate: { category: "platform", subcategory: "datamate", vendor: "altimate" }, + project_scan: { category: "platform", subcategory: "scan", vendor: "altimate" }, + + // Altimate-core wrappers + altimate_core_check: { category: "sql", subcategory: "check", vendor: "altimate-core" }, + altimate_core_validate: { category: "sql", subcategory: "validate", vendor: "altimate-core" }, + altimate_core_classify_pii: { category: "schema", subcategory: "pii", vendor: "altimate-core" }, + altimate_core_column_lineage: { category: "lineage", subcategory: "column", vendor: "altimate-core" }, + altimate_core_track_lineage: { category: "lineage", subcategory: "track", vendor: "altimate-core" }, + altimate_core_compare: { category: "sql", subcategory: "compare", vendor: "altimate-core" }, + altimate_core_complete: { category: "sql", subcategory: "complete", vendor: "altimate-core" }, + altimate_core_correct: { category: "sql", subcategory: "correct", vendor: "altimate-core" }, + altimate_core_detect_join_candidates: { category: "lineage", subcategory: "join-candidates", vendor: "altimate-core" }, + altimate_core_equivalence: { category: "sql", subcategory: "equivalence", vendor: "altimate-core" }, + altimate_core_export_ddl: { category: "schema", subcategory: "export-ddl", vendor: "altimate-core" }, + altimate_core_extract_metadata: { category: "schema", subcategory: "extract", vendor: "altimate-core" }, + altimate_core_fingerprint: { category: "sql", subcategory: "fingerprint", vendor: "altimate-core" }, + altimate_core_fix: { category: "sql", subcategory: "fix", vendor: "altimate-core" }, + altimate_core_grade: { category: "sql", subcategory: "grade", vendor: "altimate-core" }, + altimate_core_import_ddl: { category: "schema", subcategory: "import-ddl", vendor: "altimate-core" }, + altimate_core_introspection_sql: { category: "schema", subcategory: "introspection", vendor: "altimate-core" }, + altimate_core_migration: { category: "schema", subcategory: "migration", vendor: "altimate-core" }, + altimate_core_optimize_context: { category: "sql", subcategory: "optimize", vendor: "altimate-core" }, + altimate_core_parse_dbt: { category: "dbt", subcategory: "parse", vendor: "altimate-core" }, + altimate_core_policy: { category: "schema", subcategory: "policy", vendor: "altimate-core" }, + altimate_core_prune_schema: { category: "schema", subcategory: "prune", vendor: "altimate-core" }, + altimate_core_query_pii: { category: "schema", subcategory: "query-pii", vendor: "altimate-core" }, + altimate_core_resolve_term: { category: "schema", subcategory: "resolve-term", vendor: "altimate-core" }, + altimate_core_rewrite: { category: "sql", subcategory: "rewrite", vendor: "altimate-core" }, + altimate_core_schema_diff: { category: "schema", subcategory: "diff", vendor: "altimate-core" }, + altimate_core_semantics: { category: "sql", subcategory: "semantics", vendor: "altimate-core" }, + altimate_core_testgen: { category: "dbt", subcategory: "testgen", vendor: "altimate-core" }, +} + +// --------------------------------------------------------------------------- +// dbt project layer detection from file path +// --------------------------------------------------------------------------- + +const DBT_LAYER_PATTERNS: Array<[RegExp, string]> = [ + [/(?:^|\/)models\/staging\//i, "staging"], + [/(?:^|\/)models\/(?:stg|stage)\//i, "staging"], + [/(?:^|\/)models\/intermediate\//i, "intermediate"], + [/(?:^|\/)models\/(?:int|inter)\//i, "intermediate"], + [/(?:^|\/)models\/(?:dim|dims|dimensions)\//i, "dim"], + [/(?:^|\/)models\/(?:fact|facts|fct)\//i, "fact"], + [/(?:^|\/)models\/(?:agg|aggregates?|aggs)\//i, "agg"], + [/(?:^|\/)models\/(?:mart|marts)\//i, "mart"], + [/(?:^|\/)models\/(?:core|warehouse|curated)\//i, "mart"], + [/(?:^|\/)seeds\//i, "seed"], + [/(?:^|\/)macros\//i, "macro"], + [/(?:^|\/)tests\//i, "test"], + [/(?:^|\/)snapshots\//i, "snapshot"], + [/(?:^|\/)sources?\.yml$/i, "source"], + [/_sources?\.ya?ml$/i, "source"], +] + +function dbtLayerFromPath(filePath: string | undefined): string | undefined { + if (typeof filePath !== "string" || !filePath) return undefined + // Normalize backslashes so windows-style paths match too, and ensure the + // patterns match equally for absolute, relative, and bare paths. + const norm = filePath.replace(/\\/g, "/") + for (const [re, layer] of DBT_LAYER_PATTERNS) { + if (re.test(norm)) return layer + } + return undefined +} + +// --------------------------------------------------------------------------- +// Bash command intent classification +// --------------------------------------------------------------------------- + +type BashClassification = { + intent: string + invoked?: string + dbtCommand?: string +} + +// Hoisted to module scope so the RegExp is compiled once at startup, not on +// every classifyBash call. (Gemini PR-938 review.) +// +// All `dbt` / `altimate-dbt` patterns are anchored to a command-boundary +// position — start-of-string or a shell separator (`;` `|` `&`). Plain word +// boundaries (`\b`) would substring-match `dbt` inside string literals, +// arguments to other commands, etc. — `echo "dbt run"`, +// `python -c "import dbt"`, `grep dbt file` would all misclassify as +// intent="dbt" without this guard (GPT PR-938 consensus review M1). +const DBT_VERBS = "build|run|test|seed|snapshot|compile|deps|run-operation|debug|parse|docs|clean|list|ls|source|init|show|retry|freshness" +const CMD_BOUNDARY = "(?:^|[;|&])\\s*" +const DBT_VERB_RE = new RegExp(`${CMD_BOUNDARY}dbt\\s+(${DBT_VERBS})\\b`, "i") +const ALTIMATE_DBT_RE = new RegExp(`${CMD_BOUNDARY}altimate-dbt(?:\\s|$)`, "i") +const ALTIMATE_DBT_VERB_RE = new RegExp(`${CMD_BOUNDARY}altimate-dbt\\s+([a-z][a-z0-9-]*)\\b`, "i") +const DBT_BARE_RE = new RegExp(`${CMD_BOUNDARY}dbt(?:\\s|$)`, "i") +const CD_PREFIX_RE = /^\s*cd\s+\S+\s*&&\s*/ + +function classifyBash(command: string | undefined): BashClassification | undefined { + if (typeof command !== "string" || !command) return undefined + + // Strip leading "cd &&" so we classify the actual work. + const stripped = command.replace(CD_PREFIX_RE, "").trim() + if (!stripped) return undefined + + // altimate-dbt CLI — MUST be checked before dbt below: `\b` is a word + // boundary so `-` is non-word and `altimate-dbt build` would otherwise + // match the dbt-verb regex and get misclassified as intent="dbt". + if (ALTIMATE_DBT_RE.test(stripped)) { + const verb = stripped.match(ALTIMATE_DBT_VERB_RE) + return { intent: "altimate_dbt", invoked: "altimate-dbt", ...(verb && { dbtCommand: verb[1].toLowerCase() }) } + } + + // dbt CLI: detect verb after `dbt`. Broad list of subcommands. + const dbtMatch = stripped.match(DBT_VERB_RE) + if (dbtMatch) { + return { intent: "dbt", invoked: "dbt", dbtCommand: dbtMatch[1].toLowerCase() } + } + // dbt invoked but without a recognized verb (e.g. `dbt --version`, `dbt --help`) + if (DBT_BARE_RE.test(stripped)) { + return { intent: "dbt", invoked: "dbt" } + } + + // Python with inline SQL/DuckDB + if (/\bpython3?\b/i.test(stripped) && /\b(duckdb|select\s|from\s|insert\s|create\s+table)\b/i.test(stripped)) { + return { intent: "python_sql", invoked: "python3" } + } + + // psql / clickhouse-client / sqlfluff + if (/\bpsql\b/i.test(stripped)) return { intent: "sql", invoked: "psql" } + if (/\bclickhouse-client\b/i.test(stripped)) return { intent: "sql", invoked: "clickhouse-client" } + if (/\bsqlfluff\b/i.test(stripped)) return { intent: "sql_lint", invoked: "sqlfluff" } + + // Inline SQL (no driver, just a SELECT/INSERT/etc. somewhere) + if (/\b(SELECT|INSERT\s+INTO|UPDATE|DELETE\s+FROM|CREATE\s+TABLE|ALTER\s+TABLE|DROP\s+TABLE|WITH\s+\w+\s+AS)\b/i.test(stripped)) { + return { intent: "sql" } + } + + // Generic Python + if (/\bpython3?\b/i.test(stripped)) return { intent: "python", invoked: "python3" } + + // VCS + if (/^git\s/i.test(stripped)) return { intent: "vcs", invoked: "git" } + + // Install / package mgmt + if (/^(npm|pnpm|yarn|bun|pip|pip3|brew|apt-get|cargo)\s/i.test(stripped)) { + const tool = stripped.split(/\s+/)[0] + return { intent: "install", invoked: tool } + } + + // Filesystem + if (/^(ls|find|cat|head|tail|cp|mv|rm|mkdir|touch|chmod|chown|du|df)\b/i.test(stripped)) { + const tool = stripped.split(/\s+/)[0] + return { intent: "fs", invoked: tool } + } + + return { intent: "other" } +} + +// --------------------------------------------------------------------------- +// SQL fragment extraction from input/output text +// --------------------------------------------------------------------------- + +// Tools that accept a single SQL query in a `query` or `sql` parameter — the +// annotator lifts that into `de.sql.query_text` + extracts input tables. +// sql_autocomplete (takes a `prefix`, not full SQL) and sql_diff (takes +// `original`+`modified`) are handled separately. +const SQL_QUERY_TOOLS = new Set([ + "sql_execute", + "sql_analyze", + "sql_optimize", + "sql_fix", + "sql_explain", + "sql_translate", + "sql_rewrite", + "sql_format", + // altimate_core_* wrappers that consume SQL via a `sql` parameter. + // altimate_core_column_lineage has its own Layer-1 opt-in and is excluded. + "altimate_core_rewrite", + "altimate_core_validate", + "altimate_core_check", + "altimate_core_grade", + "altimate_core_compare", + "altimate_core_complete", + "altimate_core_correct", + "altimate_core_fix", + "altimate_core_semantics", + "altimate_core_extract_metadata", + "altimate_core_query_pii", + "altimate_core_prune_schema", + "altimate_core_policy", +]) + +const SQL_TABLE_PATTERN = /\b(?:FROM|JOIN)\s+([A-Za-z_"`][A-Za-z0-9_."`\-]*)/gi + +function extractInputTables(sql: string): string[] | undefined { + if (typeof sql !== "string" || !sql) return undefined + const seen = new Set() + for (const m of sql.matchAll(SQL_TABLE_PATTERN)) { + // Strip quote chars; lowercase; cap individual size + const raw = m[1].replace(/[`"]/g, "").trim() + if (raw.length > 256) continue + seen.add(raw.toLowerCase()) + if (seen.size > 50) break // sanity cap + } + return seen.size > 0 ? [...seen] : undefined +} + +// --------------------------------------------------------------------------- +// PUBLIC — per-tool-span classification +// --------------------------------------------------------------------------- + +/** + * Classify a single tool span from its name + input + output. + * Returns attributes to merge into the span. Empty object = nothing to add. + * + * Pure function. Never throws (best-effort). + */ +export function annotateToolSpan( + toolName: string, + input: unknown, + output: unknown, +): Record { + const out: Record = {} + try { + // Tool taxonomy (always) + const tax = TOOL_TAXONOMY[toolName] + if (tax) { + out[DE.TOOL.CATEGORY] = tax.category + if (tax.subcategory) out[DE.TOOL.SUBCATEGORY] = tax.subcategory + if (tax.vendor) out[DE.TOOL.VENDOR] = tax.vendor + } else { + out[DE.TOOL.CATEGORY] = "generic" + } + + const inp = (input && typeof input === "object" ? (input as Record) : {}) as Record + + // Tool-specific input parsing + if (toolName === "bash") { + const bash = classifyBash(typeof inp.command === "string" ? inp.command : undefined) + if (bash) { + out[DE.TOOL.BASH_INTENT] = bash.intent + if (bash.invoked) out[DE.TOOL.BASH_INVOKED] = bash.invoked + if (bash.dbtCommand) out[DE.DBT.COMMAND] = bash.dbtCommand + } + } else if (toolName === "read" || toolName === "write" || toolName === "edit") { + const layer = dbtLayerFromPath(typeof inp.filePath === "string" ? inp.filePath : undefined) + if (layer) out[DE.DBT.LAYER] = layer + } else if (SQL_QUERY_TOOLS.has(toolName)) { + const q = typeof inp.query === "string" ? inp.query : (typeof inp.sql === "string" ? inp.sql : undefined) + if (q) { + // Don't pre-truncate. The tracer enforces a 10 KB UTF-8 byte cap per + // value in `logToolCall`; a char-based slice here was inconsistent + // (8000 CJK chars = ~24 KB and gets silently dropped by the byte cap). + // Let the tracer be the single authority on byte limits. + out[DE.SQL.QUERY_TEXT] = q + const tables = extractInputTables(q) + if (tables) out[DE.SQL.LINEAGE_INPUT_TABLES] = tables + } + } else if (toolName === "sql_diff") { + // sql_diff takes `original` and `modified` instead of a single query. + // Lift input_tables from both — there's no single canonical `query_text` + // to store, so we skip that key. + const both = [inp.original, inp.modified] + .filter((v): v is string => typeof v === "string" && v.length > 0) + .join("\n") + if (both) { + const tables = extractInputTables(both) + if (tables) out[DE.SQL.LINEAGE_INPUT_TABLES] = tables + } + } else if (toolName === "skill") { + const skill = typeof inp.name === "string" ? inp.name : (typeof inp.skill === "string" ? inp.skill : undefined) + if (skill) out[DE.TOOL.SUBCATEGORY] = `skill.${skill}` + } + + // SQL extracted from bash (overlay onto the bash classification) + if (toolName === "bash" && typeof inp.command === "string") { + const cmd = inp.command + // Find a sql_execute-shaped SELECT/INSERT/CTE inside the bash command + const sqlMatch = cmd.match(/(?:^|[\s'"`(])((?:WITH\s+\w[\s\S]{0,200}?AS\s*\(|SELECT|INSERT\s+INTO|UPDATE\s|DELETE\s+FROM|CREATE\s+TABLE|ALTER\s+TABLE)[\s\S]+)/i) + if (sqlMatch) { + // No pre-truncation here either — tracer's 10 KB UTF-8 byte cap is + // the single source of truth for span-attribute size limits. + const fullSql = sqlMatch[1] + out[DE.SQL.QUERY_TEXT] = fullSql + const tables = extractInputTables(fullSql) + if (tables) out[DE.SQL.LINEAGE_INPUT_TABLES] = tables + } + } + } catch { + // best-effort — annotator must never break the tracer + } + return out +} + +// --------------------------------------------------------------------------- +// PUBLIC — session-level rollup +// --------------------------------------------------------------------------- + +/** + * Workflow type heuristic. Returns { type, confidence } when confident, + * undefined when not. + * + * Inputs: prompt text + tool span names + skill invocations. + * Confidence: ratio of corroborating signals to total possible signals. + */ +function classifyWorkflow( + prompt: string, + toolNames: string[], + skills: string[], + spans: TraceSpan[], +): { type: string; confidence: number } | undefined { + const p = prompt.toLowerCase() + const hasTool = (n: string) => toolNames.includes(n) + const hasSkill = (n: string) => skills.includes(n) + + // dbt-troubleshoot signals: skill, plus error/fix verbs + if (hasSkill("dbt-troubleshoot")) return { type: "dbt_troubleshoot", confidence: 0.95 } + if (hasSkill("debugging-dbt-errors")) return { type: "dbt_troubleshoot", confidence: 0.9 } + + // dbt-develop signals + if (hasSkill("dbt-develop")) return { type: "dbt_develop", confidence: 0.95 } + + // Other dbt skills + if (hasSkill("dbt-test") || hasSkill("testing-dbt-models")) return { type: "dbt_test", confidence: 0.9 } + if (hasSkill("dbt-docs") || hasSkill("documenting-dbt-models")) return { type: "dbt_docs", confidence: 0.9 } + if (hasSkill("dbt-pr-review")) return { type: "dbt_pr_review", confidence: 0.9 } + if (hasSkill("dbt-schema-verify")) return { type: "dbt_schema_verify", confidence: 0.9 } + + // Tool-mix heuristics — count BASH SPANS BY INTENT, not all bash spans, so + // `project_scan + wc -c` doesn't masquerade as a dbt session. + const dbtBashCount = spans.filter((s) => { + if (s.kind !== "tool" || s.name !== "bash") return false + const attr = s.attributes + return attr?.[DE.TOOL.BASH_INTENT] === "dbt" || attr?.[DE.TOOL.BASH_INTENT] === "altimate_dbt" + }).length + const sqlExecCount = toolNames.filter((n) => n === "sql_execute" || n === "sql_analyze").length + const projectScan = hasTool("project_scan") + + if (projectScan && sqlExecCount === 0 && dbtBashCount === 0) { + return { type: "warehouse_exploration", confidence: 0.6 } + } + if (dbtBashCount >= 3) { + return { type: "dbt_develop", confidence: 0.6 } + } + if (sqlExecCount >= 3) { + return { type: "sql_analysis", confidence: 0.7 } + } + + // Prompt-driven fallbacks + if ( + /\b(fix|debug|troubleshoot|broken|failing|error)\b/.test(p) && + (toolNames.includes("read") || toolNames.includes("edit") || toolNames.includes("write")) + ) { + return { type: "dbt_troubleshoot", confidence: 0.5 } + } + if (/\b(create|add|build|model|refactor)\b/.test(p) && (toolNames.includes("write") || toolNames.includes("edit"))) { + return { type: "dbt_develop", confidence: 0.5 } + } + + return undefined +} + +/** + * Classify a session from its finished trace. + * Returns attributes to attach to the session's root span. + * + * Pure function. Never throws (best-effort). + */ +export function annotateSession(trace: TraceFile): Record { + const out: Record = {} + try { + const toolSpans = trace.spans.filter((s) => s.kind === "tool") + const toolNames = toolSpans.map((s) => s.name) + const skillNames = toolSpans + .filter((s) => s.name === "skill") + .map((s) => { + const inp = s.input as Record | undefined + return typeof inp?.name === "string" ? inp.name : (typeof inp?.skill === "string" ? inp.skill : "") + }) + .filter(Boolean) + + // Outcome (deterministic — direct map from summary.status) + const statusMap: Record = { + completed: "success", + error: "failure", + crashed: "interrupted", + running: "interrupted", + } + const outcome = statusMap[trace.summary.status] + if (outcome) out[DE.OUTCOME.CLASS] = outcome + + // Workflow (heuristic with confidence). Reads per-span `de.tool.bash_intent` + // attribute (set by Layer 2 in logToolCall) to distinguish dbt-bash spans + // from generic-bash spans. + const wf = classifyWorkflow(trace.metadata.prompt ?? "", toolNames, skillNames, trace.spans) + if (wf) { + out[DE.WORKFLOW.TYPE] = wf.type + out[DE.WORKFLOW.TYPE_CONFIDENCE] = wf.confidence + } + + // Artifacts.touched (deterministic) + const filesRead = new Set() + const filesEdited = new Set() + for (const span of toolSpans) { + const inp = span.input as Record | undefined + const filePath = typeof inp?.filePath === "string" ? inp.filePath : undefined + if (!filePath) continue + if (span.name === "read") filesRead.add(filePath) + else if (span.name === "write" || span.name === "edit") filesEdited.add(filePath) + } + if (filesRead.size > 0) out[DE.ARTIFACTS.FILES_READ] = [...filesRead].slice(0, 100) + if (filesEdited.size > 0) out[DE.ARTIFACTS.FILES_EDITED] = [...filesEdited].slice(0, 100) + + // Environment capabilities. Prefer the project_scan tool's Layer-1 + // `de.env.*` attributes (authoritative, set in project-scan.ts:935) over + // re-parsing its output text. Falls back to text parsing only when the + // Layer-1 attribute is absent. + // + // Picks the LATEST project_scan span (last one in the array — spans are + // pushed in insertion order in logToolCall) so that a session running + // project_scan twice (e.g., after adding a warehouse) reflects the + // current state, not the initial one. (GPT PR-938 consensus review M2.) + const projectScanSpans = toolSpans.filter((s) => s.name === "project_scan") + const latestScan = projectScanSpans[projectScanSpans.length - 1] + if (latestScan) { + const attrs = latestScan.attributes ?? {} + // Text-parse fallback computed lazily — only used per key when Layer-1 + // is absent, so we still skip the regex work when all 4 keys hit Layer 1. + let fallback: ScanEnv | undefined + const parsed = (): ScanEnv => { + if (!fallback) fallback = detectEnvFromProjectScan(latestScan) + return fallback + } + const pickBool = (key: string, fb: () => boolean | undefined): boolean | undefined => { + const v = attrs[key] + if (typeof v === "boolean") return v + return fb() + } + const pickString = (key: string, fb: () => string | undefined): string | undefined => { + const v = attrs[key] + if (typeof v === "string" && v) return v + return fb() + } + const pickStringArray = (key: string, fb: () => string[]): string[] => { + const v = attrs[key] + if (Array.isArray(v) && v.every((x) => typeof x === "string")) return v as string[] + return fb() + } + const dbtPresent = pickBool(DE.ENV.DBT_PRESENT, () => parsed().dbtPresent) + const manifestPresent = pickBool(DE.ENV.DBT_MANIFEST_PRESENT, () => parsed().manifestPresent) + const warehouseType = pickString(DE.ENV.WAREHOUSE_TYPE, () => parsed().warehouseType) + const toolsDetected = pickStringArray(DE.ENV.TOOLS_DETECTED, () => parsed().toolsDetected) + if (dbtPresent !== undefined) out[DE.ENV.DBT_PRESENT] = dbtPresent + if (manifestPresent !== undefined) out[DE.ENV.DBT_MANIFEST_PRESENT] = manifestPresent + if (warehouseType) out[DE.ENV.WAREHOUSE_TYPE] = warehouseType + if (toolsDetected.length > 0) out[DE.ENV.TOOLS_DETECTED] = toolsDetected + } + + // Outcome.executed: was a dbt-family DML command (build/run/test/seed/ + // snapshot) actually performed? Reads the cached `de.tool.bash_intent` + // and `de.dbt.command` attributes set by classifyBash at logToolCall + // time — single source of truth, no regex duplication, no verb-list drift. + // (Claude PR-938 consensus review M4.) + const DBT_EXECUTED_VERBS = new Set(["build", "run", "test", "seed", "snapshot"]) + const ranDbt = toolSpans.some((s) => { + if (s.name !== "bash") return false + const intent = s.attributes?.[DE.TOOL.BASH_INTENT] + if (intent !== "dbt" && intent !== "altimate_dbt") return false + const cmd = s.attributes?.[DE.DBT.COMMAND] + return typeof cmd === "string" && DBT_EXECUTED_VERBS.has(cmd) + }) + if (ranDbt) out[DE.OUTCOME.EXECUTED] = true + + // Outcome.change_applied: any SUCCESSFUL write/edit spans? The attribute + // name says "applied," not "attempted" — a failed write that didn't touch + // the file should not flip this to true. + const changed = toolSpans.some((s) => (s.name === "write" || s.name === "edit") && s.status === "ok") + if (changed) out[DE.OUTCOME.CHANGE_APPLIED] = true + } catch { + // best-effort + } + return out +} + +// --------------------------------------------------------------------------- +// project_scan output parsing +// --------------------------------------------------------------------------- + +type ScanEnv = { + dbtPresent: boolean | undefined + manifestPresent: boolean | undefined + warehouseType: string | undefined + toolsDetected: string[] +} + +const WAREHOUSE_KEYWORDS: Array<[RegExp, string]> = [ + [/\bsnowflake\b/i, "snowflake"], + [/\bbigquery\b/i, "bigquery"], + [/\bpostgres\b/i, "postgres"], + [/\bduckdb\b/i, "duckdb"], + [/\bdatabricks\b/i, "databricks"], + [/\bredshift\b/i, "redshift"], + [/\bmysql\b/i, "mysql"], + [/\bclickhouse\b/i, "clickhouse"], +] + +const TOOL_KEYWORDS: Array<[RegExp, string]> = [ + [/\bairflow\b/i, "airflow"], + [/\bdagster\b/i, "dagster"], + [/\bprefect\b/i, "prefect"], + [/\bsoda\b/i, "soda"], + [/\bsqlmesh\b/i, "sqlmesh"], + [/\bgreat[\s_-]?expectations\b/i, "great_expectations"], + [/\bsqlfluff\b/i, "sqlfluff"], +] + +function detectEnvFromProjectScan(span: TraceSpan): ScanEnv { + const text = typeof span.output === "string" ? span.output : (span.output ? JSON.stringify(span.output) : "") + const env: ScanEnv = { + dbtPresent: undefined, + manifestPresent: undefined, + warehouseType: undefined, + toolsDetected: [], + } + if (!text) return env + + // project_scan output uses ✓ / ✗ glyphs around "dbt Project" and "manifest.json" + if (/✓\s*Project\s+"/i.test(text) || /✓\s*dbt\s+Project/i.test(text)) env.dbtPresent = true + else if (/✗\s*(?:dbt\s+Project|No\s+dbt)/i.test(text)) env.dbtPresent = false + + if (/✓\s*manifest\.json/i.test(text)) env.manifestPresent = true + else if (/✗\s*manifest\.json/i.test(text)) env.manifestPresent = false + + // Warehouse: look for adapter mention in profile + for (const [re, name] of WAREHOUSE_KEYWORDS) { + if (re.test(text)) { + env.warehouseType = name + break + } + } + + // Tools + const seen = new Set() + for (const [re, name] of TOOL_KEYWORDS) { + if (re.test(text)) seen.add(name) + } + env.toolsDetected = [...seen] + + return env +} diff --git a/packages/opencode/src/altimate/observability/de-attributes.ts b/packages/opencode/src/altimate/observability/de-attributes.ts index 295975cfa..b76779600 100644 --- a/packages/opencode/src/altimate/observability/de-attributes.ts +++ b/packages/opencode/src/altimate/observability/de-attributes.ts @@ -56,6 +56,8 @@ export const DE_WAREHOUSE = { ROWS_RETURNED: "de.warehouse.rows_returned", /** Rows affected (INSERT/UPDATE/DELETE) */ ROWS_AFFECTED: "de.warehouse.rows_affected", + /** Total rows in a table (from schema_inspect, not query result) */ + ROWS_TOTAL: "de.warehouse.rows_total", /** Query ID from the warehouse (for linking to warehouse query history) */ QUERY_ID: "de.warehouse.query_id", /** Whether the query hit a warehouse cache (Snowflake result cache, BQ cache) */ @@ -102,6 +104,8 @@ export const DE_SQL = { export const DE_DBT = { /** dbt command: run, test, build, compile, seed, snapshot */ COMMAND: "de.dbt.command", + /** dbt project layer derived from model path: staging, intermediate, dim, fact, agg, mart, source, seed, macro, test */ + LAYER: "de.dbt.layer", /** Model unique_id (e.g., model.my_project.stg_orders) */ MODEL_UNIQUE_ID: "de.dbt.model.unique_id", /** Model short name */ @@ -206,6 +210,79 @@ export const DE_COST = { ATTRIBUTION_PROJECT: "de.cost.attribution.project", } as const +// --------------------------------------------------------------------------- +// Workflow & session classification (Layer 6) — populated by derived annotator +// --------------------------------------------------------------------------- + +export const DE_WORKFLOW = { + /** Session workflow type: dbt_develop, dbt_troubleshoot, dbt_test, dbt_docs, sql_analysis, warehouse_exploration, project_scan, generic_file_edit */ + TYPE: "de.workflow.type", + /** Session intent verb-phrase: "create model", "fix error", "add tests", "refactor model", "inspect schema", "run query" */ + INTENT: "de.workflow.intent", + /** Confidence of the classifier (0.0-1.0) */ + TYPE_CONFIDENCE: "de.workflow.type_confidence", +} as const + +// --------------------------------------------------------------------------- +// Session outcome (Layer 7) — populated by derived annotator +// --------------------------------------------------------------------------- + +export const DE_OUTCOME = { + /** success, failure, interrupted, provider_routing_failure, validation_failure, no_op, partial_fix */ + CLASS: "de.outcome.class", + /** Whether a build/test/run was actually executed */ + EXECUTED: "de.outcome.executed", + /** Whether a fix or change was applied */ + CHANGE_APPLIED: "de.outcome.change_applied", +} as const + +// --------------------------------------------------------------------------- +// Touched artifacts (Layer 8) — populated by derived annotator +// --------------------------------------------------------------------------- + +export const DE_ARTIFACTS = { + /** File paths read (JSON array) */ + FILES_READ: "de.artifacts.files_read", + /** File paths written or edited (JSON array) */ + FILES_EDITED: "de.artifacts.files_edited", + /** dbt model names mentioned in the user prompt (JSON array) */ + MODELS_MENTIONED: "de.artifacts.models_mentioned", + /** Fully-qualified tables referenced by executed SQL (JSON array) */ + TABLES_REFERENCED: "de.artifacts.tables_referenced", +} as const + +// --------------------------------------------------------------------------- +// Environment capabilities (Layer 9) — populated by derived annotator +// --------------------------------------------------------------------------- + +export const DE_ENV = { + /** Whether dbt is present in the working directory */ + DBT_PRESENT: "de.env.dbt_present", + /** Whether dbt manifest.json exists */ + DBT_MANIFEST_PRESENT: "de.env.dbt_manifest_present", + /** Warehouse adapter detected: snowflake, bigquery, postgres, duckdb, databricks, redshift */ + WAREHOUSE_TYPE: "de.env.warehouse_type", + /** Orchestrators / data-quality / lint tools detected (JSON array): airflow, dagster, prefect, soda, sqlmesh, great_expectations, sqlfluff */ + TOOLS_DETECTED: "de.env.tools_detected", +} as const + +// --------------------------------------------------------------------------- +// Generic tool classification (Layer 10) — populated by derived annotator +// --------------------------------------------------------------------------- + +export const DE_TOOL = { + /** Tool category: warehouse, sql, dbt, schema, lineage, quality, finops, fs, exec, planning, generic */ + CATEGORY: "de.tool.category", + /** Finer-grained subcategory (e.g., "sql.execute", "dbt.build", "fs.read") */ + SUBCATEGORY: "de.tool.subcategory", + /** Vendor or framework the tool wraps: snowflake, bigquery, duckdb, dbt, altimate-core, etc. */ + VENDOR: "de.tool.vendor", + /** bash-command intent when kind=tool name=bash: dbt, altimate_dbt, python_sql, sql, fs, vcs, install, other */ + BASH_INTENT: "de.tool.bash_intent", + /** Concrete CLI invoked in a bash command: dbt, altimate-dbt, python3, psql, sqlfluff, etc. */ + BASH_INVOKED: "de.tool.bash_invoked", +} as const + // --------------------------------------------------------------------------- // Convenience namespace // --------------------------------------------------------------------------- @@ -217,4 +294,9 @@ export const DE = { DBT: DE_DBT, QUALITY: DE_QUALITY, COST: DE_COST, + WORKFLOW: DE_WORKFLOW, + OUTCOME: DE_OUTCOME, + ARTIFACTS: DE_ARTIFACTS, + ENV: DE_ENV, + TOOL: DE_TOOL, } as const diff --git a/packages/opencode/src/altimate/observability/tracing.ts b/packages/opencode/src/altimate/observability/tracing.ts index e3e8b5ef6..b06ef2bcb 100644 --- a/packages/opencode/src/altimate/observability/tracing.ts +++ b/packages/opencode/src/altimate/observability/tracing.ts @@ -22,6 +22,9 @@ import path from "path" import { Global } from "../../global" import { randomUUIDv7 } from "bun" import { Log } from "../../util/log" +// altimate_change start — trace augmentation: procedural classifier +import { annotateToolSpan, annotateSession } from "./annotator" +// altimate_change end // --------------------------------------------------------------------------- // Trace data types — v2 schema @@ -841,12 +844,14 @@ export class Trace { status: "completed" input: Record output: string + metadata?: Record time: { start: number; end: number } } | { status: "error" input: Record error: string + metadata?: Record time: { start: number; end: number } } }) { @@ -874,6 +879,78 @@ export class Trace { safeInput = { _serialization_error: "Input contained circular references or non-serializable data" } } + // altimate_change start — trace augmentation: lift structured tool metadata + // (carried via ToolStateCompleted/Error.metadata in message-v2) onto the + // span. Only keys matching the `de.` semantic-convention prefix are + // promoted — other metadata stays internal to the tool framework + // (e.g., `truncated`, `outputPath`, `findings`, `success`). + // + // This is the "metadata channel": tools surface structured fields by + // setting `de.*` keys on their returned metadata object, without ever + // importing the observability layer. + // + // Values must be JSON-compatible (Date / BigInt / class instances are + // silently dropped or stringified, matching the trace file's serialization + // contract). Per-value and total byte caps below prevent runaway payloads + // from ballooning snapshots and HTTP exports — both truncation layers + // upstream (`tool.ts` output truncation, `tracing.ts` 10 KB output slice) + // operate on `output: string`, not on `metadata`. + // + // Byte size is computed as UTF-8 byte length, not JS string length + // (which counts UTF-16 code units): non-ASCII payloads (e.g., + // CJK identifiers, emoji in error messages) would otherwise bypass + // the cap and balloon trace exports. + const ATTR_VALUE_MAX_BYTES = 10_000 + const ATTR_TOTAL_MAX_BYTES = 32_000 + const spanAttributes: Record = {} + let totalBytes = 0 + + // Layer 1: tool-provided structured metadata (high fidelity — driver + // values, parser output). Filtered to the de.* prefix. + const rawMetadata = state.metadata + if (rawMetadata && typeof rawMetadata === "object") { + for (const [k, v] of Object.entries(rawMetadata)) { + if (typeof k !== "string" || !k.startsWith("de.") || v === undefined) continue + try { + const serialized = JSON.stringify(v) + if (serialized === undefined) continue + const valueBytes = Buffer.byteLength(serialized, "utf8") + if (valueBytes > ATTR_VALUE_MAX_BYTES) continue + if (totalBytes + valueBytes > ATTR_TOTAL_MAX_BYTES) continue + // Store original value (matches setSpanAttributes() at line ~1135 for + // consistent overwrite semantics if both paths target the same key). + spanAttributes[k] = v + totalBytes += valueBytes + } catch { + // Bad metadata value must never break the tracer + } + } + } + + // Layer 2: derived classification from (name, input, output). Best-effort + // procedural — taxonomy lookup, bash intent, dbt layer from path, etc. + // Tool-provided metadata (Layer 1) wins on conflicts. + try { + const derived = annotateToolSpan(toolName, safeInput, isError ? errorStr : outputStr) + for (const [k, v] of Object.entries(derived)) { + if (v === undefined || k in spanAttributes) continue + try { + const serialized = JSON.stringify(v) + if (serialized === undefined) continue + const valueBytes = Buffer.byteLength(serialized, "utf8") + if (valueBytes > ATTR_VALUE_MAX_BYTES) continue + if (totalBytes + valueBytes > ATTR_TOTAL_MAX_BYTES) continue + spanAttributes[k] = v + totalBytes += valueBytes + } catch { + // best-effort + } + } + } catch { + // Annotator must never break the tracer + } + // altimate_change end + this.spans.push({ spanId: randomUUIDv7(), parentSpanId: this.currentGenerationSpanId ?? this.rootSpanId, @@ -889,6 +966,7 @@ export class Trace { }, input: safeInput, output: isError ? { error: errorStr } : outputStr.slice(0, 10000), + ...(Object.keys(spanAttributes).length > 0 && { attributes: spanAttributes }), }) this.toolCallCount++ @@ -1233,6 +1311,34 @@ export class Trace { const trace = this.buildTraceFile(error) + // altimate_change start — trace augmentation: session-level rollup. + // Pure-function classifier attaches workflow / outcome / artifacts / env + // attributes to the root (session) span. Runs over the snapshotted trace + // (post buildTraceFile) so it sees the final state. Best-effort. + // + // Merge semantics: derived attributes fill only ABSENT keys. Anything an + // upstream caller set explicitly via setSpanAttributes(..., "session") + // wins — Layer 1 (caller-provided) > Layer 2 (derived), same rule as + // logToolCall. + try { + const sessionAttrs = annotateSession(trace) + if (Object.keys(sessionAttrs).length > 0) { + const rootSnapshotSpan = trace.spans.find((s) => s.spanId === this.rootSpanId) + if (rootSnapshotSpan) { + rootSnapshotSpan.attributes = { ...sessionAttrs, ...(rootSnapshotSpan.attributes ?? {}) } + } + // Also mirror onto the live root span so a subsequent snapshot() + // (e.g., via flushSync) sees the same attributes. + const liveRoot = this.spans.find((s) => s.spanId === this.rootSpanId) + if (liveRoot) { + liveRoot.attributes = { ...sessionAttrs, ...(liveRoot.attributes ?? {}) } + } + } + } catch { + // Session annotation must never break the trace + } + // altimate_change end + // altimate_change start — trace: post-session summary (narrative, loops, topTools) try { // Top tools by call count @@ -1346,6 +1452,24 @@ export class Trace { } const trace = this.buildTraceFile(error || "Process exited before trace completed") trace.summary.status = "crashed" + + // altimate_change start — trace augmentation: session-level rollup on crash. + // Without this, crashed/interrupted sessions get per-tool attributes but + // no root rollup (workflow/outcome/artifacts/env). Pure-function call, + // best-effort, must never throw. + try { + const sessionAttrs = annotateSession(trace) + if (Object.keys(sessionAttrs).length > 0) { + const rootSnapshotSpan = trace.spans.find((s) => s.spanId === this.rootSpanId) + if (rootSnapshotSpan) { + rootSnapshotSpan.attributes = { ...sessionAttrs, ...(rootSnapshotSpan.attributes ?? {}) } + } + } + } catch { + // best-effort + } + // altimate_change end + const safeId = (this.sessionId || "unknown").replace(/[/\\.:]/g, "_") || "unknown" const filePath = path.join(this.snapshotDir, `${safeId}.json`) // Must be synchronous — async writes won't complete before signal handler exits diff --git a/packages/opencode/src/altimate/observability/viewer.ts b/packages/opencode/src/altimate/observability/viewer.ts index 8fc6ab256..2c5364298 100644 --- a/packages/opencode/src/altimate/observability/viewer.ts +++ b/packages/opencode/src/altimate/observability/viewer.ts @@ -516,7 +516,7 @@ function showDetail(span) { } // DE attributes grouped var a = span.attributes || {}; - var groups = [['de.warehouse.','Warehouse','cyan'],['de.sql.','SQL','secondary'],['de.dbt.','dbt','orange'],['de.quality.','Quality','green'],['de.cost.','Cost','orange']]; + var groups = [['de.warehouse.','Warehouse','cyan'],['de.sql.','SQL','secondary'],['de.dbt.','dbt','orange'],['de.quality.','Quality','green'],['de.cost.','Cost','orange'],['de.workflow.','Workflow','accent'],['de.outcome.','Outcome','green'],['de.artifacts.','Artifacts','secondary'],['de.env.','Environment','cyan'],['de.tool.','Tool','accent']]; var used = {}; groups.forEach(function(g) { var entries = Object.keys(a).filter(function(k){return k.indexOf(g[0])===0;}); diff --git a/packages/opencode/src/altimate/tools/altimate-core-column-lineage.ts b/packages/opencode/src/altimate/tools/altimate-core-column-lineage.ts index 310001e1e..99f5eee2d 100644 --- a/packages/opencode/src/altimate/tools/altimate-core-column-lineage.ts +++ b/packages/opencode/src/altimate/tools/altimate-core-column-lineage.ts @@ -2,6 +2,7 @@ import z from "zod" import { Tool } from "../../tool/tool" import { Dispatcher } from "../native" import { isRecord, normalizeError } from "./response-normalization" +import { DE_SQL } from "../observability/de-attributes" export const AltimateCoreColumnLineageTool = Tool.define("altimate_core_column_lineage", { description: @@ -30,9 +31,102 @@ export const AltimateCoreColumnLineageTool = Tool.define("altimate_core_column_l const error = normalizeError(result.error) ?? normalizeError(data.error) const failureMessage = error?.trim() || "Column lineage failed." const isFailure = error !== undefined || result.success === false || data.success === false + + // altimate_change start — trace augmentation: emit structured lineage on + // the de.* metadata channel. Higher fidelity than the annotator's regex + // extraction since it comes from altimate-core's real parser. + // + // Prefer structured `source_table`/`source_column` / `target_table`/ + // `target_column` fields when altimate-core supplies them — that + // preserves quoted/case-sensitive identifiers. Fall back to splitting + // a dotted endpoint string only when the structured fields are absent. + const lineageAttrs: Record = {} + if (!isFailure) { + const inputTables = new Set() + const outputs = new Set() + const colsRead = new Set() + const colsWritten = new Set() + + const extractTable = (edge: Record, side: "source" | "target"): string | undefined => { + const direct = edge[`${side}_table`] ?? edge[`${side}Table`] + if (typeof direct === "string" && direct) return direct + if (typeof direct === "object" && direct !== null) { + const obj = direct as Record + const t = obj.table ?? obj.name + if (typeof t === "string" && t) return t + } + const endpoint = edge[side] + if (typeof endpoint === "string" && endpoint.includes(".")) { + // Strip the trailing column segment; preserve original case + return endpoint.split(".").slice(0, -1).join(".") || undefined + } + return undefined + } + const extractColumn = (edge: Record, side: "source" | "target"): string | undefined => { + const direct = edge[`${side}_column`] ?? edge[`${side}Column`] + if (typeof direct === "string" && direct) return direct + // Mirror extractTable's object-form branch — altimate-core may emit + // a structured `{ column?, name? }` object instead of a bare string. + // Without this, the column was silently dropped while extractTable + // still captured its half of the edge. + if (typeof direct === "object" && direct !== null) { + const obj = direct as Record + const c = obj.column ?? obj.name + if (typeof c === "string" && c) return c + } + const endpoint = edge[side] + if (typeof endpoint === "string") { + // Strip the table prefix when falling back to a dotted endpoint + // string so this returns only the column — matching the structured + // path above. Without this, `columns_read` would mix bare + // column names (from `source_column`) with fully-qualified + // strings (from `source` as fallback), breaking deduplication. + const lastDot = endpoint.lastIndexOf(".") + return lastDot >= 0 ? endpoint.slice(lastDot + 1) : endpoint + } + if (typeof endpoint === "object" && endpoint !== null) { + const obj = endpoint as Record + const c = obj.column ?? obj.name + if (typeof c === "string" && c) return c + } + return undefined + } + + // Guard with Array.isArray — the `?? []` fallback only handles + // null/undefined. If the dispatcher returns a non-array shape we + // skip lineage extraction rather than throwing in the for-of. + // Use unknown[] at the array boundary and narrow per element with + // isRecord — Array.isArray only proves array, not array-of-records. + const edges: unknown[] = Array.isArray(data.column_lineage) ? data.column_lineage : [] + for (const edge of edges) { + if (!isRecord(edge)) continue + const srcTable = extractTable(edge, "source") + const tgtTable = extractTable(edge, "target") + const srcCol = extractColumn(edge, "source") + const tgtCol = extractColumn(edge, "target") + if (srcTable) inputTables.add(srcTable) + if (tgtTable) outputs.add(tgtTable) + if (srcCol) colsRead.add(srcCol) + if (tgtCol) colsWritten.add(tgtCol) + } + if (inputTables.size > 0) lineageAttrs[DE_SQL.LINEAGE_INPUT_TABLES] = [...inputTables].slice(0, 50) + // Keep output_table scalar — Codex chunk-3 review #5: don't switch attribute + // type to array when there are multiple outputs. Omit the attribute instead. + if (outputs.size === 1) lineageAttrs[DE_SQL.LINEAGE_OUTPUT_TABLE] = [...outputs][0] + if (colsRead.size > 0) lineageAttrs[DE_SQL.LINEAGE_COLUMNS_READ] = [...colsRead].slice(0, 100) + if (colsWritten.size > 0) lineageAttrs[DE_SQL.LINEAGE_COLUMNS_WRITTEN] = [...colsWritten].slice(0, 100) + if (args.dialect) lineageAttrs[DE_SQL.DIALECT] = args.dialect + } + // altimate_change end + return { title: isFailure ? "Column Lineage: ERROR" : `Column Lineage: ${edgeCount} edge(s)`, - metadata: { success: !isFailure, edge_count: edgeCount, ...(isFailure && { error: failureMessage }) }, + metadata: { + success: !isFailure, + edge_count: edgeCount, + ...(isFailure && { error: failureMessage }), + ...lineageAttrs, + }, output: isFailure ? `Failed: ${failureMessage}` : formatColumnLineage(data), } } catch (e) { diff --git a/packages/opencode/src/altimate/tools/project-scan.ts b/packages/opencode/src/altimate/tools/project-scan.ts index f7fa4e96b..d92004412 100644 --- a/packages/opencode/src/altimate/tools/project-scan.ts +++ b/packages/opencode/src/altimate/tools/project-scan.ts @@ -7,6 +7,9 @@ import { Telemetry } from "@/telemetry" import { Config } from "@/config/config" import { Flag } from "@/flag/flag" import { Skill } from "../../skill" +// altimate_change start — trace augmentation: use vocab constants for de.* keys +import { DE_ENV } from "../observability/de-attributes" +// altimate_change end // --- Types --- @@ -929,6 +932,28 @@ export const ProjectScanTool = Tool.define("project_scan", { const degradedSuffix = degradedList.length > 0 ? ` (${degradedList.length} degraded)` : "" + // altimate_change start — trace augmentation: lift environment signals onto + // the de.* metadata channel so the session-level rollup gets authoritative + // values instead of regex-parsing output text. + const deWhTypes = [ + ...new Set( + (schemaCache?.warehouses ?? []) + .map((w: any) => (typeof w?.type === "string" ? w.type.toLowerCase() : "")) + .filter(Boolean), + ), + ] + // `manifest_present` reflects on-disk file existence (set at line 218 via + // existsSync(target/manifest.json)), not RPC success. A transient + // `dbt.manifest` failure would otherwise emit `false` even when the + // file is there, breaking the attribute's semantic contract. + const deAttrs: Record = { + [DE_ENV.DBT_PRESENT]: dbtProject.found, + [DE_ENV.DBT_MANIFEST_PRESENT]: typeof dbtProject.manifestPath === "string" && dbtProject.manifestPath.length > 0, + ...(toolsFound.length > 0 && { [DE_ENV.TOOLS_DETECTED]: toolsFound }), + ...(deWhTypes.length === 1 && { [DE_ENV.WAREHOUSE_TYPE]: deWhTypes[0] }), + } + // altimate_change end + return { title: `Scan: ${totalConnections} connection(s), ${dbtProject.found ? "dbt found" : "no dbt"}${degradedSuffix}`, metadata: { @@ -962,6 +987,7 @@ export const ProjectScanTool = Tool.define("project_scan", { // so dashboard queries don't need null coalescing. Empty array means // "no degradation" cleanly. degraded: degradedList, + ...deAttrs, }, output: lines.join("\n"), } diff --git a/packages/opencode/src/altimate/tools/schema-inspect.ts b/packages/opencode/src/altimate/tools/schema-inspect.ts index cbdeff815..f06f7370e 100644 --- a/packages/opencode/src/altimate/tools/schema-inspect.ts +++ b/packages/opencode/src/altimate/tools/schema-inspect.ts @@ -6,6 +6,9 @@ import type { SchemaInspectResult } from "../native/types" import { PostConnectSuggestions } from "./post-connect-suggestions" // altimate_change end import { isRecord, normalizeError } from "./response-normalization" +// altimate_change start — trace augmentation: use vocab constants for de.* keys +import { DE_WAREHOUSE, DE_SQL } from "../observability/de-attributes" +// altimate_change end export const SchemaInspectTool = Tool.define("schema_inspect", { description: "Inspect database schema — list columns, types, and constraints for a table.", @@ -45,9 +48,26 @@ export const SchemaInspectTool = Tool.define("schema_inspect", { }) } // altimate_change end + // altimate_change start — trace augmentation: surface row/column counts + // on the de.* metadata channel. + const qualifiedTable = schemaResult.schema_name + ? `${schemaResult.schema_name}.${schemaResult.table ?? args.table}` + : (schemaResult.table ?? args.table) + const deAttrs: Record = { + ...(schemaResult.row_count !== undefined && schemaResult.row_count !== null && { + [DE_WAREHOUSE.ROWS_TOTAL]: schemaResult.row_count, + }), + ...(qualifiedTable && { [DE_SQL.LINEAGE_OUTPUT_TABLE]: qualifiedTable }), + } + // altimate_change end return { title: `Schema: ${schemaResult.table ?? args.table}`, - metadata: { success: true, columnCount: (schemaResult.columns ?? []).length, rowCount: schemaResult.row_count }, + metadata: { + success: true, + columnCount: (schemaResult.columns ?? []).length, + rowCount: schemaResult.row_count, + ...deAttrs, + }, output, } } catch (e) { diff --git a/packages/opencode/src/altimate/tools/sql-execute.ts b/packages/opencode/src/altimate/tools/sql-execute.ts index 4647c7564..f5938353d 100644 --- a/packages/opencode/src/altimate/tools/sql-execute.ts +++ b/packages/opencode/src/altimate/tools/sql-execute.ts @@ -13,6 +13,9 @@ import { PostConnectSuggestions } from "./post-connect-suggestions" import { getCache } from "../native/schema/cache" import * as Registry from "../native/connections/registry" // altimate_change end +// altimate_change start — trace augmentation: use vocab constants for de.* keys +import { DE_WAREHOUSE, DE_SQL } from "../observability/de-attributes" +// altimate_change end export const SqlExecuteTool = Tool.define("sql_execute", { description: "Execute SQL against a connected data warehouse. Returns results as a formatted table.", @@ -87,9 +90,30 @@ export const SqlExecuteTool = Tool.define("sql_execute", { }) } // altimate_change end + // altimate_change start — trace augmentation: surface driver-reported + // values via the de.* metadata channel. Tools never import observability; + // they just set keys on the returned metadata object and the tracer's + // logToolCall hook lifts `de.*` keys onto the tool span's attributes. + const warehouseEntry = (() => { + try { + const registered = Registry.list().warehouses + return registered.find((w) => w.name === (args.warehouse ?? registered[0]?.name)) + } catch { + return undefined + } + })() + // altimate_change end return { title: `SQL: ${args.query.slice(0, 60)}${args.query.length > 60 ? "..." : ""}`, - metadata: { rowCount: result.row_count, truncated: result.truncated }, + metadata: { + rowCount: result.row_count, + truncated: result.truncated, + // altimate_change start — de.* attributes lifted onto span by tracer + [DE_WAREHOUSE.ROWS_RETURNED]: result.row_count, + ...(warehouseEntry?.type && { [DE_WAREHOUSE.SYSTEM]: warehouseEntry.type }), + ...(warehouseEntry?.type && { [DE_SQL.DIALECT]: warehouseEntry.type }), + // altimate_change end + }, output, } } catch (e) {