Skip to content
Merged
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ All notable changes to this project will be documented here.

- **Bulk test result inserts**: Report ingestion now uses `pgx.Batch` to insert test results in bulk instead of one query per result. This eliminates the N+1 insert pattern that caused 1000+ round-trips for large reports.

### Added

- **Sharding duration-balanced strategy activated**: `DurationStore.UpsertFromResults()` is now called during report ingestion (inside the same transaction), populating the `test_duration_history` table that the `duration_balanced` sharding strategy depends on. Previously, this strategy always fell back to no-history defaults because the duration store was never written to.

- **Targeted duration query**: `GET /api/v1/sharding/durations/{testName}` now uses a `WHERE team_id = $1 AND test_name = $2` SQL query instead of fetching all team durations and filtering in Go (O(1) DB query vs O(N) in-memory scan). The endpoint returns a JSON array of duration entries across all suites for the named test, or 404 if no history exists.

- **Composite-key duration map**: `DurationStore.GetByTeamMap` now uses composite keys (`testName\x00suite`) to preserve entries for the same test name across different suites, preventing data loss in `EnrichWithHistory`.

- **DurationStore integration tests**: Added comprehensive integration tests for `DurationStore` covering `GetByTeam`, `GetByTeamAndTest`, `UpsertFromResults` (insert, rolling average, within-transaction, transaction rollback, empty input), `GetBySuite`, `GetByTeamMap`, p95 conflict behavior, and same-name-different-suite scenarios.

- **Sharding API endpoints documented**: Added `POST /api/v1/sharding/plan`, `POST /api/v1/sharding/rebalance`, `GET /api/v1/sharding/durations`, and `GET /api/v1/sharding/durations/{testName}` to the README Key Endpoints table.

### Fixed

- **IDOR vulnerability in invitation handlers**: `Create`, `List`, and `Revoke` invitation endpoints (`POST/GET/DELETE /api/v1/teams/{teamID}/invitations`) now verify that the authenticated user's team matches the URL `teamID` before checking role permissions. Previously, any maintainer or owner could list, create, or revoke invitations for any team regardless of membership.
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ Tokens are prefixed `inv_`, valid for **7 days**, and stored as SHA-256 hashes.
| `POST` | `/api/v1/auth/change-password` | Change password |
| `GET` | `/api/v1/admin/users` | List all users (owner only) |
| `GET` | `/api/v1/admin/audit-log` | Paginated audit log (`?limit=&offset=&action=`) (owner only) |
| `POST` | `/api/v1/sharding/plan` | Create a shard plan (duration-balanced or count-based) |
| `POST` | `/api/v1/sharding/rebalance` | Rebalance after worker failure |
| `GET` | `/api/v1/sharding/durations` | List all duration history for the team |
| `GET` | `/api/v1/sharding/durations/{testName}` | Get duration history for a specific test (returns array, 404 if none) |
| `GET` | `/ws/executions` | WebSocket for live updates |

### Quality Gate Rules
Expand Down
4 changes: 4 additions & 0 deletions internal/handler/reports.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type ReportsHandler struct {
BaseURL string
TriageStore triageAccessor
TriageEnqueuer triage.Enqueuer
DurationStore *store.DurationStore
AllowBackdate bool
}

Expand Down Expand Up @@ -200,6 +201,7 @@ func (h *ReportsHandler) Create(w http.ResponseWriter, r *http.Request) {
Raw: rawJSON,
CreatedAt: now,
TriageGitHubStatus: triageGitHubStatus,
DurationStore: h.DurationStore,
}
if err := h.ReportStore.CreateWithResults(r.Context(), params, results); err != nil {
if err == pgx.ErrNoRows {
Expand All @@ -210,6 +212,8 @@ func (h *ReportsHandler) Create(w http.ResponseWriter, r *http.Request) {
return
}

// Enqueue async triage — non-blocking, best-effort. Must be called after
// the transaction commits so the triage job can read the persisted rows.
if h.TriageEnqueuer != nil {
h.TriageEnqueuer.Enqueue(claims.TeamID, reportID)
}
Expand Down
15 changes: 7 additions & 8 deletions internal/handler/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (h *ShardingHandler) CreatePlan(w http.ResponseWriter, r *http.Request) {
// RebalanceRequest is the request body for rebalancing after worker failure.
type RebalanceRequest struct {
ExecutionID string `json:"execution_id" validate:"required"`
FailedWorkerID string `json:"failed_worker_id" validate:"required"`
FailedWorkerID string `json:"failed_worker_id" validate:"required"`
CurrentPlan model.ShardPlan `json:"current_plan" validate:"required"`
CompletedTests []string `json:"completed_tests"`
}
Expand Down Expand Up @@ -155,6 +155,7 @@ func (h *ShardingHandler) ListDurations(w http.ResponseWriter, r *http.Request)
}

// GetDuration handles GET /api/v1/sharding/durations/{testName}.
// Always returns a JSON array of duration entries for consistent API shape.
func (h *ShardingHandler) GetDuration(w http.ResponseWriter, r *http.Request) {
claims := auth.GetClaims(r.Context())
if claims == nil {
Expand All @@ -173,18 +174,16 @@ func (h *ShardingHandler) GetDuration(w http.ResponseWriter, r *http.Request) {
return
}

durations, err := h.DurationStore.GetByTeam(r.Context(), claims.TeamID)
durations, err := h.DurationStore.GetByTeamAndTest(r.Context(), claims.TeamID, testName)
if err != nil {
Error(w, http.StatusInternalServerError, "failed to query durations")
return
}

for _, d := range durations {
if d.TestName == testName {
JSON(w, http.StatusOK, d)
return
}
if len(durations) == 0 {
Error(w, http.StatusNotFound, "no duration history for test")
return
}

Error(w, http.StatusNotFound, "no duration history for test")
JSON(w, http.StatusOK, durations)
}
5 changes: 3 additions & 2 deletions internal/integration/reports_compare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
"github.com/scaledtest/scaledtest/internal/auth"
"github.com/scaledtest/scaledtest/internal/db"
"github.com/scaledtest/scaledtest/internal/handler"
"github.com/scaledtest/scaledtest/internal/store"
"github.com/scaledtest/scaledtest/internal/testutil"
)

// postCompare calls ReportsHandler.Compare via httptest with the given report IDs.
func postCompare(t *testing.T, pool *db.Pool, teamID, baseID, headID string) *httptest.ResponseRecorder {
t.Helper()
h := &handler.ReportsHandler{
DB: pool,
ReportStore: store.NewReportsStore(pool),
}
req := httptest.NewRequest(http.MethodGet,
fmt.Sprintf("/api/v1/reports/compare?base=%s&head=%s", baseID, headID),
Expand All @@ -49,7 +50,7 @@ func postCompare(t *testing.T, pool *db.Pool, teamID, baseID, headID string) *ht
//
// Before the fix, the fetchResults query scanned nullable TEXT columns directly
// into string destinations. pgx v5 returns "cannot scan NULL into *string" for
// NULL TEXT values, causing a 500. The fix uses COALESCE to convert NULL to ''.
// NULL TEXT values, causing a 500. The fix uses COALESCE to convert NULL to .
//
// Given: two reports whose test results have no message, trace, file_path, or suite
// When: the Compare endpoint is called with those report IDs
Expand Down
1 change: 1 addition & 0 deletions internal/integration/testdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ var truncateTables = []string{
"triage_failure_classifications",
"triage_clusters",
"triage_results",
"test_duration_history",
"test_results",
"test_reports",
"test_executions",
Expand Down
1 change: 1 addition & 0 deletions internal/server/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func NewRouter(cfg *config.Config, pool ...*db.Pool) http.Handler {
GitHubStatusPoster: ghClient,
BaseURL: cfg.BaseURL,
TriageEnqueuer: triageEnqueuer,
DurationStore: durStore,
AllowBackdate: cfg.DisableRateLimit,
}
if dbPool != nil {
Expand Down
31 changes: 25 additions & 6 deletions internal/shard/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,21 +240,40 @@ func suiteGrouped(groups []testGroup, numWorkers int) []model.Shard {
return durationBalanced(suiteGroups, numWorkers)
}

// durationAggregate holds per-test-name totals built from the composite-keyed history map.
type durationAggregate struct {
totalDurationMs int64
suite string
}

// EnrichWithHistory takes a list of test names and enriches them with
// historical duration data. Tests without history get the default estimate.
// The history map uses composite keys "testName\x00suite" to preserve entries
// across different suites. For each test name, we aggregate durations across
// all suites (summing avg_duration_ms).
func EnrichWithHistory(testNames []string, history map[string]*model.TestDurationHistory) []TestInfo {
agg := make(map[string]*durationAggregate, len(testNames))
for _, h := range history {
a := agg[h.TestName]
if a == nil {
agg[h.TestName] = &durationAggregate{totalDurationMs: h.AvgDurationMs, suite: h.Suite}
} else {
a.totalDurationMs += h.AvgDurationMs
if h.Suite < a.suite {
a.suite = h.Suite
}
}
}

tests := make([]TestInfo, len(testNames))
for i, name := range testNames {
tests[i] = TestInfo{
Name: name,
EstDurationMs: DefaultEstDurationMs,
}
if h, ok := history[name]; ok {
tests[i].EstDurationMs = h.AvgDurationMs
tests[i].Suite = h.Suite
if tests[i].EstDurationMs <= 0 {
tests[i].EstDurationMs = DefaultEstDurationMs
}
if a, ok := agg[name]; ok && a.totalDurationMs > 0 {
tests[i].EstDurationMs = a.totalDurationMs
tests[i].Suite = a.suite
}
}
return tests
Expand Down
24 changes: 22 additions & 2 deletions internal/shard/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ func TestPlan_MoreWorkersThanTests(t *testing.T) {
func TestEnrichWithHistory(t *testing.T) {
names := []string{"test-a", "test-b", "test-c"}
history := map[string]*model.TestDurationHistory{
"test-a": {AvgDurationMs: 1500, Suite: "unit"},
"test-c": {AvgDurationMs: 0, Suite: "e2e"}, // 0 should get default
"test-a\x00unit": {TestName: "test-a", AvgDurationMs: 1500, Suite: "unit"},
"test-c\x00e2e": {TestName: "test-c", AvgDurationMs: 0, Suite: "e2e"},
}

enriched := EnrichWithHistory(names, history)
Expand All @@ -258,6 +258,26 @@ func TestEnrichWithHistory(t *testing.T) {
}
}

func TestEnrichWithHistory_SameNameDifferentSuites_DeterministicSuite(t *testing.T) {
names := []string{"test-a"}
history := map[string]*model.TestDurationHistory{
"test-a\x00unit": {TestName: "test-a", AvgDurationMs: 100, Suite: "unit"},
"test-a\x00integration": {TestName: "test-a", AvgDurationMs: 200, Suite: "integration"},
}

enriched := EnrichWithHistory(names, history)

if len(enriched) != 1 {
t.Fatalf("len(enriched) = %d, want 1", len(enriched))
}
if enriched[0].EstDurationMs != 300 {
t.Errorf("test-a duration across suites: got %d, want 300 (100+200)", enriched[0].EstDurationMs)
}
if enriched[0].Suite != "integration" {
t.Errorf("test-a suite: got %q, want alphabetically-first %q", enriched[0].Suite, "integration")
}
}

func TestRebalance_FailedWorker(t *testing.T) {
plan := &model.ShardPlan{
Shards: []model.Shard{
Expand Down
89 changes: 60 additions & 29 deletions internal/store/durations.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,20 @@ import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/scaledtest/scaledtest/internal/model"
)

// DBTX is the common interface satisfied by pgxpool.Pool and pgx.Tx.
type DBTX interface {
Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) pgx.Row
}

// DurationStore handles test duration history persistence.
type DurationStore struct {
pool *pgxpool.Pool
Expand All @@ -21,57 +30,71 @@ func NewDurationStore(pool *pgxpool.Pool) *DurationStore {

// GetByTeam returns all duration history entries for a team.
func (s *DurationStore) GetByTeam(ctx context.Context, teamID string) ([]model.TestDurationHistory, error) {
rows, err := s.pool.Query(ctx,
return queryDurations(ctx, s.pool,
`SELECT id, team_id, test_name, suite, avg_duration_ms, p95_duration_ms,
min_duration_ms, max_duration_ms, run_count, last_status, updated_at, created_at
FROM test_duration_history
WHERE team_id = $1
ORDER BY test_name`, teamID)
if err != nil {
return nil, fmt.Errorf("query duration history: %w", err)
}
defer rows.Close()
}

var results []model.TestDurationHistory
for rows.Next() {
var d model.TestDurationHistory
if err := rows.Scan(
&d.ID, &d.TeamID, &d.TestName, &d.Suite,
&d.AvgDurationMs, &d.P95DurationMs, &d.MinDurationMs, &d.MaxDurationMs,
&d.RunCount, &d.LastStatus, &d.UpdatedAt, &d.CreatedAt,
); err != nil {
return nil, fmt.Errorf("scan duration history: %w", err)
}
results = append(results, d)
}
return results, rows.Err()
// GetByTeamAndTest returns duration history entries for a specific test name within a team.
func (s *DurationStore) GetByTeamAndTest(ctx context.Context, teamID, testName string) ([]model.TestDurationHistory, error) {
return queryDurations(ctx, s.pool,
`SELECT id, team_id, test_name, suite, avg_duration_ms, p95_duration_ms,
min_duration_ms, max_duration_ms, run_count, last_status, updated_at, created_at
FROM test_duration_history
WHERE team_id = $1 AND test_name = $2
ORDER BY suite`, teamID, testName)
}

// DurationMapKey returns a unique key for a test duration entry combining
// test name and suite. This avoids collisions when the same test name appears
// in multiple suites — the database UNIQUE constraint is (team_id, test_name, suite).
func DurationMapKey(testName, suite string) string {
return testName + "\x00" + suite
}

// GetByTeamMap returns duration history as a map keyed by test_name.
// GetByTeamMap returns duration history as a map keyed by composite "testName\x00suite".
// This preserves all entries including same-named tests across different suites.
func (s *DurationStore) GetByTeamMap(ctx context.Context, teamID string) (map[string]*model.TestDurationHistory, error) {
entries, err := s.GetByTeam(ctx, teamID)
if err != nil {
return nil, err
}
m := make(map[string]*model.TestDurationHistory, len(entries))
for i := range entries {
m[entries[i].TestName] = &entries[i]
m[DurationMapKey(entries[i].TestName, entries[i].Suite)] = &entries[i]
}
return m, nil
}

// UpsertFromResults updates duration history from a set of test results.
// Uses a rolling average: new_avg = ((old_avg * old_count) + new_duration) / (old_count + 1).
func (s *DurationStore) UpsertFromResults(ctx context.Context, teamID string, results []model.TestResult) error {
// When db is a pgx.Tx the caller is responsible for committing the transaction.
// When db is a pgxpool.Pool this method creates and commits its own transaction.
func (s *DurationStore) UpsertFromResults(ctx context.Context, teamID string, results []model.TestResult, db DBTX) error {
if len(results) == 0 {
return nil
}

tx, err := s.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
shouldCommit := false
var tx pgx.Tx
var err error

switch d := db.(type) {
case *pgxpool.Pool:
tx, err = d.Begin(ctx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
shouldCommit = true
case pgx.Tx:
tx = d
default:
return fmt.Errorf("UpsertFromResults: unsupported DBTX type %T", db)
}
defer tx.Rollback(ctx)

for _, r := range results {
_, err := tx.Exec(ctx,
Expand All @@ -81,6 +104,7 @@ func (s *DurationStore) UpsertFromResults(ctx context.Context, teamID string, re
ON CONFLICT (team_id, test_name, suite)
DO UPDATE SET
avg_duration_ms = (test_duration_history.avg_duration_ms * test_duration_history.run_count + $4) / (test_duration_history.run_count + 1),
p95_duration_ms = GREATEST(test_duration_history.p95_duration_ms, $4),
min_duration_ms = LEAST(test_duration_history.min_duration_ms, $4),
max_duration_ms = GREATEST(test_duration_history.max_duration_ms, $4),
run_count = test_duration_history.run_count + 1,
Expand All @@ -93,19 +117,26 @@ func (s *DurationStore) UpsertFromResults(ctx context.Context, teamID string, re
}
}

return tx.Commit(ctx)
if shouldCommit {
return tx.Commit(ctx)
}
return nil
}

// GetBySuite returns duration history for tests in a specific suite.
func (s *DurationStore) GetBySuite(ctx context.Context, teamID, suite string) ([]model.TestDurationHistory, error) {
rows, err := s.pool.Query(ctx,
return queryDurations(ctx, s.pool,
`SELECT id, team_id, test_name, suite, avg_duration_ms, p95_duration_ms,
min_duration_ms, max_duration_ms, run_count, last_status, updated_at, created_at
FROM test_duration_history
WHERE team_id = $1 AND suite = $2
ORDER BY avg_duration_ms DESC`, teamID, suite)
}

func queryDurations(ctx context.Context, db DBTX, query string, args ...interface{}) ([]model.TestDurationHistory, error) {
rows, err := db.Query(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("query duration by suite: %w", err)
return nil, fmt.Errorf("query duration history: %w", err)
}
defer rows.Close()

Expand All @@ -117,7 +148,7 @@ func (s *DurationStore) GetBySuite(ctx context.Context, teamID, suite string) ([
&d.AvgDurationMs, &d.P95DurationMs, &d.MinDurationMs, &d.MaxDurationMs,
&d.RunCount, &d.LastStatus, &d.UpdatedAt, &d.CreatedAt,
); err != nil {
return nil, fmt.Errorf("scan duration: %w", err)
return nil, fmt.Errorf("scan duration history: %w", err)
}
results = append(results, d)
}
Expand Down
Loading
Loading