diff --git a/CHANGELOG.md b/CHANGELOG.md index 60fbd72b..e98153f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,18 @@ All notable changes to this project will be documented here. - **Bulk test result inserts**: Report ingestion now uses `pgx.Batch` to insert test results in bulk instead of one query per result. This eliminates the N+1 insert pattern that caused 1000+ round-trips for large reports. +### Added + +- **Sharding duration-balanced strategy activated**: `DurationStore.UpsertFromResults()` is now called during report ingestion (inside the same transaction), populating the `test_duration_history` table that the `duration_balanced` sharding strategy depends on. Previously, this strategy always fell back to no-history defaults because the duration store was never written to. + +- **Targeted duration query**: `GET /api/v1/sharding/durations/{testName}` now uses a `WHERE team_id = $1 AND test_name = $2` SQL query instead of fetching all team durations and filtering in Go (O(1) DB query vs O(N) in-memory scan). The endpoint returns a JSON array of duration entries across all suites for the named test, or 404 if no history exists. + +- **Composite-key duration map**: `DurationStore.GetByTeamMap` now uses composite keys (`testName\x00suite`) to preserve entries for the same test name across different suites, preventing data loss in `EnrichWithHistory`. + +- **DurationStore integration tests**: Added comprehensive integration tests for `DurationStore` covering `GetByTeam`, `GetByTeamAndTest`, `UpsertFromResults` (insert, rolling average, within-transaction, transaction rollback, empty input), `GetBySuite`, `GetByTeamMap`, p95 conflict behavior, and same-name-different-suite scenarios. + +- **Sharding API endpoints documented**: Added `POST /api/v1/sharding/plan`, `POST /api/v1/sharding/rebalance`, `GET /api/v1/sharding/durations`, and `GET /api/v1/sharding/durations/{testName}` to the README Key Endpoints table. + ### Fixed - **IDOR vulnerability in invitation handlers**: `Create`, `List`, and `Revoke` invitation endpoints (`POST/GET/DELETE /api/v1/teams/{teamID}/invitations`) now verify that the authenticated user's team matches the URL `teamID` before checking role permissions. Previously, any maintainer or owner could list, create, or revoke invitations for any team regardless of membership. diff --git a/README.md b/README.md index 6fedffaf..51529a9c 100644 --- a/README.md +++ b/README.md @@ -329,6 +329,10 @@ Tokens are prefixed `inv_`, valid for **7 days**, and stored as SHA-256 hashes. | `POST` | `/api/v1/auth/change-password` | Change password | | `GET` | `/api/v1/admin/users` | List all users (owner only) | | `GET` | `/api/v1/admin/audit-log` | Paginated audit log (`?limit=&offset=&action=`) (owner only) | +| `POST` | `/api/v1/sharding/plan` | Create a shard plan (duration-balanced or count-based) | +| `POST` | `/api/v1/sharding/rebalance` | Rebalance after worker failure | +| `GET` | `/api/v1/sharding/durations` | List all duration history for the team | +| `GET` | `/api/v1/sharding/durations/{testName}` | Get duration history for a specific test (returns array, 404 if none) | | `GET` | `/ws/executions` | WebSocket for live updates | ### Quality Gate Rules diff --git a/internal/handler/reports.go b/internal/handler/reports.go index 3f67b813..057babf0 100644 --- a/internal/handler/reports.go +++ b/internal/handler/reports.go @@ -57,6 +57,7 @@ type ReportsHandler struct { BaseURL string TriageStore triageAccessor TriageEnqueuer triage.Enqueuer + DurationStore *store.DurationStore AllowBackdate bool } @@ -200,6 +201,7 @@ func (h *ReportsHandler) Create(w http.ResponseWriter, r *http.Request) { Raw: rawJSON, CreatedAt: now, TriageGitHubStatus: triageGitHubStatus, + DurationStore: h.DurationStore, } if err := h.ReportStore.CreateWithResults(r.Context(), params, results); err != nil { if err == pgx.ErrNoRows { @@ -210,6 +212,8 @@ func (h *ReportsHandler) Create(w http.ResponseWriter, r *http.Request) { return } + // Enqueue async triage — non-blocking, best-effort. Must be called after + // the transaction commits so the triage job can read the persisted rows. if h.TriageEnqueuer != nil { h.TriageEnqueuer.Enqueue(claims.TeamID, reportID) } diff --git a/internal/handler/sharding.go b/internal/handler/sharding.go index 595ffd5c..2d218cd8 100644 --- a/internal/handler/sharding.go +++ b/internal/handler/sharding.go @@ -71,7 +71,7 @@ func (h *ShardingHandler) CreatePlan(w http.ResponseWriter, r *http.Request) { // RebalanceRequest is the request body for rebalancing after worker failure. type RebalanceRequest struct { ExecutionID string `json:"execution_id" validate:"required"` - FailedWorkerID string `json:"failed_worker_id" validate:"required"` + FailedWorkerID string `json:"failed_worker_id" validate:"required"` CurrentPlan model.ShardPlan `json:"current_plan" validate:"required"` CompletedTests []string `json:"completed_tests"` } @@ -155,6 +155,7 @@ func (h *ShardingHandler) ListDurations(w http.ResponseWriter, r *http.Request) } // GetDuration handles GET /api/v1/sharding/durations/{testName}. +// Always returns a JSON array of duration entries for consistent API shape. func (h *ShardingHandler) GetDuration(w http.ResponseWriter, r *http.Request) { claims := auth.GetClaims(r.Context()) if claims == nil { @@ -173,18 +174,16 @@ func (h *ShardingHandler) GetDuration(w http.ResponseWriter, r *http.Request) { return } - durations, err := h.DurationStore.GetByTeam(r.Context(), claims.TeamID) + durations, err := h.DurationStore.GetByTeamAndTest(r.Context(), claims.TeamID, testName) if err != nil { Error(w, http.StatusInternalServerError, "failed to query durations") return } - for _, d := range durations { - if d.TestName == testName { - JSON(w, http.StatusOK, d) - return - } + if len(durations) == 0 { + Error(w, http.StatusNotFound, "no duration history for test") + return } - Error(w, http.StatusNotFound, "no duration history for test") + JSON(w, http.StatusOK, durations) } diff --git a/internal/integration/reports_compare_test.go b/internal/integration/reports_compare_test.go index 7db35bb9..3552bc03 100644 --- a/internal/integration/reports_compare_test.go +++ b/internal/integration/reports_compare_test.go @@ -16,6 +16,7 @@ import ( "github.com/scaledtest/scaledtest/internal/auth" "github.com/scaledtest/scaledtest/internal/db" "github.com/scaledtest/scaledtest/internal/handler" + "github.com/scaledtest/scaledtest/internal/store" "github.com/scaledtest/scaledtest/internal/testutil" ) @@ -23,7 +24,7 @@ import ( func postCompare(t *testing.T, pool *db.Pool, teamID, baseID, headID string) *httptest.ResponseRecorder { t.Helper() h := &handler.ReportsHandler{ - DB: pool, + ReportStore: store.NewReportsStore(pool), } req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/v1/reports/compare?base=%s&head=%s", baseID, headID), @@ -49,7 +50,7 @@ func postCompare(t *testing.T, pool *db.Pool, teamID, baseID, headID string) *ht // // Before the fix, the fetchResults query scanned nullable TEXT columns directly // into string destinations. pgx v5 returns "cannot scan NULL into *string" for -// NULL TEXT values, causing a 500. The fix uses COALESCE to convert NULL to ''. +// NULL TEXT values, causing a 500. The fix uses COALESCE to convert NULL to ”. // // Given: two reports whose test results have no message, trace, file_path, or suite // When: the Compare endpoint is called with those report IDs diff --git a/internal/integration/testdb.go b/internal/integration/testdb.go index 7b8783f3..1542d8f3 100644 --- a/internal/integration/testdb.go +++ b/internal/integration/testdb.go @@ -97,6 +97,7 @@ var truncateTables = []string{ "triage_failure_classifications", "triage_clusters", "triage_results", + "test_duration_history", "test_results", "test_reports", "test_executions", diff --git a/internal/server/routes.go b/internal/server/routes.go index 5df650cc..94ca6729 100644 --- a/internal/server/routes.go +++ b/internal/server/routes.go @@ -177,6 +177,7 @@ func NewRouter(cfg *config.Config, pool ...*db.Pool) http.Handler { GitHubStatusPoster: ghClient, BaseURL: cfg.BaseURL, TriageEnqueuer: triageEnqueuer, + DurationStore: durStore, AllowBackdate: cfg.DisableRateLimit, } if dbPool != nil { diff --git a/internal/shard/shard.go b/internal/shard/shard.go index c27b5c42..f48b8f25 100644 --- a/internal/shard/shard.go +++ b/internal/shard/shard.go @@ -240,21 +240,40 @@ func suiteGrouped(groups []testGroup, numWorkers int) []model.Shard { return durationBalanced(suiteGroups, numWorkers) } +// durationAggregate holds per-test-name totals built from the composite-keyed history map. +type durationAggregate struct { + totalDurationMs int64 + suite string +} + // EnrichWithHistory takes a list of test names and enriches them with // historical duration data. Tests without history get the default estimate. +// The history map uses composite keys "testName\x00suite" to preserve entries +// across different suites. For each test name, we aggregate durations across +// all suites (summing avg_duration_ms). func EnrichWithHistory(testNames []string, history map[string]*model.TestDurationHistory) []TestInfo { + agg := make(map[string]*durationAggregate, len(testNames)) + for _, h := range history { + a := agg[h.TestName] + if a == nil { + agg[h.TestName] = &durationAggregate{totalDurationMs: h.AvgDurationMs, suite: h.Suite} + } else { + a.totalDurationMs += h.AvgDurationMs + if h.Suite < a.suite { + a.suite = h.Suite + } + } + } + tests := make([]TestInfo, len(testNames)) for i, name := range testNames { tests[i] = TestInfo{ Name: name, EstDurationMs: DefaultEstDurationMs, } - if h, ok := history[name]; ok { - tests[i].EstDurationMs = h.AvgDurationMs - tests[i].Suite = h.Suite - if tests[i].EstDurationMs <= 0 { - tests[i].EstDurationMs = DefaultEstDurationMs - } + if a, ok := agg[name]; ok && a.totalDurationMs > 0 { + tests[i].EstDurationMs = a.totalDurationMs + tests[i].Suite = a.suite } } return tests diff --git a/internal/shard/shard_test.go b/internal/shard/shard_test.go index 7e70a401..c996e5fd 100644 --- a/internal/shard/shard_test.go +++ b/internal/shard/shard_test.go @@ -238,8 +238,8 @@ func TestPlan_MoreWorkersThanTests(t *testing.T) { func TestEnrichWithHistory(t *testing.T) { names := []string{"test-a", "test-b", "test-c"} history := map[string]*model.TestDurationHistory{ - "test-a": {AvgDurationMs: 1500, Suite: "unit"}, - "test-c": {AvgDurationMs: 0, Suite: "e2e"}, // 0 should get default + "test-a\x00unit": {TestName: "test-a", AvgDurationMs: 1500, Suite: "unit"}, + "test-c\x00e2e": {TestName: "test-c", AvgDurationMs: 0, Suite: "e2e"}, } enriched := EnrichWithHistory(names, history) @@ -258,6 +258,26 @@ func TestEnrichWithHistory(t *testing.T) { } } +func TestEnrichWithHistory_SameNameDifferentSuites_DeterministicSuite(t *testing.T) { + names := []string{"test-a"} + history := map[string]*model.TestDurationHistory{ + "test-a\x00unit": {TestName: "test-a", AvgDurationMs: 100, Suite: "unit"}, + "test-a\x00integration": {TestName: "test-a", AvgDurationMs: 200, Suite: "integration"}, + } + + enriched := EnrichWithHistory(names, history) + + if len(enriched) != 1 { + t.Fatalf("len(enriched) = %d, want 1", len(enriched)) + } + if enriched[0].EstDurationMs != 300 { + t.Errorf("test-a duration across suites: got %d, want 300 (100+200)", enriched[0].EstDurationMs) + } + if enriched[0].Suite != "integration" { + t.Errorf("test-a suite: got %q, want alphabetically-first %q", enriched[0].Suite, "integration") + } +} + func TestRebalance_FailedWorker(t *testing.T) { plan := &model.ShardPlan{ Shards: []model.Shard{ diff --git a/internal/store/durations.go b/internal/store/durations.go index 021ecb18..a6facfe2 100644 --- a/internal/store/durations.go +++ b/internal/store/durations.go @@ -4,11 +4,20 @@ import ( "context" "fmt" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" "github.com/scaledtest/scaledtest/internal/model" ) +// DBTX is the common interface satisfied by pgxpool.Pool and pgx.Tx. +type DBTX interface { + Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) + Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (pgx.Rows, error) + QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) pgx.Row +} + // DurationStore handles test duration history persistence. type DurationStore struct { pool *pgxpool.Pool @@ -21,33 +30,33 @@ func NewDurationStore(pool *pgxpool.Pool) *DurationStore { // GetByTeam returns all duration history entries for a team. func (s *DurationStore) GetByTeam(ctx context.Context, teamID string) ([]model.TestDurationHistory, error) { - rows, err := s.pool.Query(ctx, + return queryDurations(ctx, s.pool, `SELECT id, team_id, test_name, suite, avg_duration_ms, p95_duration_ms, min_duration_ms, max_duration_ms, run_count, last_status, updated_at, created_at FROM test_duration_history WHERE team_id = $1 ORDER BY test_name`, teamID) - if err != nil { - return nil, fmt.Errorf("query duration history: %w", err) - } - defer rows.Close() +} - var results []model.TestDurationHistory - for rows.Next() { - var d model.TestDurationHistory - if err := rows.Scan( - &d.ID, &d.TeamID, &d.TestName, &d.Suite, - &d.AvgDurationMs, &d.P95DurationMs, &d.MinDurationMs, &d.MaxDurationMs, - &d.RunCount, &d.LastStatus, &d.UpdatedAt, &d.CreatedAt, - ); err != nil { - return nil, fmt.Errorf("scan duration history: %w", err) - } - results = append(results, d) - } - return results, rows.Err() +// GetByTeamAndTest returns duration history entries for a specific test name within a team. +func (s *DurationStore) GetByTeamAndTest(ctx context.Context, teamID, testName string) ([]model.TestDurationHistory, error) { + return queryDurations(ctx, s.pool, + `SELECT id, team_id, test_name, suite, avg_duration_ms, p95_duration_ms, + min_duration_ms, max_duration_ms, run_count, last_status, updated_at, created_at + FROM test_duration_history + WHERE team_id = $1 AND test_name = $2 + ORDER BY suite`, teamID, testName) +} + +// DurationMapKey returns a unique key for a test duration entry combining +// test name and suite. This avoids collisions when the same test name appears +// in multiple suites — the database UNIQUE constraint is (team_id, test_name, suite). +func DurationMapKey(testName, suite string) string { + return testName + "\x00" + suite } -// GetByTeamMap returns duration history as a map keyed by test_name. +// GetByTeamMap returns duration history as a map keyed by composite "testName\x00suite". +// This preserves all entries including same-named tests across different suites. func (s *DurationStore) GetByTeamMap(ctx context.Context, teamID string) (map[string]*model.TestDurationHistory, error) { entries, err := s.GetByTeam(ctx, teamID) if err != nil { @@ -55,23 +64,37 @@ func (s *DurationStore) GetByTeamMap(ctx context.Context, teamID string) (map[st } m := make(map[string]*model.TestDurationHistory, len(entries)) for i := range entries { - m[entries[i].TestName] = &entries[i] + m[DurationMapKey(entries[i].TestName, entries[i].Suite)] = &entries[i] } return m, nil } // UpsertFromResults updates duration history from a set of test results. // Uses a rolling average: new_avg = ((old_avg * old_count) + new_duration) / (old_count + 1). -func (s *DurationStore) UpsertFromResults(ctx context.Context, teamID string, results []model.TestResult) error { +// When db is a pgx.Tx the caller is responsible for committing the transaction. +// When db is a pgxpool.Pool this method creates and commits its own transaction. +func (s *DurationStore) UpsertFromResults(ctx context.Context, teamID string, results []model.TestResult, db DBTX) error { if len(results) == 0 { return nil } - tx, err := s.pool.Begin(ctx) - if err != nil { - return fmt.Errorf("begin tx: %w", err) + shouldCommit := false + var tx pgx.Tx + var err error + + switch d := db.(type) { + case *pgxpool.Pool: + tx, err = d.Begin(ctx) + if err != nil { + return fmt.Errorf("begin tx: %w", err) + } + defer tx.Rollback(ctx) + shouldCommit = true + case pgx.Tx: + tx = d + default: + return fmt.Errorf("UpsertFromResults: unsupported DBTX type %T", db) } - defer tx.Rollback(ctx) for _, r := range results { _, err := tx.Exec(ctx, @@ -81,6 +104,7 @@ func (s *DurationStore) UpsertFromResults(ctx context.Context, teamID string, re ON CONFLICT (team_id, test_name, suite) DO UPDATE SET avg_duration_ms = (test_duration_history.avg_duration_ms * test_duration_history.run_count + $4) / (test_duration_history.run_count + 1), + p95_duration_ms = GREATEST(test_duration_history.p95_duration_ms, $4), min_duration_ms = LEAST(test_duration_history.min_duration_ms, $4), max_duration_ms = GREATEST(test_duration_history.max_duration_ms, $4), run_count = test_duration_history.run_count + 1, @@ -93,19 +117,26 @@ func (s *DurationStore) UpsertFromResults(ctx context.Context, teamID string, re } } - return tx.Commit(ctx) + if shouldCommit { + return tx.Commit(ctx) + } + return nil } // GetBySuite returns duration history for tests in a specific suite. func (s *DurationStore) GetBySuite(ctx context.Context, teamID, suite string) ([]model.TestDurationHistory, error) { - rows, err := s.pool.Query(ctx, + return queryDurations(ctx, s.pool, `SELECT id, team_id, test_name, suite, avg_duration_ms, p95_duration_ms, min_duration_ms, max_duration_ms, run_count, last_status, updated_at, created_at FROM test_duration_history WHERE team_id = $1 AND suite = $2 ORDER BY avg_duration_ms DESC`, teamID, suite) +} + +func queryDurations(ctx context.Context, db DBTX, query string, args ...interface{}) ([]model.TestDurationHistory, error) { + rows, err := db.Query(ctx, query, args...) if err != nil { - return nil, fmt.Errorf("query duration by suite: %w", err) + return nil, fmt.Errorf("query duration history: %w", err) } defer rows.Close() @@ -117,7 +148,7 @@ func (s *DurationStore) GetBySuite(ctx context.Context, teamID, suite string) ([ &d.AvgDurationMs, &d.P95DurationMs, &d.MinDurationMs, &d.MaxDurationMs, &d.RunCount, &d.LastStatus, &d.UpdatedAt, &d.CreatedAt, ); err != nil { - return nil, fmt.Errorf("scan duration: %w", err) + return nil, fmt.Errorf("scan duration history: %w", err) } results = append(results, d) } diff --git a/internal/store/durations_test.go b/internal/store/durations_test.go new file mode 100644 index 00000000..f504ec3b --- /dev/null +++ b/internal/store/durations_test.go @@ -0,0 +1,648 @@ +//go:build integration + +package store_test + +import ( + "context" + "testing" + "time" + + "github.com/scaledtest/scaledtest/internal/integration" + "github.com/scaledtest/scaledtest/internal/model" + "github.com/scaledtest/scaledtest/internal/store" +) + +func insertDurationReport(t *testing.T, tdb *integration.TestDB, teamID string) string { + t.Helper() + ctx := context.Background() + var id string + err := tdb.Pool.QueryRow(ctx, + `INSERT INTO test_reports (team_id, tool_name, summary, raw, created_at) + VALUES ($1, 'jest', '{}', '{}', now()) RETURNING id`, + teamID, + ).Scan(&id) + if err != nil { + t.Fatalf("insertDurationReport: %v", err) + } + return id +} + +func TestDurationStore_UpsertFromResults_InsertsNew(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamID := tdb.CreateTeam(t, "dur-team-insert") + s := store.NewDurationStore(tdb.Pool) + + results := []model.TestResult{ + {Name: "TestAlpha", Suite: "unit", DurationMs: 100, Status: "passed", TeamID: teamID}, + {Name: "TestBeta", Suite: "unit", DurationMs: 200, Status: "failed", TeamID: teamID}, + } + + err := s.UpsertFromResults(ctx, teamID, results, tdb.Pool) + if err != nil { + t.Fatalf("UpsertFromResults: %v", err) + } + + entries, err := s.GetByTeam(ctx, teamID) + if err != nil { + t.Fatalf("GetByTeam: %v", err) + } + if len(entries) != 2 { + t.Fatalf("len(entries) = %d, want 2", len(entries)) + } + + found := map[string]*model.TestDurationHistory{} + for i := range entries { + found[entries[i].TestName] = &entries[i] + } + + if a, ok := found["TestAlpha"]; !ok { + t.Error("TestAlpha not found") + } else { + if a.AvgDurationMs != 100 { + t.Errorf("TestAlpha AvgDurationMs = %d, want 100", a.AvgDurationMs) + } + if a.RunCount != 1 { + t.Errorf("TestAlpha RunCount = %d, want 1", a.RunCount) + } + if a.LastStatus != "passed" { + t.Errorf("TestAlpha LastStatus = %q, want %q", a.LastStatus, "passed") + } + if a.Suite != "unit" { + t.Errorf("TestAlpha Suite = %q, want %q", a.Suite, "unit") + } + } + + if b, ok := found["TestBeta"]; !ok { + t.Error("TestBeta not found") + } else { + if b.LastStatus != "failed" { + t.Errorf("TestBeta LastStatus = %q, want %q", b.LastStatus, "failed") + } + } +} + +func TestDurationStore_UpsertFromResults_RollingAverage(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamID := tdb.CreateTeam(t, "dur-team-rolling") + s := store.NewDurationStore(tdb.Pool) + + results1 := []model.TestResult{ + {Name: "TestAvg", Suite: "unit", DurationMs: 100, Status: "passed", TeamID: teamID}, + } + if err := s.UpsertFromResults(ctx, teamID, results1, tdb.Pool); err != nil { + t.Fatalf("first upsert: %v", err) + } + + results2 := []model.TestResult{ + {Name: "TestAvg", Suite: "unit", DurationMs: 300, Status: "failed", TeamID: teamID}, + } + if err := s.UpsertFromResults(ctx, teamID, results2, tdb.Pool); err != nil { + t.Fatalf("second upsert: %v", err) + } + + entries, err := s.GetByTeam(ctx, teamID) + if err != nil { + t.Fatalf("GetByTeam: %v", err) + } + if len(entries) != 1 { + t.Fatalf("len(entries) = %d, want 1", len(entries)) + } + + d := entries[0] + if d.AvgDurationMs != 200 { + t.Errorf("AvgDurationMs = %d, want 200 (rolling average of 100 and 300)", d.AvgDurationMs) + } + if d.MinDurationMs != 100 { + t.Errorf("MinDurationMs = %d, want 100", d.MinDurationMs) + } + if d.MaxDurationMs != 300 { + t.Errorf("MaxDurationMs = %d, want 300", d.MaxDurationMs) + } + if d.RunCount != 2 { + t.Errorf("RunCount = %d, want 2", d.RunCount) + } + if d.LastStatus != "failed" { + t.Errorf("LastStatus = %q, want %q", d.LastStatus, "failed") + } +} + +func TestDurationStore_UpsertFromResults_EmptySlice(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamID := tdb.CreateTeam(t, "dur-team-empty") + s := store.NewDurationStore(tdb.Pool) + + err := s.UpsertFromResults(ctx, teamID, nil, tdb.Pool) + if err != nil { + t.Fatalf("UpsertFromResults with nil: %v", err) + } + + err = s.UpsertFromResults(ctx, teamID, []model.TestResult{}, tdb.Pool) + if err != nil { + t.Fatalf("UpsertFromResults with empty slice: %v", err) + } +} + +func TestDurationStore_UpsertFromResults_WithinTransaction(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamID := tdb.CreateTeam(t, "dur-team-tx") + s := store.NewDurationStore(tdb.Pool) + + tx, err := tdb.Pool.Begin(ctx) + if err != nil { + t.Fatalf("begin tx: %v", err) + } + defer tx.Rollback(ctx) + + results := []model.TestResult{ + {Name: "TestTx", Suite: "integration", DurationMs: 500, Status: "passed", TeamID: teamID}, + } + if err := s.UpsertFromResults(ctx, teamID, results, tx); err != nil { + t.Fatalf("UpsertFromResults within tx: %v", err) + } + + if err := tx.Commit(ctx); err != nil { + t.Fatalf("commit: %v", err) + } + + entries, err := s.GetByTeam(ctx, teamID) + if err != nil { + t.Fatalf("GetByTeam: %v", err) + } + if len(entries) != 1 { + t.Fatalf("len(entries) = %d, want 1", len(entries)) + } + if entries[0].TestName != "TestTx" { + t.Errorf("TestName = %q, want %q", entries[0].TestName, "TestTx") + } + if entries[0].AvgDurationMs != 500 { + t.Errorf("AvgDurationMs = %d, want 500", entries[0].AvgDurationMs) + } +} + +func TestDurationStore_UpsertFromResults_TransactionRollback(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamID := tdb.CreateTeam(t, "dur-team-rollback") + s := store.NewDurationStore(tdb.Pool) + + tx, err := tdb.Pool.Begin(ctx) + if err != nil { + t.Fatalf("begin tx: %v", err) + } + + results := []model.TestResult{ + {Name: "TestRollback", Suite: "unit", DurationMs: 999, Status: "failed", TeamID: teamID}, + } + if err := s.UpsertFromResults(ctx, teamID, results, tx); err != nil { + t.Fatalf("UpsertFromResults within tx: %v", err) + } + + tx.Rollback(ctx) + + entries, err := s.GetByTeam(ctx, teamID) + if err != nil { + t.Fatalf("GetByTeam: %v", err) + } + if len(entries) != 0 { + t.Errorf("len(entries) = %d, want 0 after rollback", len(entries)) + } +} + +func TestDurationStore_GetByTeam_ReturnsOnlyTeamData(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamA := tdb.CreateTeam(t, "dur-team-a") + teamB := tdb.CreateTeam(t, "dur-team-b") + s := store.NewDurationStore(tdb.Pool) + + err := s.UpsertFromResults(ctx, teamA, []model.TestResult{ + {Name: "TestA", Suite: "unit", DurationMs: 100, Status: "passed", TeamID: teamA}, + }, tdb.Pool) + if err != nil { + t.Fatalf("upsert teamA: %v", err) + } + + err = s.UpsertFromResults(ctx, teamB, []model.TestResult{ + {Name: "TestB", Suite: "unit", DurationMs: 200, Status: "passed", TeamID: teamB}, + }, tdb.Pool) + if err != nil { + t.Fatalf("upsert teamB: %v", err) + } + + entriesA, err := s.GetByTeam(ctx, teamA) + if err != nil { + t.Fatalf("GetByTeam teamA: %v", err) + } + if len(entriesA) != 1 || entriesA[0].TestName != "TestA" { + t.Errorf("teamA entries: got %v, want 1 entry with TestA", entriesA) + } + + entriesB, err := s.GetByTeam(ctx, teamB) + if err != nil { + t.Fatalf("GetByTeam teamB: %v", err) + } + if len(entriesB) != 1 || entriesB[0].TestName != "TestB" { + t.Errorf("teamB entries: got %v, want 1 entry with TestB", entriesB) + } +} + +func TestDurationStore_GetByTeam_Empty(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamID := tdb.CreateTeam(t, "dur-team-empty-get") + s := store.NewDurationStore(tdb.Pool) + + entries, err := s.GetByTeam(ctx, teamID) + if err != nil { + t.Fatalf("GetByTeam: %v", err) + } + if len(entries) != 0 { + t.Errorf("len(entries) = %d, want 0 for team with no history", len(entries)) + } +} + +func TestDurationStore_GetByTeamAndTest_ReturnsMatchingRows(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamID := tdb.CreateTeam(t, "dur-team-bytest") + s := store.NewDurationStore(tdb.Pool) + + err := s.UpsertFromResults(ctx, teamID, []model.TestResult{ + {Name: "TestAlpha", Suite: "unit", DurationMs: 100, Status: "passed", TeamID: teamID}, + {Name: "TestAlpha", Suite: "integration", DurationMs: 500, Status: "passed", TeamID: teamID}, + {Name: "TestBeta", Suite: "unit", DurationMs: 200, Status: "failed", TeamID: teamID}, + }, tdb.Pool) + if err != nil { + t.Fatalf("upsert: %v", err) + } + + entries, err := s.GetByTeamAndTest(ctx, teamID, "TestAlpha") + if err != nil { + t.Fatalf("GetByTeamAndTest: %v", err) + } + if len(entries) != 2 { + t.Fatalf("len(entries) = %d, want 2 (two suites for TestAlpha)", len(entries)) + } + + for _, e := range entries { + if e.TestName != "TestAlpha" { + t.Errorf("TestName = %q, want %q", e.TestName, "TestAlpha") + } + } + + notFound, err := s.GetByTeamAndTest(ctx, teamID, "Nonexistent") + if err != nil { + t.Fatalf("GetByTeamAndTest nonexistent: %v", err) + } + if len(notFound) != 0 { + t.Errorf("len(notFound) = %d, want 0", len(notFound)) + } +} + +func TestDurationStore_GetByTeamAndTest_TeamScoped(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamA := tdb.CreateTeam(t, "dur-team-a-bytest") + teamB := tdb.CreateTeam(t, "dur-team-b-bytest") + s := store.NewDurationStore(tdb.Pool) + + err := s.UpsertFromResults(ctx, teamA, []model.TestResult{ + {Name: "SharedTest", Suite: "unit", DurationMs: 100, Status: "passed", TeamID: teamA}, + }, tdb.Pool) + if err != nil { + t.Fatalf("upsert teamA: %v", err) + } + err = s.UpsertFromResults(ctx, teamB, []model.TestResult{ + {Name: "SharedTest", Suite: "unit", DurationMs: 999, Status: "failed", TeamID: teamB}, + }, tdb.Pool) + if err != nil { + t.Fatalf("upsert teamB: %v", err) + } + + entriesA, err := s.GetByTeamAndTest(ctx, teamA, "SharedTest") + if err != nil { + t.Fatalf("GetByTeamAndTest teamA: %v", err) + } + if len(entriesA) != 1 { + t.Fatalf("teamA len = %d, want 1", len(entriesA)) + } + if entriesA[0].AvgDurationMs != 100 { + t.Errorf("teamA AvgDurationMs = %d, want 100", entriesA[0].AvgDurationMs) + } + + entriesB, err := s.GetByTeamAndTest(ctx, teamB, "SharedTest") + if err != nil { + t.Fatalf("GetByTeamAndTest teamB: %v", err) + } + if len(entriesB) != 1 { + t.Fatalf("teamB len = %d, want 1", len(entriesB)) + } + if entriesB[0].AvgDurationMs != 999 { + t.Errorf("teamB AvgDurationMs = %d, want 999", entriesB[0].AvgDurationMs) + } +} + +func TestDurationStore_GetBySuite_ReturnsOnlyMatchingSuite(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamID := tdb.CreateTeam(t, "dur-team-suite") + s := store.NewDurationStore(tdb.Pool) + + err := s.UpsertFromResults(ctx, teamID, []model.TestResult{ + {Name: "TestX", Suite: "unit", DurationMs: 100, Status: "passed", TeamID: teamID}, + {Name: "TestY", Suite: "integration", DurationMs: 500, Status: "passed", TeamID: teamID}, + }, tdb.Pool) + if err != nil { + t.Fatalf("upsert: %v", err) + } + + entries, err := s.GetBySuite(ctx, teamID, "unit") + if err != nil { + t.Fatalf("GetBySuite: %v", err) + } + if len(entries) != 1 { + t.Fatalf("len(entries) = %d, want 1", len(entries)) + } + if entries[0].TestName != "TestX" { + t.Errorf("TestName = %q, want %q", entries[0].TestName, "TestX") + } + if entries[0].Suite != "unit" { + t.Errorf("Suite = %q, want %q", entries[0].Suite, "unit") + } + + none, err := s.GetBySuite(ctx, teamID, "e2e") + if err != nil { + t.Fatalf("GetBySuite empty: %v", err) + } + if len(none) != 0 { + t.Errorf("len(none) = %d, want 0 for nonexistent suite", len(none)) + } +} + +func TestDurationStore_GetByTeamMap(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamID := tdb.CreateTeam(t, "dur-team-map") + s := store.NewDurationStore(tdb.Pool) + + err := s.UpsertFromResults(ctx, teamID, []model.TestResult{ + {Name: "TestM1", Suite: "unit", DurationMs: 150, Status: "passed", TeamID: teamID}, + {Name: "TestM2", Suite: "unit", DurationMs: 250, Status: "failed", TeamID: teamID}, + }, tdb.Pool) + if err != nil { + t.Fatalf("upsert: %v", err) + } + + m, err := s.GetByTeamMap(ctx, teamID) + if err != nil { + t.Fatalf("GetByTeamMap: %v", err) + } + if len(m) != 2 { + t.Fatalf("len(m) = %d, want 2", len(m)) + } + + key1 := store.DurationMapKey("TestM1", "unit") + if m[key1] == nil { + t.Error("TestM1/unit not in map") + } else if m[key1].AvgDurationMs != 150 { + t.Errorf("TestM1/unit AvgDurationMs = %d, want 150", m[key1].AvgDurationMs) + } + + key2 := store.DurationMapKey("TestM2", "unit") + if m[key2] == nil { + t.Error("TestM2/unit not in map") + } else if m[key2].AvgDurationMs != 250 { + t.Errorf("TestM2/unit AvgDurationMs = %d, want 250", m[key2].AvgDurationMs) + } +} + +func TestDurationStore_GetByTeamMap_PreservesSameNameDifferentSuites(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamID := tdb.CreateTeam(t, "dur-team-map-dup") + s := store.NewDurationStore(tdb.Pool) + + err := s.UpsertFromResults(ctx, teamID, []model.TestResult{ + {Name: "TestDup", Suite: "unit", DurationMs: 100, Status: "passed", TeamID: teamID}, + {Name: "TestDup", Suite: "integration", DurationMs: 500, Status: "passed", TeamID: teamID}, + }, tdb.Pool) + if err != nil { + t.Fatalf("upsert: %v", err) + } + + m, err := s.GetByTeamMap(ctx, teamID) + if err != nil { + t.Fatalf("GetByTeamMap: %v", err) + } + if len(m) != 2 { + t.Fatalf("len(m) = %d, want 2 (same test name, different suites should not collide)", len(m)) + } + + keyUnit := store.DurationMapKey("TestDup", "unit") + keyInteg := store.DurationMapKey("TestDup", "integration") + if m[keyUnit] == nil { + t.Error("TestDup/unit not in map") + } else if m[keyUnit].AvgDurationMs != 100 { + t.Errorf("TestDup/unit AvgDurationMs = %d, want 100", m[keyUnit].AvgDurationMs) + } + if m[keyInteg] == nil { + t.Error("TestDup/integration not in map") + } else if m[keyInteg].AvgDurationMs != 500 { + t.Errorf("TestDup/integration AvgDurationMs = %d, want 500", m[keyInteg].AvgDurationMs) + } +} + +func TestDurationStore_UpsertFromResults_ThreeRunsMinMaxAvg(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamID := tdb.CreateTeam(t, "dur-team-3runs") + s := store.NewDurationStore(tdb.Pool) + + for _, dur := range []int64{100, 300, 200} { + err := s.UpsertFromResults(ctx, teamID, []model.TestResult{ + {Name: "TestMulti", Suite: "unit", DurationMs: dur, Status: "passed", TeamID: teamID}, + }, tdb.Pool) + if err != nil { + t.Fatalf("upsert dur=%d: %v", dur, err) + } + } + + entries, err := s.GetByTeam(ctx, teamID) + if err != nil { + t.Fatalf("GetByTeam: %v", err) + } + if len(entries) != 1 { + t.Fatalf("len(entries) = %d, want 1", len(entries)) + } + + d := entries[0] + if d.RunCount != 3 { + t.Errorf("RunCount = %d, want 3", d.RunCount) + } + if d.AvgDurationMs != 200 { + t.Errorf("AvgDurationMs = %d, want 200 ((100+300+200)/3)", d.AvgDurationMs) + } + if d.MinDurationMs != 100 { + t.Errorf("MinDurationMs = %d, want 100", d.MinDurationMs) + } + if d.MaxDurationMs != 300 { + t.Errorf("MaxDurationMs = %d, want 300", d.MaxDurationMs) + } +} + +func TestDurationStore_UpsertFromResults_P95UpdatesOnConflict(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamID := tdb.CreateTeam(t, "dur-team-p95") + s := store.NewDurationStore(tdb.Pool) + + err := s.UpsertFromResults(ctx, teamID, []model.TestResult{ + {Name: "TestP95", Suite: "unit", DurationMs: 100, Status: "passed", TeamID: teamID}, + }, tdb.Pool) + if err != nil { + t.Fatalf("first upsert: %v", err) + } + + entries, _ := s.GetByTeam(ctx, teamID) + if len(entries) != 1 { + t.Fatalf("len(entries) = %d, want 1", len(entries)) + } + if entries[0].P95DurationMs != 100 { + t.Errorf("initial p95 = %d, want 100 (equals duration on insert)", entries[0].P95DurationMs) + } + + err = s.UpsertFromResults(ctx, teamID, []model.TestResult{ + {Name: "TestP95", Suite: "unit", DurationMs: 300, Status: "passed", TeamID: teamID}, + }, tdb.Pool) + if err != nil { + t.Fatalf("second upsert: %v", err) + } + + entries, _ = s.GetByTeam(ctx, teamID) + if entries[0].P95DurationMs != 300 { + t.Errorf("p95 after conflict = %d, want 300 (GREATEST of 100 and 300)", entries[0].P95DurationMs) + } + + err = s.UpsertFromResults(ctx, teamID, []model.TestResult{ + {Name: "TestP95", Suite: "unit", DurationMs: 200, Status: "passed", TeamID: teamID}, + }, tdb.Pool) + if err != nil { + t.Fatalf("third upsert: %v", err) + } + + entries, _ = s.GetByTeam(ctx, teamID) + if entries[0].P95DurationMs != 300 { + t.Errorf("p95 after smaller value = %d, want 300 (GREATEST keeps max)", entries[0].P95DurationMs) + } +} + +func TestDurationStore_UpsertFromResults_SameNameDifferentSuite(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamID := tdb.CreateTeam(t, "dur-team-diff-suite") + s := store.NewDurationStore(tdb.Pool) + + err := s.UpsertFromResults(ctx, teamID, []model.TestResult{ + {Name: "TestDup", Suite: "unit", DurationMs: 100, Status: "passed", TeamID: teamID}, + {Name: "TestDup", Suite: "integration", DurationMs: 500, Status: "passed", TeamID: teamID}, + }, tdb.Pool) + if err != nil { + t.Fatalf("upsert: %v", err) + } + + entries, err := s.GetByTeam(ctx, teamID) + if err != nil { + t.Fatalf("GetByTeam: %v", err) + } + if len(entries) != 2 { + t.Fatalf("len(entries) = %d, want 2 (same name, different suite)", len(entries)) + } + + suites := map[string]bool{} + for _, e := range entries { + suites[e.Suite] = true + } + if !suites["unit"] || !suites["integration"] { + t.Errorf("expected both 'unit' and 'integration' suites, got %v", suites) + } +} + +func TestDurationStore_FieldsPopulated(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamID := tdb.CreateTeam(t, "dur-team-fields") + s := store.NewDurationStore(tdb.Pool) + + err := s.UpsertFromResults(ctx, teamID, []model.TestResult{ + {Name: "TestFields", Suite: "smoke", DurationMs: 42, Status: "passed", TeamID: teamID}, + }, tdb.Pool) + if err != nil { + t.Fatalf("upsert: %v", err) + } + + entries, err := s.GetByTeam(ctx, teamID) + if err != nil { + t.Fatalf("GetByTeam: %v", err) + } + if len(entries) != 1 { + t.Fatalf("len(entries) = %d, want 1", len(entries)) + } + + d := entries[0] + if d.ID == "" { + t.Error("ID should not be empty") + } + if d.TeamID != teamID { + t.Errorf("TeamID = %q, want %q", d.TeamID, teamID) + } + if d.TestName != "TestFields" { + t.Errorf("TestName = %q, want %q", d.TestName, "TestFields") + } + if d.CreatedAt.IsZero() { + t.Error("CreatedAt should not be zero") + } + if d.UpdatedAt.IsZero() { + t.Error("UpdatedAt should not be zero") + } + if time.Since(d.CreatedAt) > 10*time.Second { + t.Errorf("CreatedAt seems too old: %v", d.CreatedAt) + } +} + +func TestDurationStore_GetByTeamAndTest_SuiteOrdering(t *testing.T) { + tdb := integration.Setup(t) + ctx := context.Background() + teamID := tdb.CreateTeam(t, "dur-team-ordering") + s := store.NewDurationStore(tdb.Pool) + + err := s.UpsertFromResults(ctx, teamID, []model.TestResult{ + {Name: "TestOrder", Suite: "z-suite", DurationMs: 100, Status: "passed", TeamID: teamID}, + {Name: "TestOrder", Suite: "a-suite", DurationMs: 200, Status: "passed", TeamID: teamID}, + {Name: "TestOrder", Suite: "m-suite", DurationMs: 300, Status: "passed", TeamID: teamID}, + }, tdb.Pool) + if err != nil { + t.Fatalf("upsert: %v", err) + } + + entries, err := s.GetByTeamAndTest(ctx, teamID, "TestOrder") + if err != nil { + t.Fatalf("GetByTeamAndTest: %v", err) + } + if len(entries) != 3 { + t.Fatalf("len(entries) = %d, want 3", len(entries)) + } + + if entries[0].Suite != "a-suite" { + t.Errorf("entries[0].Suite = %q, want %q (alphabetical)", entries[0].Suite, "a-suite") + } + if entries[1].Suite != "m-suite" { + t.Errorf("entries[1].Suite = %q, want %q", entries[1].Suite, "m-suite") + } + if entries[2].Suite != "z-suite" { + t.Errorf("entries[2].Suite = %q, want %q", entries[2].Suite, "z-suite") + } +} diff --git a/internal/store/reports.go b/internal/store/reports.go index 5e3288c3..0ebfa3d6 100644 --- a/internal/store/reports.go +++ b/internal/store/reports.go @@ -3,6 +3,7 @@ package store import ( "context" "encoding/json" + "fmt" "strconv" "time" @@ -120,6 +121,7 @@ type CreateReportParams struct { Raw json.RawMessage CreatedAt time.Time TriageGitHubStatus bool + DurationStore *DurationStore } func (s *ReportsStore) CreateWithResults(ctx context.Context, p CreateReportParams, results []model.TestResult) error { @@ -174,6 +176,12 @@ func (s *ReportsStore) CreateWithResults(ctx context.Context, p CreateReportPara } } + if p.DurationStore != nil && len(results) > 0 { + if err := p.DurationStore.UpsertFromResults(ctx, p.TeamID, results, tx); err != nil { + return fmt.Errorf("upsert duration history: %w", err) + } + } + return tx.Commit(ctx) } diff --git a/internal/store/reports_store_integration_test.go b/internal/store/reports_store_integration_test.go index f17d1ab0..37b620e8 100644 --- a/internal/store/reports_store_integration_test.go +++ b/internal/store/reports_store_integration_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/google/uuid" + "github.com/scaledtest/scaledtest/internal/integration" "github.com/scaledtest/scaledtest/internal/model" "github.com/scaledtest/scaledtest/internal/store" @@ -19,11 +21,12 @@ func TestReportsStore_CreateWithResults_BulkInsert(t *testing.T) { teamID := tdb.CreateTeam(t, "reports-bulk-test-team") s := store.NewReportsStore(tdb.Pool) + reportID := uuid.New().String() summary, _ := json.Marshal(map[string]int{"tests": 100, "passed": 90, "failed": 8, "skipped": 2}) raw, _ := json.Marshal(map[string]interface{}{"results": map[string]interface{}{"tool": map[string]interface{}{"name": "jest"}}}) params := store.CreateReportParams{ - ID: "rpt-bulk-001", + ID: reportID, TeamID: teamID, ToolName: "jest", ToolVersion: "1.0.0", @@ -42,7 +45,7 @@ func TestReportsStore_CreateWithResults_BulkInsert(t *testing.T) { status = "skipped" } results[i] = model.TestResult{ - ReportID: "rpt-bulk-001", + ReportID: reportID, TeamID: teamID, Name: "test-bulk-" + string(rune('A'+i%26)) + string(rune('0'+i%10)), Status: status, @@ -55,7 +58,7 @@ func TestReportsStore_CreateWithResults_BulkInsert(t *testing.T) { t.Fatalf("CreateWithResults with 100 results: %v", err) } - rpt, found, err := s.GetReportAndResults(ctx, "rpt-bulk-001", teamID) + rpt, found, err := s.GetReportAndResults(ctx, reportID, teamID) if err != nil { t.Fatalf("GetReportAndResults: %v", err) } @@ -73,11 +76,12 @@ func TestReportsStore_CreateWithResults_BulkInsert_1000Results(t *testing.T) { teamID := tdb.CreateTeam(t, "reports-bulk-1k-test-team") s := store.NewReportsStore(tdb.Pool) + reportID := uuid.New().String() summary, _ := json.Marshal(map[string]int{"tests": 1000, "passed": 900, "failed": 50, "skipped": 50}) raw, _ := json.Marshal(map[string]interface{}{"results": map[string]interface{}{"tool": map[string]interface{}{"name": "jest"}}}) params := store.CreateReportParams{ - ID: "rpt-bulk-1k", + ID: reportID, TeamID: teamID, ToolName: "jest", ToolVersion: "1.0.0", @@ -96,7 +100,7 @@ func TestReportsStore_CreateWithResults_BulkInsert_1000Results(t *testing.T) { status = "skipped" } results[i] = model.TestResult{ - ReportID: "rpt-bulk-1k", + ReportID: reportID, TeamID: teamID, Name: "test-1k-" + string(rune('A'+i%26)) + string(rune('0'+i%10)) + string(rune('0'+i/10%10)), Status: status, @@ -109,7 +113,7 @@ func TestReportsStore_CreateWithResults_BulkInsert_1000Results(t *testing.T) { t.Fatalf("CreateWithResults with 1000 results: %v", err) } - rpt, found, err := s.GetReportAndResults(ctx, "rpt-bulk-1k", teamID) + rpt, found, err := s.GetReportAndResults(ctx, reportID, teamID) if err != nil { t.Fatalf("GetReportAndResults: %v", err) }