Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 162 additions & 10 deletions pkg/debuginfo/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/go-kit/log"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -82,6 +83,29 @@ type Store struct {
timeNow func() time.Time
}


type DebuginfoStore interface {
ShouldInitiateUpload(ctx context.Context, req *debuginfopb.ShouldInitiateUploadRequest) (*debuginfopb.ShouldInitiateUploadResponse, error)
InitiateUpload(ctx context.Context, req *debuginfopb.InitiateUploadRequest) (*debuginfopb.InitiateUploadResponse, error)
MarkUploadFinished(ctx context.Context, req *debuginfopb.MarkUploadFinishedRequest) (*debuginfopb.MarkUploadFinishedResponse, error)
Upload(stream debuginfopb.DebuginfoService_UploadServer) error
}

type DebuginfoStoreMetrics struct {
store DebuginfoStore

uploadTotal *prometheus.CounterVec
uploadDuration prometheus.Histogram
existsDuration prometheus.Histogram
metadataUpdateTotal *prometheus.CounterVec
metadataUpdateDuration prometheus.Histogram
}






type SignedUploadClient interface {
SignedPUT(ctx context.Context, objectKey string, size int64, expiry time.Time) (signedURL string, err error)
}
Expand All @@ -102,7 +126,7 @@ func NewStore(
maxUploadDuration time.Duration,
maxUploadSize int64,
) (*Store, error) {
return &Store{
store := &Store{
tracer: tracer,
logger: log.With(logger, "component", "debuginfo"),
bucket: bucket,
Expand All @@ -112,9 +136,12 @@ func NewStore(
maxUploadDuration: maxUploadDuration,
maxUploadSize: maxUploadSize,
timeNow: time.Now,
}, nil
}

return store, nil
}


const (
ReasonDebuginfoInDebuginfod = "Debuginfo exists in debuginfod, therefore no upload is necessary."
ReasonFirstTimeSeen = "First time we see this Build ID, and it does not exist in debuginfod, therefore please upload!"
Expand All @@ -134,12 +161,66 @@ const (
// ShouldInitiateUpload returns whether an upload should be initiated for the
// given build ID. Checking if an upload should even be initiated allows the
// parca-agent to avoid extracting debuginfos unnecessarily from a binary.
func NewDebuginfoStoreMetrics(store DebuginfoStore, reg prometheus.Registerer) *DebuginfoStoreMetrics {
uploadTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "parca_debuginfo_store_upload_total",
Help: "Total number of uploads attempted.",
}, []string{"success"})

uploadDuration := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "parca_debuginfo_store_upload_duration_seconds",
Help: "Time taken for upload operations.",
Buckets: prometheus.ExponentialBuckets(0.1, 2, 15),
})

existsDuration := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "parca_debuginfo_store_exists_duration_seconds",
Help: "Time taken for exists operations.",
Buckets: prometheus.ExponentialBuckets(0.1, 2, 15),
})

metadataUpdateTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "parca_debuginfo_store_metadata_update_total",
Help: "Total number of metadata updates attempted.",
}, []string{"success"})

metadataUpdateDuration := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "parca_debuginfo_store_metadata_update_duration_seconds",
Help: "Time taken for metadata update operations.",
Buckets: prometheus.ExponentialBuckets(0.1, 2, 15),
})

reg.MustRegister(uploadTotal)
reg.MustRegister(uploadDuration)
reg.MustRegister(existsDuration)
reg.MustRegister(metadataUpdateTotal)
reg.MustRegister(metadataUpdateDuration)

return &DebuginfoStoreMetrics{
store: store,
uploadTotal: uploadTotal,
uploadDuration: uploadDuration,
existsDuration: existsDuration,
metadataUpdateTotal: metadataUpdateTotal,
metadataUpdateDuration: metadataUpdateDuration,
}
}

func (m *DebuginfoStoreMetrics) ShouldInitiateUpload(ctx context.Context, req *debuginfopb.ShouldInitiateUploadRequest) (*debuginfopb.ShouldInitiateUploadResponse, error) {
start := time.Now()
defer func() {
m.existsDuration.Observe(time.Since(start).Seconds())
}()

return m.store.ShouldInitiateUpload(ctx, req)
}

func (s *Store) ShouldInitiateUpload(ctx context.Context, req *debuginfopb.ShouldInitiateUploadRequest) (*debuginfopb.ShouldInitiateUploadResponse, error) {
span := trace.SpanFromContext(ctx)
span.SetAttributes(attribute.String("build_id", req.BuildId))

buildID := req.BuildId
if err := validateInput(buildID); err != nil {
if err := s.validateInput(buildID); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

Expand Down Expand Up @@ -267,6 +348,21 @@ func (s *Store) ShouldInitiateUpload(ctx context.Context, req *debuginfopb.Shoul
}
}

func (m *DebuginfoStoreMetrics) InitiateUpload(ctx context.Context, req *debuginfopb.InitiateUploadRequest) (*debuginfopb.InitiateUploadResponse, error) {
start := time.Now()
resp, err := m.store.InitiateUpload(ctx, req)
duration := time.Since(start)

m.uploadDuration.Observe(duration.Seconds())
if err != nil {
m.uploadTotal.WithLabelValues("false").Inc()
return resp, err
}

m.uploadTotal.WithLabelValues("true").Inc()
return resp, nil
}

func (s *Store) InitiateUpload(ctx context.Context, req *debuginfopb.InitiateUploadRequest) (*debuginfopb.InitiateUploadResponse, error) {
span := trace.SpanFromContext(ctx)
span.SetAttributes(attribute.String("build_id", req.BuildId))
Expand Down Expand Up @@ -340,13 +436,28 @@ func (s *Store) InitiateUpload(ctx context.Context, req *debuginfopb.InitiateUpl
}, nil
}

func (m *DebuginfoStoreMetrics) MarkUploadFinished(ctx context.Context, req *debuginfopb.MarkUploadFinishedRequest) (*debuginfopb.MarkUploadFinishedResponse, error) {
start := time.Now()
resp, err := m.store.MarkUploadFinished(ctx, req)
duration := time.Since(start)

m.metadataUpdateDuration.Observe(duration.Seconds())
if err != nil {
m.metadataUpdateTotal.WithLabelValues("false").Inc()
return resp, err
}

m.metadataUpdateTotal.WithLabelValues("true").Inc()
return resp, nil
}

func (s *Store) MarkUploadFinished(ctx context.Context, req *debuginfopb.MarkUploadFinishedRequest) (*debuginfopb.MarkUploadFinishedResponse, error) {
span := trace.SpanFromContext(ctx)
span.SetAttributes(attribute.String("build_id", req.BuildId))
span.SetAttributes(attribute.String("upload_id", req.UploadId))

buildID := req.BuildId
if err := validateInput(buildID); err != nil {
if err := s.validateInput(buildID); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

Expand All @@ -367,6 +478,21 @@ func (s *Store) MarkUploadFinished(ctx context.Context, req *debuginfopb.MarkUpl
return &debuginfopb.MarkUploadFinishedResponse{}, nil
}

func (m *DebuginfoStoreMetrics) Upload(stream debuginfopb.DebuginfoService_UploadServer) error {
start := time.Now()
err := m.store.Upload(stream)
duration := time.Since(start)

m.uploadDuration.Observe(duration.Seconds())
if err != nil {
m.uploadTotal.WithLabelValues("false").Inc()
return err
}

m.uploadTotal.WithLabelValues("true").Inc()
return nil
}

func (s *Store) Upload(stream debuginfopb.DebuginfoService_UploadServer) error {
if s.signedUpload.Enabled {
return status.Error(codes.Unimplemented, "signed URL uploads are the only supported upload strategy for this service")
Expand Down Expand Up @@ -399,8 +525,9 @@ func (s *Store) Upload(stream debuginfopb.DebuginfoService_UploadServer) error {
})
}


func (s *Store) upload(ctx context.Context, buildID, uploadID string, typ debuginfopb.DebuginfoType, r io.Reader) error {
if err := validateInput(buildID); err != nil {
if err := s.validateInput(buildID); err != nil {
return status.Errorf(codes.InvalidArgument, "invalid build ID: %q", err)
}

Expand All @@ -427,18 +554,38 @@ func (s *Store) upload(ctx context.Context, buildID, uploadID string, typ debugi
return nil
}

func (m *DebuginfoStoreMetrics) uploadIsStale(upload *debuginfopb.DebuginfoUpload) bool {
return uploadIsStale(upload, m.store.(*Store).maxUploadDuration, m.store.(*Store).timeNow)
}

func uploadIsStale(upload *debuginfopb.DebuginfoUpload, maxUploadDuration time.Duration, timeNow func() time.Time) bool {
return upload.StartedAt.AsTime().Add(maxUploadDuration + 2*time.Minute).Before(timeNow())
}

func (s *Store) uploadIsStale(upload *debuginfopb.DebuginfoUpload) bool {
return upload.StartedAt.AsTime().Add(s.maxUploadDuration + 2*time.Minute).Before(s.timeNow())
return uploadIsStale(upload, s.maxUploadDuration, s.timeNow)
}

func (s *Store) validateInput(id string) error {
return validateInput(id)
}

func (m *DebuginfoStoreMetrics) validateInput(id string) error {
return validateInput(id)
}

func validateInput(id string) error {
if len(id) <= 2 {
return errors.New("unexpectedly short input")
}
if len(id) <= 2 {
return errors.New("unexpectedly short input")
}
return nil
}

return nil
func (m *DebuginfoStoreMetrics) objectPath(buildID string, typ debuginfopb.DebuginfoType) string {
return objectPath(buildID, typ)
}

// objectPath returns the object path for a build ID and type.
func objectPath(buildID string, typ debuginfopb.DebuginfoType) string {
switch typ {
case debuginfopb.DebuginfoType_DEBUGINFO_TYPE_EXECUTABLE:
Expand All @@ -454,6 +601,11 @@ func objectPath(buildID string, typ debuginfopb.DebuginfoType) string {

// in a debuginfod server the source path is directly in the URL in the form of
// debuginfod.example.com/buildid/<build-id>/source/<file>.
func (m *DebuginfoStoreMetrics) debuginfodSourcePath(buildID, file string) string {
return debuginfodSourcePath(buildID, file)
}

// debuginfodSourcePath returns the source path for a build ID and file in debuginfod.
func debuginfodSourcePath(buildID, file string) string {
return path.Join(buildID, "source", file)
}