|
52 | 52 | import org.apache.kafka.common.metrics.Measurable; |
53 | 53 | import org.apache.kafka.common.metrics.Metrics; |
54 | 54 | import org.apache.kafka.common.metrics.MetricsContext; |
| 55 | +import org.apache.kafka.common.metrics.MetricsReporter; |
55 | 56 | import org.apache.kafka.common.record.TimestampType; |
56 | 57 | import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; |
57 | 58 | import org.apache.kafka.common.serialization.Serdes; |
|
71 | 72 | import org.apache.kafka.streams.errors.TaskCorruptedException; |
72 | 73 | import org.apache.kafka.streams.errors.TaskMigratedException; |
73 | 74 | import org.apache.kafka.streams.errors.TopologyException; |
| 75 | +import org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter; |
74 | 76 | import org.apache.kafka.streams.kstream.Consumed; |
75 | 77 | import org.apache.kafka.streams.kstream.Materialized; |
76 | 78 | import org.apache.kafka.streams.kstream.internals.ConsumedInternal; |
@@ -1449,6 +1451,7 @@ public void shouldNotReturnDataAfterTaskMigrated(final boolean processingThreads |
1449 | 1451 | HANDLER, |
1450 | 1452 | null, |
1451 | 1453 | Optional.empty(), |
| 1454 | + null, |
1452 | 1455 | null |
1453 | 1456 | ).updateThreadMetadata(adminClientId(CLIENT_ID)); |
1454 | 1457 |
|
@@ -2474,6 +2477,7 @@ public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath(final boolea |
2474 | 2477 | HANDLER, |
2475 | 2478 | null, |
2476 | 2479 | Optional.empty(), |
| 2480 | + null, |
2477 | 2481 | null |
2478 | 2482 | ) { |
2479 | 2483 | @Override |
@@ -2535,6 +2539,7 @@ public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHan |
2535 | 2539 | HANDLER, |
2536 | 2540 | null, |
2537 | 2541 | Optional.empty(), |
| 2542 | + null, |
2538 | 2543 | null |
2539 | 2544 | ) { |
2540 | 2545 | @Override |
@@ -2604,6 +2609,7 @@ public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath(final |
2604 | 2609 | HANDLER, |
2605 | 2610 | null, |
2606 | 2611 | Optional.empty(), |
| 2612 | + null, |
2607 | 2613 | null |
2608 | 2614 | ) { |
2609 | 2615 | @Override |
@@ -2670,6 +2676,7 @@ public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveT |
2670 | 2676 | HANDLER, |
2671 | 2677 | null, |
2672 | 2678 | Optional.empty(), |
| 2679 | + null, |
2673 | 2680 | null |
2674 | 2681 | ) { |
2675 | 2682 | @Override |
@@ -2733,6 +2740,7 @@ public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInac |
2733 | 2740 | HANDLER, |
2734 | 2741 | null, |
2735 | 2742 | Optional.empty(), |
| 2743 | + null, |
2736 | 2744 | null |
2737 | 2745 | ) { |
2738 | 2746 | @Override |
@@ -2965,6 +2973,7 @@ public void shouldConstructAdminMetrics(final boolean processingThreadsEnabled) |
2965 | 2973 | HANDLER, |
2966 | 2974 | null, |
2967 | 2975 | Optional.empty(), |
| 2976 | + null, |
2968 | 2977 | null |
2969 | 2978 | ); |
2970 | 2979 | final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>()); |
@@ -3024,6 +3033,7 @@ public void runAndVerifyFailedStreamThreadRecording(final boolean shouldFail, fi |
3024 | 3033 | (e, b) -> { }, |
3025 | 3034 | null, |
3026 | 3035 | Optional.empty(), |
| 3036 | + null, |
3027 | 3037 | null |
3028 | 3038 | ) { |
3029 | 3039 | @Override |
@@ -3611,7 +3621,8 @@ public void testStreamsProtocolRunOnceWithoutProcessingThreads() { |
3611 | 3621 | HANDLER, |
3612 | 3622 | null, |
3613 | 3623 | Optional.of(streamsRebalanceData), |
3614 | | - streamsMetadataState |
| 3624 | + streamsMetadataState, |
| 3625 | + null |
3615 | 3626 | ).updateThreadMetadata(adminClientId(CLIENT_ID)); |
3616 | 3627 |
|
3617 | 3628 | thread.setState(State.STARTING); |
@@ -3672,7 +3683,8 @@ public void testStreamsProtocolRunOnceWithoutProcessingThreadsMissingSourceTopic |
3672 | 3683 | HANDLER, |
3673 | 3684 | null, |
3674 | 3685 | Optional.of(streamsRebalanceData), |
3675 | | - streamsMetadataState |
| 3686 | + streamsMetadataState, |
| 3687 | + null |
3676 | 3688 | ).updateThreadMetadata(adminClientId(CLIENT_ID)); |
3677 | 3689 |
|
3678 | 3690 | thread.setState(State.STARTING); |
@@ -3742,7 +3754,8 @@ public void testStreamsProtocolIncorrectlyPartitionedTopics() { |
3742 | 3754 | HANDLER, |
3743 | 3755 | null, |
3744 | 3756 | Optional.of(streamsRebalanceData), |
3745 | | - streamsMetadataState |
| 3757 | + streamsMetadataState, |
| 3758 | + null |
3746 | 3759 | ).updateThreadMetadata(adminClientId(CLIENT_ID)); |
3747 | 3760 |
|
3748 | 3761 | thread.setState(State.STARTING); |
@@ -3803,7 +3816,8 @@ public void testStreamsProtocolRunOnceWithProcessingThreads() { |
3803 | 3816 | HANDLER, |
3804 | 3817 | null, |
3805 | 3818 | Optional.of(streamsRebalanceData), |
3806 | | - streamsMetadataState |
| 3819 | + streamsMetadataState, |
| 3820 | + null |
3807 | 3821 | ).updateThreadMetadata(adminClientId(CLIENT_ID)); |
3808 | 3822 |
|
3809 | 3823 | thread.setState(State.STARTING); |
@@ -3864,7 +3878,8 @@ public void testStreamsProtocolRunOnceWithProcessingThreadsMissingSourceTopic() |
3864 | 3878 | HANDLER, |
3865 | 3879 | null, |
3866 | 3880 | Optional.of(streamsRebalanceData), |
3867 | | - streamsMetadataState |
| 3881 | + streamsMetadataState, |
| 3882 | + null |
3868 | 3883 | ).updateThreadMetadata(adminClientId(CLIENT_ID)); |
3869 | 3884 |
|
3870 | 3885 | thread.setState(State.STARTING); |
@@ -3934,7 +3949,8 @@ public void testStreamsProtocolMissingSourceTopicRecovery() { |
3934 | 3949 | HANDLER, |
3935 | 3950 | null, |
3936 | 3951 | Optional.of(streamsRebalanceData), |
3937 | | - streamsMetadataState |
| 3952 | + streamsMetadataState, |
| 3953 | + null |
3938 | 3954 | ).updateThreadMetadata(adminClientId(CLIENT_ID)); |
3939 | 3955 |
|
3940 | 3956 | thread.setState(State.STARTING); |
@@ -3998,6 +4014,34 @@ t2p1, new PartitionInfo(t2p1.topic(), t2p1.partition(), null, new Node[0], new N |
3998 | 4014 | ); |
3999 | 4015 | } |
4000 | 4016 |
|
| 4017 | + @Test |
| 4018 | + public void shouldRemoveMetricsDelegatingReporterOnShutdown() throws InterruptedException { |
| 4019 | + thread = createStreamThread(CLIENT_ID, false); |
| 4020 | + |
| 4021 | + final List<MetricsReporter> reportersAfterCreate = thread.streamsMetrics().metricsRegistry().reporters(); |
| 4022 | + assertThat( |
| 4023 | + reportersAfterCreate.stream() |
| 4024 | + .filter(r -> r instanceof StreamsThreadMetricsDelegatingReporter) |
| 4025 | + .count(), |
| 4026 | + equalTo(1L) |
| 4027 | + ); |
| 4028 | + |
| 4029 | + thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP); |
| 4030 | + TestUtils.waitForCondition( |
| 4031 | + () -> thread.state() == StreamThread.State.DEAD, |
| 4032 | + 10 * 1000, |
| 4033 | + "Thread never shut down." |
| 4034 | + ); |
| 4035 | + |
| 4036 | + final List<MetricsReporter> reportersAfterShutdown = thread.streamsMetrics().metricsRegistry().reporters(); |
| 4037 | + assertThat( |
| 4038 | + reportersAfterShutdown.stream() |
| 4039 | + .filter(r -> r instanceof StreamsThreadMetricsDelegatingReporter) |
| 4040 | + .count(), |
| 4041 | + equalTo(0L) |
| 4042 | + ); |
| 4043 | + } |
| 4044 | + |
4001 | 4045 | private StreamThread setUpThread(final Properties streamsConfigProps) { |
4002 | 4046 | final StreamsConfig config = new StreamsConfig(streamsConfigProps); |
4003 | 4047 | final ConsumerGroupMetadata consumerGroupMetadata = Mockito.mock(ConsumerGroupMetadata.class); |
@@ -4030,6 +4074,7 @@ private StreamThread setUpThread(final Properties streamsConfigProps) { |
4030 | 4074 | null, |
4031 | 4075 | null, |
4032 | 4076 | Optional.empty(), |
| 4077 | + null, |
4033 | 4078 | null |
4034 | 4079 | ); |
4035 | 4080 | } |
@@ -4131,6 +4176,7 @@ private StreamThread buildStreamThread(final Consumer<byte[], byte[]> consumer, |
4131 | 4176 | HANDLER, |
4132 | 4177 | null, |
4133 | 4178 | Optional.empty(), |
| 4179 | + null, |
4134 | 4180 | null |
4135 | 4181 | ); |
4136 | 4182 | } |
|
0 commit comments