From 6b554e66e177b5bba90d06dc3effb0c96628e911 Mon Sep 17 00:00:00 2001 From: Marius van Niekerk Date: Fri, 27 Mar 2026 00:16:49 -0400 Subject: [PATCH 1/4] fix(rerun): track requested model provenance (#556) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Preserve explicit requested model/provider separately from effective execution settings so reruns can reevaluate current defaults when no request was pinned. Also exposes requested model/provider as optional hidden TUI queue columns and updates SQLite/PostgreSQL sync schema accordingly. Validation: - go test ./internal/storage ./internal/daemon ./cmd/roborev/tui - go vet ./... 🤖 Generated with [OpenAI Codex](https://openai.com/codex) Co-authored-by: OpenAI Codex --- cmd/roborev/tui/queue_test.go | 17 +++ cmd/roborev/tui/render_queue.go | 107 ++++++++------- cmd/roborev/tui/tui.go | 2 +- cmd/roborev/tui/types.go | 2 +- internal/daemon/server.go | 150 ++++++++++++++-------- internal/daemon/server_actions_test.go | 35 +++++ internal/daemon/server_jobs_test.go | 1 + internal/storage/db.go | 49 ++++++- internal/storage/db_job_test.go | 55 ++++++-- internal/storage/hydration.go | 58 +++++---- internal/storage/jobs.go | 118 +++++++++-------- internal/storage/models.go | 58 +++++---- internal/storage/postgres.go | 86 ++++++++----- internal/storage/reviews.go | 8 +- internal/storage/schemas/postgres_v10.sql | 105 +++++++++++++++ internal/storage/sync.go | 74 ++++++----- 16 files changed, 630 insertions(+), 295 deletions(-) create mode 100644 internal/storage/schemas/postgres_v10.sql diff --git a/cmd/roborev/tui/queue_test.go b/cmd/roborev/tui/queue_test.go index 9ce85f503..e72bc95c6 100644 --- a/cmd/roborev/tui/queue_test.go +++ b/cmd/roborev/tui/queue_test.go @@ -2200,12 +2200,22 @@ func TestParseColumnOrderAppendsMissing(t *testing.T) { assert.True(t, slices.Equal(got[:len(wantPrefix)], wantPrefix)) pfCount := 0 + requestedModelCount := 0 + requestedProviderCount := 0 for _, c := range got { if c == colPF { pfCount++ } + if c == colRequestedModel { + requestedModelCount++ + } + if c == colRequestedProvider { + requestedProviderCount++ + } } assert.Equal(t, 1, pfCount) + assert.Equal(t, 1, requestedModelCount) + assert.Equal(t, 1, requestedProviderCount) } func TestDefaultColumnOrderDetection(t *testing.T) { @@ -2221,3 +2231,10 @@ func TestDefaultColumnOrderDetection(t *testing.T) { assert.False(t, slices.Equal(customOrder, toggleableColumns)) } + +func TestDefaultHiddenColumnsIncludeRequestedFields(t *testing.T) { + hidden := parseHiddenColumns(nil) + assert.True(t, hidden[colSessionID]) + assert.True(t, hidden[colRequestedModel]) + assert.True(t, hidden[colRequestedProvider]) +} diff --git a/cmd/roborev/tui/render_queue.go b/cmd/roborev/tui/render_queue.go index 238edfb38..a76246c71 100644 --- a/cmd/roborev/tui/render_queue.go +++ b/cmd/roborev/tui/render_queue.go @@ -100,19 +100,21 @@ func (m model) getVisibleSelectedIdx() int { // Queue table column indices. const ( - colSel = iota // "> " selection indicator - colJobID // Job ID - colRef // Git ref (short SHA or range) - colBranch // Branch name - colRepo // Repository display name - colAgent // Agent name - colQueued // Enqueue timestamp - colElapsed // Elapsed time - colStatus // Job status - colPF // Pass/Fail verdict - colHandled // Done status - colSessionID // Session ID - colCount // total number of columns + colSel = iota // "> " selection indicator + colJobID // Job ID + colRef // Git ref (short SHA or range) + colBranch // Branch name + colRepo // Repository display name + colAgent // Agent name + colQueued // Enqueue timestamp + colElapsed // Elapsed time + colStatus // Job status + colPF // Pass/Fail verdict + colHandled // Done status + colSessionID // Session ID + colRequestedModel // Explicitly requested model + colRequestedProvider // Explicitly requested provider + colCount // total number of columns ) func (m model) renderQueueView() string { @@ -309,15 +311,17 @@ func (m model) renderQueueView() string { // Fixed-width columns: exact sizes (content + padding, not counting inter-column spacing) fixedWidth := map[int]int{ - colSel: 2, - colJobID: idWidth, - colStatus: max(contentWidth[colStatus], 6), // "Status" header = 6, auto-sizes to content - colQueued: 12, - colElapsed: 8, - colPF: 3, // "P/F" header = 3 - colHandled: max(contentWidth[colHandled], 6), // "Closed" header = 6 - colAgent: min(max(contentWidth[colAgent], 5), 12), // "Agent" header = 5, cap at 12 - colSessionID: min(max(contentWidth[colSessionID], 7), 12), // "Session" header = 7, cap at 12 + colSel: 2, + colJobID: idWidth, + colStatus: max(contentWidth[colStatus], 6), // "Status" header = 6, auto-sizes to content + colQueued: 12, + colElapsed: 8, + colPF: 3, // "P/F" header = 3 + colHandled: max(contentWidth[colHandled], 6), // "Closed" header = 6 + colAgent: min(max(contentWidth[colAgent], 5), 12), // "Agent" header = 5, cap at 12 + colSessionID: min(max(contentWidth[colSessionID], 7), 12), // "Session" header = 7, cap at 12 + colRequestedModel: min(max(contentWidth[colRequestedModel], 15), 24), // "Req Model" header = 9 + colRequestedProvider: min(max(contentWidth[colRequestedProvider], 18), 24), // "Req Provider" header = 12 } // Flexible columns absorb excess space @@ -593,8 +597,8 @@ func (m model) renderQueueView() string { } // jobCells returns plain text cell values for a job row. -// Order: ref, branch, repo, agent, queued, elapsed, status, pf, handled -// (colRef through colHandled, 9 values). +// Order: ref, branch, repo, agent, queued, elapsed, status, pf, handled, +// session, requested model, requested provider. func (m model) jobCells(job storage.ReviewJob) []string { ref := shortJobRef(job) if !config.IsDefaultReviewType(job.ReviewType) { @@ -645,7 +649,10 @@ func (m model) jobCells(job storage.ReviewJob) []string { sessionID = string(runes[:12]) } - return []string{ref, branch, repo, agentName, enqueued, elapsed, status, verdict, handled, sessionID} + requestedModel := stripControlChars(job.RequestedModel) + requestedProvider := stripControlChars(job.RequestedProvider) + + return []string{ref, branch, repo, agentName, enqueued, elapsed, status, verdict, handled, sessionID, requestedModel, requestedProvider} } // statusLabel returns a capitalized display label for the job status. @@ -770,34 +777,38 @@ func migrateColumnConfig(cfg *config.Config) bool { // toggleableColumns is the ordered list of columns the user can show/hide. // colSel and colJobID are always visible and not included here. -var toggleableColumns = []int{colRef, colBranch, colRepo, colAgent, colQueued, colElapsed, colStatus, colPF, colHandled, colSessionID} +var toggleableColumns = []int{colRef, colBranch, colRepo, colAgent, colQueued, colElapsed, colStatus, colPF, colHandled, colSessionID, colRequestedModel, colRequestedProvider} // columnNames maps column constants to display names. var columnNames = map[int]string{ - colRef: "Ref", - colBranch: "Branch", - colRepo: "Repo", - colAgent: "Agent", - colStatus: "Status", - colQueued: "Queued", - colElapsed: "Elapsed", - colPF: "P/F", - colHandled: "Closed", - colSessionID: "Session", + colRef: "Ref", + colBranch: "Branch", + colRepo: "Repo", + colAgent: "Agent", + colStatus: "Status", + colQueued: "Queued", + colElapsed: "Elapsed", + colPF: "P/F", + colHandled: "Closed", + colSessionID: "Session", + colRequestedModel: "Req Model", + colRequestedProvider: "Req Provider", } // columnConfigNames maps column constants to config file names (lowercase). var columnConfigNames = map[int]string{ - colRef: "ref", - colBranch: "branch", - colRepo: "repo", - colAgent: "agent", - colStatus: "status", - colQueued: "queued", - colElapsed: "elapsed", - colPF: "pf", - colHandled: "closed", - colSessionID: "session_id", + colRef: "ref", + colBranch: "branch", + colRepo: "repo", + colAgent: "agent", + colStatus: "status", + colQueued: "queued", + colElapsed: "elapsed", + colPF: "pf", + colHandled: "closed", + colSessionID: "session_id", + colRequestedModel: "requested_model", + colRequestedProvider: "requested_provider", } // drainFlexOverflow reduces flex column widths to absorb overflow, @@ -838,7 +849,9 @@ func columnDisplayName(col int) string { // defaultHiddenColumns lists columns that are hidden by default. // Users can enable them via the column options modal. var defaultHiddenColumns = map[int]bool{ - colSessionID: true, + colSessionID: true, + colRequestedModel: true, + colRequestedProvider: true, } // parseHiddenColumns converts config hidden_columns strings to column ID set. diff --git a/cmd/roborev/tui/tui.go b/cmd/roborev/tui/tui.go index 44e9256ac..e4e10e193 100644 --- a/cmd/roborev/tui/tui.go +++ b/cmd/roborev/tui/tui.go @@ -453,7 +453,7 @@ func newModel(ep daemon.DaemonEndpoint, opts ...option) model { tabWidth := 2 columnBorders := false tasksEnabled := false - hiddenCols := map[int]bool{} + hiddenCols := parseHiddenColumns(nil) colOrder := parseColumnOrder(nil) taskColOrder := parseTaskColumnOrder(nil) var cwdRepoRoot, cwdBranch string diff --git a/cmd/roborev/tui/types.go b/cmd/roborev/tui/types.go index 78a6c4bbb..bc82a5ae3 100644 --- a/cmd/roborev/tui/types.go +++ b/cmd/roborev/tui/types.go @@ -89,7 +89,7 @@ const ( ) // columnOption represents an item in the column options modal. -// id is a column constant (colRef..colHandled) or a sentinel option ID. +// id is a queue column constant or a sentinel option ID. type columnOption struct { id int // column constant or sentinel option ID name string // display label diff --git a/internal/daemon/server.go b/internal/daemon/server.go index eaa7f928b..4ff00c339 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -642,6 +642,27 @@ func (s *Server) writeInternalError(w http.ResponseWriter, msg string) { } } +func workflowForJob(jobType, reviewType string) string { + // "default" uses the standard "review" workflow; others use their own name. + // Compact jobs use the "fix" workflow since they're part of that pipeline. + workflow := "review" + if jobType == storage.JobTypeCompact { + return "fix" + } + if !config.IsDefaultReviewType(reviewType) { + return reviewType + } + return workflow +} + +func resolveRerunModelProvider(job *storage.ReviewJob, cfg *config.Config) (string, string) { + workflow := workflowForJob(job.JobType, job.ReviewType) + resolution := agent.ResolveWorkflowConfig("", job.RepoPath, cfg, workflow, job.Reasoning) + model := resolution.ModelForSelectedAgent(job.Agent, job.RequestedModel) + provider := strings.TrimSpace(job.RequestedProvider) + return model, provider +} + func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed") @@ -754,15 +775,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { return } - // Map review_type to config workflow for agent/model resolution. - // "default" uses the standard "review" workflow; others use their own name. - // Compact jobs use the "fix" workflow since they're part of that pipeline. - workflow := "review" - if req.JobType == "compact" { - workflow = "fix" - } else if !config.IsDefaultReviewType(req.ReviewType) { - workflow = req.ReviewType - } + workflow := workflowForJob(req.JobType, req.ReviewType) // Resolve reasoning level for the determined workflow. // Compact jobs use fix reasoning (default "standard"), not review @@ -778,6 +791,9 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { return } + requestedModel := strings.TrimSpace(req.Model) + requestedProvider := strings.TrimSpace(req.Provider) + // Resolve agent for workflow at this reasoning level cfg := s.configWatcher.Config() resolution := agent.ResolveWorkflowConfig( @@ -802,7 +818,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { agentName = resolved.Name() } - model := resolution.ModelForSelectedAgent(agentName, req.Model) + model := resolution.ModelForSelectedAgent(agentName, requestedModel) if req.JobType == storage.JobTypeInsights { if req.Since == "" { @@ -854,19 +870,21 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { if isPrompt { // Custom prompt job - use provided prompt directly job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - Branch: req.Branch, - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - Prompt: req.CustomPrompt, - OutputPrefix: req.OutputPrefix, - Agentic: req.Agentic, - Label: gitRef, // Use git_ref as TUI label (run, analyze type, custom) - JobType: req.JobType, - Provider: req.Provider, - WorktreePath: worktreePath, + RepoID: repo.ID, + Branch: req.Branch, + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + Prompt: req.CustomPrompt, + OutputPrefix: req.OutputPrefix, + Agentic: req.Agentic, + Label: gitRef, // Use git_ref as TUI label (run, analyze type, custom) + JobType: req.JobType, + Provider: requestedProvider, + RequestedModel: requestedModel, + RequestedProvider: requestedProvider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue prompt job: %v", err)) @@ -876,17 +894,19 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { // Dirty review - use pre-captured diff targetSHA, _ := git.ResolveSHA(checkoutRoot, "HEAD") job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - GitRef: gitRef, - Branch: req.Branch, - SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, targetSHA), - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - DiffContent: req.DiffContent, - Provider: req.Provider, - WorktreePath: worktreePath, + RepoID: repo.ID, + GitRef: gitRef, + Branch: req.Branch, + SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, targetSHA), + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + DiffContent: req.DiffContent, + Provider: requestedProvider, + RequestedModel: requestedModel, + RequestedProvider: requestedProvider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue dirty job: %v", err)) @@ -951,16 +971,18 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { } job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - GitRef: fullRef, - Branch: req.Branch, - SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, endSHA), - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - Provider: req.Provider, - WorktreePath: worktreePath, + RepoID: repo.ID, + GitRef: fullRef, + Branch: req.Branch, + SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, endSHA), + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + Provider: requestedProvider, + RequestedModel: requestedModel, + RequestedProvider: requestedProvider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue job: %v", err)) @@ -1001,18 +1023,20 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { patchID := git.GetPatchID(checkoutRoot, sha) job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - CommitID: commit.ID, - GitRef: sha, - Branch: req.Branch, - SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, sha), - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - PatchID: patchID, - Provider: req.Provider, - WorktreePath: worktreePath, + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: sha, + Branch: req.Branch, + SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, sha), + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + PatchID: patchID, + Provider: requestedProvider, + RequestedModel: requestedModel, + RequestedProvider: requestedProvider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue job: %v", err)) @@ -1698,7 +1722,19 @@ func (s *Server) handleRerunJob(w http.ResponseWriter, r *http.Request) { return } - if err := s.db.ReenqueueJob(req.JobID); err != nil { + job, err := s.db.GetJobByID(req.JobID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + writeError(w, http.StatusNotFound, "job not found or not rerunnable") + return + } + writeError(w, http.StatusInternalServerError, fmt.Sprintf("load job: %v", err)) + return + } + + model, provider := resolveRerunModelProvider(job, s.configWatcher.Config()) + + if err := s.db.ReenqueueJob(req.JobID, storage.ReenqueueOpts{Model: model, Provider: provider}); err != nil { if errors.Is(err, sql.ErrNoRows) { writeError(w, http.StatusNotFound, "job not found or not rerunnable") return diff --git a/internal/daemon/server_actions_test.go b/internal/daemon/server_actions_test.go index 8f30239b6..8e93aca72 100644 --- a/internal/daemon/server_actions_test.go +++ b/internal/daemon/server_actions_test.go @@ -356,6 +356,41 @@ func TestHandleRerunJob(t *testing.T) { } }) + t.Run("rerun reevaluates implicit effective model", func(t *testing.T) { + isolatedDB, isolatedDir := testutil.OpenTestDBWithDir(t) + server := NewServer(isolatedDB, config.DefaultConfig(), "") + + repo, err := isolatedDB.GetOrCreateRepo(isolatedDir) + require.NoError(t, err) + commit, err := isolatedDB.GetOrCreateCommit(repo.ID, "rerun-implicit-model", "Author", "Subject", time.Now()) + require.NoError(t, err) + job, err := isolatedDB.EnqueueJob(storage.EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "rerun-implicit-model", + Agent: "opencode", + Model: "minimax-m2.5-free", + }) + require.NoError(t, err) + + claimed, err := isolatedDB.ClaimJob("worker-1") + require.NoError(t, err) + require.NotNil(t, claimed) + require.Equal(t, job.ID, claimed.ID) + require.NoError(t, isolatedDB.CompleteJob(job.ID, "opencode", "prompt", "output")) + + req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/job/rerun", RerunJobRequest{JobID: job.ID}) + w := httptest.NewRecorder() + + server.handleRerunJob(w, req) + testutil.AssertStatusCode(t, w, http.StatusOK) + + updated, err := isolatedDB.GetJobByID(job.ID) + require.NoError(t, err) + assert.Equal(t, storage.JobStatusQueued, updated.Status) + assert.Empty(t, updated.Model, "rerun should recompute implicit model instead of preserving stale effective value") + }) + t.Run("rerun queued job fails", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "rerun-queued", "Author", "Subject", time.Now()) job, _ := db.EnqueueJob(storage.EnqueueOpts{RepoID: repo.ID, CommitID: commit.ID, GitRef: "rerun-queued", Agent: "test"}) diff --git a/internal/daemon/server_jobs_test.go b/internal/daemon/server_jobs_test.go index 034ad81c0..2b4b99064 100644 --- a/internal/daemon/server_jobs_test.go +++ b/internal/daemon/server_jobs_test.go @@ -2569,6 +2569,7 @@ func TestHandleEnqueueAgentOverrideModel(t *testing.T) { return false }, "model = %q, want %q", job.Model, tt.wantModel) } + assert.Equal(t, tt.reqModel, job.RequestedModel) }) } } diff --git a/internal/storage/db.go b/internal/storage/db.go index f0219eed7..e053cecfc 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -41,6 +41,8 @@ CREATE TABLE IF NOT EXISTS review_jobs ( session_id TEXT, agent TEXT NOT NULL DEFAULT 'codex', model TEXT, + requested_model TEXT, + requested_provider TEXT, reasoning TEXT NOT NULL DEFAULT 'thorough', status TEXT NOT NULL CHECK(status IN ('queued','running','done','failed','canceled','applied','rebased')) DEFAULT 'queued', enqueued_at TEXT NOT NULL DEFAULT (datetime('now')), @@ -334,6 +336,9 @@ func (db *DB) migrate() error { session_id TEXT, agent TEXT NOT NULL DEFAULT 'codex', model TEXT, + provider TEXT, + requested_model TEXT, + requested_provider TEXT, reasoning TEXT NOT NULL DEFAULT 'thorough', status TEXT NOT NULL CHECK(status IN ('queued','running','done','failed','canceled','applied','rebased')) DEFAULT 'queued', enqueued_at TEXT NOT NULL DEFAULT (datetime('now')), @@ -352,8 +357,8 @@ func (db *DB) migrate() error { } // Check which optional columns exist in source table - var hasDiffContent, hasReasoning, hasAgentic, hasModel, hasBranch, hasSessionID bool - checkRows, checkErr := tx.Query(`SELECT name FROM pragma_table_info('review_jobs') WHERE name IN ('diff_content', 'reasoning', 'agentic', 'model', 'branch', 'session_id')`) + var hasDiffContent, hasReasoning, hasAgentic, hasModel, hasProvider, hasRequestedModel, hasRequestedProvider, hasBranch, hasSessionID bool + checkRows, checkErr := tx.Query(`SELECT name FROM pragma_table_info('review_jobs') WHERE name IN ('diff_content', 'reasoning', 'agentic', 'model', 'provider', 'requested_model', 'requested_provider', 'branch', 'session_id')`) if checkErr == nil { for checkRows.Next() { var colName string @@ -364,9 +369,14 @@ func (db *DB) migrate() error { case "reasoning": hasReasoning = true case "agentic": - hasAgentic = true case "model": hasModel = true + case "provider": + hasProvider = true + case "requested_model": + hasRequestedModel = true + case "requested_provider": + hasRequestedProvider = true case "branch": hasBranch = true case "session_id": @@ -391,6 +401,15 @@ func (db *DB) migrate() error { if hasModel { baseCols = append(baseCols, "model") } + if hasProvider { + baseCols = append(baseCols, "provider") + } + if hasRequestedModel { + baseCols = append(baseCols, "requested_model") + } + if hasRequestedProvider { + baseCols = append(baseCols, "requested_provider") + } if hasReasoning { baseCols = append(baseCols, "reasoning") } @@ -695,6 +714,30 @@ func (db *DB) migrate() error { } } + // Migration: add requested_model column to review_jobs if missing + err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('review_jobs') WHERE name = 'requested_model'`).Scan(&count) + if err != nil { + return fmt.Errorf("check requested_model column: %w", err) + } + if count == 0 { + _, err = db.Exec(`ALTER TABLE review_jobs ADD COLUMN requested_model TEXT`) + if err != nil { + return fmt.Errorf("add requested_model column: %w", err) + } + } + + // Migration: add requested_provider column to review_jobs if missing + err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('review_jobs') WHERE name = 'requested_provider'`).Scan(&count) + if err != nil { + return fmt.Errorf("check requested_provider column: %w", err) + } + if count == 0 { + _, err = db.Exec(`ALTER TABLE review_jobs ADD COLUMN requested_provider TEXT`) + if err != nil { + return fmt.Errorf("add requested_provider column: %w", err) + } + } + // Migration: add token_usage column to review_jobs if missing err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('review_jobs') WHERE name = 'token_usage'`).Scan(&count) if err != nil { diff --git a/internal/storage/db_job_test.go b/internal/storage/db_job_test.go index 55697b014..02fed8261 100644 --- a/internal/storage/db_job_test.go +++ b/internal/storage/db_job_test.go @@ -808,7 +808,7 @@ func TestReenqueueJob(t *testing.T) { db.ClaimJob("worker-1") db.FailJob(job.ID, "", "some error") - err := db.ReenqueueJob(job.ID) + err := db.ReenqueueJob(job.ID, ReenqueueOpts{}) require.NoError(t, err, "ReenqueueJob failed: %v") updated, _ := db.GetJobByID(job.ID) @@ -822,7 +822,7 @@ func TestReenqueueJob(t *testing.T) { _, _, job := createJobChain(t, db, "/tmp/test-repo", "rerun-canceled") db.CancelJob(job.ID) - err := db.ReenqueueJob(job.ID) + err := db.ReenqueueJob(job.ID, ReenqueueOpts{}) require.NoError(t, err, "ReenqueueJob failed: %v") updated, _ := db.GetJobByID(job.ID) @@ -844,7 +844,7 @@ func TestReenqueueJob(t *testing.T) { } db.CompleteJob(job.ID, "codex", "prompt", "output") - err := db.ReenqueueJob(job.ID) + err := db.ReenqueueJob(job.ID, ReenqueueOpts{}) require.NoError(t, err, "ReenqueueJob failed: %v") updated, _ := db.GetJobByID(job.ID) @@ -854,7 +854,7 @@ func TestReenqueueJob(t *testing.T) { t.Run("rerun queued job fails", func(t *testing.T) { _, _, job := createJobChain(t, db, "/tmp/test-repo", "rerun-queued") - err := db.ReenqueueJob(job.ID) + err := db.ReenqueueJob(job.ID, ReenqueueOpts{}) require.Error(t, err) }) @@ -862,15 +862,52 @@ func TestReenqueueJob(t *testing.T) { _, _, job := createJobChain(t, db, "/tmp/test-repo", "rerun-running") db.ClaimJob("worker-1") - err := db.ReenqueueJob(job.ID) + err := db.ReenqueueJob(job.ID, ReenqueueOpts{}) require.Error(t, err) }) t.Run("rerun nonexistent job fails", func(t *testing.T) { - err := db.ReenqueueJob(99999) + err := db.ReenqueueJob(99999, ReenqueueOpts{}) require.Error(t, err) }) + t.Run("rerun updates effective model and preserves requested model", func(t *testing.T) { + isolatedDB := openTestDB(t) + defer isolatedDB.Close() + + repo := createRepo(t, isolatedDB, "/tmp/rerun-requested-model") + commit := createCommit(t, isolatedDB, repo.ID, "rerun-requested-model-sha") + + job, err := isolatedDB.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "rerun-requested-model-sha", + Agent: "opencode", + Model: "minimax-m2.5-free", + RequestedModel: "minimax-m2.5-free", + RequestedProvider: "anthropic", + Provider: "anthropic", + }) + require.NoError(t, err) + + claimed, err := isolatedDB.ClaimJob("worker-1") + require.NoError(t, err) + require.NotNil(t, claimed) + assert.Equal(t, job.ID, claimed.ID) + require.NoError(t, isolatedDB.CompleteJob(job.ID, "opencode", "prompt", "output")) + + err = isolatedDB.ReenqueueJob(job.ID, ReenqueueOpts{Model: "openai/gpt-5", Provider: "openai"}) + require.NoError(t, err) + + updated, err := isolatedDB.GetJobByID(job.ID) + require.NoError(t, err) + assert.Equal(t, JobStatusQueued, updated.Status) + assert.Equal(t, "openai/gpt-5", updated.Model) + assert.Equal(t, "openai", updated.Provider) + assert.Equal(t, "minimax-m2.5-free", updated.RequestedModel) + assert.Equal(t, "anthropic", updated.RequestedProvider) + }) + t.Run("rerun preserves worktree_path", func(t *testing.T) { isolatedDB := openTestDB(t) defer isolatedDB.Close() @@ -895,7 +932,7 @@ func TestReenqueueJob(t *testing.T) { err = isolatedDB.CompleteJob(job.ID, "test", "prompt", "output") require.NoError(t, err) - err = isolatedDB.ReenqueueJob(job.ID) + err = isolatedDB.ReenqueueJob(job.ID, ReenqueueOpts{}) require.NoError(t, err) updated, err := isolatedDB.GetJobByID(job.ID) @@ -924,7 +961,7 @@ func TestReenqueueJob(t *testing.T) { assert.Equal(t, "first output", review1.Output) // Re-enqueue the done job - err = isolatedDB.ReenqueueJob(job.ID) + err = isolatedDB.ReenqueueJob(job.ID, ReenqueueOpts{}) require.NoError(t, err, "ReenqueueJob failed: %v") // Verify review was deleted @@ -1113,7 +1150,7 @@ func TestSaveJobSessionID_StaleWorkerIgnored(t *testing.T) { err = db.CancelJob(job.ID) require.NoError(t, err, "CancelJob: %v", err) - err = db.ReenqueueJob(job.ID) + err = db.ReenqueueJob(job.ID, ReenqueueOpts{}) require.NoError(t, err, "ReenqueueJob: %v", err) j, err = db.GetJobByID(job.ID) diff --git a/internal/storage/hydration.go b/internal/storage/hydration.go index 804324f35..bde009c00 100644 --- a/internal/storage/hydration.go +++ b/internal/storage/hydration.go @@ -7,31 +7,33 @@ type sqlScanner interface { } type reviewJobScanFields struct { - EnqueuedAt string - StartedAt sql.NullString - FinishedAt sql.NullString - WorkerID sql.NullString - Error sql.NullString - Prompt sql.NullString - SourceMachineID sql.NullString - UUID sql.NullString - Model sql.NullString - Provider sql.NullString - Branch sql.NullString - SessionID sql.NullString - CommitID sql.NullInt64 - CommitSubject sql.NullString - JobType sql.NullString - ReviewType sql.NullString - PatchID sql.NullString - ParentJobID sql.NullInt64 - Patch sql.NullString - DiffContent sql.NullString - OutputPrefix sql.NullString - TokenUsage sql.NullString - Agentic int - Closed sql.NullInt64 - WorktreePath string + EnqueuedAt string + StartedAt sql.NullString + FinishedAt sql.NullString + WorkerID sql.NullString + Error sql.NullString + Prompt sql.NullString + SourceMachineID sql.NullString + UUID sql.NullString + Model sql.NullString + Provider sql.NullString + RequestedModel sql.NullString + RequestedProvider sql.NullString + Branch sql.NullString + SessionID sql.NullString + CommitID sql.NullInt64 + CommitSubject sql.NullString + JobType sql.NullString + ReviewType sql.NullString + PatchID sql.NullString + ParentJobID sql.NullInt64 + Patch sql.NullString + DiffContent sql.NullString + OutputPrefix sql.NullString + TokenUsage sql.NullString + Agentic int + Closed sql.NullInt64 + WorktreePath string } func applyReviewJobScan(job *ReviewJob, fields reviewJobScanFields) { @@ -53,6 +55,12 @@ func applyReviewJobScan(job *ReviewJob, fields reviewJobScanFields) { if fields.Provider.Valid { job.Provider = fields.Provider.String } + if fields.RequestedModel.Valid { + job.RequestedModel = fields.RequestedModel.String + } + if fields.RequestedProvider.Valid { + job.RequestedProvider = fields.RequestedProvider.String + } if fields.JobType.Valid { job.JobType = fields.JobType.String } diff --git a/internal/storage/jobs.go b/internal/storage/jobs.go index cf4f69cd1..09f5a65ae 100644 --- a/internal/storage/jobs.go +++ b/internal/storage/jobs.go @@ -40,25 +40,27 @@ func parseSQLiteTime(s string) time.Time { // - CommitID > 0 → "review" (single commit) // - otherwise → "range" (commit range) type EnqueueOpts struct { - RepoID int64 - CommitID int64 // >0 for single-commit reviews - GitRef string // SHA, "start..end" range, or "dirty" - Branch string - SessionID string - Agent string - Model string - Provider string // e.g. "anthropic", "openai" - Reasoning string - ReviewType string // e.g. "security" — changes which system prompt is used - PatchID string // Stable patch-id for rebase tracking - DiffContent string // For dirty reviews (captured at enqueue time) - Prompt string // For task jobs (pre-stored prompt) - OutputPrefix string // Prefix to prepend to review output - Agentic bool // Allow file edits and command execution - Label string // Display label in TUI for task jobs (default: "prompt") - JobType string // Explicit job type (review/range/dirty/task/compact/fix); inferred if empty - ParentJobID int64 // Parent job being fixed (for fix jobs) - WorktreePath string // Worktree checkout path (empty = use main repo root) + RepoID int64 + CommitID int64 // >0 for single-commit reviews + GitRef string // SHA, "start..end" range, or "dirty" + Branch string + SessionID string + Agent string + Model string // Effective model for this run + Provider string // Effective provider for this run (e.g. "anthropic", "openai") + RequestedModel string // Explicitly requested model; empty means reevaluate on rerun + RequestedProvider string // Explicitly requested provider; empty means reevaluate on rerun + Reasoning string + ReviewType string // e.g. "security" — changes which system prompt is used + PatchID string // Stable patch-id for rebase tracking + DiffContent string // For dirty reviews (captured at enqueue time) + Prompt string // For task jobs (pre-stored prompt) + OutputPrefix string // Prefix to prepend to review output + Agentic bool // Allow file edits and command execution + Label string // Display label in TUI for task jobs (default: "prompt") + JobType string // Explicit job type (review/range/dirty/task/compact/fix); inferred if empty + ParentJobID int64 // Parent job being fixed (for fix jobs) + WorktreePath string // Worktree checkout path (empty = use main repo root) } // EnqueueJob creates a new review job. The job type is inferred from opts. @@ -118,12 +120,12 @@ func (db *DB) EnqueueJob(opts EnqueueOpts) (*ReviewJob, error) { } result, err := db.Exec(` - INSERT INTO review_jobs (repo_id, commit_id, git_ref, branch, session_id, agent, model, provider, reasoning, + INSERT INTO review_jobs (repo_id, commit_id, git_ref, branch, session_id, agent, model, provider, requested_model, requested_provider, reasoning, status, job_type, review_type, patch_id, diff_content, prompt, agentic, output_prefix, parent_job_id, uuid, source_machine_id, updated_at, worktree_path) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'queued', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'queued', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, opts.RepoID, commitIDParam, gitRef, nullString(opts.Branch), nullString(opts.SessionID), - opts.Agent, nullString(opts.Model), nullString(opts.Provider), reasoning, + opts.Agent, nullString(opts.Model), nullString(opts.Provider), nullString(opts.RequestedModel), nullString(opts.RequestedProvider), reasoning, jobType, opts.ReviewType, nullString(opts.PatchID), nullString(opts.DiffContent), nullString(opts.Prompt), agenticInt, nullString(opts.OutputPrefix), parentJobIDParam, @@ -134,27 +136,29 @@ func (db *DB) EnqueueJob(opts EnqueueOpts) (*ReviewJob, error) { id, _ := result.LastInsertId() job := &ReviewJob{ - ID: id, - RepoID: opts.RepoID, - GitRef: gitRef, - Branch: opts.Branch, - SessionID: opts.SessionID, - Agent: opts.Agent, - Model: opts.Model, - Provider: opts.Provider, - Reasoning: reasoning, - JobType: jobType, - ReviewType: opts.ReviewType, - PatchID: opts.PatchID, - Status: JobStatusQueued, - EnqueuedAt: now, - Prompt: opts.Prompt, - Agentic: opts.Agentic, - OutputPrefix: opts.OutputPrefix, - UUID: uid, - SourceMachineID: machineID, - UpdatedAt: &now, - WorktreePath: opts.WorktreePath, + ID: id, + RepoID: opts.RepoID, + GitRef: gitRef, + Branch: opts.Branch, + SessionID: opts.SessionID, + Agent: opts.Agent, + Model: opts.Model, + Provider: opts.Provider, + RequestedModel: opts.RequestedModel, + RequestedProvider: opts.RequestedProvider, + Reasoning: reasoning, + JobType: jobType, + ReviewType: opts.ReviewType, + PatchID: opts.PatchID, + Status: JobStatusQueued, + EnqueuedAt: now, + Prompt: opts.Prompt, + Agentic: opts.Agentic, + OutputPrefix: opts.OutputPrefix, + UUID: uid, + SourceMachineID: machineID, + UpdatedAt: &now, + WorktreePath: opts.WorktreePath, } if opts.ParentJobID > 0 { job.ParentJobID = &opts.ParentJobID @@ -202,7 +206,7 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { var job ReviewJob var fields reviewJobScanFields err = db.QueryRow(` - SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.session_id, j.agent, j.model, j.provider, j.reasoning, j.status, j.enqueued_at, + SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.session_id, j.agent, j.model, j.provider, j.requested_model, j.requested_provider, j.reasoning, j.status, j.enqueued_at, r.root_path, r.name, c.subject, j.diff_content, j.prompt, COALESCE(j.agentic, 0), j.job_type, j.review_type, j.output_prefix, j.patch_id, j.parent_job_id, COALESCE(j.worktree_path, '') FROM review_jobs j @@ -211,7 +215,7 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { WHERE j.worker_id = ? AND j.status = 'running' ORDER BY j.started_at DESC LIMIT 1 - `, workerID).Scan(&job.ID, &job.RepoID, &fields.CommitID, &job.GitRef, &fields.Branch, &fields.SessionID, &job.Agent, &fields.Model, &fields.Provider, &job.Reasoning, &job.Status, &fields.EnqueuedAt, + `, workerID).Scan(&job.ID, &job.RepoID, &fields.CommitID, &job.GitRef, &fields.Branch, &fields.SessionID, &job.Agent, &fields.Model, &fields.Provider, &fields.RequestedModel, &fields.RequestedProvider, &job.Reasoning, &job.Status, &fields.EnqueuedAt, &job.RepoPath, &job.RepoName, &fields.CommitSubject, &fields.DiffContent, &fields.Prompt, &fields.Agentic, &fields.JobType, &fields.ReviewType, &fields.OutputPrefix, &fields.PatchID, &fields.ParentJobID, &fields.WorktreePath) if err != nil { @@ -514,10 +518,15 @@ func (db *DB) MarkJobRebased(jobID int64) error { return nil } +type ReenqueueOpts struct { + Model string + Provider string +} + // ReenqueueJob resets a completed, failed, or canceled job back to queued status. // This allows manual re-running of jobs to get a fresh review. // For done jobs, the existing review is deleted to avoid unique constraint violations. -func (db *DB) ReenqueueJob(jobID int64) error { +func (db *DB) ReenqueueJob(jobID int64, opts ReenqueueOpts) error { ctx := context.Background() conn, err := db.Conn(ctx) if err != nil { @@ -543,12 +552,15 @@ func (db *DB) ReenqueueJob(jobID int64) error { return err } - // Reset job status + nowStr := time.Now().Format(time.RFC3339) + + // Reset job status and replace effective execution settings with the + // newly resolved values for this rerun. result, err := conn.ExecContext(ctx, ` UPDATE review_jobs - SET status = 'queued', worker_id = NULL, started_at = NULL, finished_at = NULL, error = NULL, retry_count = 0, patch = NULL, session_id = NULL + SET status = 'queued', worker_id = NULL, started_at = NULL, finished_at = NULL, error = NULL, retry_count = 0, patch = NULL, session_id = NULL, model = ?, provider = ?, updated_at = ? WHERE id = ? AND status IN ('done', 'failed', 'canceled') - `, jobID) + `, nullString(opts.Model), nullString(opts.Provider), nowStr, jobID) if err != nil { return err } @@ -771,7 +783,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int j.started_at, j.finished_at, j.worker_id, j.error, j.prompt, j.retry_count, COALESCE(j.agentic, 0), r.root_path, r.name, c.subject, rv.closed, rv.output, rv.verdict_bool, j.source_machine_id, j.uuid, j.model, j.job_type, j.review_type, j.patch_id, - j.parent_job_id, j.provider, j.token_usage, COALESCE(j.worktree_path, '') + j.parent_job_id, j.provider, j.requested_model, j.requested_provider, j.token_usage, COALESCE(j.worktree_path, '') FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id @@ -809,7 +821,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int &fields.StartedAt, &fields.FinishedAt, &fields.WorkerID, &fields.Error, &fields.Prompt, &j.RetryCount, &fields.Agentic, &j.RepoPath, &j.RepoName, &fields.CommitSubject, &fields.Closed, &output, &verdictBool, &fields.SourceMachineID, &fields.UUID, &fields.Model, &fields.JobType, &fields.ReviewType, &fields.PatchID, - &fields.ParentJobID, &fields.Provider, &fields.TokenUsage, &fields.WorktreePath) + &fields.ParentJobID, &fields.Provider, &fields.RequestedModel, &fields.RequestedProvider, &fields.TokenUsage, &fields.WorktreePath) if err != nil { return nil, err } @@ -858,7 +870,7 @@ func (db *DB) GetJobByID(id int64) (*ReviewJob, error) { err := db.QueryRow(` SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.session_id, j.agent, j.reasoning, j.status, j.enqueued_at, j.started_at, j.finished_at, j.worker_id, j.error, j.prompt, COALESCE(j.agentic, 0), - r.root_path, r.name, c.subject, j.model, j.provider, j.job_type, j.review_type, j.patch_id, + r.root_path, r.name, c.subject, j.model, j.provider, j.requested_model, j.requested_provider, j.job_type, j.review_type, j.patch_id, j.parent_job_id, j.patch, j.token_usage, COALESCE(j.worktree_path, '') FROM review_jobs j JOIN repos r ON r.id = j.repo_id @@ -866,7 +878,7 @@ func (db *DB) GetJobByID(id int64) (*ReviewJob, error) { WHERE j.id = ? `, id).Scan(&j.ID, &j.RepoID, &fields.CommitID, &j.GitRef, &fields.Branch, &fields.SessionID, &j.Agent, &j.Reasoning, &j.Status, &fields.EnqueuedAt, &fields.StartedAt, &fields.FinishedAt, &fields.WorkerID, &fields.Error, &fields.Prompt, &fields.Agentic, - &j.RepoPath, &j.RepoName, &fields.CommitSubject, &fields.Model, &fields.Provider, &fields.JobType, &fields.ReviewType, &fields.PatchID, + &j.RepoPath, &j.RepoName, &fields.CommitSubject, &fields.Model, &fields.Provider, &fields.RequestedModel, &fields.RequestedProvider, &fields.JobType, &fields.ReviewType, &fields.PatchID, &fields.ParentJobID, &fields.Patch, &fields.TokenUsage, &fields.WorktreePath) if err != nil { return nil, err diff --git a/internal/storage/models.go b/internal/storage/models.go index 91caa51fa..5a08e38af 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -47,34 +47,36 @@ const ( ) type ReviewJob struct { - ID int64 `json:"id"` - RepoID int64 `json:"repo_id"` - CommitID *int64 `json:"commit_id,omitempty"` // nil for ranges - GitRef string `json:"git_ref"` // SHA or "start..end" for ranges - Branch string `json:"branch,omitempty"` // Branch name at time of job creation - SessionID string `json:"session_id,omitempty"` // Reused prior session or captured current session ID - Agent string `json:"agent"` - Model string `json:"model,omitempty"` // Model to use (for opencode: provider/model format) - Provider string `json:"provider,omitempty"` // Provider to use (e.g., anthropic, openai) - Reasoning string `json:"reasoning,omitempty"` // thorough, standard, fast (default: thorough) - JobType string `json:"job_type"` // review, range, dirty, task, insights, compact, fix - Status JobStatus `json:"status"` - EnqueuedAt time.Time `json:"enqueued_at"` - StartedAt *time.Time `json:"started_at,omitempty"` - FinishedAt *time.Time `json:"finished_at,omitempty"` - WorkerID string `json:"worker_id,omitempty"` - Error string `json:"error,omitempty"` - Prompt string `json:"prompt,omitempty"` - RetryCount int `json:"retry_count"` - DiffContent *string `json:"diff_content,omitempty"` // For dirty reviews (uncommitted changes) - Agentic bool `json:"agentic"` // Enable agentic mode (allow file edits) - ReviewType string `json:"review_type,omitempty"` // Review type (e.g., "security") - changes system prompt - PatchID string `json:"patch_id,omitempty"` // Stable patch-id for rebase tracking - OutputPrefix string `json:"output_prefix,omitempty"` // Prefix to prepend to review output - ParentJobID *int64 `json:"parent_job_id,omitempty"` // Job being fixed (for fix jobs) - Patch *string `json:"patch,omitempty"` // Generated diff patch (for completed fix jobs) - WorktreePath string `json:"worktree_path,omitempty"` // Worktree checkout path (empty = use RepoPath) - TokenUsage string `json:"token_usage,omitempty"` // JSON blob from agentsview (token consumption) + ID int64 `json:"id"` + RepoID int64 `json:"repo_id"` + CommitID *int64 `json:"commit_id,omitempty"` // nil for ranges + GitRef string `json:"git_ref"` // SHA or "start..end" for ranges + Branch string `json:"branch,omitempty"` // Branch name at time of job creation + SessionID string `json:"session_id,omitempty"` // Reused prior session or captured current session ID + Agent string `json:"agent"` + Model string `json:"model,omitempty"` // Effective model for this run (for opencode: provider/model format) + Provider string `json:"provider,omitempty"` // Effective provider for this run (e.g., anthropic, openai) + RequestedModel string `json:"requested_model,omitempty"` // Explicitly requested model; empty means reevaluate on rerun + RequestedProvider string `json:"requested_provider,omitempty"` // Explicitly requested provider; empty means reevaluate on rerun + Reasoning string `json:"reasoning,omitempty"` // thorough, standard, fast (default: thorough) + JobType string `json:"job_type"` // review, range, dirty, task, insights, compact, fix + Status JobStatus `json:"status"` + EnqueuedAt time.Time `json:"enqueued_at"` + StartedAt *time.Time `json:"started_at,omitempty"` + FinishedAt *time.Time `json:"finished_at,omitempty"` + WorkerID string `json:"worker_id,omitempty"` + Error string `json:"error,omitempty"` + Prompt string `json:"prompt,omitempty"` + RetryCount int `json:"retry_count"` + DiffContent *string `json:"diff_content,omitempty"` // For dirty reviews (uncommitted changes) + Agentic bool `json:"agentic"` // Enable agentic mode (allow file edits) + ReviewType string `json:"review_type,omitempty"` // Review type (e.g., "security") - changes system prompt + PatchID string `json:"patch_id,omitempty"` // Stable patch-id for rebase tracking + OutputPrefix string `json:"output_prefix,omitempty"` // Prefix to prepend to review output + ParentJobID *int64 `json:"parent_job_id,omitempty"` // Job being fixed (for fix jobs) + Patch *string `json:"patch,omitempty"` // Generated diff patch (for completed fix jobs) + WorktreePath string `json:"worktree_path,omitempty"` // Worktree checkout path (empty = use RepoPath) + TokenUsage string `json:"token_usage,omitempty"` // JSON blob from agentsview (token consumption) // Sync fields UUID string `json:"uuid,omitempty"` // Globally unique identifier for sync SourceMachineID string `json:"source_machine_id,omitempty"` // Machine that created this job diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index ac9193360..3133edf89 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -14,12 +14,12 @@ import ( ) // PostgreSQL schema version - increment when schema changes -const pgSchemaVersion = 9 +const pgSchemaVersion = 10 // pgSchemaName is the PostgreSQL schema used to isolate roborev tables const pgSchemaName = "roborev" -//go:embed schemas/postgres_v9.sql +//go:embed schemas/postgres_v10.sql var pgSchemaSQL string // pgSchemaStatements returns the individual DDL statements for schema creation. @@ -288,6 +288,20 @@ func (p *PgPool) EnsureSchema(ctx context.Context) error { return fmt.Errorf("migrate to v9 (add worktree_path column): %w", err) } } + if currentVersion < 10 { + _, err = p.pool.Exec(ctx, `ALTER TABLE review_jobs ADD COLUMN IF NOT EXISTS provider TEXT`) + if err != nil { + return fmt.Errorf("migrate to v10 (add provider column): %w", err) + } + _, err = p.pool.Exec(ctx, `ALTER TABLE review_jobs ADD COLUMN IF NOT EXISTS requested_model TEXT`) + if err != nil { + return fmt.Errorf("migrate to v10 (add requested_model column): %w", err) + } + _, err = p.pool.Exec(ctx, `ALTER TABLE review_jobs ADD COLUMN IF NOT EXISTS requested_provider TEXT`) + if err != nil { + return fmt.Errorf("migrate to v10 (add requested_provider column): %w", err) + } + } // Update version _, err = p.pool.Exec(ctx, `INSERT INTO schema_version (version) VALUES ($1) ON CONFLICT (version) DO NOTHING`, pgSchemaVersion) if err != nil { @@ -539,15 +553,18 @@ func (p *PgPool) Tx(ctx context.Context, fn func(tx pgx.Tx) error) error { func (p *PgPool) UpsertJob(ctx context.Context, j SyncableJob, pgRepoID int64, pgCommitID *int64) error { _, err := p.pool.Exec(ctx, ` INSERT INTO review_jobs ( - uuid, repo_id, commit_id, git_ref, session_id, agent, model, reasoning, job_type, review_type, patch_id, status, agentic, + uuid, repo_id, commit_id, git_ref, session_id, agent, model, provider, requested_model, requested_provider, reasoning, job_type, review_type, patch_id, status, agentic, enqueued_at, started_at, finished_at, prompt, diff_content, error, token_usage, worktree_path, source_machine_id, updated_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, NOW()) + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, NOW()) ON CONFLICT (uuid) DO UPDATE SET status = EXCLUDED.status, finished_at = EXCLUDED.finished_at, error = EXCLUDED.error, model = COALESCE(EXCLUDED.model, review_jobs.model), + provider = COALESCE(EXCLUDED.provider, review_jobs.provider), + requested_model = COALESCE(EXCLUDED.requested_model, review_jobs.requested_model), + requested_provider = COALESCE(EXCLUDED.requested_provider, review_jobs.requested_provider), git_ref = EXCLUDED.git_ref, session_id = COALESCE(EXCLUDED.session_id, review_jobs.session_id), commit_id = EXCLUDED.commit_id, @@ -555,7 +572,7 @@ func (p *PgPool) UpsertJob(ctx context.Context, j SyncableJob, pgRepoID int64, p token_usage = COALESCE(EXCLUDED.token_usage, review_jobs.token_usage), worktree_path = COALESCE(EXCLUDED.worktree_path, review_jobs.worktree_path), updated_at = NOW() - `, j.UUID, pgRepoID, pgCommitID, j.GitRef, nullString(j.SessionID), j.Agent, nullString(j.Model), nullString(j.Reasoning), + `, j.UUID, pgRepoID, pgCommitID, j.GitRef, nullString(j.SessionID), j.Agent, nullString(j.Model), nullString(j.Provider), nullString(j.RequestedModel), nullString(j.RequestedProvider), nullString(j.Reasoning), defaultStr(j.JobType, "review"), j.ReviewType, nullString(j.PatchID), j.Status, j.Agentic, j.EnqueuedAt, j.StartedAt, j.FinishedAt, nullString(j.Prompt), j.DiffContent, nullString(j.Error), nullString(j.TokenUsage), nullString(j.WorktreePath), j.SourceMachineID) return err @@ -590,32 +607,35 @@ func (p *PgPool) InsertResponse(ctx context.Context, r SyncableResponse) error { // PulledJob represents a job pulled from PostgreSQL type PulledJob struct { - UUID string - RepoIdentity string - CommitSHA string - CommitAuthor string - CommitSubject string - CommitTimestamp time.Time - GitRef string - SessionID string - Agent string - Model string - Reasoning string - JobType string - ReviewType string - PatchID string - Status string - Agentic bool - EnqueuedAt time.Time - StartedAt *time.Time - FinishedAt *time.Time - Prompt string - DiffContent *string - Error string - TokenUsage string - WorktreePath string - SourceMachineID string - UpdatedAt time.Time + UUID string + RepoIdentity string + CommitSHA string + CommitAuthor string + CommitSubject string + CommitTimestamp time.Time + GitRef string + SessionID string + Agent string + Model string + Provider string + RequestedModel string + RequestedProvider string + Reasoning string + JobType string + ReviewType string + PatchID string + Status string + Agentic bool + EnqueuedAt time.Time + StartedAt *time.Time + FinishedAt *time.Time + Prompt string + DiffContent *string + Error string + TokenUsage string + WorktreePath string + SourceMachineID string + UpdatedAt time.Time } // PullJobs fetches jobs from PostgreSQL updated after the given cursor. @@ -636,7 +656,7 @@ func (p *PgPool) PullJobs(ctx context.Context, excludeMachineID string, cursor s rows, err := p.pool.Query(ctx, ` SELECT j.uuid, r.identity, COALESCE(c.sha, ''), COALESCE(c.author, ''), COALESCE(c.subject, ''), COALESCE(c.timestamp, '1970-01-01'::timestamptz), - j.git_ref, COALESCE(j.session_id, ''), j.agent, COALESCE(j.model, ''), COALESCE(j.reasoning, ''), COALESCE(j.job_type, 'review'), COALESCE(j.review_type, ''), COALESCE(j.patch_id, ''), j.status, j.agentic, + j.git_ref, COALESCE(j.session_id, ''), j.agent, COALESCE(j.model, ''), COALESCE(j.provider, ''), COALESCE(j.requested_model, ''), COALESCE(j.requested_provider, ''), COALESCE(j.reasoning, ''), COALESCE(j.job_type, 'review'), COALESCE(j.review_type, ''), COALESCE(j.patch_id, ''), j.status, j.agentic, j.enqueued_at, j.started_at, j.finished_at, COALESCE(j.prompt, ''), j.diff_content, COALESCE(j.error, ''), COALESCE(j.token_usage, ''), COALESCE(j.worktree_path, ''), j.source_machine_id, j.updated_at, j.id @@ -663,7 +683,7 @@ func (p *PgPool) PullJobs(ctx context.Context, excludeMachineID string, cursor s err := rows.Scan( &j.UUID, &j.RepoIdentity, &j.CommitSHA, &j.CommitAuthor, &j.CommitSubject, &j.CommitTimestamp, - &j.GitRef, &j.SessionID, &j.Agent, &j.Model, &j.Reasoning, &j.JobType, &j.ReviewType, &j.PatchID, &j.Status, &j.Agentic, + &j.GitRef, &j.SessionID, &j.Agent, &j.Model, &j.Provider, &j.RequestedModel, &j.RequestedProvider, &j.Reasoning, &j.JobType, &j.ReviewType, &j.PatchID, &j.Status, &j.Agentic, &j.EnqueuedAt, &j.StartedAt, &j.FinishedAt, &j.Prompt, &diffContent, &j.Error, &j.TokenUsage, &j.WorktreePath, &j.SourceMachineID, &j.UpdatedAt, &lastID, diff --git a/internal/storage/reviews.go b/internal/storage/reviews.go index 850ff6630..7805e30c5 100644 --- a/internal/storage/reviews.go +++ b/internal/storage/reviews.go @@ -18,7 +18,7 @@ func (db *DB) GetReviewByJobID(jobID int64) (*Review, error) { err := db.QueryRow(` SELECT rv.id, rv.job_id, rv.agent, rv.prompt, rv.output, rv.created_at, rv.closed, rv.uuid, rv.verdict_bool, j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.session_id, j.agent, j.reasoning, j.status, j.enqueued_at, - j.started_at, j.finished_at, j.worker_id, j.error, j.model, j.job_type, j.review_type, j.patch_id, + j.started_at, j.finished_at, j.worker_id, j.error, j.model, j.provider, j.requested_model, j.requested_provider, j.job_type, j.review_type, j.patch_id, rp.root_path, rp.name, c.subject, j.token_usage FROM reviews rv JOIN review_jobs j ON j.id = rv.job_id @@ -27,7 +27,7 @@ func (db *DB) GetReviewByJobID(jobID int64) (*Review, error) { WHERE rv.job_id = ? `, jobID).Scan(&r.ID, &r.JobID, &r.Agent, &r.Prompt, &r.Output, &reviewFields.CreatedAt, &reviewFields.Closed, &reviewFields.UUID, &reviewFields.VerdictBool, &job.ID, &job.RepoID, &jobFields.CommitID, &job.GitRef, &jobFields.Branch, &jobFields.SessionID, &job.Agent, &job.Reasoning, &job.Status, &jobFields.EnqueuedAt, - &jobFields.StartedAt, &jobFields.FinishedAt, &jobFields.WorkerID, &jobFields.Error, &jobFields.Model, &jobFields.JobType, &jobFields.ReviewType, &jobFields.PatchID, + &jobFields.StartedAt, &jobFields.FinishedAt, &jobFields.WorkerID, &jobFields.Error, &jobFields.Model, &jobFields.Provider, &jobFields.RequestedModel, &jobFields.RequestedProvider, &jobFields.JobType, &jobFields.ReviewType, &jobFields.PatchID, &job.RepoPath, &job.RepoName, &jobFields.CommitSubject, &jobFields.TokenUsage) if err != nil { return nil, err @@ -50,7 +50,7 @@ func (db *DB) GetReviewByCommitSHA(sha string) (*Review, error) { err := db.QueryRow(` SELECT rv.id, rv.job_id, rv.agent, rv.prompt, rv.output, rv.created_at, rv.closed, rv.uuid, rv.verdict_bool, j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.session_id, j.agent, j.reasoning, j.status, j.enqueued_at, - j.started_at, j.finished_at, j.worker_id, j.error, j.model, j.job_type, j.review_type, j.patch_id, + j.started_at, j.finished_at, j.worker_id, j.error, j.model, j.provider, j.requested_model, j.requested_provider, j.job_type, j.review_type, j.patch_id, rp.root_path, rp.name, c.subject, j.token_usage FROM reviews rv JOIN review_jobs j ON j.id = rv.job_id @@ -61,7 +61,7 @@ func (db *DB) GetReviewByCommitSHA(sha string) (*Review, error) { LIMIT 1 `, sha).Scan(&r.ID, &r.JobID, &r.Agent, &r.Prompt, &r.Output, &reviewFields.CreatedAt, &reviewFields.Closed, &reviewFields.UUID, &reviewFields.VerdictBool, &job.ID, &job.RepoID, &jobFields.CommitID, &job.GitRef, &jobFields.Branch, &jobFields.SessionID, &job.Agent, &job.Reasoning, &job.Status, &jobFields.EnqueuedAt, - &jobFields.StartedAt, &jobFields.FinishedAt, &jobFields.WorkerID, &jobFields.Error, &jobFields.Model, &jobFields.JobType, &jobFields.ReviewType, &jobFields.PatchID, + &jobFields.StartedAt, &jobFields.FinishedAt, &jobFields.WorkerID, &jobFields.Error, &jobFields.Model, &jobFields.Provider, &jobFields.RequestedModel, &jobFields.RequestedProvider, &jobFields.JobType, &jobFields.ReviewType, &jobFields.PatchID, &job.RepoPath, &job.RepoName, &jobFields.CommitSubject, &jobFields.TokenUsage) if err != nil { return nil, err diff --git a/internal/storage/schemas/postgres_v10.sql b/internal/storage/schemas/postgres_v10.sql new file mode 100644 index 000000000..b8157ae14 --- /dev/null +++ b/internal/storage/schemas/postgres_v10.sql @@ -0,0 +1,105 @@ +-- PostgreSQL schema version 10 +-- Adds provider plus requested model/provider provenance to review_jobs. +-- Note: Version is managed by EnsureSchema(), not this file. + +CREATE SCHEMA IF NOT EXISTS roborev; + +CREATE TABLE IF NOT EXISTS roborev.schema_version ( + version INTEGER PRIMARY KEY, + applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.machines ( + id SERIAL PRIMARY KEY, + machine_id UUID UNIQUE NOT NULL, + name TEXT, + last_seen_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.repos ( + id SERIAL PRIMARY KEY, + identity TEXT UNIQUE NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.commits ( + id SERIAL PRIMARY KEY, + repo_id INTEGER REFERENCES roborev.repos(id), + sha TEXT NOT NULL, + author TEXT NOT NULL, + subject TEXT NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + UNIQUE(repo_id, sha) +); + +CREATE TABLE IF NOT EXISTS roborev.review_jobs ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + repo_id INTEGER NOT NULL REFERENCES roborev.repos(id), + commit_id INTEGER REFERENCES roborev.commits(id), + git_ref TEXT NOT NULL, + branch TEXT, + session_id TEXT, + agent TEXT NOT NULL, + model TEXT, + provider TEXT, + requested_model TEXT, + requested_provider TEXT, + reasoning TEXT, + job_type TEXT NOT NULL DEFAULT 'review', + review_type TEXT NOT NULL DEFAULT '', + patch_id TEXT, + status TEXT NOT NULL CHECK(status IN ('queued', 'running', 'done', 'failed', 'canceled', 'applied', 'rebased')), + agentic BOOLEAN DEFAULT FALSE, + enqueued_at TIMESTAMP WITH TIME ZONE NOT NULL, + started_at TIMESTAMP WITH TIME ZONE, + finished_at TIMESTAMP WITH TIME ZONE, + prompt TEXT, + diff_content TEXT, + error TEXT, + token_usage TEXT, + worktree_path TEXT, + source_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.reviews ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + job_uuid UUID NOT NULL REFERENCES roborev.review_jobs(uuid), + agent TEXT NOT NULL, + prompt TEXT NOT NULL, + output TEXT NOT NULL, + closed BOOLEAN NOT NULL DEFAULT FALSE, + updated_by_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.responses ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + job_uuid UUID NOT NULL REFERENCES roborev.review_jobs(uuid), + responder TEXT NOT NULL, + response TEXT NOT NULL, + source_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_review_jobs_source ON roborev.review_jobs(source_machine_id); +CREATE INDEX IF NOT EXISTS idx_review_jobs_updated ON roborev.review_jobs(updated_at); +-- Note: idx_review_jobs_branch, idx_review_jobs_job_type, and +-- idx_review_jobs_patch_id are created by migration code, not here +-- (to support upgrades from older versions where those columns +-- don't exist yet). +CREATE INDEX IF NOT EXISTS idx_reviews_job_uuid ON roborev.reviews(job_uuid); +CREATE INDEX IF NOT EXISTS idx_reviews_updated ON roborev.reviews(updated_at); +CREATE INDEX IF NOT EXISTS idx_responses_job_uuid ON roborev.responses(job_uuid); +CREATE INDEX IF NOT EXISTS idx_responses_id ON roborev.responses(id); + +CREATE TABLE IF NOT EXISTS roborev.sync_metadata ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL +); diff --git a/internal/storage/sync.go b/internal/storage/sync.go index 6a54d9518..dad9bcf93 100644 --- a/internal/storage/sync.go +++ b/internal/storage/sync.go @@ -260,35 +260,38 @@ func (db *DB) GetRepoByIdentityCaseInsensitive(identity string) (*Repo, error) { // SyncableJob contains job data needed for sync type SyncableJob struct { - ID int64 - UUID string - RepoID int64 - RepoIdentity string - CommitID *int64 - CommitSHA string - CommitAuthor string - CommitSubject string - CommitTimestamp time.Time - GitRef string - SessionID string - Agent string - Model string - Reasoning string - JobType string - ReviewType string - PatchID string - Status string - Agentic bool - EnqueuedAt time.Time - StartedAt *time.Time - FinishedAt *time.Time - Prompt string - DiffContent *string - Error string - TokenUsage string - WorktreePath string - SourceMachineID string - UpdatedAt time.Time + ID int64 + UUID string + RepoID int64 + RepoIdentity string + CommitID *int64 + CommitSHA string + CommitAuthor string + CommitSubject string + CommitTimestamp time.Time + GitRef string + SessionID string + Agent string + Model string + Provider string + RequestedModel string + RequestedProvider string + Reasoning string + JobType string + ReviewType string + PatchID string + Status string + Agentic bool + EnqueuedAt time.Time + StartedAt *time.Time + FinishedAt *time.Time + Prompt string + DiffContent *string + Error string + TokenUsage string + WorktreePath string + SourceMachineID string + UpdatedAt time.Time } // GetJobsToSync returns terminal jobs that need to be pushed to PostgreSQL. @@ -298,7 +301,7 @@ func (db *DB) GetJobsToSync(machineID string, limit int) ([]SyncableJob, error) SELECT j.id, j.uuid, j.repo_id, COALESCE(r.identity, ''), j.commit_id, COALESCE(c.sha, ''), COALESCE(c.author, ''), COALESCE(c.subject, ''), COALESCE(c.timestamp, ''), - j.git_ref, COALESCE(j.session_id, ''), j.agent, COALESCE(j.model, ''), COALESCE(j.reasoning, ''), COALESCE(j.job_type, 'review'), COALESCE(j.review_type, ''), COALESCE(j.patch_id, ''), j.status, j.agentic, + j.git_ref, COALESCE(j.session_id, ''), j.agent, COALESCE(j.model, ''), COALESCE(j.provider, ''), COALESCE(j.requested_model, ''), COALESCE(j.requested_provider, ''), COALESCE(j.reasoning, ''), COALESCE(j.job_type, 'review'), COALESCE(j.review_type, ''), COALESCE(j.patch_id, ''), j.status, j.agentic, j.enqueued_at, COALESCE(j.started_at, ''), COALESCE(j.finished_at, ''), COALESCE(j.prompt, ''), j.diff_content, COALESCE(j.error, ''), COALESCE(j.token_usage, ''), COALESCE(j.worktree_path, ''), j.source_machine_id, j.updated_at @@ -333,7 +336,7 @@ func (db *DB) GetJobsToSync(machineID string, limit int) ([]SyncableJob, error) err := rows.Scan( &j.ID, &j.UUID, &j.RepoID, &j.RepoIdentity, &commitID, &j.CommitSHA, &j.CommitAuthor, &j.CommitSubject, &commitTimestamp, - &j.GitRef, &j.SessionID, &j.Agent, &j.Model, &j.Reasoning, &j.JobType, &j.ReviewType, &j.PatchID, &j.Status, &j.Agentic, + &j.GitRef, &j.SessionID, &j.Agent, &j.Model, &j.Provider, &j.RequestedModel, &j.RequestedProvider, &j.Reasoning, &j.JobType, &j.ReviewType, &j.PatchID, &j.Status, &j.Agentic, &enqueuedAt, &startedAt, &finishedAt, &j.Prompt, &diffContent, &j.Error, &j.TokenUsage, &j.WorktreePath, &j.SourceMachineID, &updatedAt, @@ -577,15 +580,18 @@ func (db *DB) UpsertPulledJob(j PulledJob, repoID int64, commitID *int64) error now := time.Now().UTC().Format(time.RFC3339) _, err := db.Exec(` INSERT INTO review_jobs ( - uuid, repo_id, commit_id, git_ref, session_id, agent, model, reasoning, job_type, review_type, patch_id, status, agentic, + uuid, repo_id, commit_id, git_ref, session_id, agent, model, provider, requested_model, requested_provider, reasoning, job_type, review_type, patch_id, status, agentic, enqueued_at, started_at, finished_at, prompt, diff_content, error, token_usage, worktree_path, source_machine_id, updated_at, synced_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(uuid) DO UPDATE SET status = excluded.status, finished_at = excluded.finished_at, error = excluded.error, model = COALESCE(excluded.model, review_jobs.model), + provider = COALESCE(excluded.provider, review_jobs.provider), + requested_model = COALESCE(excluded.requested_model, review_jobs.requested_model), + requested_provider = COALESCE(excluded.requested_provider, review_jobs.requested_provider), git_ref = excluded.git_ref, session_id = COALESCE(excluded.session_id, review_jobs.session_id), commit_id = excluded.commit_id, @@ -594,7 +600,7 @@ func (db *DB) UpsertPulledJob(j PulledJob, repoID int64, commitID *int64) error worktree_path = COALESCE(excluded.worktree_path, review_jobs.worktree_path), updated_at = excluded.updated_at, synced_at = ? - `, j.UUID, repoID, commitID, j.GitRef, nullStr(j.SessionID), j.Agent, nullStr(j.Model), j.Reasoning, j.JobType, + `, j.UUID, repoID, commitID, j.GitRef, nullStr(j.SessionID), j.Agent, nullStr(j.Model), nullStr(j.Provider), nullStr(j.RequestedModel), nullStr(j.RequestedProvider), j.Reasoning, j.JobType, j.ReviewType, nullStr(j.PatchID), j.Status, j.Agentic, j.EnqueuedAt.Format(time.RFC3339), nullTimeStr(j.StartedAt), nullTimeStr(j.FinishedAt), nullStr(j.Prompt), j.DiffContent, nullStr(j.Error), nullStr(j.TokenUsage), From 686ddbeae96d1a61a934c4c38419dd1026d2c988 Mon Sep 17 00:00:00 2001 From: Marius van Niekerk Date: Fri, 27 Mar 2026 00:34:42 -0400 Subject: [PATCH 2/4] fix(postgres): correct review_jobs upsert placeholders MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix the PostgreSQL UpsertJob INSERT to match the target column count. This addresses the GitHub Actions integration failure in TestIntegration_UpsertJob_BackfillsModel. Validation: - go test ./internal/storage ./internal/daemon ./cmd/roborev/tui - go vet ./... 🤖 Generated with [OpenAI Codex](https://openai.com/codex) Co-authored-by: OpenAI Codex --- internal/storage/postgres.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 3133edf89..3a517390b 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -556,7 +556,7 @@ func (p *PgPool) UpsertJob(ctx context.Context, j SyncableJob, pgRepoID int64, p uuid, repo_id, commit_id, git_ref, session_id, agent, model, provider, requested_model, requested_provider, reasoning, job_type, review_type, patch_id, status, agentic, enqueued_at, started_at, finished_at, prompt, diff_content, error, token_usage, worktree_path, source_machine_id, updated_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, NOW()) + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, NOW()) ON CONFLICT (uuid) DO UPDATE SET status = EXCLUDED.status, finished_at = EXCLUDED.finished_at, From fce64372e5f88ff13fefc66f87f10d714a81a0bf Mon Sep 17 00:00:00 2001 From: Marius van Niekerk Date: Fri, 27 Mar 2026 00:52:17 -0400 Subject: [PATCH 3/4] fix: address PR review findings on rerun provenance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses the three issues raised on PR #581: - preserve agentic during SQLite rebuild migration - resolve rerun config from worktree path when present - allow sync upserts to clear model/provider/requested fields Validation: - go test ./internal/storage ./internal/daemon ./cmd/roborev/tui - go vet ./... 🤖 Generated with [OpenAI Codex](https://openai.com/codex) Co-authored-by: OpenAI Codex --- internal/daemon/server.go | 6 ++- internal/daemon/server_actions_test.go | 22 +++++++++++ internal/storage/db.go | 1 + internal/storage/db_migration_test.go | 8 +++- internal/storage/postgres.go | 8 ++-- internal/storage/postgres_test.go | 46 +++++++++++++--------- internal/storage/sync.go | 8 ++-- internal/storage/sync_backfill_test.go | 54 ++++++++++++++++++++++---- 8 files changed, 116 insertions(+), 37 deletions(-) diff --git a/internal/daemon/server.go b/internal/daemon/server.go index 4ff00c339..8e152b86b 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -657,7 +657,11 @@ func workflowForJob(jobType, reviewType string) string { func resolveRerunModelProvider(job *storage.ReviewJob, cfg *config.Config) (string, string) { workflow := workflowForJob(job.JobType, job.ReviewType) - resolution := agent.ResolveWorkflowConfig("", job.RepoPath, cfg, workflow, job.Reasoning) + resolutionPath := job.RepoPath + if strings.TrimSpace(job.WorktreePath) != "" { + resolutionPath = job.WorktreePath + } + resolution := agent.ResolveWorkflowConfig("", resolutionPath, cfg, workflow, job.Reasoning) model := resolution.ModelForSelectedAgent(job.Agent, job.RequestedModel) provider := strings.TrimSpace(job.RequestedProvider) return model, provider diff --git a/internal/daemon/server_actions_test.go b/internal/daemon/server_actions_test.go index 8e93aca72..01111e4de 100644 --- a/internal/daemon/server_actions_test.go +++ b/internal/daemon/server_actions_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "net/http" "net/http/httptest" + "os" "path/filepath" "strings" "testing" @@ -447,6 +448,27 @@ func TestHandleRerunJob(t *testing.T) { }) } +func TestResolveRerunModelProviderUsesWorktreeConfig(t *testing.T) { + mainRepo := t.TempDir() + worktreeRepo := t.TempDir() + + require.NoError(t, os.WriteFile(filepath.Join(mainRepo, ".roborev.toml"), []byte("review_model = \"main-model\"\n"), 0o644)) + require.NoError(t, os.WriteFile(filepath.Join(worktreeRepo, ".roborev.toml"), []byte("review_model = \"worktree-model\"\n"), 0o644)) + + job := &storage.ReviewJob{ + Agent: "test", + JobType: storage.JobTypeReview, + ReviewType: config.ReviewTypeDefault, + Reasoning: "thorough", + RepoPath: mainRepo, + WorktreePath: worktreeRepo, + } + + model, provider := resolveRerunModelProvider(job, config.DefaultConfig()) + assert.Equal(t, "worktree-model", model) + assert.Empty(t, provider) +} + // TestHandleAddCommentToJobStates tests that comments can be added to jobs // in any state: queued, running, done, failed, and canceled. func TestHandleAddCommentToJobStates(t *testing.T) { diff --git a/internal/storage/db.go b/internal/storage/db.go index e053cecfc..620383f01 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -369,6 +369,7 @@ func (db *DB) migrate() error { case "reasoning": hasReasoning = true case "agentic": + hasAgentic = true case "model": hasModel = true case "provider": diff --git a/internal/storage/db_migration_test.go b/internal/storage/db_migration_test.go index bbfbaca55..5a6352ce6 100644 --- a/internal/storage/db_migration_test.go +++ b/internal/storage/db_migration_test.go @@ -575,8 +575,8 @@ func TestMigrationQuotedTableWithOrphanedFK(t *testing.T) { VALUES (1, '/tmp/test', 'test'); INSERT INTO commits (id, repo_id, sha, author, subject, timestamp) VALUES (1, 1, 'abc123', 'Author', 'Subject', '2024-01-01'); - INSERT INTO review_jobs (id, repo_id, commit_id, git_ref, agent, status) - VALUES (1, 1, 1, 'abc123', 'codex', 'done'); + INSERT INTO review_jobs (id, repo_id, commit_id, git_ref, agent, status, agentic) + VALUES (1, 1, 1, 'abc123', 'codex', 'done', 1); INSERT INTO reviews (id, job_id, agent, prompt, output) VALUES (1, 1, 'codex', 'test', 'looks good'); `) @@ -623,6 +623,10 @@ func TestMigrationQuotedTableWithOrphanedFK(t *testing.T) { }, "Review output not preserved: got %q", review.Output) } + job, err := db.GetJobByID(1) + require.NoError(t, err) + assert.True(t, job.Agentic, "agentic flag should survive rebuild migration") + // Verify applied/rebased statuses work _, err = db.Exec( `UPDATE review_jobs SET status = 'applied' WHERE id = 1`, diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 3a517390b..bda83939b 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -561,10 +561,10 @@ func (p *PgPool) UpsertJob(ctx context.Context, j SyncableJob, pgRepoID int64, p status = EXCLUDED.status, finished_at = EXCLUDED.finished_at, error = EXCLUDED.error, - model = COALESCE(EXCLUDED.model, review_jobs.model), - provider = COALESCE(EXCLUDED.provider, review_jobs.provider), - requested_model = COALESCE(EXCLUDED.requested_model, review_jobs.requested_model), - requested_provider = COALESCE(EXCLUDED.requested_provider, review_jobs.requested_provider), + model = EXCLUDED.model, + provider = EXCLUDED.provider, + requested_model = EXCLUDED.requested_model, + requested_provider = EXCLUDED.requested_provider, git_ref = EXCLUDED.git_ref, session_id = COALESCE(EXCLUDED.session_id, review_jobs.session_id), commit_id = EXCLUDED.commit_id, diff --git a/internal/storage/postgres_test.go b/internal/storage/postgres_test.go index 1ea22add4..be74303fb 100644 --- a/internal/storage/postgres_test.go +++ b/internal/storage/postgres_test.go @@ -1069,36 +1069,44 @@ func TestIntegration_UpsertJob_BackfillsModel(t *testing.T) { // Upsert with a model value - should backfill job := SyncableJob{ - UUID: jobUUID, - RepoIdentity: repoIdentity, - GitRef: "HEAD", - Agent: "test-agent", - Model: "gpt-4", // Now providing a model - Status: "done", - SourceMachineID: machineID, - EnqueuedAt: time.Now(), + UUID: jobUUID, + RepoIdentity: repoIdentity, + GitRef: "HEAD", + Agent: "test-agent", + Model: "gpt-4", // Now providing a model + Provider: "openai", + RequestedModel: "gpt-4", + RequestedProvider: "openai", + Status: "done", + SourceMachineID: machineID, + EnqueuedAt: time.Now(), } err = pool.UpsertJob(ctx, job, repoID, nil) require.NoError(t, err, "UpsertJob failed: %v") - // Verify model was backfilled + // Verify model was backfilled. var modelAfter *string err = pool.pool.QueryRow(ctx, `SELECT model FROM review_jobs WHERE uuid = $1`, jobUUID).Scan(&modelAfter) require.NoError(t, err, "Failed to query model after: %v") - assert.NotNil(t, modelAfter, "Expected model to be backfilled, but it's still NULL") if modelAfter != nil { assert.Equal(t, "gpt-4", *modelAfter) } - // Also verify that upserting with empty model doesn't clear existing model - job.Model = "" // Empty model + // Upserting with empty values should clear the effective/requested fields so + // synced reruns can propagate implicit/default state. + job.Model = "" + job.Provider = "" + job.RequestedModel = "" + job.RequestedProvider = "" err = pool.UpsertJob(ctx, job, repoID, nil) - require.NoError(t, err, "UpsertJob (empty model) failed: %v") - - var modelPreserved *string - err = pool.pool.QueryRow(ctx, `SELECT model FROM review_jobs WHERE uuid = $1`, jobUUID).Scan(&modelPreserved) - require.NoError(t, err, "Failed to query model preserved: %v") - - assert.False(t, modelPreserved == nil || *modelPreserved != "gpt-4") + require.NoError(t, err, "UpsertJob (empty model/provider) failed: %v") + + var modelCleared, providerCleared, requestedModelCleared, requestedProviderCleared *string + err = pool.pool.QueryRow(ctx, `SELECT model, provider, requested_model, requested_provider FROM review_jobs WHERE uuid = $1`, jobUUID).Scan(&modelCleared, &providerCleared, &requestedModelCleared, &requestedProviderCleared) + require.NoError(t, err, "Failed to query cleared fields: %v") + assert.Nil(t, modelCleared, "Expected empty model upsert to clear existing model") + assert.Nil(t, providerCleared, "Expected empty provider upsert to clear existing provider") + assert.Nil(t, requestedModelCleared, "Expected empty requested_model upsert to clear existing requested model") + assert.Nil(t, requestedProviderCleared, "Expected empty requested_provider upsert to clear existing requested provider") } diff --git a/internal/storage/sync.go b/internal/storage/sync.go index dad9bcf93..9038f1c71 100644 --- a/internal/storage/sync.go +++ b/internal/storage/sync.go @@ -588,10 +588,10 @@ func (db *DB) UpsertPulledJob(j PulledJob, repoID int64, commitID *int64) error status = excluded.status, finished_at = excluded.finished_at, error = excluded.error, - model = COALESCE(excluded.model, review_jobs.model), - provider = COALESCE(excluded.provider, review_jobs.provider), - requested_model = COALESCE(excluded.requested_model, review_jobs.requested_model), - requested_provider = COALESCE(excluded.requested_provider, review_jobs.requested_provider), + model = excluded.model, + provider = excluded.provider, + requested_model = excluded.requested_model, + requested_provider = excluded.requested_provider, git_ref = excluded.git_ref, session_id = COALESCE(excluded.session_id, review_jobs.session_id), commit_id = excluded.commit_id, diff --git a/internal/storage/sync_backfill_test.go b/internal/storage/sync_backfill_test.go index b9c167a61..bcfd5a68e 100644 --- a/internal/storage/sync_backfill_test.go +++ b/internal/storage/sync_backfill_test.go @@ -210,16 +210,16 @@ func TestUpsertPulledJob_BackfillsModel(t *testing.T) { } assert.Equal(t, "gpt-4", modelAfter.String) - // Also verify that upserting with empty model doesn't clear existing model - pulledJob.Model = "" // Empty model + // Upserting with empty model should clear the existing effective model so + // synced reruns can propagate implicit/default state. + pulledJob.Model = "" err = db.UpsertPulledJob(pulledJob, repo.ID, nil) require.NoError(t, err, "UpsertPulledJob (empty model) failed: %v") - var modelPreserved sql.NullString - err = db.QueryRow(`SELECT model FROM review_jobs WHERE uuid = ?`, jobUUID).Scan(&modelPreserved) - require.NoError(t, err, "Failed to query model preserved: %v") - - assert.False(t, !modelPreserved.Valid || modelPreserved.String != "gpt-4") + var modelCleared sql.NullString + err = db.QueryRow(`SELECT model FROM review_jobs WHERE uuid = ?`, jobUUID).Scan(&modelCleared) + require.NoError(t, err, "Failed to query model cleared: %v") + assert.False(t, modelCleared.Valid) } func TestGetJobsToSync_IncludesWorktreePath(t *testing.T) { @@ -301,3 +301,43 @@ func TestUpsertPulledJob_PreservesWorktreePath(t *testing.T) { require.NoError(t, err) assert.Equal(t, "/worktrees/my-branch", wt) } + +func TestUpsertPulledJob_ClearsModelAndProviderFields(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo, err := db.GetOrCreateRepo("/test/repo-sync-clear") + require.NoError(t, err) + + jobUUID := "test-uuid-clear-" + time.Now().Format("20060102150405") + pulledJob := PulledJob{ + UUID: jobUUID, + RepoIdentity: "/test/repo-sync-clear", + GitRef: "HEAD", + Agent: "test-agent", + Model: "gpt-4", + Provider: "openai", + RequestedModel: "gpt-4", + RequestedProvider: "openai", + Status: "done", + SourceMachineID: "test-machine", + EnqueuedAt: time.Now(), + UpdatedAt: time.Now(), + } + require.NoError(t, db.UpsertPulledJob(pulledJob, repo.ID, nil)) + + pulledJob.Model = "" + pulledJob.Provider = "" + pulledJob.RequestedModel = "" + pulledJob.RequestedProvider = "" + pulledJob.UpdatedAt = time.Now().Add(time.Second) + require.NoError(t, db.UpsertPulledJob(pulledJob, repo.ID, nil)) + + var model, provider, requestedModel, requestedProvider *string + err = db.QueryRow(`SELECT model, provider, requested_model, requested_provider FROM review_jobs WHERE uuid = ?`, jobUUID).Scan(&model, &provider, &requestedModel, &requestedProvider) + require.NoError(t, err) + assert.Nil(t, model) + assert.Nil(t, provider) + assert.Nil(t, requestedModel) + assert.Nil(t, requestedProvider) +} From b55b5f236d95b49b5c0cb14bd54f009f0047fe4e Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 29 Mar 2026 21:59:20 -0500 Subject: [PATCH 4/4] Address review feedback --- internal/daemon/server.go | 5 +++-- internal/daemon/server_actions_test.go | 8 ++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/internal/daemon/server.go b/internal/daemon/server.go index 8e152b86b..ac166f05f 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -644,9 +644,10 @@ func (s *Server) writeInternalError(w http.ResponseWriter, msg string) { func workflowForJob(jobType, reviewType string) string { // "default" uses the standard "review" workflow; others use their own name. - // Compact jobs use the "fix" workflow since they're part of that pipeline. + // Fix and compact jobs use the "fix" workflow since they're part of + // that pipeline. workflow := "review" - if jobType == storage.JobTypeCompact { + if jobType == storage.JobTypeFix || jobType == storage.JobTypeCompact { return "fix" } if !config.IsDefaultReviewType(reviewType) { diff --git a/internal/daemon/server_actions_test.go b/internal/daemon/server_actions_test.go index 01111e4de..c48e6adb3 100644 --- a/internal/daemon/server_actions_test.go +++ b/internal/daemon/server_actions_test.go @@ -448,6 +448,14 @@ func TestHandleRerunJob(t *testing.T) { }) } +func TestWorkflowForJobFixType(t *testing.T) { + assert := assert.New(t) + assert.Equal("fix", workflowForJob(storage.JobTypeFix, config.ReviewTypeDefault)) + assert.Equal("fix", workflowForJob(storage.JobTypeCompact, config.ReviewTypeDefault)) + assert.Equal("review", workflowForJob(storage.JobTypeReview, config.ReviewTypeDefault)) + assert.Equal("security", workflowForJob(storage.JobTypeReview, "security")) +} + func TestResolveRerunModelProviderUsesWorktreeConfig(t *testing.T) { mainRepo := t.TempDir() worktreeRepo := t.TempDir()