Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ public boolean isStartingRunningOrPartitionAssigned() {
private final WindowedSum punctuateLatencyWindowedSum = new WindowedSum();
private final WindowedSum runOnceLatencyWindowedSum = new WindowedSum();
private final MetricConfig metricsConfig;
private final StreamsThreadMetricsDelegatingReporter metricsReporter;

private boolean latencyWindowsInitialized = false;

Expand Down Expand Up @@ -514,8 +515,8 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
taskManager.setMainConsumer(mainConsumerSetup.mainConsumer);
referenceContainer.mainConsumer = mainConsumerSetup.mainConsumer;

final StreamsThreadMetricsDelegatingReporter reporter = new StreamsThreadMetricsDelegatingReporter(mainConsumerSetup.mainConsumer, threadId, Optional.of(stateUpdaterId));
streamsMetrics.metricsRegistry().addReporter(reporter);
final StreamsThreadMetricsDelegatingReporter metricsReporter = new StreamsThreadMetricsDelegatingReporter(mainConsumerSetup.mainConsumer, threadId, Optional.of(stateUpdaterId));
streamsMetrics.metricsRegistry().addReporter(metricsReporter);

final StreamThread streamThread = new StreamThread(
time,
Expand All @@ -539,7 +540,8 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
streamsUncaughtExceptionHandler,
cache::resize,
mainConsumerSetup.streamsRebalanceData,
streamsMetadataState
streamsMetadataState,
metricsReporter
);

return streamThread.updateThreadMetadata(adminClientId(clientId));
Expand Down Expand Up @@ -786,7 +788,8 @@ public StreamThread(final Time time,
final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler,
final java.util.function.Consumer<Long> cacheResizer,
final Optional<StreamsRebalanceData> streamsRebalanceData,
final StreamsMetadataState streamsMetadataState
final StreamsMetadataState streamsMetadataState,
final StreamsThreadMetricsDelegatingReporter metricsReporter
) {
super(threadId);
this.stateLock = new Object();
Expand All @@ -809,6 +812,7 @@ public StreamThread(final Time time,
this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
this.cacheResizer = cacheResizer;
this.metricsConfig = streamsMetrics.metricsRegistry().config();
this.metricsReporter = metricsReporter;

// The following sensors are created here but their references are not stored in this object, since within
// this object they are not recorded. The sensors are created here so that the stream threads starts with all
Expand Down Expand Up @@ -1905,6 +1909,7 @@ private void completeShutdown(final boolean cleanRun) {
}
streamsMetrics.removeAllThreadLevelSensors(getName());
streamsMetrics.removeAllThreadLevelMetrics(getName());
streamsMetrics.metricsRegistry().removeReporter(metricsReporter);

setState(State.DEAD);

Expand Down Expand Up @@ -2102,6 +2107,10 @@ Optional<StreamsRebalanceData> streamsRebalanceData() {
return streamsRebalanceData;
}

StreamsMetricsImpl streamsMetrics() {
return streamsMetrics;
}

/**
* Initialize both WindowedSum instances at exactly the same timestamp so
* their windows are aligned from the very beginning.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.serialization.Serdes;
Expand All @@ -71,6 +72,7 @@
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
Expand Down Expand Up @@ -1449,6 +1451,7 @@ public void shouldNotReturnDataAfterTaskMigrated(final boolean processingThreads
HANDLER,
null,
Optional.empty(),
null,
null
).updateThreadMetadata(adminClientId(CLIENT_ID));

Expand Down Expand Up @@ -2474,6 +2477,7 @@ public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath(final boolea
HANDLER,
null,
Optional.empty(),
null,
null
) {
@Override
Expand Down Expand Up @@ -2535,6 +2539,7 @@ public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHan
HANDLER,
null,
Optional.empty(),
null,
null
) {
@Override
Expand Down Expand Up @@ -2604,6 +2609,7 @@ public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath(final
HANDLER,
null,
Optional.empty(),
null,
null
) {
@Override
Expand Down Expand Up @@ -2670,6 +2676,7 @@ public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveT
HANDLER,
null,
Optional.empty(),
null,
null
) {
@Override
Expand Down Expand Up @@ -2733,6 +2740,7 @@ public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInac
HANDLER,
null,
Optional.empty(),
null,
null
) {
@Override
Expand Down Expand Up @@ -2965,6 +2973,7 @@ public void shouldConstructAdminMetrics(final boolean processingThreadsEnabled)
HANDLER,
null,
Optional.empty(),
null,
null
);
final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>());
Expand Down Expand Up @@ -3024,6 +3033,7 @@ public void runAndVerifyFailedStreamThreadRecording(final boolean shouldFail, fi
(e, b) -> { },
null,
Optional.empty(),
null,
null
) {
@Override
Expand Down Expand Up @@ -3611,7 +3621,8 @@ public void testStreamsProtocolRunOnceWithoutProcessingThreads() {
HANDLER,
null,
Optional.of(streamsRebalanceData),
streamsMetadataState
streamsMetadataState,
null
).updateThreadMetadata(adminClientId(CLIENT_ID));

thread.setState(State.STARTING);
Expand Down Expand Up @@ -3672,7 +3683,8 @@ public void testStreamsProtocolRunOnceWithoutProcessingThreadsMissingSourceTopic
HANDLER,
null,
Optional.of(streamsRebalanceData),
streamsMetadataState
streamsMetadataState,
null
).updateThreadMetadata(adminClientId(CLIENT_ID));

thread.setState(State.STARTING);
Expand Down Expand Up @@ -3742,7 +3754,8 @@ public void testStreamsProtocolIncorrectlyPartitionedTopics() {
HANDLER,
null,
Optional.of(streamsRebalanceData),
streamsMetadataState
streamsMetadataState,
null
).updateThreadMetadata(adminClientId(CLIENT_ID));

thread.setState(State.STARTING);
Expand Down Expand Up @@ -3803,7 +3816,8 @@ public void testStreamsProtocolRunOnceWithProcessingThreads() {
HANDLER,
null,
Optional.of(streamsRebalanceData),
streamsMetadataState
streamsMetadataState,
null
).updateThreadMetadata(adminClientId(CLIENT_ID));

thread.setState(State.STARTING);
Expand Down Expand Up @@ -3864,7 +3878,8 @@ public void testStreamsProtocolRunOnceWithProcessingThreadsMissingSourceTopic()
HANDLER,
null,
Optional.of(streamsRebalanceData),
streamsMetadataState
streamsMetadataState,
null
).updateThreadMetadata(adminClientId(CLIENT_ID));

thread.setState(State.STARTING);
Expand Down Expand Up @@ -3934,7 +3949,8 @@ public void testStreamsProtocolMissingSourceTopicRecovery() {
HANDLER,
null,
Optional.of(streamsRebalanceData),
streamsMetadataState
streamsMetadataState,
null
).updateThreadMetadata(adminClientId(CLIENT_ID));

thread.setState(State.STARTING);
Expand Down Expand Up @@ -3998,6 +4014,34 @@ t2p1, new PartitionInfo(t2p1.topic(), t2p1.partition(), null, new Node[0], new N
);
}

@Test
public void shouldRemoveMetricsDelegatingReporterOnShutdown() throws InterruptedException {
thread = createStreamThread(CLIENT_ID, false);

final List<MetricsReporter> reportersAfterCreate = thread.streamsMetrics().metricsRegistry().reporters();
assertThat(
reportersAfterCreate.stream()
.filter(r -> r instanceof StreamsThreadMetricsDelegatingReporter)
.count(),
equalTo(1L)
);

thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
TestUtils.waitForCondition(
() -> thread.state() == StreamThread.State.DEAD,
10 * 1000,
"Thread never shut down."
);

final List<MetricsReporter> reportersAfterShutdown = thread.streamsMetrics().metricsRegistry().reporters();
assertThat(
reportersAfterShutdown.stream()
.filter(r -> r instanceof StreamsThreadMetricsDelegatingReporter)
.count(),
equalTo(0L)
);
}

private StreamThread setUpThread(final Properties streamsConfigProps) {
final StreamsConfig config = new StreamsConfig(streamsConfigProps);
final ConsumerGroupMetadata consumerGroupMetadata = Mockito.mock(ConsumerGroupMetadata.class);
Expand Down Expand Up @@ -4030,6 +4074,7 @@ private StreamThread setUpThread(final Properties streamsConfigProps) {
null,
null,
Optional.empty(),
null,
null
);
}
Expand Down Expand Up @@ -4131,6 +4176,7 @@ private StreamThread buildStreamThread(final Consumer<byte[], byte[]> consumer,
HANDLER,
null,
Optional.empty(),
null,
null
);
}
Expand Down
Loading