diff --git a/cmd/roborev/tui/queue_test.go b/cmd/roborev/tui/queue_test.go index 9ce85f50..e72bc95c 100644 --- a/cmd/roborev/tui/queue_test.go +++ b/cmd/roborev/tui/queue_test.go @@ -2200,12 +2200,22 @@ func TestParseColumnOrderAppendsMissing(t *testing.T) { assert.True(t, slices.Equal(got[:len(wantPrefix)], wantPrefix)) pfCount := 0 + requestedModelCount := 0 + requestedProviderCount := 0 for _, c := range got { if c == colPF { pfCount++ } + if c == colRequestedModel { + requestedModelCount++ + } + if c == colRequestedProvider { + requestedProviderCount++ + } } assert.Equal(t, 1, pfCount) + assert.Equal(t, 1, requestedModelCount) + assert.Equal(t, 1, requestedProviderCount) } func TestDefaultColumnOrderDetection(t *testing.T) { @@ -2221,3 +2231,10 @@ func TestDefaultColumnOrderDetection(t *testing.T) { assert.False(t, slices.Equal(customOrder, toggleableColumns)) } + +func TestDefaultHiddenColumnsIncludeRequestedFields(t *testing.T) { + hidden := parseHiddenColumns(nil) + assert.True(t, hidden[colSessionID]) + assert.True(t, hidden[colRequestedModel]) + assert.True(t, hidden[colRequestedProvider]) +} diff --git a/cmd/roborev/tui/render_queue.go b/cmd/roborev/tui/render_queue.go index 238edfb3..a76246c7 100644 --- a/cmd/roborev/tui/render_queue.go +++ b/cmd/roborev/tui/render_queue.go @@ -100,19 +100,21 @@ func (m model) getVisibleSelectedIdx() int { // Queue table column indices. const ( - colSel = iota // "> " selection indicator - colJobID // Job ID - colRef // Git ref (short SHA or range) - colBranch // Branch name - colRepo // Repository display name - colAgent // Agent name - colQueued // Enqueue timestamp - colElapsed // Elapsed time - colStatus // Job status - colPF // Pass/Fail verdict - colHandled // Done status - colSessionID // Session ID - colCount // total number of columns + colSel = iota // "> " selection indicator + colJobID // Job ID + colRef // Git ref (short SHA or range) + colBranch // Branch name + colRepo // Repository display name + colAgent // Agent name + colQueued // Enqueue timestamp + colElapsed // Elapsed time + colStatus // Job status + colPF // Pass/Fail verdict + colHandled // Done status + colSessionID // Session ID + colRequestedModel // Explicitly requested model + colRequestedProvider // Explicitly requested provider + colCount // total number of columns ) func (m model) renderQueueView() string { @@ -309,15 +311,17 @@ func (m model) renderQueueView() string { // Fixed-width columns: exact sizes (content + padding, not counting inter-column spacing) fixedWidth := map[int]int{ - colSel: 2, - colJobID: idWidth, - colStatus: max(contentWidth[colStatus], 6), // "Status" header = 6, auto-sizes to content - colQueued: 12, - colElapsed: 8, - colPF: 3, // "P/F" header = 3 - colHandled: max(contentWidth[colHandled], 6), // "Closed" header = 6 - colAgent: min(max(contentWidth[colAgent], 5), 12), // "Agent" header = 5, cap at 12 - colSessionID: min(max(contentWidth[colSessionID], 7), 12), // "Session" header = 7, cap at 12 + colSel: 2, + colJobID: idWidth, + colStatus: max(contentWidth[colStatus], 6), // "Status" header = 6, auto-sizes to content + colQueued: 12, + colElapsed: 8, + colPF: 3, // "P/F" header = 3 + colHandled: max(contentWidth[colHandled], 6), // "Closed" header = 6 + colAgent: min(max(contentWidth[colAgent], 5), 12), // "Agent" header = 5, cap at 12 + colSessionID: min(max(contentWidth[colSessionID], 7), 12), // "Session" header = 7, cap at 12 + colRequestedModel: min(max(contentWidth[colRequestedModel], 15), 24), // "Req Model" header = 9 + colRequestedProvider: min(max(contentWidth[colRequestedProvider], 18), 24), // "Req Provider" header = 12 } // Flexible columns absorb excess space @@ -593,8 +597,8 @@ func (m model) renderQueueView() string { } // jobCells returns plain text cell values for a job row. -// Order: ref, branch, repo, agent, queued, elapsed, status, pf, handled -// (colRef through colHandled, 9 values). +// Order: ref, branch, repo, agent, queued, elapsed, status, pf, handled, +// session, requested model, requested provider. func (m model) jobCells(job storage.ReviewJob) []string { ref := shortJobRef(job) if !config.IsDefaultReviewType(job.ReviewType) { @@ -645,7 +649,10 @@ func (m model) jobCells(job storage.ReviewJob) []string { sessionID = string(runes[:12]) } - return []string{ref, branch, repo, agentName, enqueued, elapsed, status, verdict, handled, sessionID} + requestedModel := stripControlChars(job.RequestedModel) + requestedProvider := stripControlChars(job.RequestedProvider) + + return []string{ref, branch, repo, agentName, enqueued, elapsed, status, verdict, handled, sessionID, requestedModel, requestedProvider} } // statusLabel returns a capitalized display label for the job status. @@ -770,34 +777,38 @@ func migrateColumnConfig(cfg *config.Config) bool { // toggleableColumns is the ordered list of columns the user can show/hide. // colSel and colJobID are always visible and not included here. -var toggleableColumns = []int{colRef, colBranch, colRepo, colAgent, colQueued, colElapsed, colStatus, colPF, colHandled, colSessionID} +var toggleableColumns = []int{colRef, colBranch, colRepo, colAgent, colQueued, colElapsed, colStatus, colPF, colHandled, colSessionID, colRequestedModel, colRequestedProvider} // columnNames maps column constants to display names. var columnNames = map[int]string{ - colRef: "Ref", - colBranch: "Branch", - colRepo: "Repo", - colAgent: "Agent", - colStatus: "Status", - colQueued: "Queued", - colElapsed: "Elapsed", - colPF: "P/F", - colHandled: "Closed", - colSessionID: "Session", + colRef: "Ref", + colBranch: "Branch", + colRepo: "Repo", + colAgent: "Agent", + colStatus: "Status", + colQueued: "Queued", + colElapsed: "Elapsed", + colPF: "P/F", + colHandled: "Closed", + colSessionID: "Session", + colRequestedModel: "Req Model", + colRequestedProvider: "Req Provider", } // columnConfigNames maps column constants to config file names (lowercase). var columnConfigNames = map[int]string{ - colRef: "ref", - colBranch: "branch", - colRepo: "repo", - colAgent: "agent", - colStatus: "status", - colQueued: "queued", - colElapsed: "elapsed", - colPF: "pf", - colHandled: "closed", - colSessionID: "session_id", + colRef: "ref", + colBranch: "branch", + colRepo: "repo", + colAgent: "agent", + colStatus: "status", + colQueued: "queued", + colElapsed: "elapsed", + colPF: "pf", + colHandled: "closed", + colSessionID: "session_id", + colRequestedModel: "requested_model", + colRequestedProvider: "requested_provider", } // drainFlexOverflow reduces flex column widths to absorb overflow, @@ -838,7 +849,9 @@ func columnDisplayName(col int) string { // defaultHiddenColumns lists columns that are hidden by default. // Users can enable them via the column options modal. var defaultHiddenColumns = map[int]bool{ - colSessionID: true, + colSessionID: true, + colRequestedModel: true, + colRequestedProvider: true, } // parseHiddenColumns converts config hidden_columns strings to column ID set. diff --git a/cmd/roborev/tui/tui.go b/cmd/roborev/tui/tui.go index 44e9256a..e4e10e19 100644 --- a/cmd/roborev/tui/tui.go +++ b/cmd/roborev/tui/tui.go @@ -453,7 +453,7 @@ func newModel(ep daemon.DaemonEndpoint, opts ...option) model { tabWidth := 2 columnBorders := false tasksEnabled := false - hiddenCols := map[int]bool{} + hiddenCols := parseHiddenColumns(nil) colOrder := parseColumnOrder(nil) taskColOrder := parseTaskColumnOrder(nil) var cwdRepoRoot, cwdBranch string diff --git a/cmd/roborev/tui/types.go b/cmd/roborev/tui/types.go index 78a6c4bb..bc82a5ae 100644 --- a/cmd/roborev/tui/types.go +++ b/cmd/roborev/tui/types.go @@ -89,7 +89,7 @@ const ( ) // columnOption represents an item in the column options modal. -// id is a column constant (colRef..colHandled) or a sentinel option ID. +// id is a queue column constant or a sentinel option ID. type columnOption struct { id int // column constant or sentinel option ID name string // display label diff --git a/internal/daemon/server.go b/internal/daemon/server.go index eaa7f928..ac166f05 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -642,6 +642,32 @@ func (s *Server) writeInternalError(w http.ResponseWriter, msg string) { } } +func workflowForJob(jobType, reviewType string) string { + // "default" uses the standard "review" workflow; others use their own name. + // Fix and compact jobs use the "fix" workflow since they're part of + // that pipeline. + workflow := "review" + if jobType == storage.JobTypeFix || jobType == storage.JobTypeCompact { + return "fix" + } + if !config.IsDefaultReviewType(reviewType) { + return reviewType + } + return workflow +} + +func resolveRerunModelProvider(job *storage.ReviewJob, cfg *config.Config) (string, string) { + workflow := workflowForJob(job.JobType, job.ReviewType) + resolutionPath := job.RepoPath + if strings.TrimSpace(job.WorktreePath) != "" { + resolutionPath = job.WorktreePath + } + resolution := agent.ResolveWorkflowConfig("", resolutionPath, cfg, workflow, job.Reasoning) + model := resolution.ModelForSelectedAgent(job.Agent, job.RequestedModel) + provider := strings.TrimSpace(job.RequestedProvider) + return model, provider +} + func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed") @@ -754,15 +780,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { return } - // Map review_type to config workflow for agent/model resolution. - // "default" uses the standard "review" workflow; others use their own name. - // Compact jobs use the "fix" workflow since they're part of that pipeline. - workflow := "review" - if req.JobType == "compact" { - workflow = "fix" - } else if !config.IsDefaultReviewType(req.ReviewType) { - workflow = req.ReviewType - } + workflow := workflowForJob(req.JobType, req.ReviewType) // Resolve reasoning level for the determined workflow. // Compact jobs use fix reasoning (default "standard"), not review @@ -778,6 +796,9 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { return } + requestedModel := strings.TrimSpace(req.Model) + requestedProvider := strings.TrimSpace(req.Provider) + // Resolve agent for workflow at this reasoning level cfg := s.configWatcher.Config() resolution := agent.ResolveWorkflowConfig( @@ -802,7 +823,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { agentName = resolved.Name() } - model := resolution.ModelForSelectedAgent(agentName, req.Model) + model := resolution.ModelForSelectedAgent(agentName, requestedModel) if req.JobType == storage.JobTypeInsights { if req.Since == "" { @@ -854,19 +875,21 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { if isPrompt { // Custom prompt job - use provided prompt directly job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - Branch: req.Branch, - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - Prompt: req.CustomPrompt, - OutputPrefix: req.OutputPrefix, - Agentic: req.Agentic, - Label: gitRef, // Use git_ref as TUI label (run, analyze type, custom) - JobType: req.JobType, - Provider: req.Provider, - WorktreePath: worktreePath, + RepoID: repo.ID, + Branch: req.Branch, + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + Prompt: req.CustomPrompt, + OutputPrefix: req.OutputPrefix, + Agentic: req.Agentic, + Label: gitRef, // Use git_ref as TUI label (run, analyze type, custom) + JobType: req.JobType, + Provider: requestedProvider, + RequestedModel: requestedModel, + RequestedProvider: requestedProvider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue prompt job: %v", err)) @@ -876,17 +899,19 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { // Dirty review - use pre-captured diff targetSHA, _ := git.ResolveSHA(checkoutRoot, "HEAD") job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - GitRef: gitRef, - Branch: req.Branch, - SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, targetSHA), - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - DiffContent: req.DiffContent, - Provider: req.Provider, - WorktreePath: worktreePath, + RepoID: repo.ID, + GitRef: gitRef, + Branch: req.Branch, + SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, targetSHA), + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + DiffContent: req.DiffContent, + Provider: requestedProvider, + RequestedModel: requestedModel, + RequestedProvider: requestedProvider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue dirty job: %v", err)) @@ -951,16 +976,18 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { } job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - GitRef: fullRef, - Branch: req.Branch, - SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, endSHA), - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - Provider: req.Provider, - WorktreePath: worktreePath, + RepoID: repo.ID, + GitRef: fullRef, + Branch: req.Branch, + SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, endSHA), + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + Provider: requestedProvider, + RequestedModel: requestedModel, + RequestedProvider: requestedProvider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue job: %v", err)) @@ -1001,18 +1028,20 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { patchID := git.GetPatchID(checkoutRoot, sha) job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - CommitID: commit.ID, - GitRef: sha, - Branch: req.Branch, - SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, sha), - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - PatchID: patchID, - Provider: req.Provider, - WorktreePath: worktreePath, + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: sha, + Branch: req.Branch, + SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, sha), + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + PatchID: patchID, + Provider: requestedProvider, + RequestedModel: requestedModel, + RequestedProvider: requestedProvider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue job: %v", err)) @@ -1698,7 +1727,19 @@ func (s *Server) handleRerunJob(w http.ResponseWriter, r *http.Request) { return } - if err := s.db.ReenqueueJob(req.JobID); err != nil { + job, err := s.db.GetJobByID(req.JobID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + writeError(w, http.StatusNotFound, "job not found or not rerunnable") + return + } + writeError(w, http.StatusInternalServerError, fmt.Sprintf("load job: %v", err)) + return + } + + model, provider := resolveRerunModelProvider(job, s.configWatcher.Config()) + + if err := s.db.ReenqueueJob(req.JobID, storage.ReenqueueOpts{Model: model, Provider: provider}); err != nil { if errors.Is(err, sql.ErrNoRows) { writeError(w, http.StatusNotFound, "job not found or not rerunnable") return diff --git a/internal/daemon/server_actions_test.go b/internal/daemon/server_actions_test.go index 8f30239b..c48e6adb 100644 --- a/internal/daemon/server_actions_test.go +++ b/internal/daemon/server_actions_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "net/http" "net/http/httptest" + "os" "path/filepath" "strings" "testing" @@ -356,6 +357,41 @@ func TestHandleRerunJob(t *testing.T) { } }) + t.Run("rerun reevaluates implicit effective model", func(t *testing.T) { + isolatedDB, isolatedDir := testutil.OpenTestDBWithDir(t) + server := NewServer(isolatedDB, config.DefaultConfig(), "") + + repo, err := isolatedDB.GetOrCreateRepo(isolatedDir) + require.NoError(t, err) + commit, err := isolatedDB.GetOrCreateCommit(repo.ID, "rerun-implicit-model", "Author", "Subject", time.Now()) + require.NoError(t, err) + job, err := isolatedDB.EnqueueJob(storage.EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "rerun-implicit-model", + Agent: "opencode", + Model: "minimax-m2.5-free", + }) + require.NoError(t, err) + + claimed, err := isolatedDB.ClaimJob("worker-1") + require.NoError(t, err) + require.NotNil(t, claimed) + require.Equal(t, job.ID, claimed.ID) + require.NoError(t, isolatedDB.CompleteJob(job.ID, "opencode", "prompt", "output")) + + req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/job/rerun", RerunJobRequest{JobID: job.ID}) + w := httptest.NewRecorder() + + server.handleRerunJob(w, req) + testutil.AssertStatusCode(t, w, http.StatusOK) + + updated, err := isolatedDB.GetJobByID(job.ID) + require.NoError(t, err) + assert.Equal(t, storage.JobStatusQueued, updated.Status) + assert.Empty(t, updated.Model, "rerun should recompute implicit model instead of preserving stale effective value") + }) + t.Run("rerun queued job fails", func(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "rerun-queued", "Author", "Subject", time.Now()) job, _ := db.EnqueueJob(storage.EnqueueOpts{RepoID: repo.ID, CommitID: commit.ID, GitRef: "rerun-queued", Agent: "test"}) @@ -412,6 +448,35 @@ func TestHandleRerunJob(t *testing.T) { }) } +func TestWorkflowForJobFixType(t *testing.T) { + assert := assert.New(t) + assert.Equal("fix", workflowForJob(storage.JobTypeFix, config.ReviewTypeDefault)) + assert.Equal("fix", workflowForJob(storage.JobTypeCompact, config.ReviewTypeDefault)) + assert.Equal("review", workflowForJob(storage.JobTypeReview, config.ReviewTypeDefault)) + assert.Equal("security", workflowForJob(storage.JobTypeReview, "security")) +} + +func TestResolveRerunModelProviderUsesWorktreeConfig(t *testing.T) { + mainRepo := t.TempDir() + worktreeRepo := t.TempDir() + + require.NoError(t, os.WriteFile(filepath.Join(mainRepo, ".roborev.toml"), []byte("review_model = \"main-model\"\n"), 0o644)) + require.NoError(t, os.WriteFile(filepath.Join(worktreeRepo, ".roborev.toml"), []byte("review_model = \"worktree-model\"\n"), 0o644)) + + job := &storage.ReviewJob{ + Agent: "test", + JobType: storage.JobTypeReview, + ReviewType: config.ReviewTypeDefault, + Reasoning: "thorough", + RepoPath: mainRepo, + WorktreePath: worktreeRepo, + } + + model, provider := resolveRerunModelProvider(job, config.DefaultConfig()) + assert.Equal(t, "worktree-model", model) + assert.Empty(t, provider) +} + // TestHandleAddCommentToJobStates tests that comments can be added to jobs // in any state: queued, running, done, failed, and canceled. func TestHandleAddCommentToJobStates(t *testing.T) { diff --git a/internal/daemon/server_jobs_test.go b/internal/daemon/server_jobs_test.go index 034ad81c..2b4b9906 100644 --- a/internal/daemon/server_jobs_test.go +++ b/internal/daemon/server_jobs_test.go @@ -2569,6 +2569,7 @@ func TestHandleEnqueueAgentOverrideModel(t *testing.T) { return false }, "model = %q, want %q", job.Model, tt.wantModel) } + assert.Equal(t, tt.reqModel, job.RequestedModel) }) } } diff --git a/internal/storage/db.go b/internal/storage/db.go index f0219eed..620383f0 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -41,6 +41,8 @@ CREATE TABLE IF NOT EXISTS review_jobs ( session_id TEXT, agent TEXT NOT NULL DEFAULT 'codex', model TEXT, + requested_model TEXT, + requested_provider TEXT, reasoning TEXT NOT NULL DEFAULT 'thorough', status TEXT NOT NULL CHECK(status IN ('queued','running','done','failed','canceled','applied','rebased')) DEFAULT 'queued', enqueued_at TEXT NOT NULL DEFAULT (datetime('now')), @@ -334,6 +336,9 @@ func (db *DB) migrate() error { session_id TEXT, agent TEXT NOT NULL DEFAULT 'codex', model TEXT, + provider TEXT, + requested_model TEXT, + requested_provider TEXT, reasoning TEXT NOT NULL DEFAULT 'thorough', status TEXT NOT NULL CHECK(status IN ('queued','running','done','failed','canceled','applied','rebased')) DEFAULT 'queued', enqueued_at TEXT NOT NULL DEFAULT (datetime('now')), @@ -352,8 +357,8 @@ func (db *DB) migrate() error { } // Check which optional columns exist in source table - var hasDiffContent, hasReasoning, hasAgentic, hasModel, hasBranch, hasSessionID bool - checkRows, checkErr := tx.Query(`SELECT name FROM pragma_table_info('review_jobs') WHERE name IN ('diff_content', 'reasoning', 'agentic', 'model', 'branch', 'session_id')`) + var hasDiffContent, hasReasoning, hasAgentic, hasModel, hasProvider, hasRequestedModel, hasRequestedProvider, hasBranch, hasSessionID bool + checkRows, checkErr := tx.Query(`SELECT name FROM pragma_table_info('review_jobs') WHERE name IN ('diff_content', 'reasoning', 'agentic', 'model', 'provider', 'requested_model', 'requested_provider', 'branch', 'session_id')`) if checkErr == nil { for checkRows.Next() { var colName string @@ -367,6 +372,12 @@ func (db *DB) migrate() error { hasAgentic = true case "model": hasModel = true + case "provider": + hasProvider = true + case "requested_model": + hasRequestedModel = true + case "requested_provider": + hasRequestedProvider = true case "branch": hasBranch = true case "session_id": @@ -391,6 +402,15 @@ func (db *DB) migrate() error { if hasModel { baseCols = append(baseCols, "model") } + if hasProvider { + baseCols = append(baseCols, "provider") + } + if hasRequestedModel { + baseCols = append(baseCols, "requested_model") + } + if hasRequestedProvider { + baseCols = append(baseCols, "requested_provider") + } if hasReasoning { baseCols = append(baseCols, "reasoning") } @@ -695,6 +715,30 @@ func (db *DB) migrate() error { } } + // Migration: add requested_model column to review_jobs if missing + err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('review_jobs') WHERE name = 'requested_model'`).Scan(&count) + if err != nil { + return fmt.Errorf("check requested_model column: %w", err) + } + if count == 0 { + _, err = db.Exec(`ALTER TABLE review_jobs ADD COLUMN requested_model TEXT`) + if err != nil { + return fmt.Errorf("add requested_model column: %w", err) + } + } + + // Migration: add requested_provider column to review_jobs if missing + err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('review_jobs') WHERE name = 'requested_provider'`).Scan(&count) + if err != nil { + return fmt.Errorf("check requested_provider column: %w", err) + } + if count == 0 { + _, err = db.Exec(`ALTER TABLE review_jobs ADD COLUMN requested_provider TEXT`) + if err != nil { + return fmt.Errorf("add requested_provider column: %w", err) + } + } + // Migration: add token_usage column to review_jobs if missing err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('review_jobs') WHERE name = 'token_usage'`).Scan(&count) if err != nil { diff --git a/internal/storage/db_job_test.go b/internal/storage/db_job_test.go index 55697b01..02fed826 100644 --- a/internal/storage/db_job_test.go +++ b/internal/storage/db_job_test.go @@ -808,7 +808,7 @@ func TestReenqueueJob(t *testing.T) { db.ClaimJob("worker-1") db.FailJob(job.ID, "", "some error") - err := db.ReenqueueJob(job.ID) + err := db.ReenqueueJob(job.ID, ReenqueueOpts{}) require.NoError(t, err, "ReenqueueJob failed: %v") updated, _ := db.GetJobByID(job.ID) @@ -822,7 +822,7 @@ func TestReenqueueJob(t *testing.T) { _, _, job := createJobChain(t, db, "/tmp/test-repo", "rerun-canceled") db.CancelJob(job.ID) - err := db.ReenqueueJob(job.ID) + err := db.ReenqueueJob(job.ID, ReenqueueOpts{}) require.NoError(t, err, "ReenqueueJob failed: %v") updated, _ := db.GetJobByID(job.ID) @@ -844,7 +844,7 @@ func TestReenqueueJob(t *testing.T) { } db.CompleteJob(job.ID, "codex", "prompt", "output") - err := db.ReenqueueJob(job.ID) + err := db.ReenqueueJob(job.ID, ReenqueueOpts{}) require.NoError(t, err, "ReenqueueJob failed: %v") updated, _ := db.GetJobByID(job.ID) @@ -854,7 +854,7 @@ func TestReenqueueJob(t *testing.T) { t.Run("rerun queued job fails", func(t *testing.T) { _, _, job := createJobChain(t, db, "/tmp/test-repo", "rerun-queued") - err := db.ReenqueueJob(job.ID) + err := db.ReenqueueJob(job.ID, ReenqueueOpts{}) require.Error(t, err) }) @@ -862,15 +862,52 @@ func TestReenqueueJob(t *testing.T) { _, _, job := createJobChain(t, db, "/tmp/test-repo", "rerun-running") db.ClaimJob("worker-1") - err := db.ReenqueueJob(job.ID) + err := db.ReenqueueJob(job.ID, ReenqueueOpts{}) require.Error(t, err) }) t.Run("rerun nonexistent job fails", func(t *testing.T) { - err := db.ReenqueueJob(99999) + err := db.ReenqueueJob(99999, ReenqueueOpts{}) require.Error(t, err) }) + t.Run("rerun updates effective model and preserves requested model", func(t *testing.T) { + isolatedDB := openTestDB(t) + defer isolatedDB.Close() + + repo := createRepo(t, isolatedDB, "/tmp/rerun-requested-model") + commit := createCommit(t, isolatedDB, repo.ID, "rerun-requested-model-sha") + + job, err := isolatedDB.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "rerun-requested-model-sha", + Agent: "opencode", + Model: "minimax-m2.5-free", + RequestedModel: "minimax-m2.5-free", + RequestedProvider: "anthropic", + Provider: "anthropic", + }) + require.NoError(t, err) + + claimed, err := isolatedDB.ClaimJob("worker-1") + require.NoError(t, err) + require.NotNil(t, claimed) + assert.Equal(t, job.ID, claimed.ID) + require.NoError(t, isolatedDB.CompleteJob(job.ID, "opencode", "prompt", "output")) + + err = isolatedDB.ReenqueueJob(job.ID, ReenqueueOpts{Model: "openai/gpt-5", Provider: "openai"}) + require.NoError(t, err) + + updated, err := isolatedDB.GetJobByID(job.ID) + require.NoError(t, err) + assert.Equal(t, JobStatusQueued, updated.Status) + assert.Equal(t, "openai/gpt-5", updated.Model) + assert.Equal(t, "openai", updated.Provider) + assert.Equal(t, "minimax-m2.5-free", updated.RequestedModel) + assert.Equal(t, "anthropic", updated.RequestedProvider) + }) + t.Run("rerun preserves worktree_path", func(t *testing.T) { isolatedDB := openTestDB(t) defer isolatedDB.Close() @@ -895,7 +932,7 @@ func TestReenqueueJob(t *testing.T) { err = isolatedDB.CompleteJob(job.ID, "test", "prompt", "output") require.NoError(t, err) - err = isolatedDB.ReenqueueJob(job.ID) + err = isolatedDB.ReenqueueJob(job.ID, ReenqueueOpts{}) require.NoError(t, err) updated, err := isolatedDB.GetJobByID(job.ID) @@ -924,7 +961,7 @@ func TestReenqueueJob(t *testing.T) { assert.Equal(t, "first output", review1.Output) // Re-enqueue the done job - err = isolatedDB.ReenqueueJob(job.ID) + err = isolatedDB.ReenqueueJob(job.ID, ReenqueueOpts{}) require.NoError(t, err, "ReenqueueJob failed: %v") // Verify review was deleted @@ -1113,7 +1150,7 @@ func TestSaveJobSessionID_StaleWorkerIgnored(t *testing.T) { err = db.CancelJob(job.ID) require.NoError(t, err, "CancelJob: %v", err) - err = db.ReenqueueJob(job.ID) + err = db.ReenqueueJob(job.ID, ReenqueueOpts{}) require.NoError(t, err, "ReenqueueJob: %v", err) j, err = db.GetJobByID(job.ID) diff --git a/internal/storage/db_migration_test.go b/internal/storage/db_migration_test.go index bbfbaca5..5a6352ce 100644 --- a/internal/storage/db_migration_test.go +++ b/internal/storage/db_migration_test.go @@ -575,8 +575,8 @@ func TestMigrationQuotedTableWithOrphanedFK(t *testing.T) { VALUES (1, '/tmp/test', 'test'); INSERT INTO commits (id, repo_id, sha, author, subject, timestamp) VALUES (1, 1, 'abc123', 'Author', 'Subject', '2024-01-01'); - INSERT INTO review_jobs (id, repo_id, commit_id, git_ref, agent, status) - VALUES (1, 1, 1, 'abc123', 'codex', 'done'); + INSERT INTO review_jobs (id, repo_id, commit_id, git_ref, agent, status, agentic) + VALUES (1, 1, 1, 'abc123', 'codex', 'done', 1); INSERT INTO reviews (id, job_id, agent, prompt, output) VALUES (1, 1, 'codex', 'test', 'looks good'); `) @@ -623,6 +623,10 @@ func TestMigrationQuotedTableWithOrphanedFK(t *testing.T) { }, "Review output not preserved: got %q", review.Output) } + job, err := db.GetJobByID(1) + require.NoError(t, err) + assert.True(t, job.Agentic, "agentic flag should survive rebuild migration") + // Verify applied/rebased statuses work _, err = db.Exec( `UPDATE review_jobs SET status = 'applied' WHERE id = 1`, diff --git a/internal/storage/hydration.go b/internal/storage/hydration.go index 804324f3..bde009c0 100644 --- a/internal/storage/hydration.go +++ b/internal/storage/hydration.go @@ -7,31 +7,33 @@ type sqlScanner interface { } type reviewJobScanFields struct { - EnqueuedAt string - StartedAt sql.NullString - FinishedAt sql.NullString - WorkerID sql.NullString - Error sql.NullString - Prompt sql.NullString - SourceMachineID sql.NullString - UUID sql.NullString - Model sql.NullString - Provider sql.NullString - Branch sql.NullString - SessionID sql.NullString - CommitID sql.NullInt64 - CommitSubject sql.NullString - JobType sql.NullString - ReviewType sql.NullString - PatchID sql.NullString - ParentJobID sql.NullInt64 - Patch sql.NullString - DiffContent sql.NullString - OutputPrefix sql.NullString - TokenUsage sql.NullString - Agentic int - Closed sql.NullInt64 - WorktreePath string + EnqueuedAt string + StartedAt sql.NullString + FinishedAt sql.NullString + WorkerID sql.NullString + Error sql.NullString + Prompt sql.NullString + SourceMachineID sql.NullString + UUID sql.NullString + Model sql.NullString + Provider sql.NullString + RequestedModel sql.NullString + RequestedProvider sql.NullString + Branch sql.NullString + SessionID sql.NullString + CommitID sql.NullInt64 + CommitSubject sql.NullString + JobType sql.NullString + ReviewType sql.NullString + PatchID sql.NullString + ParentJobID sql.NullInt64 + Patch sql.NullString + DiffContent sql.NullString + OutputPrefix sql.NullString + TokenUsage sql.NullString + Agentic int + Closed sql.NullInt64 + WorktreePath string } func applyReviewJobScan(job *ReviewJob, fields reviewJobScanFields) { @@ -53,6 +55,12 @@ func applyReviewJobScan(job *ReviewJob, fields reviewJobScanFields) { if fields.Provider.Valid { job.Provider = fields.Provider.String } + if fields.RequestedModel.Valid { + job.RequestedModel = fields.RequestedModel.String + } + if fields.RequestedProvider.Valid { + job.RequestedProvider = fields.RequestedProvider.String + } if fields.JobType.Valid { job.JobType = fields.JobType.String } diff --git a/internal/storage/jobs.go b/internal/storage/jobs.go index cf4f69cd..09f5a65a 100644 --- a/internal/storage/jobs.go +++ b/internal/storage/jobs.go @@ -40,25 +40,27 @@ func parseSQLiteTime(s string) time.Time { // - CommitID > 0 → "review" (single commit) // - otherwise → "range" (commit range) type EnqueueOpts struct { - RepoID int64 - CommitID int64 // >0 for single-commit reviews - GitRef string // SHA, "start..end" range, or "dirty" - Branch string - SessionID string - Agent string - Model string - Provider string // e.g. "anthropic", "openai" - Reasoning string - ReviewType string // e.g. "security" — changes which system prompt is used - PatchID string // Stable patch-id for rebase tracking - DiffContent string // For dirty reviews (captured at enqueue time) - Prompt string // For task jobs (pre-stored prompt) - OutputPrefix string // Prefix to prepend to review output - Agentic bool // Allow file edits and command execution - Label string // Display label in TUI for task jobs (default: "prompt") - JobType string // Explicit job type (review/range/dirty/task/compact/fix); inferred if empty - ParentJobID int64 // Parent job being fixed (for fix jobs) - WorktreePath string // Worktree checkout path (empty = use main repo root) + RepoID int64 + CommitID int64 // >0 for single-commit reviews + GitRef string // SHA, "start..end" range, or "dirty" + Branch string + SessionID string + Agent string + Model string // Effective model for this run + Provider string // Effective provider for this run (e.g. "anthropic", "openai") + RequestedModel string // Explicitly requested model; empty means reevaluate on rerun + RequestedProvider string // Explicitly requested provider; empty means reevaluate on rerun + Reasoning string + ReviewType string // e.g. "security" — changes which system prompt is used + PatchID string // Stable patch-id for rebase tracking + DiffContent string // For dirty reviews (captured at enqueue time) + Prompt string // For task jobs (pre-stored prompt) + OutputPrefix string // Prefix to prepend to review output + Agentic bool // Allow file edits and command execution + Label string // Display label in TUI for task jobs (default: "prompt") + JobType string // Explicit job type (review/range/dirty/task/compact/fix); inferred if empty + ParentJobID int64 // Parent job being fixed (for fix jobs) + WorktreePath string // Worktree checkout path (empty = use main repo root) } // EnqueueJob creates a new review job. The job type is inferred from opts. @@ -118,12 +120,12 @@ func (db *DB) EnqueueJob(opts EnqueueOpts) (*ReviewJob, error) { } result, err := db.Exec(` - INSERT INTO review_jobs (repo_id, commit_id, git_ref, branch, session_id, agent, model, provider, reasoning, + INSERT INTO review_jobs (repo_id, commit_id, git_ref, branch, session_id, agent, model, provider, requested_model, requested_provider, reasoning, status, job_type, review_type, patch_id, diff_content, prompt, agentic, output_prefix, parent_job_id, uuid, source_machine_id, updated_at, worktree_path) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'queued', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'queued', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, opts.RepoID, commitIDParam, gitRef, nullString(opts.Branch), nullString(opts.SessionID), - opts.Agent, nullString(opts.Model), nullString(opts.Provider), reasoning, + opts.Agent, nullString(opts.Model), nullString(opts.Provider), nullString(opts.RequestedModel), nullString(opts.RequestedProvider), reasoning, jobType, opts.ReviewType, nullString(opts.PatchID), nullString(opts.DiffContent), nullString(opts.Prompt), agenticInt, nullString(opts.OutputPrefix), parentJobIDParam, @@ -134,27 +136,29 @@ func (db *DB) EnqueueJob(opts EnqueueOpts) (*ReviewJob, error) { id, _ := result.LastInsertId() job := &ReviewJob{ - ID: id, - RepoID: opts.RepoID, - GitRef: gitRef, - Branch: opts.Branch, - SessionID: opts.SessionID, - Agent: opts.Agent, - Model: opts.Model, - Provider: opts.Provider, - Reasoning: reasoning, - JobType: jobType, - ReviewType: opts.ReviewType, - PatchID: opts.PatchID, - Status: JobStatusQueued, - EnqueuedAt: now, - Prompt: opts.Prompt, - Agentic: opts.Agentic, - OutputPrefix: opts.OutputPrefix, - UUID: uid, - SourceMachineID: machineID, - UpdatedAt: &now, - WorktreePath: opts.WorktreePath, + ID: id, + RepoID: opts.RepoID, + GitRef: gitRef, + Branch: opts.Branch, + SessionID: opts.SessionID, + Agent: opts.Agent, + Model: opts.Model, + Provider: opts.Provider, + RequestedModel: opts.RequestedModel, + RequestedProvider: opts.RequestedProvider, + Reasoning: reasoning, + JobType: jobType, + ReviewType: opts.ReviewType, + PatchID: opts.PatchID, + Status: JobStatusQueued, + EnqueuedAt: now, + Prompt: opts.Prompt, + Agentic: opts.Agentic, + OutputPrefix: opts.OutputPrefix, + UUID: uid, + SourceMachineID: machineID, + UpdatedAt: &now, + WorktreePath: opts.WorktreePath, } if opts.ParentJobID > 0 { job.ParentJobID = &opts.ParentJobID @@ -202,7 +206,7 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { var job ReviewJob var fields reviewJobScanFields err = db.QueryRow(` - SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.session_id, j.agent, j.model, j.provider, j.reasoning, j.status, j.enqueued_at, + SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.session_id, j.agent, j.model, j.provider, j.requested_model, j.requested_provider, j.reasoning, j.status, j.enqueued_at, r.root_path, r.name, c.subject, j.diff_content, j.prompt, COALESCE(j.agentic, 0), j.job_type, j.review_type, j.output_prefix, j.patch_id, j.parent_job_id, COALESCE(j.worktree_path, '') FROM review_jobs j @@ -211,7 +215,7 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { WHERE j.worker_id = ? AND j.status = 'running' ORDER BY j.started_at DESC LIMIT 1 - `, workerID).Scan(&job.ID, &job.RepoID, &fields.CommitID, &job.GitRef, &fields.Branch, &fields.SessionID, &job.Agent, &fields.Model, &fields.Provider, &job.Reasoning, &job.Status, &fields.EnqueuedAt, + `, workerID).Scan(&job.ID, &job.RepoID, &fields.CommitID, &job.GitRef, &fields.Branch, &fields.SessionID, &job.Agent, &fields.Model, &fields.Provider, &fields.RequestedModel, &fields.RequestedProvider, &job.Reasoning, &job.Status, &fields.EnqueuedAt, &job.RepoPath, &job.RepoName, &fields.CommitSubject, &fields.DiffContent, &fields.Prompt, &fields.Agentic, &fields.JobType, &fields.ReviewType, &fields.OutputPrefix, &fields.PatchID, &fields.ParentJobID, &fields.WorktreePath) if err != nil { @@ -514,10 +518,15 @@ func (db *DB) MarkJobRebased(jobID int64) error { return nil } +type ReenqueueOpts struct { + Model string + Provider string +} + // ReenqueueJob resets a completed, failed, or canceled job back to queued status. // This allows manual re-running of jobs to get a fresh review. // For done jobs, the existing review is deleted to avoid unique constraint violations. -func (db *DB) ReenqueueJob(jobID int64) error { +func (db *DB) ReenqueueJob(jobID int64, opts ReenqueueOpts) error { ctx := context.Background() conn, err := db.Conn(ctx) if err != nil { @@ -543,12 +552,15 @@ func (db *DB) ReenqueueJob(jobID int64) error { return err } - // Reset job status + nowStr := time.Now().Format(time.RFC3339) + + // Reset job status and replace effective execution settings with the + // newly resolved values for this rerun. result, err := conn.ExecContext(ctx, ` UPDATE review_jobs - SET status = 'queued', worker_id = NULL, started_at = NULL, finished_at = NULL, error = NULL, retry_count = 0, patch = NULL, session_id = NULL + SET status = 'queued', worker_id = NULL, started_at = NULL, finished_at = NULL, error = NULL, retry_count = 0, patch = NULL, session_id = NULL, model = ?, provider = ?, updated_at = ? WHERE id = ? AND status IN ('done', 'failed', 'canceled') - `, jobID) + `, nullString(opts.Model), nullString(opts.Provider), nowStr, jobID) if err != nil { return err } @@ -771,7 +783,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int j.started_at, j.finished_at, j.worker_id, j.error, j.prompt, j.retry_count, COALESCE(j.agentic, 0), r.root_path, r.name, c.subject, rv.closed, rv.output, rv.verdict_bool, j.source_machine_id, j.uuid, j.model, j.job_type, j.review_type, j.patch_id, - j.parent_job_id, j.provider, j.token_usage, COALESCE(j.worktree_path, '') + j.parent_job_id, j.provider, j.requested_model, j.requested_provider, j.token_usage, COALESCE(j.worktree_path, '') FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id @@ -809,7 +821,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int &fields.StartedAt, &fields.FinishedAt, &fields.WorkerID, &fields.Error, &fields.Prompt, &j.RetryCount, &fields.Agentic, &j.RepoPath, &j.RepoName, &fields.CommitSubject, &fields.Closed, &output, &verdictBool, &fields.SourceMachineID, &fields.UUID, &fields.Model, &fields.JobType, &fields.ReviewType, &fields.PatchID, - &fields.ParentJobID, &fields.Provider, &fields.TokenUsage, &fields.WorktreePath) + &fields.ParentJobID, &fields.Provider, &fields.RequestedModel, &fields.RequestedProvider, &fields.TokenUsage, &fields.WorktreePath) if err != nil { return nil, err } @@ -858,7 +870,7 @@ func (db *DB) GetJobByID(id int64) (*ReviewJob, error) { err := db.QueryRow(` SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.session_id, j.agent, j.reasoning, j.status, j.enqueued_at, j.started_at, j.finished_at, j.worker_id, j.error, j.prompt, COALESCE(j.agentic, 0), - r.root_path, r.name, c.subject, j.model, j.provider, j.job_type, j.review_type, j.patch_id, + r.root_path, r.name, c.subject, j.model, j.provider, j.requested_model, j.requested_provider, j.job_type, j.review_type, j.patch_id, j.parent_job_id, j.patch, j.token_usage, COALESCE(j.worktree_path, '') FROM review_jobs j JOIN repos r ON r.id = j.repo_id @@ -866,7 +878,7 @@ func (db *DB) GetJobByID(id int64) (*ReviewJob, error) { WHERE j.id = ? `, id).Scan(&j.ID, &j.RepoID, &fields.CommitID, &j.GitRef, &fields.Branch, &fields.SessionID, &j.Agent, &j.Reasoning, &j.Status, &fields.EnqueuedAt, &fields.StartedAt, &fields.FinishedAt, &fields.WorkerID, &fields.Error, &fields.Prompt, &fields.Agentic, - &j.RepoPath, &j.RepoName, &fields.CommitSubject, &fields.Model, &fields.Provider, &fields.JobType, &fields.ReviewType, &fields.PatchID, + &j.RepoPath, &j.RepoName, &fields.CommitSubject, &fields.Model, &fields.Provider, &fields.RequestedModel, &fields.RequestedProvider, &fields.JobType, &fields.ReviewType, &fields.PatchID, &fields.ParentJobID, &fields.Patch, &fields.TokenUsage, &fields.WorktreePath) if err != nil { return nil, err diff --git a/internal/storage/models.go b/internal/storage/models.go index 91caa51f..5a08e38a 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -47,34 +47,36 @@ const ( ) type ReviewJob struct { - ID int64 `json:"id"` - RepoID int64 `json:"repo_id"` - CommitID *int64 `json:"commit_id,omitempty"` // nil for ranges - GitRef string `json:"git_ref"` // SHA or "start..end" for ranges - Branch string `json:"branch,omitempty"` // Branch name at time of job creation - SessionID string `json:"session_id,omitempty"` // Reused prior session or captured current session ID - Agent string `json:"agent"` - Model string `json:"model,omitempty"` // Model to use (for opencode: provider/model format) - Provider string `json:"provider,omitempty"` // Provider to use (e.g., anthropic, openai) - Reasoning string `json:"reasoning,omitempty"` // thorough, standard, fast (default: thorough) - JobType string `json:"job_type"` // review, range, dirty, task, insights, compact, fix - Status JobStatus `json:"status"` - EnqueuedAt time.Time `json:"enqueued_at"` - StartedAt *time.Time `json:"started_at,omitempty"` - FinishedAt *time.Time `json:"finished_at,omitempty"` - WorkerID string `json:"worker_id,omitempty"` - Error string `json:"error,omitempty"` - Prompt string `json:"prompt,omitempty"` - RetryCount int `json:"retry_count"` - DiffContent *string `json:"diff_content,omitempty"` // For dirty reviews (uncommitted changes) - Agentic bool `json:"agentic"` // Enable agentic mode (allow file edits) - ReviewType string `json:"review_type,omitempty"` // Review type (e.g., "security") - changes system prompt - PatchID string `json:"patch_id,omitempty"` // Stable patch-id for rebase tracking - OutputPrefix string `json:"output_prefix,omitempty"` // Prefix to prepend to review output - ParentJobID *int64 `json:"parent_job_id,omitempty"` // Job being fixed (for fix jobs) - Patch *string `json:"patch,omitempty"` // Generated diff patch (for completed fix jobs) - WorktreePath string `json:"worktree_path,omitempty"` // Worktree checkout path (empty = use RepoPath) - TokenUsage string `json:"token_usage,omitempty"` // JSON blob from agentsview (token consumption) + ID int64 `json:"id"` + RepoID int64 `json:"repo_id"` + CommitID *int64 `json:"commit_id,omitempty"` // nil for ranges + GitRef string `json:"git_ref"` // SHA or "start..end" for ranges + Branch string `json:"branch,omitempty"` // Branch name at time of job creation + SessionID string `json:"session_id,omitempty"` // Reused prior session or captured current session ID + Agent string `json:"agent"` + Model string `json:"model,omitempty"` // Effective model for this run (for opencode: provider/model format) + Provider string `json:"provider,omitempty"` // Effective provider for this run (e.g., anthropic, openai) + RequestedModel string `json:"requested_model,omitempty"` // Explicitly requested model; empty means reevaluate on rerun + RequestedProvider string `json:"requested_provider,omitempty"` // Explicitly requested provider; empty means reevaluate on rerun + Reasoning string `json:"reasoning,omitempty"` // thorough, standard, fast (default: thorough) + JobType string `json:"job_type"` // review, range, dirty, task, insights, compact, fix + Status JobStatus `json:"status"` + EnqueuedAt time.Time `json:"enqueued_at"` + StartedAt *time.Time `json:"started_at,omitempty"` + FinishedAt *time.Time `json:"finished_at,omitempty"` + WorkerID string `json:"worker_id,omitempty"` + Error string `json:"error,omitempty"` + Prompt string `json:"prompt,omitempty"` + RetryCount int `json:"retry_count"` + DiffContent *string `json:"diff_content,omitempty"` // For dirty reviews (uncommitted changes) + Agentic bool `json:"agentic"` // Enable agentic mode (allow file edits) + ReviewType string `json:"review_type,omitempty"` // Review type (e.g., "security") - changes system prompt + PatchID string `json:"patch_id,omitempty"` // Stable patch-id for rebase tracking + OutputPrefix string `json:"output_prefix,omitempty"` // Prefix to prepend to review output + ParentJobID *int64 `json:"parent_job_id,omitempty"` // Job being fixed (for fix jobs) + Patch *string `json:"patch,omitempty"` // Generated diff patch (for completed fix jobs) + WorktreePath string `json:"worktree_path,omitempty"` // Worktree checkout path (empty = use RepoPath) + TokenUsage string `json:"token_usage,omitempty"` // JSON blob from agentsview (token consumption) // Sync fields UUID string `json:"uuid,omitempty"` // Globally unique identifier for sync SourceMachineID string `json:"source_machine_id,omitempty"` // Machine that created this job diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index ac919336..bda83939 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -14,12 +14,12 @@ import ( ) // PostgreSQL schema version - increment when schema changes -const pgSchemaVersion = 9 +const pgSchemaVersion = 10 // pgSchemaName is the PostgreSQL schema used to isolate roborev tables const pgSchemaName = "roborev" -//go:embed schemas/postgres_v9.sql +//go:embed schemas/postgres_v10.sql var pgSchemaSQL string // pgSchemaStatements returns the individual DDL statements for schema creation. @@ -288,6 +288,20 @@ func (p *PgPool) EnsureSchema(ctx context.Context) error { return fmt.Errorf("migrate to v9 (add worktree_path column): %w", err) } } + if currentVersion < 10 { + _, err = p.pool.Exec(ctx, `ALTER TABLE review_jobs ADD COLUMN IF NOT EXISTS provider TEXT`) + if err != nil { + return fmt.Errorf("migrate to v10 (add provider column): %w", err) + } + _, err = p.pool.Exec(ctx, `ALTER TABLE review_jobs ADD COLUMN IF NOT EXISTS requested_model TEXT`) + if err != nil { + return fmt.Errorf("migrate to v10 (add requested_model column): %w", err) + } + _, err = p.pool.Exec(ctx, `ALTER TABLE review_jobs ADD COLUMN IF NOT EXISTS requested_provider TEXT`) + if err != nil { + return fmt.Errorf("migrate to v10 (add requested_provider column): %w", err) + } + } // Update version _, err = p.pool.Exec(ctx, `INSERT INTO schema_version (version) VALUES ($1) ON CONFLICT (version) DO NOTHING`, pgSchemaVersion) if err != nil { @@ -539,15 +553,18 @@ func (p *PgPool) Tx(ctx context.Context, fn func(tx pgx.Tx) error) error { func (p *PgPool) UpsertJob(ctx context.Context, j SyncableJob, pgRepoID int64, pgCommitID *int64) error { _, err := p.pool.Exec(ctx, ` INSERT INTO review_jobs ( - uuid, repo_id, commit_id, git_ref, session_id, agent, model, reasoning, job_type, review_type, patch_id, status, agentic, + uuid, repo_id, commit_id, git_ref, session_id, agent, model, provider, requested_model, requested_provider, reasoning, job_type, review_type, patch_id, status, agentic, enqueued_at, started_at, finished_at, prompt, diff_content, error, token_usage, worktree_path, source_machine_id, updated_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, NOW()) + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, NOW()) ON CONFLICT (uuid) DO UPDATE SET status = EXCLUDED.status, finished_at = EXCLUDED.finished_at, error = EXCLUDED.error, - model = COALESCE(EXCLUDED.model, review_jobs.model), + model = EXCLUDED.model, + provider = EXCLUDED.provider, + requested_model = EXCLUDED.requested_model, + requested_provider = EXCLUDED.requested_provider, git_ref = EXCLUDED.git_ref, session_id = COALESCE(EXCLUDED.session_id, review_jobs.session_id), commit_id = EXCLUDED.commit_id, @@ -555,7 +572,7 @@ func (p *PgPool) UpsertJob(ctx context.Context, j SyncableJob, pgRepoID int64, p token_usage = COALESCE(EXCLUDED.token_usage, review_jobs.token_usage), worktree_path = COALESCE(EXCLUDED.worktree_path, review_jobs.worktree_path), updated_at = NOW() - `, j.UUID, pgRepoID, pgCommitID, j.GitRef, nullString(j.SessionID), j.Agent, nullString(j.Model), nullString(j.Reasoning), + `, j.UUID, pgRepoID, pgCommitID, j.GitRef, nullString(j.SessionID), j.Agent, nullString(j.Model), nullString(j.Provider), nullString(j.RequestedModel), nullString(j.RequestedProvider), nullString(j.Reasoning), defaultStr(j.JobType, "review"), j.ReviewType, nullString(j.PatchID), j.Status, j.Agentic, j.EnqueuedAt, j.StartedAt, j.FinishedAt, nullString(j.Prompt), j.DiffContent, nullString(j.Error), nullString(j.TokenUsage), nullString(j.WorktreePath), j.SourceMachineID) return err @@ -590,32 +607,35 @@ func (p *PgPool) InsertResponse(ctx context.Context, r SyncableResponse) error { // PulledJob represents a job pulled from PostgreSQL type PulledJob struct { - UUID string - RepoIdentity string - CommitSHA string - CommitAuthor string - CommitSubject string - CommitTimestamp time.Time - GitRef string - SessionID string - Agent string - Model string - Reasoning string - JobType string - ReviewType string - PatchID string - Status string - Agentic bool - EnqueuedAt time.Time - StartedAt *time.Time - FinishedAt *time.Time - Prompt string - DiffContent *string - Error string - TokenUsage string - WorktreePath string - SourceMachineID string - UpdatedAt time.Time + UUID string + RepoIdentity string + CommitSHA string + CommitAuthor string + CommitSubject string + CommitTimestamp time.Time + GitRef string + SessionID string + Agent string + Model string + Provider string + RequestedModel string + RequestedProvider string + Reasoning string + JobType string + ReviewType string + PatchID string + Status string + Agentic bool + EnqueuedAt time.Time + StartedAt *time.Time + FinishedAt *time.Time + Prompt string + DiffContent *string + Error string + TokenUsage string + WorktreePath string + SourceMachineID string + UpdatedAt time.Time } // PullJobs fetches jobs from PostgreSQL updated after the given cursor. @@ -636,7 +656,7 @@ func (p *PgPool) PullJobs(ctx context.Context, excludeMachineID string, cursor s rows, err := p.pool.Query(ctx, ` SELECT j.uuid, r.identity, COALESCE(c.sha, ''), COALESCE(c.author, ''), COALESCE(c.subject, ''), COALESCE(c.timestamp, '1970-01-01'::timestamptz), - j.git_ref, COALESCE(j.session_id, ''), j.agent, COALESCE(j.model, ''), COALESCE(j.reasoning, ''), COALESCE(j.job_type, 'review'), COALESCE(j.review_type, ''), COALESCE(j.patch_id, ''), j.status, j.agentic, + j.git_ref, COALESCE(j.session_id, ''), j.agent, COALESCE(j.model, ''), COALESCE(j.provider, ''), COALESCE(j.requested_model, ''), COALESCE(j.requested_provider, ''), COALESCE(j.reasoning, ''), COALESCE(j.job_type, 'review'), COALESCE(j.review_type, ''), COALESCE(j.patch_id, ''), j.status, j.agentic, j.enqueued_at, j.started_at, j.finished_at, COALESCE(j.prompt, ''), j.diff_content, COALESCE(j.error, ''), COALESCE(j.token_usage, ''), COALESCE(j.worktree_path, ''), j.source_machine_id, j.updated_at, j.id @@ -663,7 +683,7 @@ func (p *PgPool) PullJobs(ctx context.Context, excludeMachineID string, cursor s err := rows.Scan( &j.UUID, &j.RepoIdentity, &j.CommitSHA, &j.CommitAuthor, &j.CommitSubject, &j.CommitTimestamp, - &j.GitRef, &j.SessionID, &j.Agent, &j.Model, &j.Reasoning, &j.JobType, &j.ReviewType, &j.PatchID, &j.Status, &j.Agentic, + &j.GitRef, &j.SessionID, &j.Agent, &j.Model, &j.Provider, &j.RequestedModel, &j.RequestedProvider, &j.Reasoning, &j.JobType, &j.ReviewType, &j.PatchID, &j.Status, &j.Agentic, &j.EnqueuedAt, &j.StartedAt, &j.FinishedAt, &j.Prompt, &diffContent, &j.Error, &j.TokenUsage, &j.WorktreePath, &j.SourceMachineID, &j.UpdatedAt, &lastID, diff --git a/internal/storage/postgres_test.go b/internal/storage/postgres_test.go index 1ea22add..be74303f 100644 --- a/internal/storage/postgres_test.go +++ b/internal/storage/postgres_test.go @@ -1069,36 +1069,44 @@ func TestIntegration_UpsertJob_BackfillsModel(t *testing.T) { // Upsert with a model value - should backfill job := SyncableJob{ - UUID: jobUUID, - RepoIdentity: repoIdentity, - GitRef: "HEAD", - Agent: "test-agent", - Model: "gpt-4", // Now providing a model - Status: "done", - SourceMachineID: machineID, - EnqueuedAt: time.Now(), + UUID: jobUUID, + RepoIdentity: repoIdentity, + GitRef: "HEAD", + Agent: "test-agent", + Model: "gpt-4", // Now providing a model + Provider: "openai", + RequestedModel: "gpt-4", + RequestedProvider: "openai", + Status: "done", + SourceMachineID: machineID, + EnqueuedAt: time.Now(), } err = pool.UpsertJob(ctx, job, repoID, nil) require.NoError(t, err, "UpsertJob failed: %v") - // Verify model was backfilled + // Verify model was backfilled. var modelAfter *string err = pool.pool.QueryRow(ctx, `SELECT model FROM review_jobs WHERE uuid = $1`, jobUUID).Scan(&modelAfter) require.NoError(t, err, "Failed to query model after: %v") - assert.NotNil(t, modelAfter, "Expected model to be backfilled, but it's still NULL") if modelAfter != nil { assert.Equal(t, "gpt-4", *modelAfter) } - // Also verify that upserting with empty model doesn't clear existing model - job.Model = "" // Empty model + // Upserting with empty values should clear the effective/requested fields so + // synced reruns can propagate implicit/default state. + job.Model = "" + job.Provider = "" + job.RequestedModel = "" + job.RequestedProvider = "" err = pool.UpsertJob(ctx, job, repoID, nil) - require.NoError(t, err, "UpsertJob (empty model) failed: %v") - - var modelPreserved *string - err = pool.pool.QueryRow(ctx, `SELECT model FROM review_jobs WHERE uuid = $1`, jobUUID).Scan(&modelPreserved) - require.NoError(t, err, "Failed to query model preserved: %v") - - assert.False(t, modelPreserved == nil || *modelPreserved != "gpt-4") + require.NoError(t, err, "UpsertJob (empty model/provider) failed: %v") + + var modelCleared, providerCleared, requestedModelCleared, requestedProviderCleared *string + err = pool.pool.QueryRow(ctx, `SELECT model, provider, requested_model, requested_provider FROM review_jobs WHERE uuid = $1`, jobUUID).Scan(&modelCleared, &providerCleared, &requestedModelCleared, &requestedProviderCleared) + require.NoError(t, err, "Failed to query cleared fields: %v") + assert.Nil(t, modelCleared, "Expected empty model upsert to clear existing model") + assert.Nil(t, providerCleared, "Expected empty provider upsert to clear existing provider") + assert.Nil(t, requestedModelCleared, "Expected empty requested_model upsert to clear existing requested model") + assert.Nil(t, requestedProviderCleared, "Expected empty requested_provider upsert to clear existing requested provider") } diff --git a/internal/storage/reviews.go b/internal/storage/reviews.go index 850ff663..7805e30c 100644 --- a/internal/storage/reviews.go +++ b/internal/storage/reviews.go @@ -18,7 +18,7 @@ func (db *DB) GetReviewByJobID(jobID int64) (*Review, error) { err := db.QueryRow(` SELECT rv.id, rv.job_id, rv.agent, rv.prompt, rv.output, rv.created_at, rv.closed, rv.uuid, rv.verdict_bool, j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.session_id, j.agent, j.reasoning, j.status, j.enqueued_at, - j.started_at, j.finished_at, j.worker_id, j.error, j.model, j.job_type, j.review_type, j.patch_id, + j.started_at, j.finished_at, j.worker_id, j.error, j.model, j.provider, j.requested_model, j.requested_provider, j.job_type, j.review_type, j.patch_id, rp.root_path, rp.name, c.subject, j.token_usage FROM reviews rv JOIN review_jobs j ON j.id = rv.job_id @@ -27,7 +27,7 @@ func (db *DB) GetReviewByJobID(jobID int64) (*Review, error) { WHERE rv.job_id = ? `, jobID).Scan(&r.ID, &r.JobID, &r.Agent, &r.Prompt, &r.Output, &reviewFields.CreatedAt, &reviewFields.Closed, &reviewFields.UUID, &reviewFields.VerdictBool, &job.ID, &job.RepoID, &jobFields.CommitID, &job.GitRef, &jobFields.Branch, &jobFields.SessionID, &job.Agent, &job.Reasoning, &job.Status, &jobFields.EnqueuedAt, - &jobFields.StartedAt, &jobFields.FinishedAt, &jobFields.WorkerID, &jobFields.Error, &jobFields.Model, &jobFields.JobType, &jobFields.ReviewType, &jobFields.PatchID, + &jobFields.StartedAt, &jobFields.FinishedAt, &jobFields.WorkerID, &jobFields.Error, &jobFields.Model, &jobFields.Provider, &jobFields.RequestedModel, &jobFields.RequestedProvider, &jobFields.JobType, &jobFields.ReviewType, &jobFields.PatchID, &job.RepoPath, &job.RepoName, &jobFields.CommitSubject, &jobFields.TokenUsage) if err != nil { return nil, err @@ -50,7 +50,7 @@ func (db *DB) GetReviewByCommitSHA(sha string) (*Review, error) { err := db.QueryRow(` SELECT rv.id, rv.job_id, rv.agent, rv.prompt, rv.output, rv.created_at, rv.closed, rv.uuid, rv.verdict_bool, j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.session_id, j.agent, j.reasoning, j.status, j.enqueued_at, - j.started_at, j.finished_at, j.worker_id, j.error, j.model, j.job_type, j.review_type, j.patch_id, + j.started_at, j.finished_at, j.worker_id, j.error, j.model, j.provider, j.requested_model, j.requested_provider, j.job_type, j.review_type, j.patch_id, rp.root_path, rp.name, c.subject, j.token_usage FROM reviews rv JOIN review_jobs j ON j.id = rv.job_id @@ -61,7 +61,7 @@ func (db *DB) GetReviewByCommitSHA(sha string) (*Review, error) { LIMIT 1 `, sha).Scan(&r.ID, &r.JobID, &r.Agent, &r.Prompt, &r.Output, &reviewFields.CreatedAt, &reviewFields.Closed, &reviewFields.UUID, &reviewFields.VerdictBool, &job.ID, &job.RepoID, &jobFields.CommitID, &job.GitRef, &jobFields.Branch, &jobFields.SessionID, &job.Agent, &job.Reasoning, &job.Status, &jobFields.EnqueuedAt, - &jobFields.StartedAt, &jobFields.FinishedAt, &jobFields.WorkerID, &jobFields.Error, &jobFields.Model, &jobFields.JobType, &jobFields.ReviewType, &jobFields.PatchID, + &jobFields.StartedAt, &jobFields.FinishedAt, &jobFields.WorkerID, &jobFields.Error, &jobFields.Model, &jobFields.Provider, &jobFields.RequestedModel, &jobFields.RequestedProvider, &jobFields.JobType, &jobFields.ReviewType, &jobFields.PatchID, &job.RepoPath, &job.RepoName, &jobFields.CommitSubject, &jobFields.TokenUsage) if err != nil { return nil, err diff --git a/internal/storage/schemas/postgres_v10.sql b/internal/storage/schemas/postgres_v10.sql new file mode 100644 index 00000000..b8157ae1 --- /dev/null +++ b/internal/storage/schemas/postgres_v10.sql @@ -0,0 +1,105 @@ +-- PostgreSQL schema version 10 +-- Adds provider plus requested model/provider provenance to review_jobs. +-- Note: Version is managed by EnsureSchema(), not this file. + +CREATE SCHEMA IF NOT EXISTS roborev; + +CREATE TABLE IF NOT EXISTS roborev.schema_version ( + version INTEGER PRIMARY KEY, + applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.machines ( + id SERIAL PRIMARY KEY, + machine_id UUID UNIQUE NOT NULL, + name TEXT, + last_seen_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.repos ( + id SERIAL PRIMARY KEY, + identity TEXT UNIQUE NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.commits ( + id SERIAL PRIMARY KEY, + repo_id INTEGER REFERENCES roborev.repos(id), + sha TEXT NOT NULL, + author TEXT NOT NULL, + subject TEXT NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + UNIQUE(repo_id, sha) +); + +CREATE TABLE IF NOT EXISTS roborev.review_jobs ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + repo_id INTEGER NOT NULL REFERENCES roborev.repos(id), + commit_id INTEGER REFERENCES roborev.commits(id), + git_ref TEXT NOT NULL, + branch TEXT, + session_id TEXT, + agent TEXT NOT NULL, + model TEXT, + provider TEXT, + requested_model TEXT, + requested_provider TEXT, + reasoning TEXT, + job_type TEXT NOT NULL DEFAULT 'review', + review_type TEXT NOT NULL DEFAULT '', + patch_id TEXT, + status TEXT NOT NULL CHECK(status IN ('queued', 'running', 'done', 'failed', 'canceled', 'applied', 'rebased')), + agentic BOOLEAN DEFAULT FALSE, + enqueued_at TIMESTAMP WITH TIME ZONE NOT NULL, + started_at TIMESTAMP WITH TIME ZONE, + finished_at TIMESTAMP WITH TIME ZONE, + prompt TEXT, + diff_content TEXT, + error TEXT, + token_usage TEXT, + worktree_path TEXT, + source_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.reviews ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + job_uuid UUID NOT NULL REFERENCES roborev.review_jobs(uuid), + agent TEXT NOT NULL, + prompt TEXT NOT NULL, + output TEXT NOT NULL, + closed BOOLEAN NOT NULL DEFAULT FALSE, + updated_by_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.responses ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + job_uuid UUID NOT NULL REFERENCES roborev.review_jobs(uuid), + responder TEXT NOT NULL, + response TEXT NOT NULL, + source_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_review_jobs_source ON roborev.review_jobs(source_machine_id); +CREATE INDEX IF NOT EXISTS idx_review_jobs_updated ON roborev.review_jobs(updated_at); +-- Note: idx_review_jobs_branch, idx_review_jobs_job_type, and +-- idx_review_jobs_patch_id are created by migration code, not here +-- (to support upgrades from older versions where those columns +-- don't exist yet). +CREATE INDEX IF NOT EXISTS idx_reviews_job_uuid ON roborev.reviews(job_uuid); +CREATE INDEX IF NOT EXISTS idx_reviews_updated ON roborev.reviews(updated_at); +CREATE INDEX IF NOT EXISTS idx_responses_job_uuid ON roborev.responses(job_uuid); +CREATE INDEX IF NOT EXISTS idx_responses_id ON roborev.responses(id); + +CREATE TABLE IF NOT EXISTS roborev.sync_metadata ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL +); diff --git a/internal/storage/sync.go b/internal/storage/sync.go index 6a54d951..9038f1c7 100644 --- a/internal/storage/sync.go +++ b/internal/storage/sync.go @@ -260,35 +260,38 @@ func (db *DB) GetRepoByIdentityCaseInsensitive(identity string) (*Repo, error) { // SyncableJob contains job data needed for sync type SyncableJob struct { - ID int64 - UUID string - RepoID int64 - RepoIdentity string - CommitID *int64 - CommitSHA string - CommitAuthor string - CommitSubject string - CommitTimestamp time.Time - GitRef string - SessionID string - Agent string - Model string - Reasoning string - JobType string - ReviewType string - PatchID string - Status string - Agentic bool - EnqueuedAt time.Time - StartedAt *time.Time - FinishedAt *time.Time - Prompt string - DiffContent *string - Error string - TokenUsage string - WorktreePath string - SourceMachineID string - UpdatedAt time.Time + ID int64 + UUID string + RepoID int64 + RepoIdentity string + CommitID *int64 + CommitSHA string + CommitAuthor string + CommitSubject string + CommitTimestamp time.Time + GitRef string + SessionID string + Agent string + Model string + Provider string + RequestedModel string + RequestedProvider string + Reasoning string + JobType string + ReviewType string + PatchID string + Status string + Agentic bool + EnqueuedAt time.Time + StartedAt *time.Time + FinishedAt *time.Time + Prompt string + DiffContent *string + Error string + TokenUsage string + WorktreePath string + SourceMachineID string + UpdatedAt time.Time } // GetJobsToSync returns terminal jobs that need to be pushed to PostgreSQL. @@ -298,7 +301,7 @@ func (db *DB) GetJobsToSync(machineID string, limit int) ([]SyncableJob, error) SELECT j.id, j.uuid, j.repo_id, COALESCE(r.identity, ''), j.commit_id, COALESCE(c.sha, ''), COALESCE(c.author, ''), COALESCE(c.subject, ''), COALESCE(c.timestamp, ''), - j.git_ref, COALESCE(j.session_id, ''), j.agent, COALESCE(j.model, ''), COALESCE(j.reasoning, ''), COALESCE(j.job_type, 'review'), COALESCE(j.review_type, ''), COALESCE(j.patch_id, ''), j.status, j.agentic, + j.git_ref, COALESCE(j.session_id, ''), j.agent, COALESCE(j.model, ''), COALESCE(j.provider, ''), COALESCE(j.requested_model, ''), COALESCE(j.requested_provider, ''), COALESCE(j.reasoning, ''), COALESCE(j.job_type, 'review'), COALESCE(j.review_type, ''), COALESCE(j.patch_id, ''), j.status, j.agentic, j.enqueued_at, COALESCE(j.started_at, ''), COALESCE(j.finished_at, ''), COALESCE(j.prompt, ''), j.diff_content, COALESCE(j.error, ''), COALESCE(j.token_usage, ''), COALESCE(j.worktree_path, ''), j.source_machine_id, j.updated_at @@ -333,7 +336,7 @@ func (db *DB) GetJobsToSync(machineID string, limit int) ([]SyncableJob, error) err := rows.Scan( &j.ID, &j.UUID, &j.RepoID, &j.RepoIdentity, &commitID, &j.CommitSHA, &j.CommitAuthor, &j.CommitSubject, &commitTimestamp, - &j.GitRef, &j.SessionID, &j.Agent, &j.Model, &j.Reasoning, &j.JobType, &j.ReviewType, &j.PatchID, &j.Status, &j.Agentic, + &j.GitRef, &j.SessionID, &j.Agent, &j.Model, &j.Provider, &j.RequestedModel, &j.RequestedProvider, &j.Reasoning, &j.JobType, &j.ReviewType, &j.PatchID, &j.Status, &j.Agentic, &enqueuedAt, &startedAt, &finishedAt, &j.Prompt, &diffContent, &j.Error, &j.TokenUsage, &j.WorktreePath, &j.SourceMachineID, &updatedAt, @@ -577,15 +580,18 @@ func (db *DB) UpsertPulledJob(j PulledJob, repoID int64, commitID *int64) error now := time.Now().UTC().Format(time.RFC3339) _, err := db.Exec(` INSERT INTO review_jobs ( - uuid, repo_id, commit_id, git_ref, session_id, agent, model, reasoning, job_type, review_type, patch_id, status, agentic, + uuid, repo_id, commit_id, git_ref, session_id, agent, model, provider, requested_model, requested_provider, reasoning, job_type, review_type, patch_id, status, agentic, enqueued_at, started_at, finished_at, prompt, diff_content, error, token_usage, worktree_path, source_machine_id, updated_at, synced_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(uuid) DO UPDATE SET status = excluded.status, finished_at = excluded.finished_at, error = excluded.error, - model = COALESCE(excluded.model, review_jobs.model), + model = excluded.model, + provider = excluded.provider, + requested_model = excluded.requested_model, + requested_provider = excluded.requested_provider, git_ref = excluded.git_ref, session_id = COALESCE(excluded.session_id, review_jobs.session_id), commit_id = excluded.commit_id, @@ -594,7 +600,7 @@ func (db *DB) UpsertPulledJob(j PulledJob, repoID int64, commitID *int64) error worktree_path = COALESCE(excluded.worktree_path, review_jobs.worktree_path), updated_at = excluded.updated_at, synced_at = ? - `, j.UUID, repoID, commitID, j.GitRef, nullStr(j.SessionID), j.Agent, nullStr(j.Model), j.Reasoning, j.JobType, + `, j.UUID, repoID, commitID, j.GitRef, nullStr(j.SessionID), j.Agent, nullStr(j.Model), nullStr(j.Provider), nullStr(j.RequestedModel), nullStr(j.RequestedProvider), j.Reasoning, j.JobType, j.ReviewType, nullStr(j.PatchID), j.Status, j.Agentic, j.EnqueuedAt.Format(time.RFC3339), nullTimeStr(j.StartedAt), nullTimeStr(j.FinishedAt), nullStr(j.Prompt), j.DiffContent, nullStr(j.Error), nullStr(j.TokenUsage), diff --git a/internal/storage/sync_backfill_test.go b/internal/storage/sync_backfill_test.go index b9c167a6..bcfd5a68 100644 --- a/internal/storage/sync_backfill_test.go +++ b/internal/storage/sync_backfill_test.go @@ -210,16 +210,16 @@ func TestUpsertPulledJob_BackfillsModel(t *testing.T) { } assert.Equal(t, "gpt-4", modelAfter.String) - // Also verify that upserting with empty model doesn't clear existing model - pulledJob.Model = "" // Empty model + // Upserting with empty model should clear the existing effective model so + // synced reruns can propagate implicit/default state. + pulledJob.Model = "" err = db.UpsertPulledJob(pulledJob, repo.ID, nil) require.NoError(t, err, "UpsertPulledJob (empty model) failed: %v") - var modelPreserved sql.NullString - err = db.QueryRow(`SELECT model FROM review_jobs WHERE uuid = ?`, jobUUID).Scan(&modelPreserved) - require.NoError(t, err, "Failed to query model preserved: %v") - - assert.False(t, !modelPreserved.Valid || modelPreserved.String != "gpt-4") + var modelCleared sql.NullString + err = db.QueryRow(`SELECT model FROM review_jobs WHERE uuid = ?`, jobUUID).Scan(&modelCleared) + require.NoError(t, err, "Failed to query model cleared: %v") + assert.False(t, modelCleared.Valid) } func TestGetJobsToSync_IncludesWorktreePath(t *testing.T) { @@ -301,3 +301,43 @@ func TestUpsertPulledJob_PreservesWorktreePath(t *testing.T) { require.NoError(t, err) assert.Equal(t, "/worktrees/my-branch", wt) } + +func TestUpsertPulledJob_ClearsModelAndProviderFields(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo, err := db.GetOrCreateRepo("/test/repo-sync-clear") + require.NoError(t, err) + + jobUUID := "test-uuid-clear-" + time.Now().Format("20060102150405") + pulledJob := PulledJob{ + UUID: jobUUID, + RepoIdentity: "/test/repo-sync-clear", + GitRef: "HEAD", + Agent: "test-agent", + Model: "gpt-4", + Provider: "openai", + RequestedModel: "gpt-4", + RequestedProvider: "openai", + Status: "done", + SourceMachineID: "test-machine", + EnqueuedAt: time.Now(), + UpdatedAt: time.Now(), + } + require.NoError(t, db.UpsertPulledJob(pulledJob, repo.ID, nil)) + + pulledJob.Model = "" + pulledJob.Provider = "" + pulledJob.RequestedModel = "" + pulledJob.RequestedProvider = "" + pulledJob.UpdatedAt = time.Now().Add(time.Second) + require.NoError(t, db.UpsertPulledJob(pulledJob, repo.ID, nil)) + + var model, provider, requestedModel, requestedProvider *string + err = db.QueryRow(`SELECT model, provider, requested_model, requested_provider FROM review_jobs WHERE uuid = ?`, jobUUID).Scan(&model, &provider, &requestedModel, &requestedProvider) + require.NoError(t, err) + assert.Nil(t, model) + assert.Nil(t, provider) + assert.Nil(t, requestedModel) + assert.Nil(t, requestedProvider) +}