From 518a582c73f558dfea0c8ad165ec2aa73263b710 Mon Sep 17 00:00:00 2001 From: Luke Meyer Date: Wed, 17 Dec 2025 15:48:22 -0500 Subject: [PATCH 1/7] =?UTF-8?q?=F0=9F=A4=96=20create=20sippyclient=20for?= =?UTF-8?q?=20accessing=20sippy=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Also, use it to implement such a client for symptoms. 🤖 Assisted by Claude Code --- pkg/api/jobrunscan/helpers.go | 2 +- pkg/api/jobrunscan/labels.go | 2 +- pkg/api/jobrunscan/symptoms.go | 2 +- pkg/sippyclient/client.go | 178 +++++++++++++++++++++++++ pkg/sippyclient/jobrunscan/symptoms.go | 88 ++++++++++++ 5 files changed, 269 insertions(+), 3 deletions(-) create mode 100644 pkg/sippyclient/client.go create mode 100644 pkg/sippyclient/jobrunscan/symptoms.go diff --git a/pkg/api/jobrunscan/helpers.go b/pkg/api/jobrunscan/helpers.go index 37c8dc341..b4b6b771a 100644 --- a/pkg/api/jobrunscan/helpers.go +++ b/pkg/api/jobrunscan/helpers.go @@ -10,7 +10,7 @@ import ( ) var ( - validIdentifierRegex = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`) + ValidIdentifierRegex = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`) wordCharRegex = regexp.MustCompile(`[a-zA-Z0-9]+`) // no underscores or non-word removeLeadingRegex = regexp.MustCompile(`^\d+`) ) diff --git a/pkg/api/jobrunscan/labels.go b/pkg/api/jobrunscan/labels.go index 41866e7ee..c19cd1439 100644 --- a/pkg/api/jobrunscan/labels.go +++ b/pkg/api/jobrunscan/labels.go @@ -18,7 +18,7 @@ func validateLabel(label jobrunscan.Label) error { if label.LabelTitle == "" { return fmt.Errorf("label_title is required for a label") } - if !validIdentifierRegex.MatchString(label.ID) { + if !ValidIdentifierRegex.MatchString(label.ID) { return fmt.Errorf("invalid id for a label: %s", label.ID) } diff --git a/pkg/api/jobrunscan/symptoms.go b/pkg/api/jobrunscan/symptoms.go index 18677bdae..f5943f8d7 100644 --- a/pkg/api/jobrunscan/symptoms.go +++ b/pkg/api/jobrunscan/symptoms.go @@ -14,7 +14,7 @@ import ( // validateSymptom ensures the Symptom record coming into the API appears valid. func validateSymptom(dbc *gorm.DB, symptom jobrunscan.Symptom) error { - if !validIdentifierRegex.MatchString(symptom.ID) { + if !ValidIdentifierRegex.MatchString(symptom.ID) { return fmt.Errorf("invalid id for a symptom: %s", symptom.ID) } diff --git a/pkg/sippyclient/client.go b/pkg/sippyclient/client.go new file mode 100644 index 000000000..61b2e24b7 --- /dev/null +++ b/pkg/sippyclient/client.go @@ -0,0 +1,178 @@ +package sippyclient + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +const ( + DefaultServerURL = "http://localhost:8080" +) + +// Client is a client for the Sippy API. It is intended to be imported by golang-based clients +// (e.g. cloud functions and cmdline tools) that need to access sippy APIs, +// which is why there may be no uses of these methods within sippy itself. +type Client struct { + BaseURL string + HTTPClient *http.Client + Token string +} + +// Option is a functional option for configuring the client +type Option func(*Client) + +// WithServerURL sets the server URL for the client +func WithServerURL(url string) Option { + return func(c *Client) { + c.BaseURL = strings.TrimSuffix(url, "/") + } +} + +// WithToken sets the authentication token for the client +func WithToken(token string) Option { + return func(c *Client) { + c.Token = token + } +} + +// WithHTTPClient sets a custom HTTP client +func WithHTTPClient(httpClient *http.Client) Option { + return func(c *Client) { + c.HTTPClient = httpClient + } +} + +// New creates a new Sippy API client +func New(opts ...Option) *Client { + client := &Client{ + BaseURL: DefaultServerURL, + HTTPClient: &http.Client{ + Timeout: 30 * time.Second, + }, + } + + for _, opt := range opts { + opt(client) + } + + return client +} + +// Get performs a GET request to the specified path and decodes the JSON response +func (c *Client) Get(ctx context.Context, path string, result interface{}) error { + url := c.BaseURL + path + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + if c.Token != "" { + req.Header.Set("Authorization", "Bearer "+c.Token) + } + req.Header.Set("Accept", "application/json") + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("failed to execute request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body)) + } + + if err := json.NewDecoder(resp.Body).Decode(result); err != nil { + return fmt.Errorf("failed to decode response: %w", err) + } + + return nil +} + +// Post performs a POST request to the specified path with the given body +func (c *Client) Post(ctx context.Context, path string, body, result interface{}) error { + return c.doJSONRequest(ctx, http.MethodPost, path, body, result) +} + +// Put performs a PUT request to the specified path with the given body +func (c *Client) Put(ctx context.Context, path string, body, result interface{}) error { + return c.doJSONRequest(ctx, http.MethodPut, path, body, result) +} + +// Delete performs a DELETE request to the specified path +func (c *Client) Delete(ctx context.Context, path string) error { + url := c.BaseURL + path + + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + if c.Token != "" { + req.Header.Set("Authorization", "Bearer "+c.Token) + } + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("failed to execute request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body)) + } + + return nil +} + +// doJSONRequest performs a request with a JSON body +func (c *Client) doJSONRequest(ctx context.Context, method, path string, body, result interface{}) error { + url := c.BaseURL + path + + var bodyReader io.Reader + if body != nil { + bodyBytes, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("failed to marshal request body: %w", err) + } + bodyReader = bytes.NewReader(bodyBytes) + } + + req, err := http.NewRequestWithContext(ctx, method, url, bodyReader) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + if c.Token != "" { + req.Header.Set("Authorization", "Bearer "+c.Token) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("failed to execute request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(respBody)) + } + + if result != nil { + if err := json.NewDecoder(resp.Body).Decode(result); err != nil { + return fmt.Errorf("failed to decode response: %w", err) + } + } + + return nil +} diff --git a/pkg/sippyclient/jobrunscan/symptoms.go b/pkg/sippyclient/jobrunscan/symptoms.go new file mode 100644 index 000000000..f28175898 --- /dev/null +++ b/pkg/sippyclient/jobrunscan/symptoms.go @@ -0,0 +1,88 @@ +package jobrunscan + +import ( + "context" + "fmt" + + api "github.com/openshift/sippy/pkg/api/jobrunscan" + "github.com/openshift/sippy/pkg/db/models/jobrunscan" + "github.com/openshift/sippy/pkg/sippyclient" +) + +// SymptomsClient provides methods for interacting with the symptoms API +type SymptomsClient struct { + client *sippyclient.Client +} + +// NewSymptomsClient creates a new symptoms client +func NewSymptomsClient(client *sippyclient.Client) *SymptomsClient { + return &SymptomsClient{ + client: client, + } +} + +// List retrieves all symptoms from the API +func (sc *SymptomsClient) List(ctx context.Context) ([]jobrunscan.Symptom, error) { + var symptoms []jobrunscan.Symptom + if err := sc.client.Get(ctx, "/api/jobs/symptoms", &symptoms); err != nil { + return nil, fmt.Errorf("failed to list symptoms: %w", err) + } + return symptoms, nil +} + +// Get retrieves a single symptom by ID +func (sc *SymptomsClient) Get(ctx context.Context, id string) (*jobrunscan.Symptom, error) { + if !api.ValidIdentifierRegex.MatchString(id) { + return nil, fmt.Errorf("invalid symptom ID: '%s'", id) + } + + var symptom jobrunscan.Symptom + path := fmt.Sprintf("/api/jobs/symptoms/%s", id) + if err := sc.client.Get(ctx, path, &symptom); err != nil { + return nil, fmt.Errorf("failed to get symptom %s: %w", id, err) + } + return &symptom, nil +} + +// Create creates a new symptom +func (sc *SymptomsClient) Create(ctx context.Context, symptom jobrunscan.Symptom) (*jobrunscan.Symptom, error) { + if !api.ValidIdentifierRegex.MatchString(symptom.ID) { + return nil, fmt.Errorf("invalid symptom ID: '%s'", symptom.ID) + } + + var result jobrunscan.Symptom + if err := sc.client.Post(ctx, "/api/jobs/symptoms", symptom, &result); err != nil { + return nil, fmt.Errorf("failed to create symptom: %w", err) + } + return &result, nil +} + +// Update updates an existing symptom +func (sc *SymptomsClient) Update(ctx context.Context, id string, symptom jobrunscan.Symptom) (*jobrunscan.Symptom, error) { + if !api.ValidIdentifierRegex.MatchString(id) { + return nil, fmt.Errorf("invalid lookup symptom ID: '%s'", id) + } + if !api.ValidIdentifierRegex.MatchString(symptom.ID) { + return nil, fmt.Errorf("invalid update symptom ID: '%s'", symptom.ID) + } + + var result jobrunscan.Symptom + path := fmt.Sprintf("/api/jobs/symptoms/%s", id) + if err := sc.client.Put(ctx, path, symptom, &result); err != nil { + return nil, fmt.Errorf("failed to update symptom %s: %w", id, err) + } + return &result, nil +} + +// Delete deletes a symptom by ID +func (sc *SymptomsClient) Delete(ctx context.Context, id string) error { + if !api.ValidIdentifierRegex.MatchString(id) { + return fmt.Errorf("invalid symptom ID: '%s'", id) + } + + path := fmt.Sprintf("/api/jobs/symptoms/%s", id) + if err := sc.client.Delete(ctx, path); err != nil { + return fmt.Errorf("failed to delete symptom %s: %w", id, err) + } + return nil +} From 846291aa264e384435d7b46afc33a631896550a7 Mon Sep 17 00:00:00 2001 From: Luke Meyer Date: Fri, 19 Dec 2025 19:02:45 -0500 Subject: [PATCH 2/7] jobartifacts: factor out OpenArtifactReader for reuse --- pkg/api/jobartifacts/query.go | 63 ++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 19 deletions(-) diff --git a/pkg/api/jobartifacts/query.go b/pkg/api/jobartifacts/query.go index 52c75e84f..6b848bca6 100644 --- a/pkg/api/jobartifacts/query.go +++ b/pkg/api/jobartifacts/query.go @@ -30,7 +30,7 @@ type JobArtifactQuery struct { JobRunIDs []int64 PathGlob string // A simple glob to match files in the artifact bucket for each queried run ContentMatcher // An interface to match in the content of the files - // TODO: regex, jq, xpath support for matching content + // TODO: jq, xpath support for matching content } func (q *JobArtifactQuery) queryJobArtifacts(ctx context.Context, jobRunID int64, mgr *Manager, logger *log.Entry) (JobRun, error) { @@ -170,42 +170,67 @@ func relativeArtifactPath(bucketPath, jobRunID string) string { return bucketPath[start+len(marker):] } -func (q *JobArtifactQuery) getFileContentMatches(jobRunID int64, file *storage.ObjectAttrs) (artifact JobRunArtifact) { +func (q *JobArtifactQuery) getFileContentMatches(jobRunID int64, attrs *storage.ObjectAttrs) (artifact JobRunArtifact) { artifact.JobRunID = strconv.FormatInt(jobRunID, 10) - artifact.ArtifactPath = relativeArtifactPath(file.Name, artifact.JobRunID) - artifact.ArtifactContentType = file.ContentType - artifact.ArtifactURL = fmt.Sprintf(artifactURLFmt, util.GcsBucketRoot, file.Name) + artifact.ArtifactPath = relativeArtifactPath(attrs.Name, artifact.JobRunID) + artifact.ArtifactContentType = attrs.ContentType + artifact.ArtifactURL = fmt.Sprintf(artifactURLFmt, util.GcsBucketRoot, attrs.Name) if q.ContentMatcher == nil { // no matching requested return } - gcsReader, err := q.GcsBucket.Object(file.Name).NewReader(context.Background()) + reader, closer, err := OpenArtifactReader(context.Background(), q.GcsBucket.Object(attrs.Name), attrs.ContentType) + defer closer() if err != nil { artifact.Error = err.Error() return } - defer gcsReader.Close() + + matches, err := q.ContentMatcher.GetMatches(reader) + if err != nil { + artifact.Error = err.Error() + } + artifact.MatchedContent = matches // even if scanning hit an error, we may still want to see incomplete matches + return +} + +// OpenArtifactReader opens a reader on an artifact, transparently handling compressed archives. +// In addition to the reader, it returns a closer function which can and should be called in a defer - +// regardless of whether there was an error. +func OpenArtifactReader(ctx context.Context, file *storage.ObjectHandle, contentType string) (*bufio.Reader, func(), error) { + var gcsReader *storage.Reader + var gzipReader *gzip.Reader var reader *bufio.Reader - if file.ContentType == "application/gzip" { + closer := func() { + if gzipReader != nil { + _ = gzipReader.Close() + } + if gcsReader != nil { + _ = gcsReader.Close() + } + } + var err error + + gcsReader, err = file.NewReader(ctx) + if err != nil { + gcsReader = nil // will not need closing + return nil, closer, err + } + + if contentType == "application/gzip" { // if it's gzipped, decompress it in the stream - gzipReader, err := gzip.NewReader(gcsReader) + gzipReader, err = gzip.NewReader(gcsReader) if err != nil { - artifact.Error = err.Error() - return + gzipReader = nil // will not need closing + return nil, closer, err } - defer gzipReader.Close() reader = bufio.NewReader(gzipReader) - } else { // just read it as a normal text file + } else { // otherwise read it as a normal text file reader = bufio.NewReader(gcsReader) } - matches, err := q.ContentMatcher.GetMatches(reader) - if err != nil { - artifact.Error = err.Error() - } - artifact.MatchedContent = matches // even if scanning hit an error, we may still want to see incomplete matches - return + return reader, closer, nil } // ContentMatcher is a generic interface for matching content in artifact files From 4e92c41af7aa5017634a79c2265f4edcbac81b57 Mon Sep 17 00:00:00 2001 From: Luke Meyer Date: Mon, 22 Dec 2025 09:18:59 -0500 Subject: [PATCH 3/7] jobartifacts: add MatchedContent.HasMatches() --- pkg/api/jobartifacts/apitypes.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/api/jobartifacts/apitypes.go b/pkg/api/jobartifacts/apitypes.go index 14c8d318a..b5f40e5bb 100644 --- a/pkg/api/jobartifacts/apitypes.go +++ b/pkg/api/jobartifacts/apitypes.go @@ -41,6 +41,12 @@ type MatchedContent struct { *ContentLineMatches `json:"line_matches,omitempty"` } +// HasMatches returns true if there are any matches in the MatchedContent. +// This could get more complicated with more matcher types, but for now we only have line matches. +func (m MatchedContent) HasMatches() bool { + return m.ContentLineMatches != nil && len(m.ContentLineMatches.Matches) > 0 +} + type ContentLineMatches struct { // NOTE: limited per maxFileMatches, sets Truncated if file has more matches Matches []ContentLineMatch `json:"matches"` From fd5ea78d0c312724907aaa4b617a06df74b1006e Mon Sep 17 00:00:00 2001 From: Luke Meyer Date: Tue, 23 Dec 2025 22:16:05 -0500 Subject: [PATCH 4/7] jobartifacts: pass request ctx to GCS lookup This was a suggestion from CodeRabbit that seemed to make sense. If the request context gets canceled, then the results are going to be abandoned anyway, so it makes sense to let that cancellation pass to the GCS reader and give it a chance to abort. It seems right... It should be a minor optimization... But check if anything breaks. --- pkg/api/jobartifacts/manager.go | 2 +- pkg/api/jobartifacts/query.go | 4 ++-- pkg/api/jobartifacts/query_test.go | 7 ++++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/api/jobartifacts/manager.go b/pkg/api/jobartifacts/manager.go index 9b5eed253..c7068928d 100644 --- a/pkg/api/jobartifacts/manager.go +++ b/pkg/api/jobartifacts/manager.go @@ -266,7 +266,7 @@ func (m *Manager) artifactWorker(managerCtx context.Context) { artLog.Debug("Received request from artifactChan") response := artifactResponse{ artifactPath: request.artifactAttrs.Name, - artifact: request.query.getFileContentMatches(request.jobRunID, request.artifactAttrs), + artifact: request.query.getFileContentMatches(request.ctx, request.jobRunID, request.artifactAttrs), } expired = sendViaChannel(request.ctx, request.artifactsChan, response) diff --git a/pkg/api/jobartifacts/query.go b/pkg/api/jobartifacts/query.go index 6b848bca6..89c74a6af 100644 --- a/pkg/api/jobartifacts/query.go +++ b/pkg/api/jobartifacts/query.go @@ -170,7 +170,7 @@ func relativeArtifactPath(bucketPath, jobRunID string) string { return bucketPath[start+len(marker):] } -func (q *JobArtifactQuery) getFileContentMatches(jobRunID int64, attrs *storage.ObjectAttrs) (artifact JobRunArtifact) { +func (q *JobArtifactQuery) getFileContentMatches(ctx context.Context, jobRunID int64, attrs *storage.ObjectAttrs) (artifact JobRunArtifact) { artifact.JobRunID = strconv.FormatInt(jobRunID, 10) artifact.ArtifactPath = relativeArtifactPath(attrs.Name, artifact.JobRunID) artifact.ArtifactContentType = attrs.ContentType @@ -179,7 +179,7 @@ func (q *JobArtifactQuery) getFileContentMatches(jobRunID int64, attrs *storage. return } - reader, closer, err := OpenArtifactReader(context.Background(), q.GcsBucket.Object(attrs.Name), attrs.ContentType) + reader, closer, err := OpenArtifactReader(ctx, q.GcsBucket.Object(attrs.Name), attrs.ContentType) defer closer() if err != nil { artifact.Error = err.Error() diff --git a/pkg/api/jobartifacts/query_test.go b/pkg/api/jobartifacts/query_test.go index 0ac1e97a4..4dd474474 100644 --- a/pkg/api/jobartifacts/query_test.go +++ b/pkg/api/jobartifacts/query_test.go @@ -47,15 +47,16 @@ func TestFunctional_FilterFiles(t *testing.T) { func TestFunctional_FilterContent(t *testing.T) { const filePath = "logs/periodic-ci-openshift-release-master-ci-4.19-e2e-azure-ovn/1898704060324777984/artifacts/e2e-azure-ovn/gather-extra/build-log.txt" + ctx := context.Background() query := baseTestingJAQ(t, "", NewStringMatcher("ClusterVersion:", 0, 0, maxFileMatches)) - artifact := query.getFileContentMatches(1898704060324777984, &storage.ObjectAttrs{Name: filePath}) + artifact := query.getFileContentMatches(ctx, 1898704060324777984, &storage.ObjectAttrs{Name: filePath}) assert.Empty(t, artifact.Error) assert.False(t, artifact.MatchedContent.ContentLineMatches.Truncated, "expected no need for truncating the content matches") assert.Equal(t, 2, len(artifact.MatchedContent.ContentLineMatches.Matches), "expected content to match with two lines") query.ContentMatcher = NewStringMatcher("error:", 0, 0, maxFileMatches) - artifact = query.getFileContentMatches(1898704060324777984, &storage.ObjectAttrs{Name: filePath}) + artifact = query.getFileContentMatches(ctx, 1898704060324777984, &storage.ObjectAttrs{Name: filePath}) assert.Empty(t, artifact.Error) assert.True(t, artifact.MatchedContent.ContentLineMatches.Truncated, "expected to truncate content matches") assert.Equal(t, maxFileMatches, len(artifact.MatchedContent.ContentLineMatches.Matches), "expected content to match with many lines") @@ -65,7 +66,7 @@ func TestFunctional_GzipContent(t *testing.T) { const filePath = "logs/periodic-ci-openshift-release-master-ci-4.19-e2e-aws-ovn-techpreview/1909930323508989952/artifacts/e2e-aws-ovn-techpreview/gather-extra/artifacts/nodes/ip-10-0-59-177.us-east-2.compute.internal/journal" query := baseTestingJAQ(t, "", NewStringMatcher("error", 0, 0, maxFileMatches)) - artifact := query.getFileContentMatches(1909930323508989952, &storage.ObjectAttrs{Name: filePath, ContentType: "application/gzip"}) + artifact := query.getFileContentMatches(context.Background(), 1909930323508989952, &storage.ObjectAttrs{Name: filePath, ContentType: "application/gzip"}) assert.Empty(t, artifact.Error) assert.True(t, artifact.MatchedContent.ContentLineMatches.Truncated, "expected a lot of matches") assert.Contains(t, From 9a1ed923ce996e8683f1596fecb9dc3bc58eb57d Mon Sep 17 00:00:00 2001 From: Luke Meyer Date: Mon, 22 Dec 2025 18:23:54 -0500 Subject: [PATCH 5/7] jobrunannotator: factor jobRunAnnotation out to db.models for reuse --- .../jobrunannotator/jobrunannotator.go | 26 +++++++------------ pkg/db/models/job_labels.go | 15 +++++++++++ 2 files changed, 24 insertions(+), 17 deletions(-) create mode 100644 pkg/db/models/job_labels.go diff --git a/pkg/componentreadiness/jobrunannotator/jobrunannotator.go b/pkg/componentreadiness/jobrunannotator/jobrunannotator.go index a190c54f3..55238d089 100644 --- a/pkg/componentreadiness/jobrunannotator/jobrunannotator.go +++ b/pkg/componentreadiness/jobrunannotator/jobrunannotator.go @@ -18,6 +18,7 @@ import ( "github.com/openshift/sippy/pkg/apis/cache" bqclient "github.com/openshift/sippy/pkg/bigquery" "github.com/openshift/sippy/pkg/db" + "github.com/openshift/sippy/pkg/db/models" "github.com/openshift/sippy/pkg/util" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -173,15 +174,6 @@ type jobRun struct { URL string `bigquery:"prowjob_url"` } -type jobRunAnnotation struct { - ID string `bigquery:"prowjob_build_id"` - StartTime civil.DateTime `bigquery:"prowjob_start"` - Label string `bigquery:"label"` - Comment string `bigquery:"comment"` - User bigquery.NullString `bigquery:"user"` - url string -} - func (j JobRunAnnotator) getJobRunsFromBigQuery(ctx context.Context) (map[int64]jobRun, error) { //lint:ignore now := time.Now() var jobRuns map[int64]jobRun @@ -392,13 +384,13 @@ func (j JobRunAnnotator) filterJobRunByArtifact(ctx context.Context, jobRunIDs [ } // bulkInsertVariants inserts all new job variants in batches. -func (j JobRunAnnotator) bulkInsertJobRunAnnotations(ctx context.Context, inserts []jobRunAnnotation) error { +func (j JobRunAnnotator) bulkInsertJobRunAnnotations(ctx context.Context, inserts []models.JobRunLabel) error { var batchSize = 500 if !j.execute { jobsStr := "" for _, jobRun := range inserts { - jobsStr += fmt.Sprintf("StartTime: %v; URL: %s\n", jobRun.StartTime, jobRun.url) + jobsStr += fmt.Sprintf("StartTime: %v; URL: %s\n", jobRun.StartTime, jobRun.Url) } log.Infof("\n===========================================================\nDry run mode enabled\nBulk inserting\n%s\n\nTo write the label to DB, please use --execute argument\n", jobsStr) return nil @@ -430,7 +422,7 @@ func (j JobRunAnnotator) generateComment() string { return comment } -func (j JobRunAnnotator) getJobRunAnnotationsFromBigQuery(ctx context.Context) (map[int64]jobRunAnnotation, error) { +func (j JobRunAnnotator) getJobRunAnnotationsFromBigQuery(ctx context.Context) (map[int64]models.JobRunLabel, error) { now := time.Now() queryStr := fmt.Sprintf(` SELECT @@ -443,7 +435,7 @@ func (j JobRunAnnotator) getJobRunAnnotationsFromBigQuery(ctx context.Context) ( q := j.bqClient.BQ.Query(queryStr) errs := []error{} - result := make(map[int64]jobRunAnnotation) + result := make(map[int64]models.JobRunLabel) log.Debugf("Fetching job run annotations with:\n%s\n", q.Q) it, err := q.Read(ctx) @@ -453,7 +445,7 @@ func (j JobRunAnnotator) getJobRunAnnotationsFromBigQuery(ctx context.Context) ( } for { - row := jobRunAnnotation{} + row := models.JobRunLabel{} err := it.Next(&row) if errors.Is(err, iterator.Done) { break @@ -486,7 +478,7 @@ func (j JobRunAnnotator) getJobRunAnnotationsFromBigQuery(ctx context.Context) ( } func (j JobRunAnnotator) annotateJobRuns(ctx context.Context, jobRunIDs []int64, jobRuns map[int64]jobRun) error { - jobRunAnnotations := make([]jobRunAnnotation, 0, len(jobRunIDs)) + jobRunAnnotations := make([]models.JobRunLabel, 0, len(jobRunIDs)) existingAnnotations, err := j.getJobRunAnnotationsFromBigQuery(ctx) if err != nil { return err @@ -498,13 +490,13 @@ func (j JobRunAnnotator) annotateJobRuns(ctx context.Context, jobRunIDs []int64, if annotation, existing := existingAnnotations[jobRunID]; existing && annotation.Label == j.Label { continue } - jobRunAnnotations = append(jobRunAnnotations, jobRunAnnotation{ + jobRunAnnotations = append(jobRunAnnotations, models.JobRunLabel{ ID: jobRun.ID, StartTime: jobRun.StartTime, Label: j.Label, Comment: j.generateComment(), User: bigquery.NullString{Valid: true, StringVal: j.user}, - url: jobRun.URL, + Url: jobRun.URL, }) } } diff --git a/pkg/db/models/job_labels.go b/pkg/db/models/job_labels.go new file mode 100644 index 000000000..2eedcc970 --- /dev/null +++ b/pkg/db/models/job_labels.go @@ -0,0 +1,15 @@ +package models + +import ( + "cloud.google.com/go/bigquery" + "cloud.google.com/go/civil" +) + +type JobRunLabel struct { + ID string `bigquery:"prowjob_build_id"` + StartTime civil.DateTime `bigquery:"prowjob_start"` + Label string `bigquery:"label"` + Comment string `bigquery:"comment"` + User bigquery.NullString `bigquery:"user"` + Url string `bigquery:"-"` +} From 9b2e3ff27917b02efe3bc11c79795af2e2d209a1 Mon Sep 17 00:00:00 2001 From: Luke Meyer Date: Mon, 22 Dec 2025 18:39:03 -0500 Subject: [PATCH 6/7] jobrunannotator: add new job_labels fields --- .../jobrunannotator/jobrunannotator.go | 19 +++++++----- pkg/db/models/job_labels.go | 31 ++++++++++++++----- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/pkg/componentreadiness/jobrunannotator/jobrunannotator.go b/pkg/componentreadiness/jobrunannotator/jobrunannotator.go index 55238d089..4258856a0 100644 --- a/pkg/componentreadiness/jobrunannotator/jobrunannotator.go +++ b/pkg/componentreadiness/jobrunannotator/jobrunannotator.go @@ -390,7 +390,7 @@ func (j JobRunAnnotator) bulkInsertJobRunAnnotations(ctx context.Context, insert if !j.execute { jobsStr := "" for _, jobRun := range inserts { - jobsStr += fmt.Sprintf("StartTime: %v; URL: %s\n", jobRun.StartTime, jobRun.Url) + jobsStr += fmt.Sprintf("StartTime: %v; URL: %s\n", jobRun.StartTime, jobRun.URL) } log.Infof("\n===========================================================\nDry run mode enabled\nBulk inserting\n%s\n\nTo write the label to DB, please use --execute argument\n", jobsStr) return nil @@ -484,6 +484,7 @@ func (j JobRunAnnotator) annotateJobRuns(ctx context.Context, jobRunIDs []int64, return err } log.Infof("Found %d existing job run annotations.", len(existingAnnotations)) + now := civil.DateTimeOf(time.Now()) for _, jobRunID := range jobRunIDs { if jobRun, ok := jobRuns[jobRunID]; ok { // Skip if the same label already exists @@ -491,12 +492,16 @@ func (j JobRunAnnotator) annotateJobRuns(ctx context.Context, jobRunIDs []int64, continue } jobRunAnnotations = append(jobRunAnnotations, models.JobRunLabel{ - ID: jobRun.ID, - StartTime: jobRun.StartTime, - Label: j.Label, - Comment: j.generateComment(), - User: bigquery.NullString{Valid: true, StringVal: j.user}, - Url: jobRun.URL, + ID: jobRun.ID, + StartTime: jobRun.StartTime, + Label: j.Label, + Comment: j.generateComment(), + User: j.user, + CreatedAt: now, + UpdatedAt: now, + SourceTool: "sippy annotate-job-runs", + SymptomID: "", // Empty for manual annotations; will be populated when symptom detection is implemented + URL: jobRun.URL, }) } } diff --git a/pkg/db/models/job_labels.go b/pkg/db/models/job_labels.go index 2eedcc970..bdcd695ff 100644 --- a/pkg/db/models/job_labels.go +++ b/pkg/db/models/job_labels.go @@ -1,15 +1,32 @@ package models import ( - "cloud.google.com/go/bigquery" "cloud.google.com/go/civil" ) +// JobRunLabel represents a label applied to a job run, stored in BigQuery's job_labels table. type JobRunLabel struct { - ID string `bigquery:"prowjob_build_id"` - StartTime civil.DateTime `bigquery:"prowjob_start"` - Label string `bigquery:"label"` - Comment string `bigquery:"comment"` - User bigquery.NullString `bigquery:"user"` - Url string `bigquery:"-"` + ID string `bigquery:"prowjob_build_id"` + StartTime civil.DateTime `bigquery:"prowjob_start"` + Label string `bigquery:"label"` + Comment string `bigquery:"comment"` + User string `bigquery:"user"` + CreatedAt civil.DateTime `bigquery:"created_at"` + UpdatedAt civil.DateTime `bigquery:"updated_at"` + SourceTool string `bigquery:"source_tool"` + SymptomID string `bigquery:"symptom_id"` + URL string `bigquery:"-"` } + +// [2025-12-22] To update the BigQuery table schema, use: +// +// bq update :.job_labels \ +// created_at:DATETIME,updated_at:DATETIME,source_tool:STRING,symptom_id:STRING +// +// Or via SQL: +// +// ALTER TABLE `..job_labels` +// ADD COLUMN IF NOT EXISTS created_at DATETIME, +// ADD COLUMN IF NOT EXISTS updated_at DATETIME, +// ADD COLUMN IF NOT EXISTS source_tool STRING, +// ADD COLUMN IF NOT EXISTS symptom_id STRING; From f487f04783e5d870e5f015cec7940b758458a8a5 Mon Sep 17 00:00:00 2001 From: Luke Meyer Date: Mon, 22 Dec 2025 20:06:52 -0500 Subject: [PATCH 7/7] jobrunannotator: adjust label comment to uniform format Label comments should be json with a tool-specific schema. --- .../jobrunannotator/jobrunannotator.go | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/pkg/componentreadiness/jobrunannotator/jobrunannotator.go b/pkg/componentreadiness/jobrunannotator/jobrunannotator.go index 4258856a0..5a450f7f3 100644 --- a/pkg/componentreadiness/jobrunannotator/jobrunannotator.go +++ b/pkg/componentreadiness/jobrunannotator/jobrunannotator.go @@ -413,13 +413,26 @@ func (j JobRunAnnotator) bulkInsertJobRunAnnotations(ctx context.Context, insert return nil } +// LabelComment is what gets serialized in the DB to provide context for applying the label. +// each tool that produces job_labels can specify whatever context is relevant for it; +// but each should do so in a json object with a unique key for its own schema. +type LabelComment struct { + Comment string `json:"comment"` + V1 JobRunAnnotator `json:"job_run_annotator_v1"` +} + func (j JobRunAnnotator) generateComment() string { - comment := j.comment - annotatorComment, err := json.MarshalIndent(j, "", " ") - if err == nil { - comment += fmt.Sprintf("\nAnnotator\n%s", annotatorComment) + comment := LabelComment{ + Comment: j.comment, // apparently separated out in order to stand out from the object + V1: j, + } + str, err := json.MarshalIndent(comment, "", " ") + if err != nil { + log.WithError(err).Error("error generating JobAnnotator comment") + // fallback comment that will not parse as json, but will be visible in the DB + return j.comment + "\nError generating JobAnnotator comment: " + err.Error() } - return comment + return string(str) } func (j JobRunAnnotator) getJobRunAnnotationsFromBigQuery(ctx context.Context) (map[int64]models.JobRunLabel, error) {