Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/api/jobartifacts/apitypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ type MatchedContent struct {
*ContentLineMatches `json:"line_matches,omitempty"`
}

// HasMatches returns true if there are any matches in the MatchedContent.
// This could get more complicated with more matcher types, but for now we only have line matches.
func (m MatchedContent) HasMatches() bool {
return m.ContentLineMatches != nil && len(m.ContentLineMatches.Matches) > 0
}

type ContentLineMatches struct {
// NOTE: limited per maxFileMatches, sets Truncated if file has more matches
Matches []ContentLineMatch `json:"matches"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/jobartifacts/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (m *Manager) artifactWorker(managerCtx context.Context) {
artLog.Debug("Received request from artifactChan")
response := artifactResponse{
artifactPath: request.artifactAttrs.Name,
artifact: request.query.getFileContentMatches(request.jobRunID, request.artifactAttrs),
artifact: request.query.getFileContentMatches(request.ctx, request.jobRunID, request.artifactAttrs),
}

expired = sendViaChannel(request.ctx, request.artifactsChan, response)
Expand Down
63 changes: 44 additions & 19 deletions pkg/api/jobartifacts/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type JobArtifactQuery struct {
JobRunIDs []int64
PathGlob string // A simple glob to match files in the artifact bucket for each queried run
ContentMatcher // An interface to match in the content of the files
// TODO: regex, jq, xpath support for matching content
// TODO: jq, xpath support for matching content
}

func (q *JobArtifactQuery) queryJobArtifacts(ctx context.Context, jobRunID int64, mgr *Manager, logger *log.Entry) (JobRun, error) {
Expand Down Expand Up @@ -170,42 +170,67 @@ func relativeArtifactPath(bucketPath, jobRunID string) string {
return bucketPath[start+len(marker):]
}

func (q *JobArtifactQuery) getFileContentMatches(jobRunID int64, file *storage.ObjectAttrs) (artifact JobRunArtifact) {
func (q *JobArtifactQuery) getFileContentMatches(ctx context.Context, jobRunID int64, attrs *storage.ObjectAttrs) (artifact JobRunArtifact) {
artifact.JobRunID = strconv.FormatInt(jobRunID, 10)
artifact.ArtifactPath = relativeArtifactPath(file.Name, artifact.JobRunID)
artifact.ArtifactContentType = file.ContentType
artifact.ArtifactURL = fmt.Sprintf(artifactURLFmt, util.GcsBucketRoot, file.Name)
artifact.ArtifactPath = relativeArtifactPath(attrs.Name, artifact.JobRunID)
artifact.ArtifactContentType = attrs.ContentType
artifact.ArtifactURL = fmt.Sprintf(artifactURLFmt, util.GcsBucketRoot, attrs.Name)
if q.ContentMatcher == nil { // no matching requested
return
}

gcsReader, err := q.GcsBucket.Object(file.Name).NewReader(context.Background())
reader, closer, err := OpenArtifactReader(ctx, q.GcsBucket.Object(attrs.Name), attrs.ContentType)
defer closer()
if err != nil {
artifact.Error = err.Error()
return
}
defer gcsReader.Close()

matches, err := q.ContentMatcher.GetMatches(reader)
if err != nil {
artifact.Error = err.Error()
}
artifact.MatchedContent = matches // even if scanning hit an error, we may still want to see incomplete matches
return
}

// OpenArtifactReader opens a reader on an artifact, transparently handling compressed archives.
// In addition to the reader, it returns a closer function which can and should be called in a defer -
// regardless of whether there was an error.
func OpenArtifactReader(ctx context.Context, file *storage.ObjectHandle, contentType string) (*bufio.Reader, func(), error) {
var gcsReader *storage.Reader
var gzipReader *gzip.Reader

var reader *bufio.Reader
if file.ContentType == "application/gzip" {
closer := func() {
if gzipReader != nil {
_ = gzipReader.Close()
}
if gcsReader != nil {
_ = gcsReader.Close()
}
}
var err error

gcsReader, err = file.NewReader(ctx)
if err != nil {
gcsReader = nil // will not need closing
return nil, closer, err
}

if contentType == "application/gzip" {
// if it's gzipped, decompress it in the stream
gzipReader, err := gzip.NewReader(gcsReader)
gzipReader, err = gzip.NewReader(gcsReader)
if err != nil {
artifact.Error = err.Error()
return
gzipReader = nil // will not need closing
return nil, closer, err
}
defer gzipReader.Close()
reader = bufio.NewReader(gzipReader)
} else { // just read it as a normal text file
} else { // otherwise read it as a normal text file
reader = bufio.NewReader(gcsReader)
}

matches, err := q.ContentMatcher.GetMatches(reader)
if err != nil {
artifact.Error = err.Error()
}
artifact.MatchedContent = matches // even if scanning hit an error, we may still want to see incomplete matches
return
return reader, closer, nil
}

// ContentMatcher is a generic interface for matching content in artifact files
Expand Down
7 changes: 4 additions & 3 deletions pkg/api/jobartifacts/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,16 @@ func TestFunctional_FilterFiles(t *testing.T) {

func TestFunctional_FilterContent(t *testing.T) {
const filePath = "logs/periodic-ci-openshift-release-master-ci-4.19-e2e-azure-ovn/1898704060324777984/artifacts/e2e-azure-ovn/gather-extra/build-log.txt"
ctx := context.Background()

query := baseTestingJAQ(t, "", NewStringMatcher("ClusterVersion:", 0, 0, maxFileMatches))
artifact := query.getFileContentMatches(1898704060324777984, &storage.ObjectAttrs{Name: filePath})
artifact := query.getFileContentMatches(ctx, 1898704060324777984, &storage.ObjectAttrs{Name: filePath})
assert.Empty(t, artifact.Error)
assert.False(t, artifact.MatchedContent.ContentLineMatches.Truncated, "expected no need for truncating the content matches")
assert.Equal(t, 2, len(artifact.MatchedContent.ContentLineMatches.Matches), "expected content to match with two lines")

query.ContentMatcher = NewStringMatcher("error:", 0, 0, maxFileMatches)
artifact = query.getFileContentMatches(1898704060324777984, &storage.ObjectAttrs{Name: filePath})
artifact = query.getFileContentMatches(ctx, 1898704060324777984, &storage.ObjectAttrs{Name: filePath})
assert.Empty(t, artifact.Error)
assert.True(t, artifact.MatchedContent.ContentLineMatches.Truncated, "expected to truncate content matches")
assert.Equal(t, maxFileMatches, len(artifact.MatchedContent.ContentLineMatches.Matches), "expected content to match with many lines")
Expand All @@ -65,7 +66,7 @@ func TestFunctional_GzipContent(t *testing.T) {
const filePath = "logs/periodic-ci-openshift-release-master-ci-4.19-e2e-aws-ovn-techpreview/1909930323508989952/artifacts/e2e-aws-ovn-techpreview/gather-extra/artifacts/nodes/ip-10-0-59-177.us-east-2.compute.internal/journal"

query := baseTestingJAQ(t, "", NewStringMatcher("error", 0, 0, maxFileMatches))
artifact := query.getFileContentMatches(1909930323508989952, &storage.ObjectAttrs{Name: filePath, ContentType: "application/gzip"})
artifact := query.getFileContentMatches(context.Background(), 1909930323508989952, &storage.ObjectAttrs{Name: filePath, ContentType: "application/gzip"})
assert.Empty(t, artifact.Error)
assert.True(t, artifact.MatchedContent.ContentLineMatches.Truncated, "expected a lot of matches")
assert.Contains(t,
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/jobrunscan/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

var (
validIdentifierRegex = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`)
ValidIdentifierRegex = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`)
wordCharRegex = regexp.MustCompile(`[a-zA-Z0-9]+`) // no underscores or non-word
removeLeadingRegex = regexp.MustCompile(`^\d+`)
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/jobrunscan/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func validateLabel(label jobrunscan.Label) error {
if label.LabelTitle == "" {
return fmt.Errorf("label_title is required for a label")
}
if !validIdentifierRegex.MatchString(label.ID) {
if !ValidIdentifierRegex.MatchString(label.ID) {
return fmt.Errorf("invalid id for a label: %s", label.ID)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/api/jobrunscan/symptoms.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// validateSymptom ensures the Symptom record coming into the API appears valid.
func validateSymptom(dbc *gorm.DB, symptom jobrunscan.Symptom) error {
if !validIdentifierRegex.MatchString(symptom.ID) {
if !ValidIdentifierRegex.MatchString(symptom.ID) {
return fmt.Errorf("invalid id for a symptom: %s", symptom.ID)
}

Expand Down
64 changes: 37 additions & 27 deletions pkg/componentreadiness/jobrunannotator/jobrunannotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/openshift/sippy/pkg/apis/cache"
bqclient "github.com/openshift/sippy/pkg/bigquery"
"github.com/openshift/sippy/pkg/db"
"github.com/openshift/sippy/pkg/db/models"
"github.com/openshift/sippy/pkg/util"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -173,15 +174,6 @@ type jobRun struct {
URL string `bigquery:"prowjob_url"`
}

type jobRunAnnotation struct {
ID string `bigquery:"prowjob_build_id"`
StartTime civil.DateTime `bigquery:"prowjob_start"`
Label string `bigquery:"label"`
Comment string `bigquery:"comment"`
User bigquery.NullString `bigquery:"user"`
url string
}

func (j JobRunAnnotator) getJobRunsFromBigQuery(ctx context.Context) (map[int64]jobRun, error) { //lint:ignore
now := time.Now()
var jobRuns map[int64]jobRun
Expand Down Expand Up @@ -392,13 +384,13 @@ func (j JobRunAnnotator) filterJobRunByArtifact(ctx context.Context, jobRunIDs [
}

// bulkInsertVariants inserts all new job variants in batches.
func (j JobRunAnnotator) bulkInsertJobRunAnnotations(ctx context.Context, inserts []jobRunAnnotation) error {
func (j JobRunAnnotator) bulkInsertJobRunAnnotations(ctx context.Context, inserts []models.JobRunLabel) error {
var batchSize = 500

if !j.execute {
jobsStr := ""
for _, jobRun := range inserts {
jobsStr += fmt.Sprintf("StartTime: %v; URL: %s\n", jobRun.StartTime, jobRun.url)
jobsStr += fmt.Sprintf("StartTime: %v; URL: %s\n", jobRun.StartTime, jobRun.URL)
}
log.Infof("\n===========================================================\nDry run mode enabled\nBulk inserting\n%s\n\nTo write the label to DB, please use --execute argument\n", jobsStr)
return nil
Expand All @@ -421,16 +413,29 @@ func (j JobRunAnnotator) bulkInsertJobRunAnnotations(ctx context.Context, insert
return nil
}

// LabelComment is what gets serialized in the DB to provide context for applying the label.
// each tool that produces job_labels can specify whatever context is relevant for it;
// but each should do so in a json object with a unique key for its own schema.
type LabelComment struct {
Comment string `json:"comment"`
V1 JobRunAnnotator `json:"job_run_annotator_v1"`
}

func (j JobRunAnnotator) generateComment() string {
comment := j.comment
annotatorComment, err := json.MarshalIndent(j, "", " ")
if err == nil {
comment += fmt.Sprintf("\nAnnotator\n%s", annotatorComment)
comment := LabelComment{
Comment: j.comment, // apparently separated out in order to stand out from the object
V1: j,
}
str, err := json.MarshalIndent(comment, "", " ")
if err != nil {
log.WithError(err).Error("error generating JobAnnotator comment")
// fallback comment that will not parse as json, but will be visible in the DB
return j.comment + "\nError generating JobAnnotator comment: " + err.Error()
}
return comment
return string(str)
}

func (j JobRunAnnotator) getJobRunAnnotationsFromBigQuery(ctx context.Context) (map[int64]jobRunAnnotation, error) {
func (j JobRunAnnotator) getJobRunAnnotationsFromBigQuery(ctx context.Context) (map[int64]models.JobRunLabel, error) {
now := time.Now()
queryStr := fmt.Sprintf(`
SELECT
Expand All @@ -443,7 +448,7 @@ func (j JobRunAnnotator) getJobRunAnnotationsFromBigQuery(ctx context.Context) (
q := j.bqClient.BQ.Query(queryStr)

errs := []error{}
result := make(map[int64]jobRunAnnotation)
result := make(map[int64]models.JobRunLabel)
log.Debugf("Fetching job run annotations with:\n%s\n", q.Q)

it, err := q.Read(ctx)
Expand All @@ -453,7 +458,7 @@ func (j JobRunAnnotator) getJobRunAnnotationsFromBigQuery(ctx context.Context) (
}

for {
row := jobRunAnnotation{}
row := models.JobRunLabel{}
err := it.Next(&row)
if errors.Is(err, iterator.Done) {
break
Expand Down Expand Up @@ -486,25 +491,30 @@ func (j JobRunAnnotator) getJobRunAnnotationsFromBigQuery(ctx context.Context) (
}

func (j JobRunAnnotator) annotateJobRuns(ctx context.Context, jobRunIDs []int64, jobRuns map[int64]jobRun) error {
jobRunAnnotations := make([]jobRunAnnotation, 0, len(jobRunIDs))
jobRunAnnotations := make([]models.JobRunLabel, 0, len(jobRunIDs))
existingAnnotations, err := j.getJobRunAnnotationsFromBigQuery(ctx)
if err != nil {
return err
}
log.Infof("Found %d existing job run annotations.", len(existingAnnotations))
now := civil.DateTimeOf(time.Now())
for _, jobRunID := range jobRunIDs {
if jobRun, ok := jobRuns[jobRunID]; ok {
// Skip if the same label already exists
if annotation, existing := existingAnnotations[jobRunID]; existing && annotation.Label == j.Label {
continue
}
jobRunAnnotations = append(jobRunAnnotations, jobRunAnnotation{
ID: jobRun.ID,
StartTime: jobRun.StartTime,
Label: j.Label,
Comment: j.generateComment(),
User: bigquery.NullString{Valid: true, StringVal: j.user},
url: jobRun.URL,
jobRunAnnotations = append(jobRunAnnotations, models.JobRunLabel{
ID: jobRun.ID,
StartTime: jobRun.StartTime,
Label: j.Label,
Comment: j.generateComment(),
User: j.user,
CreatedAt: now,
UpdatedAt: now,
SourceTool: "sippy annotate-job-runs",
SymptomID: "", // Empty for manual annotations; will be populated when symptom detection is implemented
URL: jobRun.URL,
})
}
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/db/models/job_labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package models

import (
"cloud.google.com/go/civil"
)

// JobRunLabel represents a label applied to a job run, stored in BigQuery's job_labels table.
type JobRunLabel struct {
ID string `bigquery:"prowjob_build_id"`
StartTime civil.DateTime `bigquery:"prowjob_start"`
Label string `bigquery:"label"`
Comment string `bigquery:"comment"`
User string `bigquery:"user"`
CreatedAt civil.DateTime `bigquery:"created_at"`
UpdatedAt civil.DateTime `bigquery:"updated_at"`
SourceTool string `bigquery:"source_tool"`
SymptomID string `bigquery:"symptom_id"`
URL string `bigquery:"-"`
}

// [2025-12-22] To update the BigQuery table schema, use:
//
// bq update <project>:<dataset>.job_labels \
// created_at:DATETIME,updated_at:DATETIME,source_tool:STRING,symptom_id:STRING
//
// Or via SQL:
//
// ALTER TABLE `<project>.<dataset>.job_labels`
// ADD COLUMN IF NOT EXISTS created_at DATETIME,
// ADD COLUMN IF NOT EXISTS updated_at DATETIME,
// ADD COLUMN IF NOT EXISTS source_tool STRING,
// ADD COLUMN IF NOT EXISTS symptom_id STRING;
Comment on lines +21 to +32
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use this to update the schema before this merges.
just in case, best to make a copy of the table first.

Loading