WIP [da-vinci][server][controller][router][fc][tc][test] Close async OTel instruments properly with a uniform lifecycle#2813
Open
m-nagarajan wants to merge 2 commits into
Conversation
instruments properly with a uniform lifecycle Stats classes that own ASYNC_GAUGE / ASYNC_COUNTER wrappers previously had no consistent way to deregister the SDK-side callbacks at shutdown, so the OTel SDK kept polling deleted stores and dropped instruments. This change introduces a single close pattern and applies it uniformly across every stats class that owns metric wrappers. Lifecycle primitives (venice-client-common): - CompositeCloseable: idempotent LIFO registry of Closeables. - AbstractStatsCloseable: base providing the statsCloseables field and a default close() that drains it. - MetricEntityStateUtils.closeQuietly / closeAndClear. - MetricEntityState* and AsyncMetricEntityState* factories now take a CompositeCloseable so each wrapper self-registers and gets closed (and async callbacks deregistered) when its owner shuts down. Uniform adoption across stats and owner classes: - AbstractVeniceAggVersionedStats extends AbstractStatsCloseable; close() unregisters the metadata listener then drains the registry. - Per-store PerStoreEntry inner classes (AggVersionedDIVStats, AggVersionedDaVinciRecordTransformerStats, ServerMetadataServiceStats, NativeMetadataRepositoryStats, BackupVersionOptimizationServiceStats) extend AbstractStatsCloseable so handleStoreDeleted closes the per-store wrappers and deregisters their SDK callbacks. - AbstractVeniceService-based owners (HelixParticipation, KafkaStoreIngestion, AggKafkaConsumer, AdaptiveThrottlerSignal, StoreBuffer, HeartbeatMonitoring) hold their own statsCloseables and drain it from stopInner / clear / shutdown. - VeniceServer, HelixVeniceClusterResources, PartitionedProducerExecutor, NativeMetadataRepository, ErrorPartitionResetTask, DaVinciBackend, and the two VeniceChangelogConsumer* classes extend AbstractStatsCloseable. Simplifications discovered while standardizing: - Drop the StoreDataChangedListener pattern from BackupVersionOptimizationServiceStats and ServerMetadataServiceStats (sync COUNTERs don't need SDK-callback deregistration). - Revert StoreVersionOtelStats to its original Map<String, AtomicReference<VersionInfo>> design; handleStoreDeleted resets to NON_EXISTING. Delete the now-unused StoreDataChangedListenerRegistration. - Remove the volatile boolean closed flags (4 classes) — bounded post-close leak is acceptable and matches sibling classes. Helper consolidation: - OpenTelemetryMetricsSetup.buildStoreDimensionsMap(baseDims, storeName) replaces 5 private helpers plus 3 inline patterns and sanitizes the store name consistently. - MetricsRepositoryUtils.createOtelEnabledRepository / createOtelDisabledRepository replace OTel test-setup boilerplate across ~13 test classes.
There was a problem hiding this comment.
Pull request overview
This PR introduces a uniform lifecycle for OpenTelemetry (OTel) metric wrapper instances across Venice components so that async instruments deregister their SDK callbacks on shutdown/store-deletion, preventing callback polling after resources are dropped.
Changes:
- Adds
CompositeCloseable/AbstractStatsCloseableand propagates a closeable registry intoMetricEntityState*/AsyncMetricEntityState*factories so wrappers self-register and are drained on owner shutdown. - Updates many stats owners (server/router/controller/clients) to implement
Closeableand to close per-store/per-cluster/per-component stats consistently during stop/clear/delete flows. - Consolidates OTel-related test boilerplate via
MetricsRepositoryUtilshelpers and adds lifecycle-focused regression tests.
Reviewed changes
Copilot reviewed 149 out of 149 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| services/venice-server/src/test/java/com/linkedin/venice/stats/ServerReadQuotaUsageStatsOtelTest.java | Uses shared OTel-enabled repo helper. |
| services/venice-server/src/test/java/com/linkedin/venice/stats/ServerLoadStatsTest.java | Uses repo helpers; adds OTel-disabled path helper. |
| services/venice-server/src/test/java/com/linkedin/venice/stats/RocksDBStatsOtelTest.java | Uses shared OTel-enabled repo helper. |
| services/venice-server/src/test/java/com/linkedin/venice/stats/DiskHealthStatsTest.java | Uses repo helpers for enabled/disabled OTel. |
| services/venice-server/src/test/java/com/linkedin/venice/stats/BackupVersionOptimizationServiceStatsTest.java | Uses repo helpers for enabled/disabled OTel. |
| services/venice-server/src/main/java/com/linkedin/venice/stats/ServerReadQuotaUsageStats.java | Registers async gauge + counters into lifecycle registry. |
| services/venice-server/src/main/java/com/linkedin/venice/stats/ServerLoadStats.java | Passes lifecycle registry into metric wrappers. |
| services/venice-server/src/main/java/com/linkedin/venice/stats/ServerConnectionStats.java | Passes lifecycle registry into metric wrappers. |
| services/venice-server/src/main/java/com/linkedin/venice/stats/RocksDBStats.java | Registers async wrapper instruments with lifecycle registry. |
| services/venice-server/src/main/java/com/linkedin/venice/stats/DiskHealthStats.java | Keeps/owns async gauge wrapper for closing. |
| services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java | Implements Closeable and closes stats on shutdown. |
| services/venice-server/src/main/java/com/linkedin/venice/listener/ListenerService.java | Tracks owned stats in CompositeCloseable on stop. |
| services/venice-server/src/main/java/com/linkedin/venice/listener/HttpChannelInitializer.java | Implements Closeable; drains owned stats registry. |
| services/venice-server/src/main/java/com/linkedin/venice/cleaner/BackupVersionOptimizationService.java | Closes stats during service stop. |
| services/venice-router/src/main/java/com/linkedin/venice/router/stats/RouterStats.java | Implements Closeable; closes per-request-type stats. |
| services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java | Drains owned stats closeables during stop. |
| services/venice-router/src/main/java/com/linkedin/venice/router/api/VenicePathParser.java | Closes per-store retry managers on store delete. |
| services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDelegateMode.java | Implements Closeable; drains owned stats registry. |
| services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupSelector.java | Implements Closeable; closes helix group stats. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java | Closes per-cluster stats maps on admin close. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerService.java | Owns checker stats and closes them at stop. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/systemstore/SystemStoreRepairService.java | Closes per-cluster stats on shutdown. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreBackupVersionCleanupService.java | Closes per-cluster stats map on stop. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/VeniceAdminStats.java | Passes lifecycle registry into metric wrappers. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/TopicCleanupServiceStats.java | Passes lifecycle registry into metric wrappers. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/SystemStoreHealthCheckStats.java | Owns async wrappers via lifecycle registry. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/StoreBackupVersionCleanupServiceStats.java | Passes lifecycle registry into metric wrappers. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/SparkServerStats.java | Passes lifecycle registry into metric wrappers. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/PushJobStatusStats.java | Passes lifecycle registry into generic wrappers. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/ProtocolVersionAutoDetectionStats.java | Passes lifecycle registry into metric wrappers. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/PartitionHealthStats.java | Passes lifecycle registry into metric wrappers. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/LogCompactionStats.java | Passes lifecycle registry into generic wrappers. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/ErrorPartitionStats.java | Passes lifecycle registry into metric wrappers. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/DisabledPartitionStats.java | Passes lifecycle registry into generic wrappers. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/DeferredVersionSwapStats.java | Passes lifecycle registry into wrappers. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/AddVersionLatencyStats.java | Passes lifecycle registry into wrappers. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java | Closes per-cluster stats on stop. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/ProtocolVersionAutoDetectionService.java | Closes stats on stop. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/lingeringjob/HeartbeatBasedCheckerStats.java | Passes lifecycle registry into wrappers. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupService.java | Closes stats on stop. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java | Closes stats on task close. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java | Extends stats closeable; closes owned stats on clear. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/ErrorPartitionResetTask.java | Extends stats closeable; closes stats on close. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/DeferredVersionSwapService.java | Tracks thread pool stats in registry and closes. |
| internal/venice-test-common/src/jmh/java/com/linkedin/venice/benchmark/VeniceOpenTelemetryPerfTest.java | Updates factory calls to pass registry sentinel. |
| internal/venice-common/src/main/java/com/linkedin/venice/stats/RetryManagerStats.java | Registers async wrappers with lifecycle registry. |
| internal/venice-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceAggStoreStats.java | Closes per-store stats on store delete. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/RetryManager.java | Implements Closeable; closes owned stats. |
| internal/venice-client-common/src/test/java/com/linkedin/venice/stats/VeniceMetricsRepositoryTest.java | Adds regression test for close try/finally behavior. |
| internal/venice-client-common/src/test/java/com/linkedin/venice/stats/metrics/MetricTypeTest.java | Passes registry sentinel into wrapper factories. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/utils/metrics/MetricsRepositoryUtils.java | Adds test helpers to build repos consistently. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/VeniceOpenTelemetryMetricsRepository.java | Implements Closeable; closes resources + bounded shutdown. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/VeniceMetricsRepository.java | Ensures OTel close runs even if Tehuti close throws. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/ThreadPoolStats.java | Registers async gauges into stats lifecycle registry. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/routing/HelixGroupStats.java | Uses lifecycle registry; clears maps on close. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/OpenTelemetryMetricsSetup.java | Adds shared per-store dimensions map helper. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/metrics/MetricEntityStateUtils.java | Adds close helpers and map-drain helper. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/metrics/MetricEntityStateTwoEnums.java | Adds registry parameter + safer attribute snapshotting. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/metrics/MetricEntityStateOneEnum.java | Adds registry parameter + safer attribute snapshotting. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/metrics/MetricEntityStateGeneric.java | Adds registry parameter to factories. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/metrics/MetricEntityStateFiveEnums.java | Adds registry parameter to factories. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/metrics/MetricEntityStateBase.java | Adds registry parameter to factories. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/metrics/CompositeCloseable.java | New closeable registry primitive (idempotent LIFO). |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/metrics/AsyncMetricEntityState.java | Implements Closeable; deregisters async instruments. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/metrics/AbstractStatsCloseable.java | New base class with statsCloseables registry. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java | Implements Closeable; closes registered wrapper resources. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceAggStats.java | Implements Closeable; closes total + per-store stats. |
| clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/StatTrackingStoreClient.java | Closes request-type stats instances on client close. |
| clients/venice-thin-client/src/main/java/com/linkedin/venice/client/stats/BasicClientStats.java | Passes lifecycle registry into wrapper creation. |
| clients/venice-producer/src/main/java/com/linkedin/venice/producer/PartitionedProducerExecutor.java | Extends stats closeable; closes thread pool stats on shutdown. |
| clients/venice-client/src/main/java/com/linkedin/venice/fastclient/StatsAvroGenericStoreClient.java | Owns and closes per-request stats via registry. |
| clients/venice-client/src/main/java/com/linkedin/venice/fastclient/stats/FastClientStats.java | Passes lifecycle registry into wrapper creation. |
| clients/venice-client/src/main/java/com/linkedin/venice/fastclient/stats/ClusterStats.java | Passes lifecycle registry into wrapper creation. |
| clients/venice-client/src/main/java/com/linkedin/venice/fastclient/stats/ClusterRouteStats.java | Passes lifecycle registry into route stats wrappers. |
| clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java | Registers RetryManagers and closes them on client close. |
| clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/HelixGroupRoutingStrategy.java | Implements Closeable; closes owned HelixGroupStats. |
| clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/AbstractStoreMetadata.java | Closes old routing strategy on swap and on close. |
| clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/AbstractClientRoutingStrategy.java | Implements Closeable with default no-op close. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/StuckConsumerRepairStatsTest.java | Uses repo helpers for enabled/disabled OTel. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/StoreVersionOtelStatsTest.java | Updates store-delete semantics + uses repo helpers. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/StoreBufferServiceStatsOtelTest.java | Uses shared OTel-enabled repo helper. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/StorageEngineOtelStatsTest.java | Adds close/deregister lifecycle test; uses repo helper. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetadataServiceStatsOtelTest.java | Uses repo helper for OTel-disabled path. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/RocksDBMemoryStatsOtelTest.java | Uses shared OTel-enabled repo helper. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ParticipantStoreConsumptionStatsTest.java | Validates store-name sanitization; uses repo helpers. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ParticipantStateTransitionStatsOtelTest.java | Adds post-close no-op/idempotent close test. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/NativeMetadataRepositoryStatsTest.java | Uses store-delete hook for lifecycle cleanup. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/NativeMetadataRepositoryStatsOtelTest.java | Validates per-store wrapper close + re-add behavior. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/KafkaConsumerServiceStatsOtelTest.java | Uses shared OTel-enabled repo helper. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelStatsTest.java | Updates close semantics test (post-close drops). |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/DIVStatsOtelTest.java | Uses repo helper for OTel-disabled path. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/BlobTransferOtelStatsTest.java | Uses repo helper for OTel-disabled path. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggVersionedDaVinciRecordTransformerStatsOtelTest.java | Updates per-store cleanup assertions + repo helpers. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AdaptiveThrottlingServiceStatsTest.java | Uses repo helper for OTel-disabled path. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/StuckConsumerRepairStats.java | Passes lifecycle registry into wrappers. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/StoreBufferServiceStats.java | Registers async gauges + closes per-store maps. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/StorageEngineOtelStats.java | Extends stats closeable; deregisters async gauges on close. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/RocksDBMemoryStats.java | Registers async gauges into lifecycle registry. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ParticipantStoreConsumptionStats.java | Uses shared store dims helper + lifecycle registry. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ParticipantStateTransitionStats.java | Passes registry into wrappers; adds null-state guards. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/KafkaConsumerServiceStats.java | Passes lifecycle registry into wrappers. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStats.java | Extends stats closeable; closes per-region wrappers. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelStats.java | Extends stats closeable; clears per-region map on close. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatOtelStats.java | Extends stats closeable; clears per-region map on close. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java | Owns stats registry; closes stats on stop. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HeartbeatMonitoringServiceStats.java | Passes lifecycle registry into wrappers. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferOtelStats.java | Extends stats closeable; registers wrappers in registry. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java | Closes per-store otel stats on deletion + on close. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AdaptiveThrottlingServiceStats.java | Registers wrapper states in lifecycle registry. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AbstractVeniceAggVersionedStats.java | Extends stats closeable; unregisters listener before drain. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java | Extends stats closeable; closes stats on clear/store remove. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreBufferService.java | Owns stats registry; closes stats on stop. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java | Owns stats registry; closes stats on stop. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java | Owns stats registry; closes stats on stop. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AdaptiveThrottlerSignalService.java | Owns stats registry; closes stats on stop. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java | Owns stats registry; closes stats on stop. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java | Extends stats closeable; closes stats in backend close. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java | Extends stats closeable; closes stats in finally. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerDaVinciRecordTransformerImpl.java | Extends stats closeable; closes stats in finally. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/stats/BasicConsumerStats.java | Passes lifecycle registry into wrapper creation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
1baad53 to
acb2e41
Compare
acb2e41 to
95cd467
Compare
95cd467 to
370bab8
Compare
…paths Cover the new close() entry points exercised by the prior commit so CI's diffCoverage gate (45% branch threshold) is satisfied: - RetryManagerTest.testCloseIsSafeWhenDisabled — exercises RetryManager.close() when no RetryManagerStats was wired in; verifies idempotency. - RouterStatsTest (new) — covers RouterStats.close() for both Closeable and non-Closeable STAT_TYPE parameterisations, idempotency, and the per-request-type getter. - TestVeniceDelegateMode.testCloseIsIdempotent — exercises VeniceDelegateMode.close().
370bab8 to
21b9df8
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem Statement
Stats classes that own ASYNC_GAUGE / ASYNC_COUNTER wrappers had no consistent way to deregister their SDK-side callbacks at shutdown, so the OTel SDK kept polling callbacks for deleted stores or dropped instruments. Each class invented its own ad-hoc close logic (or had none), which made it easy to forget cleanup for new metrics and led to a number of duplicated per-store dimension and test-setup helpers across the codebase.
Solution
Introduces a single lifecycle primitive for OTel metric wrappers and applies it uniformly across every stats class that owns wrappers.
Lifecycle primitives (
venice-client-common)CompositeCloseable— idempotent LIFO registry ofCloseables.AbstractStatsCloseable— base class providing astatsCloseablesfield and a defaultclose()that drains the registry.MetricEntityStateUtils.closeQuietly/closeAndClear— null-safe close helpers that swallow per-entry exceptions so a misbehaving wrapper cannot abort sibling cleanup.MetricEntityState*andAsyncMetricEntityState*factory methods now accept aCompositeCloseableso each wrapper self-registers on creation and is closed (with the SDK callback deregistered) when its owner shuts down.Uniform adoption across stats and owner classes
AbstractVeniceAggVersionedStatsextendsAbstractStatsCloseable;close()unregisters the metadata listener first, then drains the registry.PerStoreEntryinner classes (AggVersionedDIVStats,AggVersionedDaVinciRecordTransformerStats,NativeMetadataRepositoryStats, etc.) extendAbstractStatsCloseablesohandleStoreDeletedcloses the per-store wrappers and deregisters their SDK callbacks.AbstractVeniceService-based owners (HelixParticipationService,KafkaStoreIngestionService,AggKafkaConsumerService,AdaptiveThrottlerSignalService,StoreBufferService,HeartbeatMonitoringService) own astatsCloseablesfield and drain it fromstopInner/clear/shutdown.VeniceServer,HelixVeniceClusterResources,PartitionedProducerExecutor,NativeMetadataRepository,ErrorPartitionResetTask,DaVinciBackend, and the twoVeniceChangelogConsumer*classes extendAbstractStatsCloseable.Helper consolidation
OpenTelemetryMetricsSetup.buildStoreDimensionsMap(baseDims, storeName)— single helper that returns a per-store dimensions map with the store name sanitized. Replaces 5 private helpers and 3 inline patterns and fixes a latent issue where some classes did not sanitize empty/null store names.MetricsRepositoryUtils.createOtelEnabledRepository/createOtelDisabledRepository— centralise the OTel test-setup boilerplate (InMemoryMetricReader+ dedicatedAsyncGaugeExecutor+ builder wiring) used by ~13 test classes.Code changes
Concurrency-Specific Checks
Both reviewer and PR author to verify
CompositeCloseablesynchronises register/close on its own monitor; new entries registered after close return as-is without being tracked.synchronized,RWLock) are used where needed.CompositeCloseableusessynchronized; per-store maps remainVeniceConcurrentHashMap.ConcurrentHashMap,CopyOnWriteArrayList). All per-store maps continue to useVeniceConcurrentHashMap.closeQuietlycatchesExceptionper entry so one failure cannot abort sibling cleanup.How was this PR tested?
CompositeCloseableTestcovers the registry semantics (register, close, register-after-close, idempotent close, exception isolation).MetricEntityState*EnumsTest(including the newMetricEntityStateFiveEnumsTest) to passCompositeCloseable.NONEto the new factory signatures; migrated ~13 OTel test classes to use the newMetricsRepositoryUtilshelpers; updatedParticipantStoreConsumptionStatsTestto assert sanitization of empty store names (now records underUNKNOWN_STORE_NAMEinstead of throwing).Does this PR introduce any user-facing or breaking changes?
OpenTelemetryMetricsSetup.buildStoreDimensionsMapsanitizes the store name (whitespace-trim + null/empty →unknown_store). Three classes (ServerMetadataServiceStats,ParticipantStoreConsumptionStats,StoreBufferServiceStats) previously passed the raw store name through. With this change, an empty or null store name no longer triggers OTel dimension validation; the metric emits under theunknown_storesentinel instead. This matches the behavior the other per-store stats classes already had and bounds dimension cardinality for unknown stores.