Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 107 additions & 25 deletions pkg/dataloader/prowloader/prow.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -22,6 +21,7 @@ import (
"github.com/jackc/pgtype"
"github.com/lib/pq"
"github.com/openshift/sippy/pkg/bigquery/bqlabel"
"github.com/openshift/sippy/pkg/db/partitions"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -167,11 +167,64 @@ func (pl *ProwLoader) Errors() []error {
return pl.errors
}

// PartitionManagementConfig defines partition lifecycle settings for a table
type PartitionManagementConfig struct {
TableName string // Name of the partitioned table
FuturePartitionWindow time.Duration // How far in the future to create partitions
DetachAfter int // Detach partitions older than this many days
DropDetachedAfter int // Drop detached partitions older than this many days
InitialLookbackDays int // Days to look back when initializing a new table
}

var partitionConfigs = []PartitionManagementConfig{
{
TableName: "test_analysis_by_job_by_dates",
FuturePartitionWindow: 48 * time.Hour,
DetachAfter: 90,
DropDetachedAfter: 100,
InitialLookbackDays: 15,
},
{
TableName: "prow_job_run_tests",
FuturePartitionWindow: 48 * time.Hour,
DetachAfter: 90,
DropDetachedAfter: 100,
InitialLookbackDays: 15,
},
}

func (pl *ProwLoader) updatePartitions(config PartitionManagementConfig) error {
err := pl.agePartitions(config)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("error aging %s", config.TableName))
}

err = pl.preparePartitions(config)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("error preparing %s", config.TableName))
}

return nil
}

func (pl *ProwLoader) Load() {
start := time.Now()

log.Infof("started loading prow jobs to DB...")

for _, config := range partitionConfigs {
err := pl.updatePartitions(config)
if err != nil {
pl.errors = append(pl.errors, err)

// if we have errors with partition management we can't be sure that we have created
// the necessary partitions to proceed with loading
// we could possibly differentiate between removing old and creating new but for now
// any failures here block any loading
return
}
}

// Update unmerged PR statuses in case any have merged
if err := pl.syncPRStatus(); err != nil {
pl.errors = append(pl.errors, errors.Wrap(err, "error in syncPRStatus"))
Expand Down Expand Up @@ -331,19 +384,62 @@ func DaysBetween(start, end time.Time) []string {
return days
}

// NextDay takes a date string in YYYY-MM-DD format and returns the date string for the following day.
func NextDay(dateStr string) (string, error) {
// Parse the input date string
date, err := time.Parse("2006-01-02", dateStr)
// agePartitions detaches and drops old partitions based on configuration
func (pl *ProwLoader) agePartitions(config PartitionManagementConfig) error {
detached, err := partitions.DetachOldPartitions(pl.dbc, config.TableName, config.DetachAfter, false)
if err != nil {
log.WithError(err).Errorf("error detaching partitions for %s", config.TableName)
return err
}
log.Infof("detached %d partitions from %s", detached, config.TableName)
dropped, err := partitions.DropOldDetachedPartitions(pl.dbc, config.TableName, config.DropDetachedAfter, false)
if err != nil {
log.WithError(err).Errorf("error dropping detached partitions for %s", config.TableName)
return err
}
log.Infof("dropped %d detached partitions from %s", dropped, config.TableName)

return nil
}

// preparePartitions creates missing partitions for future data based on configuration
func (pl *ProwLoader) preparePartitions(config PartitionManagementConfig) error {
log.Infof("preparing partitions for %s", config.TableName)
stats, err := partitions.GetAttachedPartitionStats(pl.dbc, config.TableName)

if err != nil {
return "", fmt.Errorf("invalid date format: %v", err)
log.WithError(err).Errorf("error getting partition stats for %s", config.TableName)
return err
}
fmt.Printf(" Total: %d partitions (%s)\n", stats.TotalPartitions, stats.TotalSizePretty)

// When initializing a new table, look back the configured number of days
oldestDate := time.Now().Add(-time.Duration(config.InitialLookbackDays) * 24 * time.Hour)
if stats.TotalPartitions > 0 {

// Add one day to the parsed date
nextDay := date.Add(24 * time.Hour)
var startRange, endRange string
if stats.OldestDate.Valid {
startRange = stats.OldestDate.Time.Format("2006-01-02")
oldestDate = stats.OldestDate.Time
}
if stats.NewestDate.Valid {
endRange = stats.OldestDate.Time.Format("2006-01-02")
}
fmt.Printf(" Range: %s to %s\n",
startRange,
endRange)

// Format the next day back to YYYY-MM-DD
return nextDay.Format("2006-01-02"), nil
}

futureDate := time.Now().Add(config.FuturePartitionWindow)
created, err := partitions.CreateMissingPartitions(pl.dbc, config.TableName, oldestDate, futureDate, false)
if err != nil {
log.WithError(err).Errorf("error creating partitions for %s", config.TableName)
return err
}

log.Infof("created %d partitions for %s", created, config.TableName)
return nil
}

// loadDailyTestAnalysisByJob loads test analysis data into partitioned tables in postgres, one per
Expand Down Expand Up @@ -382,21 +478,6 @@ func (pl *ProwLoader) loadDailyTestAnalysisByJob(ctx context.Context) error {
dLog := log.WithField("date", dateToImport)

dLog.Infof("Loading test analysis by job daily summaries")
nextDay, err := NextDay(dateToImport)
if err != nil {
return errors.Wrapf(err, "error parsing next day from %s", dateToImport)
}

// create a partition for this date
partitionSQL := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS test_analysis_by_job_by_dates_%s PARTITION OF test_analysis_by_job_by_dates
FOR VALUES FROM ('%s') TO ('%s');`, strings.ReplaceAll(dateToImport, "-", "_"), dateToImport, nextDay)
dLog.Info(partitionSQL)

if res := pl.dbc.DB.Exec(partitionSQL); res.Error != nil {
log.WithError(res.Error).Error("error creating partition")
return res.Error
}
dLog.Warnf("partition created for releases %v", pl.releases)

q := pl.bigQueryClient.Query(ctx, bqlabel.ProwLoaderTestAnalysis, fmt.Sprintf(`WITH
deduped_testcases AS (
Expand Down Expand Up @@ -1241,6 +1322,7 @@ func (pl *ProwLoader) extractTestCases(suite *junit.TestSuite, suiteID *uint, te
continue
}

// interesting that we rely on created_at here which is when we imported the test, not when the test ran
testCases[testCacheKey] = &models.ProwJobRunTest{
TestID: testID,
SuiteID: suiteID,
Expand Down
Loading