Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.stats.TehutiUtils;
import com.linkedin.venice.stats.metrics.AbstractStatsCloseable;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.LogContext;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
import java.io.Closeable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -100,7 +100,7 @@
* the shared behavior of this class. Regular clients participate in version swaps while version-specific
* clients subscribe to a fixed version and ignore version swap events.
*/
public class DaVinciBackend implements Closeable {
public class DaVinciBackend extends AbstractStatsCloseable {
private static final Logger LOGGER = LogManager.getLogger(DaVinciBackend.class);

// Client type tracking for version-specific vs regular clients
Expand Down Expand Up @@ -189,16 +189,18 @@ public DaVinciBackend(
configLoader.getVeniceClusterConfig().getClusterName());

// OTel per-store version gauge
storeVersionOtelStats = StoreVersionOtelStats
.create(metricsRepository, configLoader.getVeniceClusterConfig().getClusterName(), storeRepository);

rocksDBMemoryStats = backendConfig.isDatabaseMemoryStatsEnabled()
? new RocksDBMemoryStats(
metricsRepository,
"RocksDBMemoryStats",
backendConfig.getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled(),
configLoader.getVeniceClusterConfig().getClusterName())
: null;
storeVersionOtelStats = statsCloseables.register(
StoreVersionOtelStats
.create(metricsRepository, configLoader.getVeniceClusterConfig().getClusterName(), storeRepository));

rocksDBMemoryStats = statsCloseables.register(
backendConfig.isDatabaseMemoryStatsEnabled()
? new RocksDBMemoryStats(
metricsRepository,
"RocksDBMemoryStats",
backendConfig.getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled(),
configLoader.getVeniceClusterConfig().getClusterName())
: null);

/**
* The constructor of {@link #storageService} will take care of unused store/store version cleanup.
Expand Down Expand Up @@ -298,8 +300,11 @@ public DaVinciBackend(
ingestionService.start();

if (BlobTransferUtils.isBlobTransferManagerEnabled(backendConfig)) {
aggVersionedBlobTransferStats =
new AggVersionedBlobTransferStats(metricsRepository, storeRepository, configLoader.getVeniceServerConfig());
aggVersionedBlobTransferStats = statsCloseables.register(
new AggVersionedBlobTransferStats(
metricsRepository,
storeRepository,
configLoader.getVeniceServerConfig()));
aggBlobTransferStats =
new AggBlobTransferStats(aggVersionedBlobTransferStats, ingestionService.getHostLevelIngestionStats());
P2PBlobTransferConfig p2PBlobTransferConfig = new P2PBlobTransferConfig(
Expand Down Expand Up @@ -434,9 +439,7 @@ public synchronized void close() {
cacheBackend.ifPresent(
objectCacheBackend -> storeRepository
.unregisterStoreDataChangedListener(objectCacheBackend.getCacheInvalidatingStoreChangeListener()));
if (storeVersionOtelStats != null) {
storeVersionOtelStats.close();
}
super.close();
ExecutorService storeBackendCloseExecutor = Executors.newCachedThreadPool(
new DaemonThreadFactory(
"DaVinciBackend-StoreBackend-Close",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.stats.metrics.AbstractStatsCloseable;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.LogContext;
Expand Down Expand Up @@ -64,7 +65,7 @@
import org.apache.logging.log4j.Logger;


public class VeniceChangelogConsumerDaVinciRecordTransformerImpl<K, V>
public class VeniceChangelogConsumerDaVinciRecordTransformerImpl<K, V> extends AbstractStatsCloseable
implements StatefulVeniceChangelogConsumer<K, V>, VeniceChangelogConsumer<K, V> {
private static final Logger LOGGER = LogManager.getLogger(VeniceChangelogConsumerDaVinciRecordTransformerImpl.class);
private long START_TIMEOUT_IN_SECONDS = 60;
Expand Down Expand Up @@ -184,10 +185,11 @@ public VeniceChangelogConsumerDaVinciRecordTransformerImpl(
}

if (changelogClientConfig.getInnerClientConfig().getMetricsRepository() != null) {
this.changeCaptureStats = new BasicConsumerStats(
changelogClientConfig.getInnerClientConfig().getMetricsRepository(),
"vcc-" + changelogClientConfig.getConsumerName(),
storeName);
this.changeCaptureStats = statsCloseables.register(
new BasicConsumerStats(
changelogClientConfig.getInnerClientConfig().getMetricsRepository(),
"vcc-" + changelogClientConfig.getConsumerName(),
storeName));
} else {
changeCaptureStats = null;
}
Expand Down Expand Up @@ -439,12 +441,15 @@ public void resume() {
this.resume(Collections.emptySet());
}

@Override
public void close() {
try {
this.stop();
} catch (Exception e) {
LOGGER.error("Close failed for VeniceChangelogConsumer", e);
throw new RuntimeException(e);
} finally {
super.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.linkedin.venice.serialization.avro.AvroSpecificStoreDeserializerCache;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.stats.metrics.AbstractStatsCloseable;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.DaemonThreadFactory;
Expand Down Expand Up @@ -102,7 +103,7 @@
import org.apache.logging.log4j.Logger;


public class VeniceChangelogConsumerImpl<K, V> implements VeniceChangelogConsumer<K, V> {
public class VeniceChangelogConsumerImpl<K, V> extends AbstractStatsCloseable implements VeniceChangelogConsumer<K, V> {
private static final Logger LOGGER = LogManager.getLogger(VeniceChangelogConsumerImpl.class);
private static final int MAX_SUBSCRIBE_RETRIES = 5;
private static final String ROCKSDB_BUFFER_FOLDER = "rocksdb-chunk-buffer";
Expand Down Expand Up @@ -250,10 +251,11 @@ public VeniceChangelogConsumerImpl(
}

if (changelogClientConfig.getInnerClientConfig().getMetricsRepository() != null) {
this.changeCaptureStats = new BasicConsumerStats(
changelogClientConfig.getInnerClientConfig().getMetricsRepository(),
"vcc-" + changelogClientConfig.getConsumerName(),
storeName);
this.changeCaptureStats = statsCloseables.register(
new BasicConsumerStats(
changelogClientConfig.getInnerClientConfig().getMetricsRepository(),
"vcc-" + changelogClientConfig.getConsumerName(),
storeName));
} else {
changeCaptureStats = null;
}
Expand Down Expand Up @@ -1169,18 +1171,22 @@ public void close() {
LOGGER.info("Closing Changelog Consumer with name: {}", changelogClientConfig.getConsumerName());
subscriptionLock.writeLock().lock();
try {
this.unsubscribeAll();
pubSubConsumer.close();
heartbeatReporterThread.interrupt();
seekExecutorService.shutdown();
compressorFactory.close();

if (rocksDBStorageEngineFactory != null) {
rocksDBStorageEngineFactory.close();
}
try {
this.unsubscribeAll();
pubSubConsumer.close();
heartbeatReporterThread.interrupt();
seekExecutorService.shutdown();
compressorFactory.close();

if (rocksDBStorageEngineFactory != null) {
rocksDBStorageEngineFactory.close();
}

veniceChangelogConsumerClientFactory.deregisterClient(changelogClientConfig.getConsumerName());
LOGGER.info("Closed Changelog Consumer with name: {}", changelogClientConfig.getConsumerName());
veniceChangelogConsumerClientFactory.deregisterClient(changelogClientConfig.getConsumerName());
LOGGER.info("Closed Changelog Consumer with name: {}", changelogClientConfig.getConsumerName());
} finally {
super.close();
}
} finally {
subscriptionLock.writeLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
BasicConsumerTehutiMetricName.MAX_PARTITION_LAG,
Collections.singletonList(new Max()),
baseDimensionsMap,
baseAttributes);
baseAttributes,
resources);

minimumConsumingVersionMetric = MetricEntityStateBase.create(
BasicConsumerMetricEntity.CURRENT_CONSUMING_VERSION.getMetricEntity(),
Expand All @@ -83,7 +84,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
BasicConsumerTehutiMetricName.MINIMUM_CONSUMING_VERSION,
Collections.singletonList(new Gauge()),
baseDimensionsMap,
baseAttributes);
baseAttributes,
resources);

maximumConsumingVersionMetric = MetricEntityStateBase.create(
BasicConsumerMetricEntity.CURRENT_CONSUMING_VERSION.getMetricEntity(),
Expand All @@ -92,7 +94,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
BasicConsumerTehutiMetricName.MAXIMUM_CONSUMING_VERSION,
Collections.singletonList(new Gauge()),
baseDimensionsMap,
baseAttributes);
baseAttributes,
resources);

recordsConsumedCountMetric = MetricEntityStateBase.create(
BasicConsumerMetricEntity.RECORDS_CONSUMED_COUNT.getMetricEntity(),
Expand All @@ -101,7 +104,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
BasicConsumerTehutiMetricName.RECORDS_CONSUMED,
Arrays.asList(new Avg(), new Max(), new Rate()),
baseDimensionsMap,
baseAttributes);
baseAttributes,
resources);

pollSuccessCountMetric = MetricEntityStateOneEnum.create(
BasicConsumerMetricEntity.POLL_COUNT.getMetricEntity(),
Expand All @@ -110,7 +114,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
BasicConsumerTehutiMetricName.POLL_SUCCESS_COUNT,
Collections.singletonList(new Rate()),
baseDimensionsMap,
VeniceResponseStatusCategory.class);
VeniceResponseStatusCategory.class,
resources);

pollFailCountMetric = MetricEntityStateOneEnum.create(
BasicConsumerMetricEntity.POLL_COUNT.getMetricEntity(),
Expand All @@ -119,7 +124,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
BasicConsumerTehutiMetricName.POLL_FAIL_COUNT,
Collections.singletonList(new Rate()),
baseDimensionsMap,
VeniceResponseStatusCategory.class);
VeniceResponseStatusCategory.class,
resources);

versionSwapSuccessCountMetric = MetricEntityStateOneEnum.create(
BasicConsumerMetricEntity.VERSION_SWAP_COUNT.getMetricEntity(),
Expand All @@ -128,7 +134,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
BasicConsumerTehutiMetricName.VERSION_SWAP_SUCCESS_COUNT,
Collections.singletonList(new Total()),
baseDimensionsMap,
VeniceResponseStatusCategory.class);
VeniceResponseStatusCategory.class,
resources);

versionSwapFailCountMetric = MetricEntityStateOneEnum.create(
BasicConsumerMetricEntity.VERSION_SWAP_COUNT.getMetricEntity(),
Expand All @@ -137,7 +144,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
BasicConsumerTehutiMetricName.VERSION_SWAP_FAIL_COUNT,
Collections.singletonList(new Total()),
baseDimensionsMap,
VeniceResponseStatusCategory.class);
VeniceResponseStatusCategory.class,
resources);

chunkedRecordSuccessCountMetric = MetricEntityStateOneEnum.create(
BasicConsumerMetricEntity.CHUNKED_RECORD_COUNT.getMetricEntity(),
Expand All @@ -146,7 +154,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
BasicConsumerTehutiMetricName.CHUNKED_RECORD_SUCCESS_COUNT,
Collections.singletonList(new Rate()),
baseDimensionsMap,
VeniceResponseStatusCategory.class);
VeniceResponseStatusCategory.class,
resources);

chunkedRecordFailCountMetric = MetricEntityStateOneEnum.create(
BasicConsumerMetricEntity.CHUNKED_RECORD_COUNT.getMetricEntity(),
Expand All @@ -155,7 +164,8 @@ public BasicConsumerStats(MetricsRepository metricsRepository, String consumerNa
BasicConsumerTehutiMetricName.CHUNKED_RECORD_FAIL_COUNT,
Collections.singletonList(new Rate()),
baseDimensionsMap,
VeniceResponseStatusCategory.class);
VeniceResponseStatusCategory.class,
resources);

/*
* Record default value for version swap metrics so the UP_DOWN_COUNTER in OTEL will emit a default 0.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.stats.HelixMessageChannelStats;
import com.linkedin.venice.stats.metrics.CompositeCloseable;
import com.linkedin.venice.status.StatusMessageHandler;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.HelixUtils;
Expand Down Expand Up @@ -90,6 +91,8 @@ public class HelixParticipationService extends AbstractVeniceService
private VeniceOfflinePushMonitorAccessor veniceOfflinePushMonitorAccessor;
private BlobTransferManager<Void> blobTransferManager;
private final HeartbeatMonitoringService heartbeatMonitoringService;
/** Stats fields owned by this class; drained by {@link #stopInner()}. */
private final CompositeCloseable statsCloseables = new CompositeCloseable();

// This is ONLY for testing purpose.
public ThreadPoolExecutor getLeaderFollowerHelixStateTransitionThreadPool() {
Expand Down Expand Up @@ -193,20 +196,22 @@ public boolean startInner() {
config.getMaxLeaderFollowerStateTransitionThreadNumber(),
"Venice-L/F-state-transition");
// register stats that tracks the thread pool
ParticipantStateTransitionStats stateTransitionStats = new ParticipantStateTransitionStats(
metricsRepository,
leaderFollowerHelixStateTransitionThreadPool,
"Venice_L/F_ST_thread_pool");
ParticipantStateTransitionStats stateTransitionStats = statsCloseables.register(
new ParticipantStateTransitionStats(
metricsRepository,
leaderFollowerHelixStateTransitionThreadPool,
"Venice_L/F_ST_thread_pool"));

if (config.getLeaderFollowerThreadPoolStrategy()
.equals(LeaderFollowerPartitionStateModelFactory.LeaderFollowerThreadPoolStrategy.DUAL_POOL_STRATEGY)) {
ThreadPoolExecutor futureVersionThreadPool = initHelixStateTransitionThreadPool(
config.getMaxFutureVersionLeaderFollowerStateTransitionThreadNumber(),
"venice-L/F-state-transition-future-version");
ParticipantStateTransitionStats futureVersionStateTransitionStats = new ParticipantStateTransitionStats(
metricsRepository,
futureVersionThreadPool,
"Venice_L/F_ST_thread_pool_future_version");
ParticipantStateTransitionStats futureVersionStateTransitionStats = statsCloseables.register(
new ParticipantStateTransitionStats(
metricsRepository,
futureVersionThreadPool,
"Venice_L/F_ST_thread_pool_future_version"));
leaderFollowerParticipantModelFactory = new LeaderFollowerPartitionStateModelDualPoolFactory(
ingestionBackend,
veniceConfigLoader,
Expand Down Expand Up @@ -301,6 +306,7 @@ public void stopInner() throws IOException {
zkClient.close();
LOGGER.info("Closed ZkClient.");
}
statsCloseables.close();
LOGGER.info("Finished stopping HelixParticipation service.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.davinci.stats.ingestion.heartbeat.AggregatedHeartbeatLagEntry;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.stats.metrics.CompositeCloseable;
import com.linkedin.venice.throttle.VeniceAdaptiveThrottler;
import com.linkedin.venice.utils.DaemonThreadFactory;
import io.tehuti.Metric;
Expand Down Expand Up @@ -49,6 +50,8 @@ public class AdaptiveThrottlerSignalService extends AbstractVeniceService {
private boolean nonCurrentLeaderMaxHeartbeatLagSignal = false;
private boolean nonCurrentFollowerMaxHeartbeatLagSignal = false;
private final AdaptiveThrottlingServiceStats adaptiveThrottlingServiceStats;
/** Stats fields owned by this class; drained by {@link #stopInner()}. */
private final CompositeCloseable statsCloseables = new CompositeCloseable();

public AdaptiveThrottlerSignalService(
VeniceServerConfig veniceServerConfig,
Expand All @@ -63,8 +66,8 @@ public AdaptiveThrottlerSignalService(
this.updateService = Executors.newSingleThreadScheduledExecutor(
new DaemonThreadFactory("AdaptiveThrottlerSignalService", veniceServerConfig.getLogContext()));
this.heartbeatMonitoringService = heartbeatMonitoringService;
this.adaptiveThrottlingServiceStats =
new AdaptiveThrottlingServiceStats(metricsRepository, veniceServerConfig.getClusterName());
this.adaptiveThrottlingServiceStats = statsCloseables
.register(new AdaptiveThrottlingServiceStats(metricsRepository, veniceServerConfig.getClusterName()));
}

public void registerThrottler(VeniceAdaptiveThrottler adaptiveIngestionThrottler) {
Expand Down Expand Up @@ -171,6 +174,7 @@ public boolean startInner() throws Exception {
@Override
public void stopInner() throws Exception {
updateService.shutdownNow();
statsCloseables.close();
}

List<VeniceAdaptiveThrottler> getThrottlerList() {
Expand Down
Loading