From d12a8e0a09907a96a3b078da9d913a3dd57c6eee Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Thu, 26 Mar 2026 17:38:28 +0000 Subject: [PATCH 1/8] Add KV store operation duration metrics to Vault plugin Instrument all public KVStore methods with a histogram metric (platform_vault_plugin_kv_operation_duration_seconds) to help diagnose high P95 Observation latencies in production. Also adds a max-batch Observation test exercising 10 GetSecrets requests with 10 encryption keys each plus 20 pending queue items. --- core/services/ocr2/plugins/vault/kvstore.go | 83 ++-- .../ocr2/plugins/vault/kvstore_test.go | 95 ++-- core/services/ocr2/plugins/vault/metrics.go | 23 +- core/services/ocr2/plugins/vault/plugin.go | 32 +- .../ocr2/plugins/vault/plugin_test.go | 432 ++++++++++++++---- 5 files changed, 475 insertions(+), 190 deletions(-) diff --git a/core/services/ocr2/plugins/vault/kvstore.go b/core/services/ocr2/plugins/vault/kvstore.go index 466101e90a5..05988178cec 100644 --- a/core/services/ocr2/plugins/vault/kvstore.go +++ b/core/services/ocr2/plugins/vault/kvstore.go @@ -1,9 +1,11 @@ package vault import ( + "context" "errors" "fmt" "strconv" + "time" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" "google.golang.org/protobuf/proto" @@ -20,38 +22,44 @@ const ( ) type KVStore struct { - reader ocr3_1types.KeyValueStateReader - writer ocr3_1types.KeyValueStateReadWriter + reader ocr3_1types.KeyValueStateReader + writer ocr3_1types.KeyValueStateReadWriter + metrics *pluginMetrics +} + +func (s *KVStore) trackDuration(ctx context.Context, method string, start time.Time) { + s.metrics.trackKVOperation(ctx, method, time.Since(start).Seconds()) } type ReadKVStore interface { - GetSecret(id *vault.SecretIdentifier) (*vault.StoredSecret, error) - GetMetadata(owner string) (*vault.StoredMetadata, error) - GetSecretIdentifiersCountForOwner(owner string) (int, error) - GetPendingQueue() ([]*vault.StoredPendingQueueItem, error) + GetSecret(ctx context.Context, id *vault.SecretIdentifier) (*vault.StoredSecret, error) + GetMetadata(ctx context.Context, owner string) (*vault.StoredMetadata, error) + GetSecretIdentifiersCountForOwner(ctx context.Context, owner string) (int, error) + GetPendingQueue(ctx context.Context) ([]*vault.StoredPendingQueueItem, error) } type WriteKVStore interface { ReadKVStore - WriteSecret(id *vault.SecretIdentifier, secret *vault.StoredSecret) error - WriteMetadata(owner string, metadata *vault.StoredMetadata) error - DeleteSecret(id *vault.SecretIdentifier) error - WritePendingQueue(pending []*vault.StoredPendingQueueItem) error + WriteSecret(ctx context.Context, id *vault.SecretIdentifier, secret *vault.StoredSecret) error + WriteMetadata(ctx context.Context, owner string, metadata *vault.StoredMetadata) error + DeleteSecret(ctx context.Context, id *vault.SecretIdentifier) error + WritePendingQueue(ctx context.Context, pending []*vault.StoredPendingQueueItem) error } -func NewReadStore(reader ocr3_1types.KeyValueStateReader) *KVStore { - return &KVStore{reader: reader} +func NewReadStore(reader ocr3_1types.KeyValueStateReader, metrics *pluginMetrics) *KVStore { + return &KVStore{reader: reader, metrics: metrics} } -func NewWriteStore(writer ocr3_1types.KeyValueStateReadWriter) *KVStore { - return &KVStore{reader: writer, writer: writer} +func NewWriteStore(writer ocr3_1types.KeyValueStateReadWriter, metrics *pluginMetrics) *KVStore { + return &KVStore{reader: writer, writer: writer, metrics: metrics} } -func (s *KVStore) GetSecret(id *vault.SecretIdentifier) (*vault.StoredSecret, error) { +func (s *KVStore) GetSecret(ctx context.Context, id *vault.SecretIdentifier) (*vault.StoredSecret, error) { + defer s.trackDuration(ctx, "GetSecret", time.Now()) if id == nil { return nil, errors.New("id cannot be nil") } - found, err := s.metadataContainsID(id) + found, err := s.metadataContainsID(ctx, id) if err != nil { return nil, fmt.Errorf("failed to check if metadata contains id: %w", err) } @@ -77,7 +85,8 @@ func (s *KVStore) GetSecret(id *vault.SecretIdentifier) (*vault.StoredSecret, er return secret, nil } -func (s *KVStore) GetMetadata(owner string) (*vault.StoredMetadata, error) { +func (s *KVStore) GetMetadata(ctx context.Context, owner string) (*vault.StoredMetadata, error) { + defer s.trackDuration(ctx, "GetMetadata", time.Now()) b, err := s.reader.Read([]byte(metadataPrefix + owner)) if err != nil { return nil, fmt.Errorf("failed to read metadata: %w", err) @@ -95,8 +104,9 @@ func (s *KVStore) GetMetadata(owner string) (*vault.StoredMetadata, error) { return md, nil } -func (s *KVStore) GetSecretIdentifiersCountForOwner(owner string) (int, error) { - md, err := s.GetMetadata(owner) +func (s *KVStore) GetSecretIdentifiersCountForOwner(ctx context.Context, owner string) (int, error) { + defer s.trackDuration(ctx, "GetSecretIdentifiersCountForOwner", time.Now()) + md, err := s.GetMetadata(ctx, owner) if err != nil { return 0, fmt.Errorf("failed to get metadata for owner %s: %w", owner, err) } @@ -108,7 +118,8 @@ func (s *KVStore) GetSecretIdentifiersCountForOwner(owner string) (int, error) { return count, nil } -func (s *KVStore) WriteMetadata(owner string, metadata *vault.StoredMetadata) error { +func (s *KVStore) WriteMetadata(ctx context.Context, owner string, metadata *vault.StoredMetadata) error { + defer s.trackDuration(ctx, "WriteMetadata", time.Now()) if metadata == nil { return errors.New("metadata cannot be nil") } @@ -125,11 +136,11 @@ func (s *KVStore) WriteMetadata(owner string, metadata *vault.StoredMetadata) er return nil } -func (s *KVStore) metadataContainsID(id *vault.SecretIdentifier) (bool, error) { +func (s *KVStore) metadataContainsID(ctx context.Context, id *vault.SecretIdentifier) (bool, error) { if id == nil { return false, errors.New("id cannot be nil") } - md, err := s.GetMetadata(id.Owner) + md, err := s.GetMetadata(ctx, id.Owner) if err != nil { return false, fmt.Errorf("failed to get metadata for owner %s: %w", id.Owner, err) } @@ -147,11 +158,11 @@ func (s *KVStore) metadataContainsID(id *vault.SecretIdentifier) (bool, error) { return false, nil } -func (s *KVStore) addIDToMetadata(id *vault.SecretIdentifier) error { +func (s *KVStore) addIDToMetadata(ctx context.Context, id *vault.SecretIdentifier) error { if id == nil { return errors.New("id cannot be nil") } - md, err := s.GetMetadata(id.Owner) + md, err := s.GetMetadata(ctx, id.Owner) if err != nil { return fmt.Errorf("failed to get metadata for owner %s: %w", id.Owner, err) } @@ -171,7 +182,7 @@ func (s *KVStore) addIDToMetadata(id *vault.SecretIdentifier) error { md.SecretIdentifiers = append(md.SecretIdentifiers, id) } - err = s.WriteMetadata(id.Owner, md) + err = s.WriteMetadata(ctx, id.Owner, md) if err != nil { return fmt.Errorf("failed to write metadata for owner %s: %w", id.Owner, err) } @@ -179,11 +190,11 @@ func (s *KVStore) addIDToMetadata(id *vault.SecretIdentifier) error { return nil } -func (s *KVStore) removeIDFromMetadata(id *vault.SecretIdentifier) error { +func (s *KVStore) removeIDFromMetadata(ctx context.Context, id *vault.SecretIdentifier) error { if id == nil { return errors.New("id cannot be nil") } - md, err := s.GetMetadata(id.Owner) + md, err := s.GetMetadata(ctx, id.Owner) if err != nil { return fmt.Errorf("failed to get metadata for owner %s: %w", id.Owner, err) } @@ -209,7 +220,7 @@ func (s *KVStore) removeIDFromMetadata(id *vault.SecretIdentifier) error { newMd := &vault.StoredMetadata{ SecretIdentifiers: si, } - err = s.WriteMetadata(id.Owner, newMd) + err = s.WriteMetadata(ctx, id.Owner, newMd) if err != nil { return fmt.Errorf("failed to write metadata for owner %s: %w", id.Owner, err) } @@ -217,7 +228,8 @@ func (s *KVStore) removeIDFromMetadata(id *vault.SecretIdentifier) error { return nil } -func (s *KVStore) WriteSecret(id *vault.SecretIdentifier, secret *vault.StoredSecret) error { +func (s *KVStore) WriteSecret(ctx context.Context, id *vault.SecretIdentifier, secret *vault.StoredSecret) error { + defer s.trackDuration(ctx, "WriteSecret", time.Now()) if id == nil { return errors.New("id cannot be nil") } @@ -231,18 +243,19 @@ func (s *KVStore) WriteSecret(id *vault.SecretIdentifier, secret *vault.StoredSe return fmt.Errorf("failed to write secret: %w", err) } - if err := s.addIDToMetadata(id); err != nil { + if err := s.addIDToMetadata(ctx, id); err != nil { return fmt.Errorf("failed to add id to metadata: %w", err) } return nil } -func (s *KVStore) DeleteSecret(id *vault.SecretIdentifier) error { +func (s *KVStore) DeleteSecret(ctx context.Context, id *vault.SecretIdentifier) error { + defer s.trackDuration(ctx, "DeleteSecret", time.Now()) if id == nil { return errors.New("id cannot be nil") } - err := s.removeIDFromMetadata(id) + err := s.removeIDFromMetadata(ctx, id) if err != nil { return fmt.Errorf("failed to remove id from metadata: %w", err) } @@ -255,7 +268,8 @@ func (s *KVStore) DeleteSecret(id *vault.SecretIdentifier) error { return nil } -func (s *KVStore) GetPendingQueue() ([]*vault.StoredPendingQueueItem, error) { +func (s *KVStore) GetPendingQueue(ctx context.Context) ([]*vault.StoredPendingQueueItem, error) { + defer s.trackDuration(ctx, "GetPendingQueue", time.Now()) indexBytes, err := s.reader.Read([]byte(pendingQueueIndex)) if err != nil { return nil, fmt.Errorf("failed to read pending queue index: %w", err) @@ -320,7 +334,8 @@ func (s *KVStore) deletePendingQueue() error { return nil } -func (s *KVStore) WritePendingQueue(pending []*vault.StoredPendingQueueItem) error { +func (s *KVStore) WritePendingQueue(ctx context.Context, pending []*vault.StoredPendingQueueItem) error { + defer s.trackDuration(ctx, "WritePendingQueue", time.Now()) err := s.deletePendingQueue() if err != nil { return fmt.Errorf("failed to delete pending requests: %w", err) diff --git a/core/services/ocr2/plugins/vault/kvstore_test.go b/core/services/ocr2/plugins/vault/kvstore_test.go index a98742f7122..b46b3be9994 100644 --- a/core/services/ocr2/plugins/vault/kvstore_test.go +++ b/core/services/ocr2/plugins/vault/kvstore_test.go @@ -15,6 +15,27 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities/actions/vault" ) +func newTestMetrics(t *testing.T) *pluginMetrics { + t.Helper() + m, err := newPluginMetrics("test") + require.NoError(t, err) + return m +} + +func newTestWriteStore(t *testing.T, writer ocr3_1types.KeyValueStateReadWriter) *KVStore { + t.Helper() + m, err := newPluginMetrics("test") + require.NoError(t, err) + return NewWriteStore(writer, m) +} + +func newTestReadStore(t *testing.T, reader ocr3_1types.KeyValueStateReader) *KVStore { + t.Helper() + m, err := newPluginMetrics("test") + require.NoError(t, err) + return NewReadStore(reader, m) +} + type response struct { data []byte err error @@ -80,7 +101,7 @@ func TestKVStore_Secrets(t *testing.T) { kv.m["Metadata::owner"] = response{ err: errors.New("not found"), } - store := NewWriteStore(kv) + store := newTestWriteStore(t, kv) id := &vault.SecretIdentifier{ Owner: "owner", @@ -88,7 +109,7 @@ func TestKVStore_Secrets(t *testing.T) { Key: "secret1", } - _, err := store.GetSecret(id) + _, err := store.GetSecret(t.Context(), id) require.ErrorContains(t, err, "not found") d, err := proto.Marshal(&vault.StoredSecret{ @@ -105,12 +126,12 @@ func TestKVStore_Secrets(t *testing.T) { kv.m["Metadata::owner"] = response{ data: d, } - s, err := store.GetSecret(id) + s, err := store.GetSecret(t.Context(), id) require.NoError(t, err) assert.Equal(t, s.EncryptedSecret, []byte("encrypted data")) delete(kv.m, "Metadata::owner") - s, err = store.GetSecret(id) + s, err = store.GetSecret(t.Context(), id) assert.Nil(t, s) require.NoError(t, err) @@ -118,10 +139,10 @@ func TestKVStore_Secrets(t *testing.T) { ss := &vault.StoredSecret{ EncryptedSecret: newData, } - err = store.WriteSecret(id, ss) + err = store.WriteSecret(t.Context(), id,ss) require.NoError(t, err) - s, err = store.GetSecret(id) + s, err = store.GetSecret(t.Context(), id) require.NoError(t, err) assert.Equal(t, newData, s.EncryptedSecret) } @@ -130,22 +151,22 @@ func TestKVStore_DeleteSecrets(t *testing.T) { kv := &kv{ m: make(map[string]response), } - store := NewWriteStore(kv) + store := newTestWriteStore(t, kv) id := &vault.SecretIdentifier{ Owner: "owner", Namespace: "main", Key: "secret1", } - err := store.WriteSecret(id, &vault.StoredSecret{ + err := store.WriteSecret(t.Context(), id,&vault.StoredSecret{ EncryptedSecret: []byte("encrypted data"), }) require.NoError(t, err) - err = store.DeleteSecret(id) + err = store.DeleteSecret(t.Context(), id) require.NoError(t, err) - md, err := store.GetMetadata("owner") + md, err := store.GetMetadata(t.Context(), "owner") require.NoError(t, err) assert.Empty(t, md.SecretIdentifiers) @@ -159,9 +180,9 @@ func TestKVStore_Metadata(t *testing.T) { kv.m["Metadata::"+owner] = response{ err: errors.New("not found"), } - store := NewWriteStore(kv) + store := newTestWriteStore(t, kv) - _, err := store.GetMetadata(owner) + _, err := store.GetMetadata(t.Context(), owner) require.ErrorContains(t, err, "not found") id := &vault.SecretIdentifier{ @@ -176,13 +197,13 @@ func TestKVStore_Metadata(t *testing.T) { kv.m["Metadata::owner"] = response{ data: d, } - m, err := store.GetMetadata(owner) + m, err := store.GetMetadata(t.Context(), owner) require.NoError(t, err) assert.Len(t, m.SecretIdentifiers, 1) assert.True(t, proto.Equal(m.SecretIdentifiers[0], id)) delete(kv.m, "Metadata::"+owner) - m, err = store.GetMetadata(owner) + m, err = store.GetMetadata(t.Context(), owner) assert.Nil(t, m) require.NoError(t, err) @@ -200,10 +221,10 @@ func TestKVStore_Metadata(t *testing.T) { }, }, } - err = store.WriteMetadata(owner, m) + err = store.WriteMetadata(t.Context(), owner, m) require.NoError(t, err) - gotM, err := store.GetMetadata(owner) + gotM, err := store.GetMetadata(t.Context(), owner) require.NoError(t, err) assert.True(t, proto.Equal(m, gotM)) @@ -212,10 +233,10 @@ func TestKVStore_Metadata(t *testing.T) { Namespace: "main", Key: "secret3", } - err = store.addIDToMetadata(newKey) + err = store.addIDToMetadata(t.Context(), newKey) require.NoError(t, err) - gotM, err = store.GetMetadata(owner) + gotM, err = store.GetMetadata(t.Context(), owner) require.NoError(t, err) assert.Len(t, gotM.SecretIdentifiers, 2) } @@ -225,7 +246,7 @@ func TestKVStore_Metadata_Delete(t *testing.T) { kv := &kv{ m: make(map[string]response), } - store := NewWriteStore(kv) + store := newTestWriteStore(t, kv) id := &vault.SecretIdentifier{ Owner: "owner", @@ -240,20 +261,20 @@ func TestKVStore_Metadata_Delete(t *testing.T) { data: d, } - err = store.removeIDFromMetadata(id) + err = store.removeIDFromMetadata(t.Context(), id) require.NoError(t, err) - m, err := store.GetMetadata(owner) + m, err := store.GetMetadata(t.Context(), owner) require.NoError(t, err) assert.Empty(t, m.SecretIdentifiers) - err = store.removeIDFromMetadata(id) + err = store.removeIDFromMetadata(t.Context(), id) require.ErrorContains(t, err, "not found in metadata for owner owner") delete(kv.m, "Metadata::owner") - err = store.removeIDFromMetadata(id) + err = store.removeIDFromMetadata(t.Context(), id) require.ErrorContains(t, err, "no metadata found for owner owner") } @@ -261,7 +282,7 @@ func TestKVStore_InconsistentWrites(t *testing.T) { kv := &kv{ m: make(map[string]response), } - store := NewWriteStore(kv) + store := newTestWriteStore(t, kv) id := &vault.SecretIdentifier{ Owner: "owner", @@ -283,7 +304,7 @@ func TestKVStore_InconsistentWrites(t *testing.T) { kv.m["Metadata::owner"] = response{ data: d, } - s, err := store.GetSecret(id) + s, err := store.GetSecret(t.Context(), id) require.NoError(t, err) assert.Equal(t, s.EncryptedSecret, []byte("encrypted data")) @@ -292,21 +313,21 @@ func TestKVStore_InconsistentWrites(t *testing.T) { delete(kv.m, "Metadata::owner") // Now fetching the secret should fail - s, err = store.GetSecret(id) + s, err = store.GetSecret(t.Context(), id) assert.Nil(t, s) require.NoError(t, err) // We can recreate it without an already exists error. - err = store.WriteSecret(id, &vault.StoredSecret{ + err = store.WriteSecret(t.Context(), id,&vault.StoredSecret{ EncryptedSecret: []byte("encrypted data 2"), }) require.NoError(t, err) - md, err := store.GetMetadata("owner") + md, err := store.GetMetadata(t.Context(), "owner") require.NoError(t, err) assert.Len(t, md.SecretIdentifiers, 1) - s, err = store.GetSecret(id) + s, err = store.GetSecret(t.Context(), id) assert.NotNil(t, s) require.NoError(t, err) @@ -318,10 +339,10 @@ func TestKVStore_GetPendingRequests(t *testing.T) { kv := &kv{ m: make(map[string]response), } - store := NewWriteStore(kv) + store := newTestWriteStore(t, kv) // Expect no pending requests on empty store. - requests, err := store.GetPendingQueue() + requests, err := store.GetPendingQueue(t.Context()) require.NoError(t, err) assert.Empty(t, requests) @@ -350,7 +371,7 @@ func TestKVStore_GetPendingRequests(t *testing.T) { kv.m[pendingQueueIndex] = response{data: indexBytes} // Validate retrieval of one pending request. - requests, err = store.GetPendingQueue() + requests, err = store.GetPendingQueue(t.Context()) require.NoError(t, err) assert.Len(t, requests, 2) assert.Equal(t, "test-request-id-123", requests[0].Id) @@ -358,7 +379,7 @@ func TestKVStore_GetPendingRequests(t *testing.T) { // Validate behaviour when the index item is missing. delete(kv.m, pendingQueueIndex) - requests, err = store.GetPendingQueue() + requests, err = store.GetPendingQueue(t.Context()) require.NoError(t, err) assert.Empty(t, requests) @@ -368,7 +389,7 @@ func TestKVStore_GetPendingRequests(t *testing.T) { require.NoError(t, err) kv.m[pendingQueueIndex] = response{data: indexBytes} - requests, err = store.GetPendingQueue() + requests, err = store.GetPendingQueue(t.Context()) require.ErrorContains(t, err, "pending queue item at index 2 not found") assert.Empty(t, requests) } @@ -378,7 +399,7 @@ func TestKVStore_WritePendingRequests(t *testing.T) { kv := &kv{ m: make(map[string]response), } - store := NewWriteStore(kv) + store := newTestWriteStore(t, kv) // Writing mock pending requests. empty, err := anypb.New(&emptypb.Empty{}) @@ -395,7 +416,7 @@ func TestKVStore_WritePendingRequests(t *testing.T) { Id: "test-request-id-3", Item: empty, } - err = store.WritePendingQueue([]*vault.StoredPendingQueueItem{item, item2, item3}) + err = store.WritePendingQueue(t.Context(),[]*vault.StoredPendingQueueItem{item, item2, item3}) require.NoError(t, err) // Ensure index is correctly written. @@ -427,7 +448,7 @@ func TestKVStore_WritePendingRequests(t *testing.T) { assert.Equal(t, "test-request-id-3", item2.Id) // Writing a shorter list deletes the old one. - err = store.WritePendingQueue([]*vault.StoredPendingQueueItem{item, item2}) + err = store.WritePendingQueue(t.Context(),[]*vault.StoredPendingQueueItem{item, item2}) require.NoError(t, err) _, exists = kv.m[pendingQueueItemPrefix+"3"] diff --git a/core/services/ocr2/plugins/vault/metrics.go b/core/services/ocr2/plugins/vault/metrics.go index 5e4f68e73b6..bab4ef4eaa3 100644 --- a/core/services/ocr2/plugins/vault/metrics.go +++ b/core/services/ocr2/plugins/vault/metrics.go @@ -13,7 +13,8 @@ import ( type pluginMetrics struct { configDigest string - queueOverflow metric.Int64Counter + queueOverflow metric.Int64Counter + kvOperationDuration metric.Float64Histogram } func newPluginMetrics(configDigest string) (*pluginMetrics, error) { @@ -22,12 +23,28 @@ func newPluginMetrics(configDigest string) (*pluginMetrics, error) { return nil, fmt.Errorf("failed to create queue overflow counter: %w", err) } + kvOperationDuration, err := beholder.GetMeter().Float64Histogram( + "platform_vault_plugin_kv_operation_duration_seconds", + metric.WithUnit("s"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create kv operation duration histogram: %w", err) + } + return &pluginMetrics{ - configDigest: configDigest, - queueOverflow: queueOverflow, + configDigest: configDigest, + queueOverflow: queueOverflow, + kvOperationDuration: kvOperationDuration, }, nil } +func (m *pluginMetrics) trackKVOperation(ctx context.Context, method string, durationSeconds float64) { + m.kvOperationDuration.Record(ctx, durationSeconds, metric.WithAttributes( + attribute.String("configDigest", m.configDigest), + attribute.String("method", method), + )) +} + func (m *pluginMetrics) trackQueueOverflow(ctx context.Context, queueSize int, batchSize int) { m.queueOverflow.Add(ctx, 1, metric.WithAttributes( attribute.String("configDigest", m.configDigest), diff --git a/core/services/ocr2/plugins/vault/plugin.go b/core/services/ocr2/plugins/vault/plugin.go index 92c8e83b8af..65a7c133762 100644 --- a/core/services/ocr2/plugins/vault/plugin.go +++ b/core/services/ocr2/plugins/vault/plugin.go @@ -383,9 +383,9 @@ func generateRandomNonce() ([]byte, error) { } func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq types.AttributedQuery, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (types.Observation, error) { - readStore := NewReadStore(keyValueReader) + readStore := NewReadStore(keyValueReader, r.metrics) - batch, err := readStore.GetPendingQueue() + batch, err := readStore.GetPendingQueue(ctx) if err != nil { return nil, fmt.Errorf("could not fetch batch of requests: %w", err) } @@ -457,7 +457,7 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type // Next, get the current pending queue. We'll use this to dedupe // requests when generating an observation for the next state of the // pending queue. - pendingQueue, ierr := readStore.GetPendingQueue() + pendingQueue, ierr := readStore.GetPendingQueue(ctx) if ierr != nil { return nil, ierr } @@ -625,7 +625,7 @@ func (r *ReportingPlugin) observeGetSecretsRequest(ctx context.Context, reader R return nil, err } - secret, err := reader.GetSecret(id) + secret, err := reader.GetSecret(ctx, id) if err != nil { return nil, fmt.Errorf("failed to read secret from key-value store: %w", err) } @@ -833,7 +833,7 @@ func (r *ReportingPlugin) processListSecretIdentifiersRequest(ctx context.Contex return nil, errors.New("invalid request: owner cannot be empty") } - md, err := reader.GetMetadata(req.Owner) + md, err := reader.GetMetadata(ctx, req.Owner) if err != nil { return nil, fmt.Errorf("failed to get metadata for owner: %w", err) } @@ -931,7 +931,7 @@ func (r *ReportingPlugin) observeDeleteSecretRequest(ctx context.Context, reader return id, newUserError("duplicate request for secret identifier " + vaulttypes.KeyFor(id)) } - ss, err := reader.GetSecret(id) + ss, err := reader.GetSecret(ctx, id) if err != nil { return id, fmt.Errorf("failed to read secret from key-value store: %w", err) } @@ -1060,8 +1060,8 @@ func (r *ReportingPlugin) ValidateObservation(ctx context.Context, seqNr uint64, // This is because honest nodes will all be reading from // the same deterministic key-value store-based queue. // - that all pending queue items can be fetched as blobs. - store := NewReadStore(keyValueReader) - pendingQueueItems, err := store.GetPendingQueue() + store := NewReadStore(keyValueReader, r.metrics) + pendingQueueItems, err := store.GetPendingQueue(ctx) if err != nil { return fmt.Errorf("could not fetch pending queue from store: %w", err) } @@ -1412,7 +1412,7 @@ func (r *ReportingPlugin) validateListSecretIdentifiersObservation(ctx context.C } func (r *ReportingPlugin) StateTransition(ctx context.Context, seqNr uint64, aq types.AttributedQuery, aos []types.AttributedObservation, keyValueReadWriter ocr3_1types.KeyValueStateReadWriter, blobFetcher ocr3_1types.BlobFetcher) (ocr3_1types.ReportsPlusPrecursor, error) { - store := NewWriteStore(keyValueReadWriter) + store := NewWriteStore(keyValueReadWriter, r.metrics) marshalledObs := map[uint8]*vaultcommon.Observations{} for _, ao := range aos { @@ -1656,7 +1656,7 @@ func (r *ReportingPlugin) stateTransitionPendingQueue(ctx context.Context, store keptItems = keptItems[:errBoundLimited.Limit] } - return store.WritePendingQueue(keptItems) + return store.WritePendingQueue(ctx, keptItems) } func sortKey(id string, nonce []byte) []byte { @@ -1845,7 +1845,7 @@ func (r *ReportingPlugin) stateTransitionCreateSecretsRequest(ctx context.Contex return nil, newUserError("could not decode secret value: invalid hex" + err.Error()) } - secret, err := store.GetSecret(req.Id) + secret, err := store.GetSecret(ctx, req.Id) if err != nil { return nil, fmt.Errorf("failed to read secret from key-value store: %w", err) } @@ -1854,7 +1854,7 @@ func (r *ReportingPlugin) stateTransitionCreateSecretsRequest(ctx context.Contex return nil, newUserError("could not write to key value store: key already exists") } - count, err := store.GetSecretIdentifiersCountForOwner(req.Id.Owner) + count, err := store.GetSecretIdentifiersCountForOwner(ctx, req.Id.Owner) if err != nil { return nil, fmt.Errorf("failed to read secret identifiers count for owner: %w", err) } @@ -1868,7 +1868,7 @@ func (r *ReportingPlugin) stateTransitionCreateSecretsRequest(ctx context.Contex return nil, fmt.Errorf("failed to check max secrets per owner limit: %w", ierr) } - err = store.WriteSecret(req.Id, &vaultcommon.StoredSecret{ + err = store.WriteSecret(ctx, req.Id, &vaultcommon.StoredSecret{ EncryptedSecret: encryptedSecret, }) if err != nil { @@ -1961,7 +1961,7 @@ func (r *ReportingPlugin) stateTransitionUpdateSecretsRequest(ctx context.Contex return nil, newUserError("could not decode secret value: invalid hex" + err.Error()) } - secret, err := store.GetSecret(req.Id) + secret, err := store.GetSecret(ctx, req.Id) if err != nil { return nil, fmt.Errorf("failed to read secret from key-value store: %w", err) } @@ -1970,7 +1970,7 @@ func (r *ReportingPlugin) stateTransitionUpdateSecretsRequest(ctx context.Contex return nil, newUserError("could not write update to key value store: key does not exist") } - err = store.WriteSecret(req.Id, &vaultcommon.StoredSecret{ + err = store.WriteSecret(ctx, req.Id, &vaultcommon.StoredSecret{ EncryptedSecret: encryptedSecret, }) if err != nil { @@ -2058,7 +2058,7 @@ func (r *ReportingPlugin) stateTransitionDeleteSecretsRequest(ctx context.Contex return resp, newUserError(resp.GetError()) } - err := store.DeleteSecret(id) + err := store.DeleteSecret(ctx, id) if err != nil { return nil, fmt.Errorf("failed to delete secret from key value store: %w", err) } diff --git a/core/services/ocr2/plugins/vault/plugin_test.go b/core/services/ocr2/plugins/vault/plugin_test.go index b755fef86db..45be4e3cced 100644 --- a/core/services/ocr2/plugins/vault/plugin_test.go +++ b/core/services/ocr2/plugins/vault/plugin_test.go @@ -8,6 +8,7 @@ import ( "fmt" "strings" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/smartcontractkit/libocr/commontypes" @@ -291,8 +292,9 @@ func TestPlugin_Observation_NothingInBatch(t *testing.T) { lggr := logger.TestLogger(t) store := requests.NewStore[*vaulttypes.Request]() r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -325,8 +327,9 @@ func TestPlugin_Observation_PendingQueueEnabled_EmptyPendingQueue(t *testing.T) lggr := logger.TestLogger(t) store := requests.NewStore[*vaulttypes.Request]() r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -400,8 +403,9 @@ func TestPlugin_Observation_PendingQueueEnabled_WithPendingQueueProvided(t *test lggr := logger.TestLogger(t) store := requests.NewStore[*vaulttypes.Request]() r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -456,7 +460,7 @@ func TestPlugin_Observation_PendingQueueEnabled_WithPendingQueueProvided(t *test } anyd, err := anypb.New(d) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-3", Item: anyd}, }, @@ -495,8 +499,9 @@ func TestPlugin_Observation_PendingQueueEnabled_ItemBothInPendingQueueAndLocalQu lggr := logger.TestLogger(t) store := requests.NewStore[*vaulttypes.Request]() r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -548,7 +553,7 @@ func TestPlugin_Observation_PendingQueueEnabled_ItemBothInPendingQueueAndLocalQu anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-2", Item: anyp}, }, @@ -641,6 +646,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretIdentifierInvalid(t *testing r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -670,7 +676,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretIdentifierInvalid(t *testing } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -706,8 +712,9 @@ func TestPlugin_Observation_GetSecretsRequest_FillsInNamespace(t *testing.T) { _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -743,7 +750,7 @@ func TestPlugin_Observation_GetSecretsRequest_FillsInNamespace(t *testing.T) { Namespace: "main", Key: "my_secret", } - err = NewWriteStore(rdr).WriteSecret(createdID, &vaultcommon.StoredSecret{ + err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), createdID,&vaultcommon.StoredSecret{ EncryptedSecret: ciphertextBytes, }) require.NoError(t, err) @@ -763,7 +770,7 @@ func TestPlugin_Observation_GetSecretsRequest_FillsInNamespace(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -795,8 +802,9 @@ func TestPlugin_Observation_GetSecretsRequest_SecretDoesNotExist(t *testing.T) { lggr := logger.TestLogger(t) store := requests.NewStore[*vaulttypes.Request]() r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -832,7 +840,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretDoesNotExist(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -869,6 +877,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretExistsButIsIncorrect(t *test r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -894,7 +903,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretExistsButIsIncorrect(t *test m: make(map[string]response), } - err = NewWriteStore(rdr).WriteSecret(id, &vaultcommon.StoredSecret{ + err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id,&vaultcommon.StoredSecret{ EncryptedSecret: []byte("invalid-ciphertext"), }) require.NoError(t, err) @@ -909,7 +918,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretExistsButIsIncorrect(t *test } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -955,6 +964,7 @@ func TestPlugin_Observation_GetSecretsRequest_PublicKeyIsInvalid(t *testing.T) { r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -986,7 +996,7 @@ func TestPlugin_Observation_GetSecretsRequest_PublicKeyIsInvalid(t *testing.T) { ciphertextBytes, err := ciphertext.Marshal() require.NoError(t, err) - err = NewWriteStore(rdr).WriteSecret(id, &vaultcommon.StoredSecret{ + err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id,&vaultcommon.StoredSecret{ EncryptedSecret: ciphertextBytes, }) require.NoError(t, err) @@ -1001,7 +1011,7 @@ func TestPlugin_Observation_GetSecretsRequest_PublicKeyIsInvalid(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1039,6 +1049,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretLabelIsInvalid(t *testing.T) r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1073,7 +1084,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretLabelIsInvalid(t *testing.T) ciphertextBytes, err := ciphertext.Marshal() require.NoError(t, err) - err = NewWriteStore(rdr).WriteSecret(id, &vaultcommon.StoredSecret{ + err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id,&vaultcommon.StoredSecret{ EncryptedSecret: ciphertextBytes, }) require.NoError(t, err) @@ -1093,7 +1104,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretLabelIsInvalid(t *testing.T) } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1131,6 +1142,7 @@ func TestPlugin_Observation_GetSecretsRequest_Success(t *testing.T) { r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1166,7 +1178,7 @@ func TestPlugin_Observation_GetSecretsRequest_Success(t *testing.T) { ciphertextBytes, err := ciphertext.Marshal() require.NoError(t, err) - err = NewWriteStore(rdr).WriteSecret(id, &vaultcommon.StoredSecret{ + err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id,&vaultcommon.StoredSecret{ EncryptedSecret: ciphertextBytes, }) require.NoError(t, err) @@ -1186,7 +1198,7 @@ func TestPlugin_Observation_GetSecretsRequest_Success(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1241,6 +1253,173 @@ func TestPlugin_Observation_GetSecretsRequest_Success(t *testing.T) { assert.Equal(t, plaintext, gotSecret) } +func TestPlugin_Observation_MaxBatchGetSecretsWithEncryptionKeys(t *testing.T) { + lggr, _ := logger.TestLoggerObserved(t, zapcore.DebugLevel) + store := requests.NewStore[*vaulttypes.Request]() + _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) + require.NoError(t, err) + + batchSize := 10 + maxRequestBatchSize := 10 + numEncryptionKeys := 10 + + r := &ReportingPlugin{ + lggr: lggr, + store: store, + metrics: newTestMetrics(t), + marshalBlob: mockMarshalBlob, + unmarshalBlob: mockUnmarshalBlob, + cfg: makeReportingPluginConfig( + t, + batchSize, + pk, + shares[0], + 100, + 2048, + 64, + 64, + 64, + maxRequestBatchSize, + ), + } + + rdr := &kv{ + m: make(map[string]response), + } + ws := newTestWriteStore(t, rdr) + + // Generate encryption keys (NaCl box public keys). + encryptionKeys := make([]string, numEncryptionKeys) + for i := range encryptionKeys { + pubK, _, kerr := box.GenerateKey(rand.Reader) + require.NoError(t, kerr) + encryptionKeys[i] = hex.EncodeToString(pubK[:]) + } + + // Build batchSize pending queue items, each a GetSecretsRequest with 1 secret and numEncryptionKeys encryption keys. + pendingItems := make([]*vaultcommon.StoredPendingQueueItem, batchSize) + for i := 0; i < batchSize; i++ { + owner := fmt.Sprintf("0x%040d", i+1) + id := &vaultcommon.SecretIdentifier{ + Owner: owner, + Namespace: "main", + Key: fmt.Sprintf("secret_%d", i), + } + + // Encrypt a secret with the correct label for this owner. + var label [32]byte + ownerAddress := common.HexToAddress(owner) + copy(label[12:], ownerAddress.Bytes()) + ciphertext, cerr := tdh2easy.EncryptWithLabel(pk, []byte(fmt.Sprintf("plaintext-%d", i)), label) + require.NoError(t, cerr) + ciphertextBytes, cerr := ciphertext.Marshal() + require.NoError(t, cerr) + + // Store the secret in KV. + err = ws.WriteSecret(t.Context(), id,&vaultcommon.StoredSecret{ + EncryptedSecret: ciphertextBytes, + }) + require.NoError(t, err) + + p := &vaultcommon.GetSecretsRequest{ + Requests: []*vaultcommon.SecretRequest{ + { + Id: id, + EncryptionKeys: encryptionKeys, + }, + }, + } + anyp, aerr := anypb.New(p) + require.NoError(t, aerr) + pendingItems[i] = &vaultcommon.StoredPendingQueueItem{ + Id: fmt.Sprintf("request-%d", i), + Item: anyp, + } + } + + err = ws.WritePendingQueue(t.Context(), pendingItems) + require.NoError(t, err) + + // Add 2*batchSize items to the local store (with different IDs) so that + // the observation also includes the maximum number of pending queue items + // to be broadcast as blobs. + numLocalItems := 2 * batchSize + for i := range numLocalItems { + owner := fmt.Sprintf("0x%040d", batchSize+i+1) + id := &vaultcommon.SecretIdentifier{ + Owner: owner, + Namespace: "main", + Key: fmt.Sprintf("local_secret_%d", i), + } + p := &vaultcommon.GetSecretsRequest{ + Requests: []*vaultcommon.SecretRequest{ + { + Id: id, + EncryptionKeys: encryptionKeys, + }, + }, + } + err = store.Add(&vaulttypes.Request{Payload: p, IDVal: fmt.Sprintf("local-request-%d", i)}) + require.NoError(t, err) + } + + seqNr := uint64(1) + bf := &blobber{} + start := time.Now() + data, err := r.Observation(t.Context(), seqNr, types.AttributedQuery{}, rdr, bf) + elapsed := time.Since(start) + require.NoError(t, err) + t.Logf("Observation took %s, output size: %d bytes", elapsed, len(data)) + + obs := &vaultcommon.Observations{} + err = proto.Unmarshal(data, obs) + require.NoError(t, err) + + // Verify all pending queue requests were observed. + require.Len(t, obs.Observations, batchSize) + + for i, o := range obs.Observations { + assert.Equal(t, fmt.Sprintf("request-%d", i), o.Id) + assert.Equal(t, vaultcommon.RequestType_GET_SECRETS, o.RequestType) + + batchResp := o.GetGetSecretsResponse() + require.Len(t, batchResp.Responses, 1) + + resp := batchResp.Responses[0] + assert.Empty(t, resp.GetError()) + assert.NotEmpty(t, resp.GetData().EncryptedValue) + assert.Len(t, resp.GetData().EncryptedDecryptionKeyShares, numEncryptionKeys) + + for _, share := range resp.GetData().EncryptedDecryptionKeyShares { + assert.Len(t, share.Shares, 1) + assert.NotEmpty(t, share.Shares[0]) + } + } + + // Verify all local queue items were broadcast as pending queue observations. + // The local queue is sorted lexicographically by ID before broadcasting, + // so we collect the IDs into a set rather than asserting on order. + assert.Len(t, obs.PendingQueueItems, numLocalItems) + require.Len(t, bf.blobs, numLocalItems) + gotLocalIDs := map[string]bool{} + for _, blob := range bf.blobs { + gotMsg := &vaultcommon.StoredPendingQueueItem{} + err = proto.Unmarshal(blob, gotMsg) + require.NoError(t, err) + gotLocalIDs[gotMsg.Id] = true + } + for i := range numLocalItems { + assert.True(t, gotLocalIDs[fmt.Sprintf("local-request-%d", i)], "missing local-request-%d", i) + } + + assert.NotEmpty(t, obs.SortNonce) + + // Verify the serialized observation fits within the max observation size limit (512 KB). + maxObservationBytes := 512 * 1000 + assert.LessOrEqual(t, len(data), maxObservationBytes, + "observation size %d exceeds max observation limit %d", len(data), maxObservationBytes) +} + func TestPlugin_Observation_CreateSecretsRequest_SecretIdentifierInvalid(t *testing.T) { tcs := []struct { name string @@ -1288,6 +1467,7 @@ func TestPlugin_Observation_CreateSecretsRequest_SecretIdentifierInvalid(t *test r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1318,7 +1498,7 @@ func TestPlugin_Observation_CreateSecretsRequest_SecretIdentifierInvalid(t *test } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1354,6 +1534,7 @@ func TestPlugin_Observation_CreateSecretsRequest_DisallowsDuplicateRequests(t *t r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1393,7 +1574,7 @@ func TestPlugin_Observation_CreateSecretsRequest_DisallowsDuplicateRequests(t *t } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1431,8 +1612,9 @@ func TestPlugin_StateTransition_CreateSecretsRequest_CorrectlyTracksLimits(t *te _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -1546,6 +1728,7 @@ func TestPlugin_Observation_CreateSecretsRequest_InvalidCiphertext(t *testing.T) r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1582,7 +1765,7 @@ func TestPlugin_Observation_CreateSecretsRequest_InvalidCiphertext(t *testing.T) } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1616,6 +1799,7 @@ func TestPlugin_Observation_CreateSecretsRequest_InvalidCiphertext_TooLong(t *te r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1653,7 +1837,7 @@ func TestPlugin_Observation_CreateSecretsRequest_InvalidCiphertext_TooLong(t *te } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1693,6 +1877,7 @@ func TestPlugin_Observation_CreateSecretsRequest_InvalidCiphertext_EncryptedWith r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1735,7 +1920,7 @@ func TestPlugin_Observation_CreateSecretsRequest_InvalidCiphertext_EncryptedWith } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1771,6 +1956,7 @@ func TestPlugin_Observation_CreateSecretsRequest_SecretLabelIsInvalid(t *testing r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1820,7 +2006,7 @@ func TestPlugin_Observation_CreateSecretsRequest_SecretLabelIsInvalid(t *testing } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1852,6 +2038,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_SecretLabelIsInvalid(t *testing r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -1901,7 +2088,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_SecretLabelIsInvalid(t *testing } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -1931,8 +2118,9 @@ func TestPlugin_StateTransition_CreateSecretsRequest_TooManySecretsForOwner(t *t _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -1956,8 +2144,8 @@ func TestPlugin_StateTransition_CreateSecretsRequest_TooManySecretsForOwner(t *t Namespace: "main", Key: "secret", } - kvstore := NewWriteStore(rdr) - err = kvstore.WriteMetadata(id.Owner, &vaultcommon.StoredMetadata{ + kvstore := newTestWriteStore(t, rdr) + err = kvstore.WriteMetadata(t.Context(), id.Owner,&vaultcommon.StoredMetadata{ SecretIdentifiers: []*vaultcommon.SecretIdentifier{ { Owner: "owner", @@ -2022,8 +2210,9 @@ func TestPlugin_StateTransition_CreateSecretsRequest_SecretExistsForKey(t *testi _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2047,8 +2236,8 @@ func TestPlugin_StateTransition_CreateSecretsRequest_SecretExistsForKey(t *testi Namespace: "main", Key: "secret", } - kvstore := NewWriteStore(rdr) - err = kvstore.WriteSecret(id, &vaultcommon.StoredSecret{ + kvstore := newTestWriteStore(t, rdr) + err = kvstore.WriteSecret(t.Context(), id,&vaultcommon.StoredSecret{ EncryptedSecret: []byte("some-ciphertext"), }) require.NoError(t, err) @@ -2109,6 +2298,7 @@ func TestPlugin_Observation_CreateSecretsRequest_Success(t *testing.T) { r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -2150,7 +2340,7 @@ func TestPlugin_Observation_CreateSecretsRequest_Success(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -2333,6 +2523,7 @@ func TestPlugin_StateTransition_InsufficientObservations(t *testing.T) { F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2409,6 +2600,7 @@ func TestPlugin_StateTransition_GetSecretsRequest_ResponseSizeWithinLimit(t *tes F: 3, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2481,6 +2673,7 @@ func TestPlugin_ValidateObservations_InvalidObservations(t *testing.T) { F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2570,6 +2763,7 @@ func TestPlugin_ValidateObservations_IncludesAllItemsInPendingQueue(t *testing.T F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2614,7 +2808,7 @@ func TestPlugin_ValidateObservations_IncludesAllItemsInPendingQueue(t *testing.T } anyg, err := anypb.New(g) require.NoError(t, err) - err = NewWriteStore(kv).WritePendingQueue( + err = newTestWriteStore(t, kv).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: vaulttypes.KeyFor(id), Item: anyd}, {Id: vaulttypes.KeyFor(id2), Item: anyg}, @@ -2673,6 +2867,7 @@ func TestPlugin_ValidateObservations_DisallowsDuplicateBlobHandles(t *testing.T) F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2732,6 +2927,7 @@ func TestPlugin_StateTransition_ShasDontMatch(t *testing.T) { F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2815,6 +3011,7 @@ func TestPlugin_StateTransition_AggregatesValidationErrors(t *testing.T) { F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -2894,6 +3091,7 @@ func TestPlugin_StateTransition_GetSecretsRequest_CombinesShares(t *testing.T) { F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -3038,6 +3236,7 @@ func TestPlugin_StateTransition_CreateSecretsRequest_WritesSecrets(t *testing.T) F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -3056,7 +3255,7 @@ func TestPlugin_StateTransition_CreateSecretsRequest_WritesSecrets(t *testing.T) kv := &kv{ m: make(map[string]response), } - rs := NewReadStore(kv) + rs := newTestReadStore(t, kv) id := &vaultcommon.SecretIdentifier{ Owner: "owner", @@ -3115,7 +3314,7 @@ func TestPlugin_StateTransition_CreateSecretsRequest_WritesSecrets(t *testing.T) } assert.True(t, proto.Equal(expectedResp, o.GetCreateSecretsResponse()), o.GetCreateSecretsResponse()) - ss, err := rs.GetSecret(id) + ss, err := rs.GetSecret(t.Context(), id) require.NoError(t, err) assert.Equal(t, ss.EncryptedSecret, []byte("encrypted-value")) @@ -3209,6 +3408,7 @@ func TestPlugin_Reports(t *testing.T) { F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -3304,6 +3504,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_SecretIdentifierInvalid(t *test r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -3334,7 +3535,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_SecretIdentifierInvalid(t *test } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -3370,6 +3571,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_DisallowsDuplicateRequests(t *t r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -3409,7 +3611,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_DisallowsDuplicateRequests(t *t } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -3447,6 +3649,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_InvalidCiphertext(t *testing.T) r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -3483,7 +3686,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_InvalidCiphertext(t *testing.T) } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -3517,6 +3720,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_InvalidCiphertext_TooLong(t *te r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -3554,7 +3758,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_InvalidCiphertext_TooLong(t *te } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -3594,6 +3798,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_InvalidCiphertext_EncryptedWith r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -3636,7 +3841,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_InvalidCiphertext_EncryptedWith } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -3676,6 +3881,7 @@ func TestPlugin_StateTransition_UpdateSecretsRequest_SecretDoesntExist(t *testin F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -3694,7 +3900,7 @@ func TestPlugin_StateTransition_UpdateSecretsRequest_SecretDoesntExist(t *testin kv := &kv{ m: make(map[string]response), } - rs := NewReadStore(kv) + rs := newTestReadStore(t, kv) id := &vaultcommon.SecretIdentifier{ Owner: "owner", @@ -3753,7 +3959,7 @@ func TestPlugin_StateTransition_UpdateSecretsRequest_SecretDoesntExist(t *testin } assert.True(t, proto.Equal(expectedResp, o.GetUpdateSecretsResponse()), o.GetUpdateSecretsResponse()) - ss, err := rs.GetSecret(id) + ss, err := rs.GetSecret(t.Context(), id) require.NoError(t, err) require.Nil(t, ss) @@ -3772,6 +3978,7 @@ func TestPlugin_StateTransition_UpdateSecretsRequest_WritesSecrets(t *testing.T) F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -3809,7 +4016,7 @@ func TestPlugin_StateTransition_UpdateSecretsRequest_WritesSecrets(t *testing.T) }, }, } - rs := NewReadStore(kv) + rs := newTestReadStore(t, kv) value := []byte("encrypted-value") enc := hex.EncodeToString(value) @@ -3864,7 +4071,7 @@ func TestPlugin_StateTransition_UpdateSecretsRequest_WritesSecrets(t *testing.T) } assert.True(t, proto.Equal(expectedResp, o.GetUpdateSecretsResponse()), o.GetUpdateSecretsResponse()) - ss, err := rs.GetSecret(id) + ss, err := rs.GetSecret(t.Context(), id) require.NoError(t, err) require.NotNil(t, ss) @@ -3928,6 +4135,7 @@ func TestPlugin_Reports_UpdateSecretsRequest(t *testing.T) { F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -3968,6 +4176,7 @@ func TestPlugin_Observation_DeleteSecrets(t *testing.T) { r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -4022,7 +4231,7 @@ func TestPlugin_Observation_DeleteSecrets(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -4054,6 +4263,7 @@ func TestPlugin_Observation_DeleteSecrets_IdDoesntExist(t *testing.T) { r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -4087,7 +4297,7 @@ func TestPlugin_Observation_DeleteSecrets_IdDoesntExist(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -4119,6 +4329,7 @@ func TestPlugin_Observation_DeleteSecrets_InvalidRequestDuplicateIds(t *testing. r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -4153,7 +4364,7 @@ func TestPlugin_Observation_DeleteSecrets_InvalidRequestDuplicateIds(t *testing. } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -4195,6 +4406,7 @@ func TestPlugin_StateTransition_DeleteSecretsRequest(t *testing.T) { F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -4239,7 +4451,7 @@ func TestPlugin_StateTransition_DeleteSecretsRequest(t *testing.T) { }, }, } - rs := NewReadStore(rdr) + rs := newTestReadStore(t, rdr) req := &vaultcommon.DeleteSecretsRequest{ RequestId: "request-id", @@ -4286,7 +4498,7 @@ func TestPlugin_StateTransition_DeleteSecretsRequest(t *testing.T) { } assert.True(t, proto.Equal(expectedResp, o.GetDeleteSecretsResponse())) - ss, err = rs.GetSecret(id) + ss, err = rs.GetSecret(t.Context(), id) require.NoError(t, err) require.Nil(t, ss) @@ -4305,6 +4517,7 @@ func TestPlugin_StateTransition_DeleteSecretsRequest_SecretDoesNotExist(t *testi F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -4338,7 +4551,7 @@ func TestPlugin_StateTransition_DeleteSecretsRequest_SecretDoesNotExist(t *testi }, }, } - rs := NewReadStore(rdr) + rs := newTestReadStore(t, rdr) req := &vaultcommon.DeleteSecretsRequest{ RequestId: "request-id", @@ -4385,7 +4598,7 @@ func TestPlugin_StateTransition_DeleteSecretsRequest_SecretDoesNotExist(t *testi } assert.True(t, proto.Equal(expectedResp, o.GetDeleteSecretsResponse()), o.GetDeleteSecretsResponse()) - ss, err := rs.GetSecret(id) + ss, err := rs.GetSecret(t.Context(), id) require.NoError(t, err) require.Nil(t, ss) @@ -4442,6 +4655,7 @@ func TestPlugin_Reports_DeleteSecretsRequest(t *testing.T) { F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -4482,6 +4696,7 @@ func TestPlugin_Observation_ListSecretIdentifiers_OwnerRequired(t *testing.T) { r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -4508,7 +4723,7 @@ func TestPlugin_Observation_ListSecretIdentifiers_OwnerRequired(t *testing.T) { } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -4539,6 +4754,7 @@ func TestPlugin_Observation_ListSecretIdentifiers_NoNamespaceProvided(t *testing r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -4591,7 +4807,7 @@ func TestPlugin_Observation_ListSecretIdentifiers_NoNamespaceProvided(t *testing } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -4642,6 +4858,7 @@ func TestPlugin_Observation_ListSecretIdentifiers_FilterByNamespace(t *testing.T r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -4695,7 +4912,7 @@ func TestPlugin_Observation_ListSecretIdentifiers_FilterByNamespace(t *testing.T } anyp, err := anypb.New(p) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -4781,6 +4998,7 @@ func TestPlugin_Reports_ListSecretIdentifiersRequest(t *testing.T) { F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -4827,6 +5045,7 @@ func TestPlugin_StateTransition_ListSecretIdentifiers(t *testing.T) { F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -4845,7 +5064,7 @@ func TestPlugin_StateTransition_ListSecretIdentifiers(t *testing.T) { kv := &kv{ m: make(map[string]response), } - rs := NewReadStore(kv) + rs := newTestReadStore(t, kv) id := &vaultcommon.SecretIdentifier{ Owner: "owner", @@ -4884,7 +5103,7 @@ func TestPlugin_StateTransition_ListSecretIdentifiers(t *testing.T) { assert.True(t, proto.Equal(resp, o.GetListSecretIdentifiersResponse())) - ss, err := rs.GetSecret(id) + ss, err := rs.GetSecret(t.Context(), id) require.NoError(t, err) require.Nil(t, ss) @@ -4912,8 +5131,9 @@ func TestPlugin_StateTransition_StoresPendingQueue(t *testing.T) { _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -5053,7 +5273,7 @@ func TestPlugin_StateTransition_StoresPendingQueue(t *testing.T) { assert.Empty(t, os.Outcomes) - pq, err := NewReadStore(rdr).GetPendingQueue() + pq, err := newTestReadStore(t, rdr).GetPendingQueue(t.Context()) require.NoError(t, err) assert.Len(t, pq, 3) @@ -5071,8 +5291,9 @@ func TestPlugin_StateTransition_StoresPendingQueue_LimitedToBatchSize(t *testing _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -5204,7 +5425,7 @@ func TestPlugin_StateTransition_StoresPendingQueue_LimitedToBatchSize(t *testing assert.Empty(t, os.Outcomes) - pq, err := NewReadStore(rdr).GetPendingQueue() + pq, err := newTestReadStore(t, rdr).GetPendingQueue(t.Context()) require.NoError(t, err) assert.Len(t, pq, 1) @@ -5223,8 +5444,9 @@ func TestPlugin_StateTransition_StoresPendingQueue_DoesntDoubleCountObservations _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -5295,7 +5517,7 @@ func TestPlugin_StateTransition_StoresPendingQueue_DoesntDoubleCountObservations assert.Empty(t, os.Outcomes) - pq, err := NewReadStore(rdr).GetPendingQueue() + pq, err := newTestReadStore(t, rdr).GetPendingQueue(t.Context()) require.NoError(t, err) assert.Empty(t, pq, 0) @@ -5314,8 +5536,9 @@ func TestPlugin_ValidateObservation_RejectsIfMoreThan2xBatchSize(t *testing.T) { _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -5404,8 +5627,9 @@ func TestPlugin_ValidateObservation_AcceptsFullPendingQueueObservation(t *testin batchSize := 1 // MaxBatchSize=1, so 2*batchSize=2 is the intended max pending queue items r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -5481,8 +5705,9 @@ func TestPlugin_ValidateObservation_GetSecretsRequest(t *testing.T) { _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -5544,7 +5769,7 @@ func TestPlugin_ValidateObservation_GetSecretsRequest(t *testing.T) { anyp, err := anypb.New(req) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -5760,8 +5985,9 @@ func TestPlugin_ValidateObservation_PanicsOnEmptyShares(t *testing.T) { _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -5825,7 +6051,7 @@ func TestPlugin_ValidateObservation_PanicsOnEmptyShares(t *testing.T) { anyp, err := anypb.New(req) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -5872,8 +6098,9 @@ func TestPlugin_ValidateObservation_NilSecretIdentifier(t *testing.T) { _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -6073,7 +6300,7 @@ func TestPlugin_ValidateObservation_NilSecretIdentifier(t *testing.T) { } require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -6110,8 +6337,9 @@ func TestPlugin_ValidateObservation_CiphertextSize(t *testing.T) { // maxCipherTextLengthBytes = 10 bytes, so any ciphertext > 10 decoded bytes should be rejected r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -6296,7 +6524,7 @@ func TestPlugin_ValidateObservation_CiphertextSize(t *testing.T) { } require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyp}, }, @@ -6341,6 +6569,7 @@ func TestPlugin_StateTransition_PendingQueueEnabled_NewQuora_NotGetRequest(t *te F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -6359,7 +6588,7 @@ func TestPlugin_StateTransition_PendingQueueEnabled_NewQuora_NotGetRequest(t *te kv := &kv{ m: make(map[string]response), } - rs := NewReadStore(kv) + rs := newTestReadStore(t, kv) id := &vaultcommon.SecretIdentifier{ Owner: "owner", @@ -6396,7 +6625,7 @@ func TestPlugin_StateTransition_PendingQueueEnabled_NewQuora_NotGetRequest(t *te assert.True(t, proto.Equal(req, o.GetListSecretIdentifiersRequest())) assert.True(t, proto.Equal(resp, o.GetListSecretIdentifiersResponse())) - ss, err := rs.GetSecret(id) + ss, err := rs.GetSecret(t.Context(), id) require.NoError(t, err) require.Nil(t, ss) @@ -6415,6 +6644,7 @@ func TestPlugin_StateTransition_PendingQueueEnabled_GetRequest(t *testing.T) { F: 1, }, store: store, + metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, 10, @@ -6433,7 +6663,7 @@ func TestPlugin_StateTransition_PendingQueueEnabled_GetRequest(t *testing.T) { kv := &kv{ m: make(map[string]response), } - rs := NewReadStore(kv) + rs := newTestReadStore(t, kv) id := &vaultcommon.SecretIdentifier{ Owner: "owner", @@ -6484,7 +6714,7 @@ func TestPlugin_StateTransition_PendingQueueEnabled_GetRequest(t *testing.T) { assert.True(t, proto.Equal(req, o.GetGetSecretsRequest())) assert.True(t, proto.Equal(resp, o.GetGetSecretsResponse())) - ss, err := rs.GetSecret(id) + ss, err := rs.GetSecret(t.Context(), id) require.NoError(t, err) require.Nil(t, ss) @@ -6630,6 +6860,7 @@ func TestPlugin_ValidateObservation_RequestBatchLimit(t *testing.T) { r := &ReportingPlugin{ lggr: lggr, store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -6683,8 +6914,9 @@ func TestPlugin_ValidateObservation_ListSecretIdentifiersExceedsMaxSecretsPerOwn _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, + metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, F: 1, @@ -6732,7 +6964,7 @@ func TestPlugin_ValidateObservation_ListSecretIdentifiersExceedsMaxSecretsPerOwn rdr := &kv{m: make(map[string]response)} anyReq, err := anypb.New(listReq) require.NoError(t, err) - err = NewWriteStore(rdr).WritePendingQueue( + err = newTestWriteStore(t, rdr).WritePendingQueue(t.Context(), []*vaultcommon.StoredPendingQueueItem{ {Id: "request-1", Item: anyReq}, }, From eb65caf9811d472143b45a18a575671631ade4b1 Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Thu, 26 Mar 2026 17:38:57 +0000 Subject: [PATCH 2/8] Increase buckets to account for production usage --- core/services/ocr3_1/beholderwrapper/types.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/services/ocr3_1/beholderwrapper/types.go b/core/services/ocr3_1/beholderwrapper/types.go index 0c8c16d93a3..124b0b2c294 100644 --- a/core/services/ocr3_1/beholderwrapper/types.go +++ b/core/services/ocr3_1/beholderwrapper/types.go @@ -113,16 +113,16 @@ func MetricViews() []sdkmetric.View { sdkmetric.NewView( sdkmetric.Instrument{Name: "platform_ocr3_1_reporting_plugin_duration_ms"}, sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ - // 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560 - Boundaries: prometheus.ExponentialBuckets(5, 2, 10), + // 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480, 40960 + Boundaries: prometheus.ExponentialBuckets(5, 2, 14), }}, ), sdkmetric.NewView( sdkmetric.Instrument{Name: "platform_ocr3_1_reporting_plugin_data_sizes"}, sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ // 512KB is the max value possible - // 1KB, 2KB, 4KB, 8KB, 16KB, 32KB, 64KB, 128KB, 256KB, 512KB - Boundaries: prometheus.ExponentialBuckets(1024, 2, 10), + // 1KB, 2KB, 4KB, 8KB, 16KB, 32KB, 64KB, 128KB, 256KB, 512KB, 1024KB, 2048KB, 4096KB, 8192KB + Boundaries: prometheus.ExponentialBuckets(1024, 2, 14), }}, ), } From cd79b586c5ce18269734cbc6812f4a1b9175d6f0 Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Thu, 26 Mar 2026 18:21:44 +0000 Subject: [PATCH 3/8] Instrument KV and BlobBroadcaster/Fetcher interfaces --- .../beholderwrapper/instrumented_blob.go | 39 ++ .../ocr3_1/beholderwrapper/instrumented_kv.go | 40 ++ .../services/ocr3_1/beholderwrapper/plugin.go | 43 +- .../ocr3_1/beholderwrapper/plugin_test.go | 405 ++++++++++++++++++ core/services/ocr3_1/beholderwrapper/types.go | 46 ++ 5 files changed, 567 insertions(+), 6 deletions(-) create mode 100644 core/services/ocr3_1/beholderwrapper/instrumented_blob.go create mode 100644 core/services/ocr3_1/beholderwrapper/instrumented_kv.go diff --git a/core/services/ocr3_1/beholderwrapper/instrumented_blob.go b/core/services/ocr3_1/beholderwrapper/instrumented_blob.go new file mode 100644 index 00000000000..f2ccdb374ba --- /dev/null +++ b/core/services/ocr3_1/beholderwrapper/instrumented_blob.go @@ -0,0 +1,39 @@ +package beholderwrapper + +import ( + "context" + "time" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" +) + +type instrumentedBlobBroadcastFetcher struct { + inner ocr3_1types.BlobBroadcastFetcher + metrics *pluginMetrics +} + +func (i *instrumentedBlobBroadcastFetcher) BroadcastBlob(ctx context.Context, payload []byte, expirationHint ocr3_1types.BlobExpirationHint) (ocr3_1types.BlobHandle, error) { + start := time.Now() + handle, err := i.inner.BroadcastBlob(ctx, payload, expirationHint) + i.metrics.recordBlobDuration(ctx, "BroadcastBlob", time.Since(start), err == nil) + return handle, err +} + +func (i *instrumentedBlobBroadcastFetcher) FetchBlob(ctx context.Context, handle ocr3_1types.BlobHandle) ([]byte, error) { + start := time.Now() + data, err := i.inner.FetchBlob(ctx, handle) + i.metrics.recordBlobDuration(ctx, "FetchBlob", time.Since(start), err == nil) + return data, err +} + +type instrumentedBlobFetcher struct { + inner ocr3_1types.BlobFetcher + metrics *pluginMetrics +} + +func (i *instrumentedBlobFetcher) FetchBlob(ctx context.Context, handle ocr3_1types.BlobHandle) ([]byte, error) { + start := time.Now() + data, err := i.inner.FetchBlob(ctx, handle) + i.metrics.recordBlobDuration(ctx, "FetchBlob", time.Since(start), err == nil) + return data, err +} diff --git a/core/services/ocr3_1/beholderwrapper/instrumented_kv.go b/core/services/ocr3_1/beholderwrapper/instrumented_kv.go new file mode 100644 index 00000000000..5550cb2c839 --- /dev/null +++ b/core/services/ocr3_1/beholderwrapper/instrumented_kv.go @@ -0,0 +1,40 @@ +package beholderwrapper + +import ( + "context" + "time" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" +) + +type instrumentedKVStateReader struct { + inner ocr3_1types.KeyValueStateReader + ctx context.Context + metrics *pluginMetrics +} + +func (i *instrumentedKVStateReader) Read(key []byte) ([]byte, error) { + start := time.Now() + data, err := i.inner.Read(key) + i.metrics.recordKVDuration(i.ctx, "Read", time.Since(start), err == nil) + return data, err +} + +type instrumentedKVStateReadWriter struct { + instrumentedKVStateReader + writer ocr3_1types.KeyValueStateReadWriter +} + +func (i *instrumentedKVStateReadWriter) Write(key []byte, value []byte) error { + start := time.Now() + err := i.writer.Write(key, value) + i.metrics.recordKVDuration(i.ctx, "Write", time.Since(start), err == nil) + return err +} + +func (i *instrumentedKVStateReadWriter) Delete(key []byte) error { + start := time.Now() + err := i.writer.Delete(key) + i.metrics.recordKVDuration(i.ctx, "Delete", time.Since(start), err == nil) + return err +} diff --git a/core/services/ocr3_1/beholderwrapper/plugin.go b/core/services/ocr3_1/beholderwrapper/plugin.go index 356ac62be57..ec08da2c4a6 100644 --- a/core/services/ocr3_1/beholderwrapper/plugin.go +++ b/core/services/ocr3_1/beholderwrapper/plugin.go @@ -26,15 +26,46 @@ func newReportingPlugin[RI any]( } } +func (p *reportingPlugin[RI]) wrapReader(ctx context.Context, r ocr3_1types.KeyValueStateReader) ocr3_1types.KeyValueStateReader { + if r == nil { + return nil + } + return &instrumentedKVStateReader{inner: r, ctx: ctx, metrics: p.metrics} +} + +func (p *reportingPlugin[RI]) wrapReadWriter(ctx context.Context, rw ocr3_1types.KeyValueStateReadWriter) ocr3_1types.KeyValueStateReadWriter { + if rw == nil { + return nil + } + return &instrumentedKVStateReadWriter{ + instrumentedKVStateReader: instrumentedKVStateReader{inner: rw, ctx: ctx, metrics: p.metrics}, + writer: rw, + } +} + +func (p *reportingPlugin[RI]) wrapBroadcastFetcher(bbf ocr3_1types.BlobBroadcastFetcher) ocr3_1types.BlobBroadcastFetcher { + if bbf == nil { + return nil + } + return &instrumentedBlobBroadcastFetcher{inner: bbf, metrics: p.metrics} +} + +func (p *reportingPlugin[RI]) wrapFetcher(bf ocr3_1types.BlobFetcher) ocr3_1types.BlobFetcher { + if bf == nil { + return nil + } + return &instrumentedBlobFetcher{inner: bf, metrics: p.metrics} +} + func (p *reportingPlugin[RI]) Query(ctx context.Context, seqNr uint64, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (ocrtypes.Query, error) { return withObservedExecution(ctx, p.metrics, query, func() (ocrtypes.Query, error) { - return p.ReportingPlugin.Query(ctx, seqNr, keyValueReader, blobBroadcastFetcher) + return p.ReportingPlugin.Query(ctx, seqNr, p.wrapReader(ctx, keyValueReader), p.wrapBroadcastFetcher(blobBroadcastFetcher)) }) } func (p *reportingPlugin[RI]) Observation(ctx context.Context, seqNr uint64, aq ocrtypes.AttributedQuery, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (ocrtypes.Observation, error) { result, err := withObservedExecution(ctx, p.metrics, observation, func() (ocrtypes.Observation, error) { - return p.ReportingPlugin.Observation(ctx, seqNr, aq, keyValueReader, blobBroadcastFetcher) + return p.ReportingPlugin.Observation(ctx, seqNr, aq, p.wrapReader(ctx, keyValueReader), p.wrapBroadcastFetcher(blobBroadcastFetcher)) }) if err == nil { p.metrics.trackSize(ctx, observation, len(result)) @@ -44,7 +75,7 @@ func (p *reportingPlugin[RI]) Observation(ctx context.Context, seqNr uint64, aq func (p *reportingPlugin[RI]) ValidateObservation(ctx context.Context, seqNr uint64, aq ocrtypes.AttributedQuery, ao ocrtypes.AttributedObservation, keyValueReader ocr3_1types.KeyValueStateReader, blobFetcher ocr3_1types.BlobFetcher) error { _, err := withObservedExecution(ctx, p.metrics, validateObservation, func() (any, error) { - err := p.ReportingPlugin.ValidateObservation(ctx, seqNr, aq, ao, keyValueReader, blobFetcher) + err := p.ReportingPlugin.ValidateObservation(ctx, seqNr, aq, ao, p.wrapReader(ctx, keyValueReader), p.wrapFetcher(blobFetcher)) return nil, err }) return err @@ -52,13 +83,13 @@ func (p *reportingPlugin[RI]) ValidateObservation(ctx context.Context, seqNr uin func (p *reportingPlugin[RI]) ObservationQuorum(ctx context.Context, seqNr uint64, aq ocrtypes.AttributedQuery, aos []ocrtypes.AttributedObservation, keyValueReader ocr3_1types.KeyValueStateReader, blobFetcher ocr3_1types.BlobFetcher) (bool, error) { return withObservedExecution(ctx, p.metrics, observationQuorum, func() (bool, error) { - return p.ReportingPlugin.ObservationQuorum(ctx, seqNr, aq, aos, keyValueReader, blobFetcher) + return p.ReportingPlugin.ObservationQuorum(ctx, seqNr, aq, aos, p.wrapReader(ctx, keyValueReader), p.wrapFetcher(blobFetcher)) }) } func (p *reportingPlugin[RI]) StateTransition(ctx context.Context, seqNr uint64, aq ocrtypes.AttributedQuery, aos []ocrtypes.AttributedObservation, keyValueReadWriter ocr3_1types.KeyValueStateReadWriter, blobFetcher ocr3_1types.BlobFetcher) (ocr3_1types.ReportsPlusPrecursor, error) { result, err := withObservedExecution(ctx, p.metrics, stateTransition, func() (ocr3_1types.ReportsPlusPrecursor, error) { - return p.ReportingPlugin.StateTransition(ctx, seqNr, aq, aos, keyValueReadWriter, blobFetcher) + return p.ReportingPlugin.StateTransition(ctx, seqNr, aq, aos, p.wrapReadWriter(ctx, keyValueReadWriter), p.wrapFetcher(blobFetcher)) }) if err == nil { p.metrics.trackSize(ctx, stateTransition, len(result)) @@ -68,7 +99,7 @@ func (p *reportingPlugin[RI]) StateTransition(ctx context.Context, seqNr uint64, func (p *reportingPlugin[RI]) Committed(ctx context.Context, seqNr uint64, keyValueReader ocr3_1types.KeyValueStateReader) error { _, err := withObservedExecution(ctx, p.metrics, committed, func() (any, error) { - err := p.ReportingPlugin.Committed(ctx, seqNr, keyValueReader) + err := p.ReportingPlugin.Committed(ctx, seqNr, p.wrapReader(ctx, keyValueReader)) return nil, err }) return err diff --git a/core/services/ocr3_1/beholderwrapper/plugin_test.go b/core/services/ocr3_1/beholderwrapper/plugin_test.go index d0b5b520780..ec0b74d0fa9 100644 --- a/core/services/ocr3_1/beholderwrapper/plugin_test.go +++ b/core/services/ocr3_1/beholderwrapper/plugin_test.go @@ -109,6 +109,411 @@ func Test_ReportingPlugin_PropagatesErrors(t *testing.T) { require.ErrorIs(t, err, expectedErr) } +func Test_InstrumentedBlobBroadcastFetcher(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + inner := &fakeBlobBroadcastFetcher{ + broadcastPayload: []byte("broadcast-handle"), + fetchPayload: []byte("fetched-data"), + } + + wrapped := &instrumentedBlobBroadcastFetcher{inner: inner, metrics: metrics} + + // BroadcastBlob delegates and records metrics + handle, err := wrapped.BroadcastBlob(t.Context(), []byte("payload"), ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: 1}) + require.NoError(t, err) + require.Equal(t, 1, inner.broadcastCalls) + + // FetchBlob delegates and records metrics + data, err := wrapped.FetchBlob(t.Context(), handle) + require.NoError(t, err) + require.Equal(t, []byte("fetched-data"), data) + require.Equal(t, 1, inner.fetchCalls) +} + +func Test_InstrumentedBlobBroadcastFetcher_PropagatesErrors(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + expectedErr := errors.New("blob error") + inner := &fakeBlobBroadcastFetcher{err: expectedErr} + wrapped := &instrumentedBlobBroadcastFetcher{inner: inner, metrics: metrics} + + _, err = wrapped.BroadcastBlob(t.Context(), []byte("payload"), ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: 1}) + require.ErrorIs(t, err, expectedErr) + + _, err = wrapped.FetchBlob(t.Context(), ocr3_1types.BlobHandle{}) + require.ErrorIs(t, err, expectedErr) +} + +func Test_InstrumentedBlobFetcher(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + inner := &fakeBlobFetcher{fetchPayload: []byte("fetched-data")} + wrapped := &instrumentedBlobFetcher{inner: inner, metrics: metrics} + + data, err := wrapped.FetchBlob(t.Context(), ocr3_1types.BlobHandle{}) + require.NoError(t, err) + require.Equal(t, []byte("fetched-data"), data) + require.Equal(t, 1, inner.fetchCalls) +} + +func Test_InstrumentedBlobFetcher_PropagatesErrors(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + expectedErr := errors.New("fetch error") + inner := &fakeBlobFetcher{err: expectedErr} + wrapped := &instrumentedBlobFetcher{inner: inner, metrics: metrics} + + _, err = wrapped.FetchBlob(t.Context(), ocr3_1types.BlobHandle{}) + require.ErrorIs(t, err, expectedErr) +} + +func Test_ReportingPlugin_WrapsBlobs(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + innerBlob := &fakeBlobBroadcastFetcher{ + broadcastPayload: []byte("handle"), + fetchPayload: []byte("data"), + } + innerFetcher := &fakeBlobFetcher{fetchPayload: []byte("data")} + + capturingPlugin := &blobCapturingPlugin[uint]{} + plugin := newReportingPlugin[uint](capturingPlugin, metrics) + + // Query wraps BlobBroadcastFetcher + _, _ = plugin.Query(t.Context(), 1, nil, innerBlob) + require.IsType(t, &instrumentedBlobBroadcastFetcher{}, capturingPlugin.lastBroadcastFetcher) + + // Observation wraps BlobBroadcastFetcher + _, _ = plugin.Observation(t.Context(), 1, ocrtypes.AttributedQuery{}, nil, innerBlob) + require.IsType(t, &instrumentedBlobBroadcastFetcher{}, capturingPlugin.lastBroadcastFetcher) + + // ValidateObservation wraps BlobFetcher + _ = plugin.ValidateObservation(t.Context(), 1, ocrtypes.AttributedQuery{}, ocrtypes.AttributedObservation{}, nil, innerFetcher) + require.IsType(t, &instrumentedBlobFetcher{}, capturingPlugin.lastFetcher) + + // ObservationQuorum wraps BlobFetcher + _, _ = plugin.ObservationQuorum(t.Context(), 1, ocrtypes.AttributedQuery{}, nil, nil, innerFetcher) + require.IsType(t, &instrumentedBlobFetcher{}, capturingPlugin.lastFetcher) + + // StateTransition wraps BlobFetcher + _, _ = plugin.StateTransition(t.Context(), 1, ocrtypes.AttributedQuery{}, nil, nil, innerFetcher) + require.IsType(t, &instrumentedBlobFetcher{}, capturingPlugin.lastFetcher) + + // nil is preserved + _, _ = plugin.Query(t.Context(), 1, nil, nil) + require.Nil(t, capturingPlugin.lastBroadcastFetcher) + + _ = plugin.ValidateObservation(t.Context(), 1, ocrtypes.AttributedQuery{}, ocrtypes.AttributedObservation{}, nil, nil) + require.Nil(t, capturingPlugin.lastFetcher) +} + +func Test_InstrumentedKVStateReader(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + inner := &fakeKVStateReader{data: map[string][]byte{"key1": []byte("value1")}} + wrapped := &instrumentedKVStateReader{inner: inner, ctx: t.Context(), metrics: metrics} + + data, err := wrapped.Read([]byte("key1")) + require.NoError(t, err) + require.Equal(t, []byte("value1"), data) + require.Equal(t, 1, inner.readCalls) + + // Missing key returns nil + data, err = wrapped.Read([]byte("missing")) + require.NoError(t, err) + require.Nil(t, data) + require.Equal(t, 2, inner.readCalls) +} + +func Test_InstrumentedKVStateReader_PropagatesErrors(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + expectedErr := errors.New("read error") + inner := &fakeKVStateReader{err: expectedErr} + wrapped := &instrumentedKVStateReader{inner: inner, ctx: t.Context(), metrics: metrics} + + _, err = wrapped.Read([]byte("key")) + require.ErrorIs(t, err, expectedErr) +} + +func Test_InstrumentedKVStateReadWriter(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + inner := &fakeKVStateReadWriter{fakeKVStateReader: fakeKVStateReader{data: map[string][]byte{}}} + wrapped := &instrumentedKVStateReadWriter{ + instrumentedKVStateReader: instrumentedKVStateReader{inner: inner, ctx: t.Context(), metrics: metrics}, + writer: inner, + } + + // Write + err = wrapped.Write([]byte("key1"), []byte("value1")) + require.NoError(t, err) + require.Equal(t, 1, inner.writeCalls) + + // Read back through the wrapper + data, err := wrapped.Read([]byte("key1")) + require.NoError(t, err) + require.Equal(t, []byte("value1"), data) + + // Delete + err = wrapped.Delete([]byte("key1")) + require.NoError(t, err) + require.Equal(t, 1, inner.deleteCalls) + + // Read returns nil after delete + data, err = wrapped.Read([]byte("key1")) + require.NoError(t, err) + require.Nil(t, data) +} + +func Test_InstrumentedKVStateReadWriter_PropagatesErrors(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + expectedErr := errors.New("write error") + inner := &fakeKVStateReadWriter{fakeKVStateReader: fakeKVStateReader{err: expectedErr}} + wrapped := &instrumentedKVStateReadWriter{ + instrumentedKVStateReader: instrumentedKVStateReader{inner: inner, ctx: t.Context(), metrics: metrics}, + writer: inner, + } + + _, err = wrapped.Read([]byte("key")) + require.ErrorIs(t, err, expectedErr) + + err = wrapped.Write([]byte("key"), []byte("value")) + require.ErrorIs(t, err, expectedErr) + + err = wrapped.Delete([]byte("key")) + require.ErrorIs(t, err, expectedErr) +} + +func Test_ReportingPlugin_WrapsKV(t *testing.T) { + metrics, err := newPluginMetrics("test", "abc") + require.NoError(t, err) + + innerReader := &fakeKVStateReader{data: map[string][]byte{}} + innerReadWriter := &fakeKVStateReadWriter{fakeKVStateReader: fakeKVStateReader{data: map[string][]byte{}}} + + capturingPlugin := &kvCapturingPlugin[uint]{} + plugin := newReportingPlugin[uint](capturingPlugin, metrics) + + // Query wraps KeyValueStateReader + _, _ = plugin.Query(t.Context(), 1, innerReader, nil) + require.IsType(t, &instrumentedKVStateReader{}, capturingPlugin.lastReader) + + // Observation wraps KeyValueStateReader + _, _ = plugin.Observation(t.Context(), 1, ocrtypes.AttributedQuery{}, innerReader, nil) + require.IsType(t, &instrumentedKVStateReader{}, capturingPlugin.lastReader) + + // ValidateObservation wraps KeyValueStateReader + _ = plugin.ValidateObservation(t.Context(), 1, ocrtypes.AttributedQuery{}, ocrtypes.AttributedObservation{}, innerReader, nil) + require.IsType(t, &instrumentedKVStateReader{}, capturingPlugin.lastReader) + + // ObservationQuorum wraps KeyValueStateReader + _, _ = plugin.ObservationQuorum(t.Context(), 1, ocrtypes.AttributedQuery{}, nil, innerReader, nil) + require.IsType(t, &instrumentedKVStateReader{}, capturingPlugin.lastReader) + + // StateTransition wraps KeyValueStateReadWriter + _, _ = plugin.StateTransition(t.Context(), 1, ocrtypes.AttributedQuery{}, nil, innerReadWriter, nil) + require.IsType(t, &instrumentedKVStateReadWriter{}, capturingPlugin.lastReadWriter) + + // Committed wraps KeyValueStateReader + _ = plugin.Committed(t.Context(), 1, innerReader) + require.IsType(t, &instrumentedKVStateReader{}, capturingPlugin.lastReader) + + // nil is preserved + _, _ = plugin.Query(t.Context(), 1, nil, nil) + require.Nil(t, capturingPlugin.lastReader) + + _, _ = plugin.StateTransition(t.Context(), 1, ocrtypes.AttributedQuery{}, nil, nil, nil) + require.Nil(t, capturingPlugin.lastReadWriter) +} + +type fakeKVStateReader struct { + data map[string][]byte + err error + readCalls int +} + +func (f *fakeKVStateReader) Read(key []byte) ([]byte, error) { + f.readCalls++ + if f.err != nil { + return nil, f.err + } + return f.data[string(key)], nil +} + +type fakeKVStateReadWriter struct { + fakeKVStateReader + writeCalls int + deleteCalls int +} + +func (f *fakeKVStateReadWriter) Write(key []byte, value []byte) error { + f.writeCalls++ + if f.err != nil { + return f.err + } + f.data[string(key)] = value + return nil +} + +func (f *fakeKVStateReadWriter) Delete(key []byte) error { + f.deleteCalls++ + if f.err != nil { + return f.err + } + delete(f.data, string(key)) + return nil +} + +// kvCapturingPlugin captures the KV reader/writer it receives so tests can assert on wrapping. +type kvCapturingPlugin[RI any] struct { + lastReader ocr3_1types.KeyValueStateReader + lastReadWriter ocr3_1types.KeyValueStateReadWriter +} + +func (p *kvCapturingPlugin[RI]) Query(_ context.Context, _ uint64, r ocr3_1types.KeyValueStateReader, _ ocr3_1types.BlobBroadcastFetcher) (ocrtypes.Query, error) { + p.lastReader = r + return ocrtypes.Query{}, nil +} + +func (p *kvCapturingPlugin[RI]) Observation(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, r ocr3_1types.KeyValueStateReader, _ ocr3_1types.BlobBroadcastFetcher) (ocrtypes.Observation, error) { + p.lastReader = r + return ocrtypes.Observation{}, nil +} + +func (p *kvCapturingPlugin[RI]) ValidateObservation(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, _ ocrtypes.AttributedObservation, r ocr3_1types.KeyValueStateReader, _ ocr3_1types.BlobFetcher) error { + p.lastReader = r + return nil +} + +func (p *kvCapturingPlugin[RI]) ObservationQuorum(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, _ []ocrtypes.AttributedObservation, r ocr3_1types.KeyValueStateReader, _ ocr3_1types.BlobFetcher) (bool, error) { + p.lastReader = r + return true, nil +} + +func (p *kvCapturingPlugin[RI]) StateTransition(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, _ []ocrtypes.AttributedObservation, rw ocr3_1types.KeyValueStateReadWriter, _ ocr3_1types.BlobFetcher) (ocr3_1types.ReportsPlusPrecursor, error) { + p.lastReadWriter = rw + return nil, nil +} + +func (p *kvCapturingPlugin[RI]) Committed(_ context.Context, _ uint64, r ocr3_1types.KeyValueStateReader) error { + p.lastReader = r + return nil +} + +func (p *kvCapturingPlugin[RI]) Reports(context.Context, uint64, ocr3_1types.ReportsPlusPrecursor) ([]ocr3types.ReportPlus[RI], error) { + return nil, nil +} + +func (p *kvCapturingPlugin[RI]) ShouldAcceptAttestedReport(context.Context, uint64, ocr3types.ReportWithInfo[RI]) (bool, error) { + return true, nil +} + +func (p *kvCapturingPlugin[RI]) ShouldTransmitAcceptedReport(context.Context, uint64, ocr3types.ReportWithInfo[RI]) (bool, error) { + return true, nil +} + +func (p *kvCapturingPlugin[RI]) Close() error { + return nil +} + +type fakeBlobBroadcastFetcher struct { + broadcastPayload []byte + fetchPayload []byte + err error + broadcastCalls int + fetchCalls int +} + +func (f *fakeBlobBroadcastFetcher) BroadcastBlob(_ context.Context, _ []byte, _ ocr3_1types.BlobExpirationHint) (ocr3_1types.BlobHandle, error) { + f.broadcastCalls++ + return ocr3_1types.BlobHandle{}, f.err +} + +func (f *fakeBlobBroadcastFetcher) FetchBlob(_ context.Context, _ ocr3_1types.BlobHandle) ([]byte, error) { + f.fetchCalls++ + if f.err != nil { + return nil, f.err + } + return f.fetchPayload, nil +} + +type fakeBlobFetcher struct { + fetchPayload []byte + err error + fetchCalls int +} + +func (f *fakeBlobFetcher) FetchBlob(_ context.Context, _ ocr3_1types.BlobHandle) ([]byte, error) { + f.fetchCalls++ + if f.err != nil { + return nil, f.err + } + return f.fetchPayload, nil +} + +// blobCapturingPlugin captures the blob fetcher/broadcaster it receives so tests can assert on wrapping. +type blobCapturingPlugin[RI any] struct { + lastBroadcastFetcher ocr3_1types.BlobBroadcastFetcher + lastFetcher ocr3_1types.BlobFetcher +} + +func (p *blobCapturingPlugin[RI]) Query(_ context.Context, _ uint64, _ ocr3_1types.KeyValueStateReader, bbf ocr3_1types.BlobBroadcastFetcher) (ocrtypes.Query, error) { + p.lastBroadcastFetcher = bbf + return ocrtypes.Query{}, nil +} + +func (p *blobCapturingPlugin[RI]) Observation(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, _ ocr3_1types.KeyValueStateReader, bbf ocr3_1types.BlobBroadcastFetcher) (ocrtypes.Observation, error) { + p.lastBroadcastFetcher = bbf + return ocrtypes.Observation{}, nil +} + +func (p *blobCapturingPlugin[RI]) ValidateObservation(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, _ ocrtypes.AttributedObservation, _ ocr3_1types.KeyValueStateReader, bf ocr3_1types.BlobFetcher) error { + p.lastFetcher = bf + return nil +} + +func (p *blobCapturingPlugin[RI]) ObservationQuorum(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, _ []ocrtypes.AttributedObservation, _ ocr3_1types.KeyValueStateReader, bf ocr3_1types.BlobFetcher) (bool, error) { + p.lastFetcher = bf + return true, nil +} + +func (p *blobCapturingPlugin[RI]) StateTransition(_ context.Context, _ uint64, _ ocrtypes.AttributedQuery, _ []ocrtypes.AttributedObservation, _ ocr3_1types.KeyValueStateReadWriter, bf ocr3_1types.BlobFetcher) (ocr3_1types.ReportsPlusPrecursor, error) { + p.lastFetcher = bf + return nil, nil +} + +func (p *blobCapturingPlugin[RI]) Committed(context.Context, uint64, ocr3_1types.KeyValueStateReader) error { + return nil +} + +func (p *blobCapturingPlugin[RI]) Reports(context.Context, uint64, ocr3_1types.ReportsPlusPrecursor) ([]ocr3types.ReportPlus[RI], error) { + return nil, nil +} + +func (p *blobCapturingPlugin[RI]) ShouldAcceptAttestedReport(context.Context, uint64, ocr3types.ReportWithInfo[RI]) (bool, error) { + return true, nil +} + +func (p *blobCapturingPlugin[RI]) ShouldTransmitAcceptedReport(context.Context, uint64, ocr3types.ReportWithInfo[RI]) (bool, error) { + return true, nil +} + +func (p *blobCapturingPlugin[RI]) Close() error { + return nil +} + type fakePlugin[RI any] struct { reports []ocr3types.ReportPlus[RI] observationSize int diff --git a/core/services/ocr3_1/beholderwrapper/types.go b/core/services/ocr3_1/beholderwrapper/types.go index 124b0b2c294..2addc692cd8 100644 --- a/core/services/ocr3_1/beholderwrapper/types.go +++ b/core/services/ocr3_1/beholderwrapper/types.go @@ -36,6 +36,8 @@ type pluginMetrics struct { reportsGenerated metric.Int64Counter sizes metric.Int64Histogram status metric.Int64Gauge + blobDurations metric.Int64Histogram + kvDurations metric.Int64Histogram } func newPluginMetrics(plugin, configDigest string) (*pluginMetrics, error) { @@ -59,6 +61,16 @@ func newPluginMetrics(plugin, configDigest string) (*pluginMetrics, error) { return nil, fmt.Errorf("failed to create status gauge: %w", err) } + blobDurations, err := beholder.GetMeter().Int64Histogram("platform_ocr3_1_reporting_plugin_blob_duration_ms", metric.WithUnit("ms")) + if err != nil { + return nil, fmt.Errorf("failed to create blob duration histogram: %w", err) + } + + kvDurations, err := beholder.GetMeter().Int64Histogram("platform_ocr3_1_reporting_plugin_kv_duration_ms", metric.WithUnit("ms")) + if err != nil { + return nil, fmt.Errorf("failed to create kv duration histogram: %w", err) + } + return &pluginMetrics{ plugin: plugin, configDigest: configDigest, @@ -66,6 +78,8 @@ func newPluginMetrics(plugin, configDigest string) (*pluginMetrics, error) { reportsGenerated: reportsGenerated, sizes: sizes, status: status, + blobDurations: blobDurations, + kvDurations: kvDurations, }, nil } @@ -95,6 +109,24 @@ func (m *pluginMetrics) trackSize(ctx context.Context, function functionType, si )) } +func (m *pluginMetrics) recordKVDuration(ctx context.Context, method string, d time.Duration, success bool) { + m.kvDurations.Record(ctx, d.Milliseconds(), metric.WithAttributes( + attribute.String("plugin", m.plugin), + attribute.String("method", method), + attribute.String("success", strconv.FormatBool(success)), + attribute.String("configDigest", m.configDigest), + )) +} + +func (m *pluginMetrics) recordBlobDuration(ctx context.Context, method string, d time.Duration, success bool) { + m.blobDurations.Record(ctx, d.Milliseconds(), metric.WithAttributes( + attribute.String("plugin", m.plugin), + attribute.String("method", method), + attribute.String("success", strconv.FormatBool(success)), + attribute.String("configDigest", m.configDigest), + )) +} + func (m *pluginMetrics) updateStatus(ctx context.Context, up bool) { val := int64(0) if up { @@ -117,6 +149,20 @@ func MetricViews() []sdkmetric.View { Boundaries: prometheus.ExponentialBuckets(5, 2, 14), }}, ), + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_ocr3_1_reporting_plugin_kv_duration_ms"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + // 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480, 40960 + Boundaries: prometheus.ExponentialBuckets(5, 2, 14), + }}, + ), + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_ocr3_1_reporting_plugin_blob_duration_ms"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + // 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480, 40960 + Boundaries: prometheus.ExponentialBuckets(5, 2, 14), + }}, + ), sdkmetric.NewView( sdkmetric.Instrument{Name: "platform_ocr3_1_reporting_plugin_data_sizes"}, sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ From 6b8e18c8cf34b879f5521c783bf49cce60007553 Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Thu, 26 Mar 2026 18:21:44 +0000 Subject: [PATCH 4/8] Instrument KV and BlobBroadcaster/Fetcher interfaces --- .../beholderwrapper/instrumented_blob.go | 8 +------- core/services/ocr3_1/beholderwrapper/plugin.go | 6 +++++- .../ocr3_1/beholderwrapper/plugin_test.go | 18 ++++++++++++++++-- core/services/ocr3_1/beholderwrapper/types.go | 1 - 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/core/services/ocr3_1/beholderwrapper/instrumented_blob.go b/core/services/ocr3_1/beholderwrapper/instrumented_blob.go index f2ccdb374ba..c59b17f1bcc 100644 --- a/core/services/ocr3_1/beholderwrapper/instrumented_blob.go +++ b/core/services/ocr3_1/beholderwrapper/instrumented_blob.go @@ -10,6 +10,7 @@ import ( type instrumentedBlobBroadcastFetcher struct { inner ocr3_1types.BlobBroadcastFetcher metrics *pluginMetrics + instrumentedBlobFetcher } func (i *instrumentedBlobBroadcastFetcher) BroadcastBlob(ctx context.Context, payload []byte, expirationHint ocr3_1types.BlobExpirationHint) (ocr3_1types.BlobHandle, error) { @@ -19,13 +20,6 @@ func (i *instrumentedBlobBroadcastFetcher) BroadcastBlob(ctx context.Context, pa return handle, err } -func (i *instrumentedBlobBroadcastFetcher) FetchBlob(ctx context.Context, handle ocr3_1types.BlobHandle) ([]byte, error) { - start := time.Now() - data, err := i.inner.FetchBlob(ctx, handle) - i.metrics.recordBlobDuration(ctx, "FetchBlob", time.Since(start), err == nil) - return data, err -} - type instrumentedBlobFetcher struct { inner ocr3_1types.BlobFetcher metrics *pluginMetrics diff --git a/core/services/ocr3_1/beholderwrapper/plugin.go b/core/services/ocr3_1/beholderwrapper/plugin.go index ec08da2c4a6..f55095ef739 100644 --- a/core/services/ocr3_1/beholderwrapper/plugin.go +++ b/core/services/ocr3_1/beholderwrapper/plugin.go @@ -47,7 +47,11 @@ func (p *reportingPlugin[RI]) wrapBroadcastFetcher(bbf ocr3_1types.BlobBroadcast if bbf == nil { return nil } - return &instrumentedBlobBroadcastFetcher{inner: bbf, metrics: p.metrics} + return &instrumentedBlobBroadcastFetcher{ + inner: bbf, + metrics: p.metrics, + instrumentedBlobFetcher: instrumentedBlobFetcher{inner: bbf, metrics: p.metrics}, + } } func (p *reportingPlugin[RI]) wrapFetcher(bf ocr3_1types.BlobFetcher) ocr3_1types.BlobFetcher { diff --git a/core/services/ocr3_1/beholderwrapper/plugin_test.go b/core/services/ocr3_1/beholderwrapper/plugin_test.go index ec0b74d0fa9..845a8edc3fd 100644 --- a/core/services/ocr3_1/beholderwrapper/plugin_test.go +++ b/core/services/ocr3_1/beholderwrapper/plugin_test.go @@ -118,7 +118,14 @@ func Test_InstrumentedBlobBroadcastFetcher(t *testing.T) { fetchPayload: []byte("fetched-data"), } - wrapped := &instrumentedBlobBroadcastFetcher{inner: inner, metrics: metrics} + wrapped := &instrumentedBlobBroadcastFetcher{ + inner: inner, + metrics: metrics, + instrumentedBlobFetcher: instrumentedBlobFetcher{ + inner: inner, + metrics: metrics, + }, + } // BroadcastBlob delegates and records metrics handle, err := wrapped.BroadcastBlob(t.Context(), []byte("payload"), ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: 1}) @@ -138,7 +145,14 @@ func Test_InstrumentedBlobBroadcastFetcher_PropagatesErrors(t *testing.T) { expectedErr := errors.New("blob error") inner := &fakeBlobBroadcastFetcher{err: expectedErr} - wrapped := &instrumentedBlobBroadcastFetcher{inner: inner, metrics: metrics} + wrapped := &instrumentedBlobBroadcastFetcher{ + inner: inner, + metrics: metrics, + instrumentedBlobFetcher: instrumentedBlobFetcher{ + inner: inner, + metrics: metrics, + }, + } _, err = wrapped.BroadcastBlob(t.Context(), []byte("payload"), ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: 1}) require.ErrorIs(t, err, expectedErr) diff --git a/core/services/ocr3_1/beholderwrapper/types.go b/core/services/ocr3_1/beholderwrapper/types.go index 2addc692cd8..cbde5670fe3 100644 --- a/core/services/ocr3_1/beholderwrapper/types.go +++ b/core/services/ocr3_1/beholderwrapper/types.go @@ -166,7 +166,6 @@ func MetricViews() []sdkmetric.View { sdkmetric.NewView( sdkmetric.Instrument{Name: "platform_ocr3_1_reporting_plugin_data_sizes"}, sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ - // 512KB is the max value possible // 1KB, 2KB, 4KB, 8KB, 16KB, 32KB, 64KB, 128KB, 256KB, 512KB, 1024KB, 2048KB, 4096KB, 8192KB Boundaries: prometheus.ExponentialBuckets(1024, 2, 14), }}, From 9b156c983991c2077f9e0667581677356f6abc26 Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Fri, 27 Mar 2026 11:38:01 +0000 Subject: [PATCH 5/8] Go fmt --- .../ocr2/plugins/vault/plugin_test.go | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/core/services/ocr2/plugins/vault/plugin_test.go b/core/services/ocr2/plugins/vault/plugin_test.go index 45be4e3cced..dff5a8d472c 100644 --- a/core/services/ocr2/plugins/vault/plugin_test.go +++ b/core/services/ocr2/plugins/vault/plugin_test.go @@ -644,8 +644,8 @@ func TestPlugin_Observation_GetSecretsRequest_SecretIdentifierInvalid(t *testing maxIDLen = tc.maxIDLen } r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -750,7 +750,7 @@ func TestPlugin_Observation_GetSecretsRequest_FillsInNamespace(t *testing.T) { Namespace: "main", Key: "my_secret", } - err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), createdID,&vaultcommon.StoredSecret{ + err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), createdID, &vaultcommon.StoredSecret{ EncryptedSecret: ciphertextBytes, }) require.NoError(t, err) @@ -903,7 +903,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretExistsButIsIncorrect(t *test m: make(map[string]response), } - err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id,&vaultcommon.StoredSecret{ + err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id, &vaultcommon.StoredSecret{ EncryptedSecret: []byte("invalid-ciphertext"), }) require.NoError(t, err) @@ -996,7 +996,7 @@ func TestPlugin_Observation_GetSecretsRequest_PublicKeyIsInvalid(t *testing.T) { ciphertextBytes, err := ciphertext.Marshal() require.NoError(t, err) - err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id,&vaultcommon.StoredSecret{ + err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id, &vaultcommon.StoredSecret{ EncryptedSecret: ciphertextBytes, }) require.NoError(t, err) @@ -1084,7 +1084,7 @@ func TestPlugin_Observation_GetSecretsRequest_SecretLabelIsInvalid(t *testing.T) ciphertextBytes, err := ciphertext.Marshal() require.NoError(t, err) - err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id,&vaultcommon.StoredSecret{ + err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id, &vaultcommon.StoredSecret{ EncryptedSecret: ciphertextBytes, }) require.NoError(t, err) @@ -1178,7 +1178,7 @@ func TestPlugin_Observation_GetSecretsRequest_Success(t *testing.T) { ciphertextBytes, err := ciphertext.Marshal() require.NoError(t, err) - err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id,&vaultcommon.StoredSecret{ + err = newTestWriteStore(t, rdr).WriteSecret(t.Context(), id, &vaultcommon.StoredSecret{ EncryptedSecret: ciphertextBytes, }) require.NoError(t, err) @@ -1316,7 +1316,7 @@ func TestPlugin_Observation_MaxBatchGetSecretsWithEncryptionKeys(t *testing.T) { require.NoError(t, cerr) // Store the secret in KV. - err = ws.WriteSecret(t.Context(), id,&vaultcommon.StoredSecret{ + err = ws.WriteSecret(t.Context(), id, &vaultcommon.StoredSecret{ EncryptedSecret: ciphertextBytes, }) require.NoError(t, err) @@ -1467,7 +1467,7 @@ func TestPlugin_Observation_CreateSecretsRequest_SecretIdentifierInvalid(t *test r := &ReportingPlugin{ lggr: lggr, store: store, - metrics: newTestMetrics(t), + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -2145,7 +2145,7 @@ func TestPlugin_StateTransition_CreateSecretsRequest_TooManySecretsForOwner(t *t Key: "secret", } kvstore := newTestWriteStore(t, rdr) - err = kvstore.WriteMetadata(t.Context(), id.Owner,&vaultcommon.StoredMetadata{ + err = kvstore.WriteMetadata(t.Context(), id.Owner, &vaultcommon.StoredMetadata{ SecretIdentifiers: []*vaultcommon.SecretIdentifier{ { Owner: "owner", @@ -2237,7 +2237,7 @@ func TestPlugin_StateTransition_CreateSecretsRequest_SecretExistsForKey(t *testi Key: "secret", } kvstore := newTestWriteStore(t, rdr) - err = kvstore.WriteSecret(t.Context(), id,&vaultcommon.StoredSecret{ + err = kvstore.WriteSecret(t.Context(), id, &vaultcommon.StoredSecret{ EncryptedSecret: []byte("some-ciphertext"), }) require.NoError(t, err) @@ -2522,7 +2522,7 @@ func TestPlugin_StateTransition_InsufficientObservations(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -2599,7 +2599,7 @@ func TestPlugin_StateTransition_GetSecretsRequest_ResponseSizeWithinLimit(t *tes N: 10, F: 3, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -2672,7 +2672,7 @@ func TestPlugin_ValidateObservations_InvalidObservations(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -2762,7 +2762,7 @@ func TestPlugin_ValidateObservations_IncludesAllItemsInPendingQueue(t *testing.T N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -2866,7 +2866,7 @@ func TestPlugin_ValidateObservations_DisallowsDuplicateBlobHandles(t *testing.T) N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -2926,7 +2926,7 @@ func TestPlugin_StateTransition_ShasDontMatch(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -3010,7 +3010,7 @@ func TestPlugin_StateTransition_AggregatesValidationErrors(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -3090,7 +3090,7 @@ func TestPlugin_StateTransition_GetSecretsRequest_CombinesShares(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -3235,7 +3235,7 @@ func TestPlugin_StateTransition_CreateSecretsRequest_WritesSecrets(t *testing.T) N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -3407,7 +3407,7 @@ func TestPlugin_Reports(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -3504,7 +3504,7 @@ func TestPlugin_Observation_UpdateSecretsRequest_SecretIdentifierInvalid(t *test r := &ReportingPlugin{ lggr: lggr, store: store, - metrics: newTestMetrics(t), + metrics: newTestMetrics(t), marshalBlob: mockMarshalBlob, unmarshalBlob: mockUnmarshalBlob, cfg: makeReportingPluginConfig( @@ -3880,7 +3880,7 @@ func TestPlugin_StateTransition_UpdateSecretsRequest_SecretDoesntExist(t *testin N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -3977,7 +3977,7 @@ func TestPlugin_StateTransition_UpdateSecretsRequest_WritesSecrets(t *testing.T) N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -4134,7 +4134,7 @@ func TestPlugin_Reports_UpdateSecretsRequest(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -4405,7 +4405,7 @@ func TestPlugin_StateTransition_DeleteSecretsRequest(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -4516,7 +4516,7 @@ func TestPlugin_StateTransition_DeleteSecretsRequest_SecretDoesNotExist(t *testi N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -4654,7 +4654,7 @@ func TestPlugin_Reports_DeleteSecretsRequest(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -4997,7 +4997,7 @@ func TestPlugin_Reports_ListSecretIdentifiersRequest(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -5044,7 +5044,7 @@ func TestPlugin_StateTransition_ListSecretIdentifiers(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -6568,7 +6568,7 @@ func TestPlugin_StateTransition_PendingQueueEnabled_NewQuora_NotGetRequest(t *te N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -6643,7 +6643,7 @@ func TestPlugin_StateTransition_PendingQueueEnabled_GetRequest(t *testing.T) { N: 4, F: 1, }, - store: store, + store: store, metrics: newTestMetrics(t), cfg: makeReportingPluginConfig( t, @@ -6858,8 +6858,8 @@ func TestPlugin_ValidateObservation_RequestBatchLimit(t *testing.T) { _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) require.NoError(t, err) r := &ReportingPlugin{ - lggr: lggr, - store: store, + lggr: lggr, + store: store, metrics: newTestMetrics(t), onchainCfg: ocr3types.ReportingPluginConfig{ N: 4, From 54c84f5d985f69b5b6d38661a6bd9438ed07ce7d Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Fri, 27 Mar 2026 13:47:18 +0000 Subject: [PATCH 6/8] Track KV durations as ms --- core/services/ocr2/plugins/vault/kvstore.go | 2 +- core/services/ocr2/plugins/vault/metrics.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/services/ocr2/plugins/vault/kvstore.go b/core/services/ocr2/plugins/vault/kvstore.go index 05988178cec..1e27d263d10 100644 --- a/core/services/ocr2/plugins/vault/kvstore.go +++ b/core/services/ocr2/plugins/vault/kvstore.go @@ -28,7 +28,7 @@ type KVStore struct { } func (s *KVStore) trackDuration(ctx context.Context, method string, start time.Time) { - s.metrics.trackKVOperation(ctx, method, time.Since(start).Seconds()) + s.metrics.trackKVOperation(ctx, method, time.Since(start).Milliseconds()) } type ReadKVStore interface { diff --git a/core/services/ocr2/plugins/vault/metrics.go b/core/services/ocr2/plugins/vault/metrics.go index bab4ef4eaa3..7773d469188 100644 --- a/core/services/ocr2/plugins/vault/metrics.go +++ b/core/services/ocr2/plugins/vault/metrics.go @@ -14,7 +14,7 @@ type pluginMetrics struct { configDigest string queueOverflow metric.Int64Counter - kvOperationDuration metric.Float64Histogram + kvOperationDuration metric.Int64Histogram } func newPluginMetrics(configDigest string) (*pluginMetrics, error) { @@ -23,9 +23,9 @@ func newPluginMetrics(configDigest string) (*pluginMetrics, error) { return nil, fmt.Errorf("failed to create queue overflow counter: %w", err) } - kvOperationDuration, err := beholder.GetMeter().Float64Histogram( - "platform_vault_plugin_kv_operation_duration_seconds", - metric.WithUnit("s"), + kvOperationDuration, err := beholder.GetMeter().Int64Histogram( + "platform_vault_plugin_kv_operation_duration_ms", + metric.WithUnit("ms"), ) if err != nil { return nil, fmt.Errorf("failed to create kv operation duration histogram: %w", err) @@ -38,8 +38,8 @@ func newPluginMetrics(configDigest string) (*pluginMetrics, error) { }, nil } -func (m *pluginMetrics) trackKVOperation(ctx context.Context, method string, durationSeconds float64) { - m.kvOperationDuration.Record(ctx, durationSeconds, metric.WithAttributes( +func (m *pluginMetrics) trackKVOperation(ctx context.Context, method string, durationMs int64) { + m.kvOperationDuration.Record(ctx, durationMs, metric.WithAttributes( attribute.String("configDigest", m.configDigest), attribute.String("method", method), )) From 771570711ee9656df15ffce338e0fb535f5167c1 Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Fri, 27 Mar 2026 14:04:50 +0000 Subject: [PATCH 7/8] Remove test --- .../ocr2/plugins/vault/plugin_test.go | 168 ------------------ 1 file changed, 168 deletions(-) diff --git a/core/services/ocr2/plugins/vault/plugin_test.go b/core/services/ocr2/plugins/vault/plugin_test.go index dff5a8d472c..d40af6061bf 100644 --- a/core/services/ocr2/plugins/vault/plugin_test.go +++ b/core/services/ocr2/plugins/vault/plugin_test.go @@ -8,7 +8,6 @@ import ( "fmt" "strings" "testing" - "time" "github.com/ethereum/go-ethereum/common" "github.com/smartcontractkit/libocr/commontypes" @@ -1253,173 +1252,6 @@ func TestPlugin_Observation_GetSecretsRequest_Success(t *testing.T) { assert.Equal(t, plaintext, gotSecret) } -func TestPlugin_Observation_MaxBatchGetSecretsWithEncryptionKeys(t *testing.T) { - lggr, _ := logger.TestLoggerObserved(t, zapcore.DebugLevel) - store := requests.NewStore[*vaulttypes.Request]() - _, pk, shares, err := tdh2easy.GenerateKeys(1, 3) - require.NoError(t, err) - - batchSize := 10 - maxRequestBatchSize := 10 - numEncryptionKeys := 10 - - r := &ReportingPlugin{ - lggr: lggr, - store: store, - metrics: newTestMetrics(t), - marshalBlob: mockMarshalBlob, - unmarshalBlob: mockUnmarshalBlob, - cfg: makeReportingPluginConfig( - t, - batchSize, - pk, - shares[0], - 100, - 2048, - 64, - 64, - 64, - maxRequestBatchSize, - ), - } - - rdr := &kv{ - m: make(map[string]response), - } - ws := newTestWriteStore(t, rdr) - - // Generate encryption keys (NaCl box public keys). - encryptionKeys := make([]string, numEncryptionKeys) - for i := range encryptionKeys { - pubK, _, kerr := box.GenerateKey(rand.Reader) - require.NoError(t, kerr) - encryptionKeys[i] = hex.EncodeToString(pubK[:]) - } - - // Build batchSize pending queue items, each a GetSecretsRequest with 1 secret and numEncryptionKeys encryption keys. - pendingItems := make([]*vaultcommon.StoredPendingQueueItem, batchSize) - for i := 0; i < batchSize; i++ { - owner := fmt.Sprintf("0x%040d", i+1) - id := &vaultcommon.SecretIdentifier{ - Owner: owner, - Namespace: "main", - Key: fmt.Sprintf("secret_%d", i), - } - - // Encrypt a secret with the correct label for this owner. - var label [32]byte - ownerAddress := common.HexToAddress(owner) - copy(label[12:], ownerAddress.Bytes()) - ciphertext, cerr := tdh2easy.EncryptWithLabel(pk, []byte(fmt.Sprintf("plaintext-%d", i)), label) - require.NoError(t, cerr) - ciphertextBytes, cerr := ciphertext.Marshal() - require.NoError(t, cerr) - - // Store the secret in KV. - err = ws.WriteSecret(t.Context(), id, &vaultcommon.StoredSecret{ - EncryptedSecret: ciphertextBytes, - }) - require.NoError(t, err) - - p := &vaultcommon.GetSecretsRequest{ - Requests: []*vaultcommon.SecretRequest{ - { - Id: id, - EncryptionKeys: encryptionKeys, - }, - }, - } - anyp, aerr := anypb.New(p) - require.NoError(t, aerr) - pendingItems[i] = &vaultcommon.StoredPendingQueueItem{ - Id: fmt.Sprintf("request-%d", i), - Item: anyp, - } - } - - err = ws.WritePendingQueue(t.Context(), pendingItems) - require.NoError(t, err) - - // Add 2*batchSize items to the local store (with different IDs) so that - // the observation also includes the maximum number of pending queue items - // to be broadcast as blobs. - numLocalItems := 2 * batchSize - for i := range numLocalItems { - owner := fmt.Sprintf("0x%040d", batchSize+i+1) - id := &vaultcommon.SecretIdentifier{ - Owner: owner, - Namespace: "main", - Key: fmt.Sprintf("local_secret_%d", i), - } - p := &vaultcommon.GetSecretsRequest{ - Requests: []*vaultcommon.SecretRequest{ - { - Id: id, - EncryptionKeys: encryptionKeys, - }, - }, - } - err = store.Add(&vaulttypes.Request{Payload: p, IDVal: fmt.Sprintf("local-request-%d", i)}) - require.NoError(t, err) - } - - seqNr := uint64(1) - bf := &blobber{} - start := time.Now() - data, err := r.Observation(t.Context(), seqNr, types.AttributedQuery{}, rdr, bf) - elapsed := time.Since(start) - require.NoError(t, err) - t.Logf("Observation took %s, output size: %d bytes", elapsed, len(data)) - - obs := &vaultcommon.Observations{} - err = proto.Unmarshal(data, obs) - require.NoError(t, err) - - // Verify all pending queue requests were observed. - require.Len(t, obs.Observations, batchSize) - - for i, o := range obs.Observations { - assert.Equal(t, fmt.Sprintf("request-%d", i), o.Id) - assert.Equal(t, vaultcommon.RequestType_GET_SECRETS, o.RequestType) - - batchResp := o.GetGetSecretsResponse() - require.Len(t, batchResp.Responses, 1) - - resp := batchResp.Responses[0] - assert.Empty(t, resp.GetError()) - assert.NotEmpty(t, resp.GetData().EncryptedValue) - assert.Len(t, resp.GetData().EncryptedDecryptionKeyShares, numEncryptionKeys) - - for _, share := range resp.GetData().EncryptedDecryptionKeyShares { - assert.Len(t, share.Shares, 1) - assert.NotEmpty(t, share.Shares[0]) - } - } - - // Verify all local queue items were broadcast as pending queue observations. - // The local queue is sorted lexicographically by ID before broadcasting, - // so we collect the IDs into a set rather than asserting on order. - assert.Len(t, obs.PendingQueueItems, numLocalItems) - require.Len(t, bf.blobs, numLocalItems) - gotLocalIDs := map[string]bool{} - for _, blob := range bf.blobs { - gotMsg := &vaultcommon.StoredPendingQueueItem{} - err = proto.Unmarshal(blob, gotMsg) - require.NoError(t, err) - gotLocalIDs[gotMsg.Id] = true - } - for i := range numLocalItems { - assert.True(t, gotLocalIDs[fmt.Sprintf("local-request-%d", i)], "missing local-request-%d", i) - } - - assert.NotEmpty(t, obs.SortNonce) - - // Verify the serialized observation fits within the max observation size limit (512 KB). - maxObservationBytes := 512 * 1000 - assert.LessOrEqual(t, len(data), maxObservationBytes, - "observation size %d exceeds max observation limit %d", len(data), maxObservationBytes) -} - func TestPlugin_Observation_CreateSecretsRequest_SecretIdentifierInvalid(t *testing.T) { tcs := []struct { name string From dc5ceaf8a89e3e48701905fcf338db34d4cec357 Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Fri, 27 Mar 2026 14:16:21 +0000 Subject: [PATCH 8/8] Go fmt --- core/services/ocr2/plugins/vault/kvstore_test.go | 10 +++++----- .../services/ocr3_1/beholderwrapper/instrumented_kv.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/services/ocr2/plugins/vault/kvstore_test.go b/core/services/ocr2/plugins/vault/kvstore_test.go index b46b3be9994..a0786fe2efc 100644 --- a/core/services/ocr2/plugins/vault/kvstore_test.go +++ b/core/services/ocr2/plugins/vault/kvstore_test.go @@ -139,7 +139,7 @@ func TestKVStore_Secrets(t *testing.T) { ss := &vault.StoredSecret{ EncryptedSecret: newData, } - err = store.WriteSecret(t.Context(), id,ss) + err = store.WriteSecret(t.Context(), id, ss) require.NoError(t, err) s, err = store.GetSecret(t.Context(), id) @@ -158,7 +158,7 @@ func TestKVStore_DeleteSecrets(t *testing.T) { Namespace: "main", Key: "secret1", } - err := store.WriteSecret(t.Context(), id,&vault.StoredSecret{ + err := store.WriteSecret(t.Context(), id, &vault.StoredSecret{ EncryptedSecret: []byte("encrypted data"), }) require.NoError(t, err) @@ -318,7 +318,7 @@ func TestKVStore_InconsistentWrites(t *testing.T) { require.NoError(t, err) // We can recreate it without an already exists error. - err = store.WriteSecret(t.Context(), id,&vault.StoredSecret{ + err = store.WriteSecret(t.Context(), id, &vault.StoredSecret{ EncryptedSecret: []byte("encrypted data 2"), }) require.NoError(t, err) @@ -416,7 +416,7 @@ func TestKVStore_WritePendingRequests(t *testing.T) { Id: "test-request-id-3", Item: empty, } - err = store.WritePendingQueue(t.Context(),[]*vault.StoredPendingQueueItem{item, item2, item3}) + err = store.WritePendingQueue(t.Context(), []*vault.StoredPendingQueueItem{item, item2, item3}) require.NoError(t, err) // Ensure index is correctly written. @@ -448,7 +448,7 @@ func TestKVStore_WritePendingRequests(t *testing.T) { assert.Equal(t, "test-request-id-3", item2.Id) // Writing a shorter list deletes the old one. - err = store.WritePendingQueue(t.Context(),[]*vault.StoredPendingQueueItem{item, item2}) + err = store.WritePendingQueue(t.Context(), []*vault.StoredPendingQueueItem{item, item2}) require.NoError(t, err) _, exists = kv.m[pendingQueueItemPrefix+"3"] diff --git a/core/services/ocr3_1/beholderwrapper/instrumented_kv.go b/core/services/ocr3_1/beholderwrapper/instrumented_kv.go index 5550cb2c839..e215fa9d0b7 100644 --- a/core/services/ocr3_1/beholderwrapper/instrumented_kv.go +++ b/core/services/ocr3_1/beholderwrapper/instrumented_kv.go @@ -9,7 +9,7 @@ import ( type instrumentedKVStateReader struct { inner ocr3_1types.KeyValueStateReader - ctx context.Context + ctx context.Context //nolint:containedctx // libocr 3.1's API doesn't support passing in ctx via the Read/Write method. metrics *pluginMetrics }