From 92bcc6647cffa5ae742c3605e919f17dcdd37b70 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sat, 28 Mar 2026 07:38:37 +0000 Subject: [PATCH] [rpc] Client zero-copy lazy parse ByteBuf to avoid deep memory copy (#1184) --- .../apache/fluss/client/FlussConnection.java | 2 +- .../scanner/log/DefaultCompletedFetch.java | 26 ++- .../client/table/scanner/log/LogFetcher.java | 20 ++- .../client/metadata/MetadataUpdaterTest.java | 2 +- .../metadata/TestingClientSchemaGetter.java | 2 +- .../metadata/TestingMetadataUpdater.java | 2 +- .../acl/FlussAuthorizationITCase.java | 4 +- ...aultCompletedFetchBufferLifecycleTest.java | 165 ++++++++++++++++++ .../log/DefaultCompletedFetchTest.java | 6 +- .../table/scanner/log/LogFetchBufferTest.java | 3 +- .../scanner/log/LogFetchCollectorTest.java | 2 +- .../client/write/RecordAccumulatorTest.java | 3 +- .../FlussTableLakeSnapshotCommitter.java | 3 +- .../enumerator/TieringSourceEnumerator.java | 2 +- .../java/org/apache/fluss/rpc/RpcClient.java | 7 +- .../fluss/rpc/netty/client/NettyClient.java | 13 +- .../rpc/netty/client/NettyClientHandler.java | 36 +--- .../rpc/netty/client/ServerConnection.java | 11 +- .../fluss/rpc/netty/NettyMetricsTest.java | 2 +- .../authenticate/AuthenticationTest.java | 22 +-- .../SaslAuthenticationITCase.java | 2 +- .../rpc/netty/client/NettyClientTest.java | 4 +- .../netty/client/ServerConnectionTest.java | 20 +-- .../fluss/rpc/protocol/MessageCodecTest.java | 2 +- .../server/coordinator/CoordinatorServer.java | 2 +- .../fluss/server/tablet/TabletServer.java | 2 +- .../apache/fluss/server/ServerITCaseBase.java | 3 +- .../CoordinatorChannelManagerTest.java | 3 +- .../coordinator/TableManagerITCase.java | 3 +- .../TestCoordinatorChannelManager.java | 2 +- .../statemachine/ReplicaStateMachineTest.java | 3 +- .../TableBucketStateMachineTest.java | 3 +- .../fluss/server/replica/ReplicaTestBase.java | 2 +- .../fetcher/ReplicaFetcherThreadTest.java | 2 +- .../testutils/FlussClusterExtension.java | 3 +- .../server/utils/RpcGatewayManagerTest.java | 2 +- 36 files changed, 271 insertions(+), 120 deletions(-) create mode 100644 fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchBufferLifecycleTest.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java index 0990756ce5..d98e45869e 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java @@ -81,7 +81,7 @@ public final class FlussConnection implements Connection { String clientId = conf.getString(ConfigOptions.CLIENT_ID); this.metricRegistry = metricRegistry; this.clientMetricGroup = new ClientMetricGroup(metricRegistry, clientId); - this.rpcClient = RpcClient.create(conf, clientMetricGroup, false); + this.rpcClient = RpcClient.create(conf, clientMetricGroup); // TODO this maybe remove after we introduce client metadata. this.metadataUpdater = new MetadataUpdater(conf, rpcClient); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java index 95078bf36e..abd6e4ca85 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java @@ -22,6 +22,9 @@ import org.apache.fluss.record.LogRecordReadContext; import org.apache.fluss.rpc.entity.FetchLogResultForBucket; import org.apache.fluss.rpc.messages.FetchLogRequest; +import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf; + +import javax.annotation.Nullable; /** * {@link DefaultCompletedFetch} is a {@link CompletedFetch} that represents a completed fetch that @@ -30,13 +33,21 @@ @Internal class DefaultCompletedFetch extends CompletedFetch { + /** + * The parsed ByteBuf backing the lazily-parsed records data. This reference is retained to keep + * the underlying network buffer alive while the records are being consumed. Released in {@link + * #drain()}. + */ + @Nullable private ByteBuf parsedByteBuf; + public DefaultCompletedFetch( TableBucket tableBucket, FetchLogResultForBucket fetchLogResultForBucket, LogRecordReadContext readContext, LogScannerStatus logScannerStatus, boolean isCheckCrc, - Long fetchOffset) { + Long fetchOffset, + @Nullable ByteBuf parsedByteBuf) { super( tableBucket, fetchLogResultForBucket.getError(), @@ -47,5 +58,18 @@ public DefaultCompletedFetch( logScannerStatus, isCheckCrc, fetchOffset); + this.parsedByteBuf = parsedByteBuf; + } + + @Override + void drain() { + try { + super.drain(); + } finally { + if (parsedByteBuf != null) { + parsedByteBuf.release(); + parsedByteBuf = null; + } + } } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index b5a03a1caf..323aab7851 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -52,6 +52,7 @@ import org.apache.fluss.rpc.messages.PbFetchLogRespForTable; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.rpc.protocol.Errors; +import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.fluss.utils.IOUtils; import org.apache.fluss.utils.Projection; @@ -340,6 +341,11 @@ private void handleFetchLogException( /** Implements the core logic for a successful fetch log response. */ private synchronized void handleFetchLogResponse( int destination, long requestStartTime, FetchLogResponse fetchLogResponse) { + // Capture the parsed ByteBuf for buffer lifecycle management. The response may + // have been lazily parsed from the network buffer. Each DefaultCompletedFetch + // that references the buffer's records data must retain it. We release the base + // reference in the finally block. + ByteBuf parsedByteBuf = fetchLogResponse.getParsedByteBuf(); try { if (isClosed) { return; @@ -387,6 +393,11 @@ private synchronized void handleFetchLogResponse( LogRecords logRecords = fetchResultForBucket.recordsOrEmpty(); if (!MemoryLogRecords.EMPTY.equals(logRecords) || fetchResultForBucket.getErrorCode() != Errors.NONE.code()) { + // Retain the parsed buffer so it stays alive while + // this CompletedFetch's records are being consumed. + if (parsedByteBuf != null) { + parsedByteBuf.retain(); + } // In oder to not signal notEmptyCondition, add completed // fetch to buffer until log records is not empty. DefaultCompletedFetch completedFetch = @@ -398,7 +409,8 @@ private synchronized void handleFetchLogResponse( // skipping CRC check if projection push downed as // the data is pruned isCheckCrcs, - fetchOffset); + fetchOffset, + parsedByteBuf); logFetchBuffer.add(completedFetch); } } @@ -406,6 +418,12 @@ private synchronized void handleFetchLogResponse( } } } finally { + // Release the base reference from the network buffer. Any CompletedFetch + // objects created above hold their own retained references, keeping the + // buffer alive until they are drained. + if (parsedByteBuf != null) { + parsedByteBuf.release(); + } LOG.debug("Removing pending request for node: {}", destination); nodesWithPendingFetchRequests.remove(destination); } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java index 6717cfe5ae..1cda13b930 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java @@ -50,7 +50,7 @@ public class MetadataUpdaterTest { void testInitializeClusterWithRetries() throws Exception { Configuration configuration = new Configuration(); RpcClient rpcClient = - RpcClient.create(configuration, TestingClientMetricGroup.newInstance(), false); + RpcClient.create(configuration, TestingClientMetricGroup.newInstance()); // retry lower than max retry count. AdminReadOnlyGateway gateway = new TestingAdminReadOnlyGateway(2); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java index ffb2c54dc2..0550173831 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java @@ -38,7 +38,7 @@ public TestingClientSchemaGetter( tablePath, latestSchemaInfo, new FlussAdmin( - RpcClient.create(conf, TestingClientMetricGroup.newInstance(), false), + RpcClient.create(conf, TestingClientMetricGroup.newInstance()), metadataUpdater)); } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java index 2063951357..6c67d297e6 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java @@ -70,7 +70,7 @@ public TestingMetadataUpdater( Map customGateways, Configuration conf) { super( - RpcClient.create(conf, TestingClientMetricGroup.newInstance(), false), + RpcClient.create(conf, TestingClientMetricGroup.newInstance()), conf, Cluster.empty()); initializeCluster(coordinatorServer, tabletServers, tableInfos); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index 8c301c89a2..e606052efd 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -512,7 +512,7 @@ void testGetMetaInfo() throws Exception { Collections.singleton(DATA1_TABLE_PATH_PK), null, null); try (RpcClient rpcClient = - RpcClient.create(guestConf, TestingClientMetricGroup.newInstance(), false)) { + RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) { AdminGateway guestGateway = GatewayClientProxy.createGatewayProxy( () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"), @@ -836,7 +836,7 @@ void testControlledShutdown() throws Exception { new ControlledShutdownRequest().setTabletServerId(-1).setTabletServerEpoch(-1); try (RpcClient rpcClient = - RpcClient.create(guestConf, TestingClientMetricGroup.newInstance(), false)) { + RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) { CoordinatorGateway guestGateway = GatewayClientProxy.createGatewayProxy( () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"), diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchBufferLifecycleTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchBufferLifecycleTest.java new file mode 100644 index 0000000000..090bc932fa --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchBufferLifecycleTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.log; + +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.record.LogRecordReadContext; +import org.apache.fluss.rpc.entity.FetchLogResultForBucket; +import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.fluss.shaded.netty4.io.netty.buffer.Unpooled; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.fluss.record.TestData.DATA1; +import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; +import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID; +import static org.apache.fluss.record.TestData.TEST_SCHEMA_GETTER; +import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link DefaultCompletedFetch} zero-copy buffer lifecycle (issue #1184). */ +class DefaultCompletedFetchBufferLifecycleTest { + + private final TableBucket tb0 = new TableBucket(1, 0); + private final TableBucket tb1 = new TableBucket(1, 1); + private LogScannerStatus logScannerStatus; + private LogRecordReadContext readContext; + + @BeforeEach + void setup() { + Map scanBuckets = new HashMap<>(); + scanBuckets.put(tb0, 0L); + scanBuckets.put(tb1, 0L); + logScannerStatus = new LogScannerStatus(); + logScannerStatus.assignScanBuckets(scanBuckets); + readContext = + LogRecordReadContext.createArrowReadContext( + DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, TEST_SCHEMA_GETTER); + } + + @AfterEach + void afterEach() { + if (readContext != null) { + readContext.close(); + readContext = null; + } + } + + @Test + void testBufferReleasedOnDrain() throws Exception { + ByteBuf buf = Unpooled.buffer(64); + buf.writeBytes(new byte[64]); + + buf.retain(); + DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf); + buf.release(); // base reference + assertThat(buf.refCnt()).isEqualTo(1); + + fetch.drain(); + assertThat(buf.refCnt()).isEqualTo(0); + } + + @Test + void testMultipleCompletedFetchShareBuffer() throws Exception { + ByteBuf buf = Unpooled.buffer(64); + buf.writeBytes(new byte[64]); + + buf.retain(); + DefaultCompletedFetch fetch1 = makeCompletedFetch(tb0, buf); + buf.retain(); + DefaultCompletedFetch fetch2 = makeCompletedFetch(tb1, buf); + buf.release(); // base reference + assertThat(buf.refCnt()).isEqualTo(2); + + fetch1.drain(); + assertThat(buf.refCnt()).isEqualTo(1); + + fetch2.drain(); + assertThat(buf.refCnt()).isEqualTo(0); + } + + @Test + void testDrainIsIdempotent() throws Exception { + ByteBuf buf = Unpooled.buffer(64); + buf.writeBytes(new byte[64]); + buf.retain(); + + DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf); + buf.release(); + + fetch.drain(); + assertThat(buf.refCnt()).isEqualTo(0); + fetch.drain(); // no-op, no exception + } + + @Test + void testNullBufferHandledGracefully() throws Exception { + DefaultCompletedFetch fetch = makeCompletedFetch(tb0, null); + fetch.drain(); + } + + @Test + void testLogFetchBufferCloseReleasesBuffer() throws Exception { + ByteBuf buf = Unpooled.buffer(64); + buf.writeBytes(new byte[64]); + + buf.retain(); + DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf); + buf.release(); + + try (LogFetchBuffer logFetchBuffer = new LogFetchBuffer()) { + logFetchBuffer.add(fetch); + assertThat(buf.refCnt()).isEqualTo(1); + } + assertThat(buf.refCnt()).isEqualTo(0); + } + + @Test + void testRetainAllUnsubscribeReleasesBuffer() throws Exception { + ByteBuf buf = Unpooled.buffer(64); + buf.writeBytes(new byte[64]); + + buf.retain(); + DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf); + buf.release(); + + try (LogFetchBuffer logFetchBuffer = new LogFetchBuffer()) { + logFetchBuffer.add(fetch); + logFetchBuffer.retainAll(Collections.singleton(tb1)); + assertThat(buf.refCnt()).isEqualTo(0); + } + } + + private DefaultCompletedFetch makeCompletedFetch(TableBucket tableBucket, ByteBuf parsedByteBuf) + throws Exception { + return new DefaultCompletedFetch( + tableBucket, + new FetchLogResultForBucket(tableBucket, genMemoryLogRecordsByObject(DATA1), 10L), + readContext, + logScannerStatus, + true, + 0L, + parsedByteBuf); + } +} diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java index c4624617df..427f59d4d7 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java @@ -345,7 +345,8 @@ void testComplexTypeFetch() throws Exception { tableInfo.getSchemaId(), tableInfo.getSchema())), logScannerStatus, true, - fetchOffset); + fetchOffset, + null); List scanRecords = defaultCompletedFetch.fetchRecords(3); // close the read context to release arrow root resource, // this is important to test complex types @@ -398,7 +399,8 @@ private DefaultCompletedFetch makeCompletedFetch( new TestingSchemaGetter(tableInfo.getSchemaId(), tableInfo.getSchema())), logScannerStatus, true, - offset); + offset, + null); } private static Collection typeAndMagic() { diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchBufferTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchBufferTest.java index 095131b1e5..abfd0f1d19 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchBufferTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchBufferTest.java @@ -264,7 +264,8 @@ private DefaultCompletedFetch makeCompletedFetch(TableBucket tableBucket) throws readContext, logScannerStatus, true, - 0L); + 0L, + null); } private PendingFetch makePendingFetch(TableBucket tableBucket) throws Exception { diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java index 99a108e2f3..6b4f143f2f 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java @@ -160,6 +160,6 @@ void testCollectAfterUnassign() throws Exception { private DefaultCompletedFetch makeCompletedFetch( TableBucket tableBucket, FetchLogResultForBucket resultForBucket, long offset) { return new DefaultCompletedFetch( - tableBucket, resultForBucket, readContext, logScannerStatus, true, offset); + tableBucket, resultForBucket, readContext, logScannerStatus, true, offset, null); } } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java index 773c12a445..a4c31d51ba 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java @@ -634,8 +634,7 @@ private RecordAccumulator createTestRecordAccumulator( conf.getInt(ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET), GatewayClientProxy.createGatewayProxy( () -> cluster.getRandomTabletServer(), - RpcClient.create( - conf, TestingClientMetricGroup.newInstance(), false), + RpcClient.create(conf, TestingClientMetricGroup.newInstance()), TabletServerGateway.class), null), TestingWriterMetricGroup.newInstance(), diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java index 30c3a4bf31..76474a5b26 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java @@ -83,8 +83,7 @@ public void open() { String clientId = flussConf.getString(ConfigOptions.CLIENT_ID); MetricRegistry metricRegistry = MetricRegistry.create(flussConf, null); // don't care about metrics, but pass a ClientMetricGroup to make compiler happy - rpcClient = - RpcClient.create(flussConf, new ClientMetricGroup(metricRegistry, clientId), false); + rpcClient = RpcClient.create(flussConf, new ClientMetricGroup(metricRegistry, clientId)); MetadataUpdater metadataUpdater = new MetadataUpdater(flussConf, rpcClient); this.coordinatorGateway = GatewayClientProxy.createGatewayProxy( diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index 337222f4e3..e81362fa84 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -147,7 +147,7 @@ public void start() { FlinkMetricRegistry metricRegistry = new FlinkMetricRegistry(enumeratorMetricGroup); ClientMetricGroup clientMetricGroup = new ClientMetricGroup(metricRegistry, "LakeTieringService"); - this.rpcClient = RpcClient.create(flussConf, clientMetricGroup, false); + this.rpcClient = RpcClient.create(flussConf, clientMetricGroup); MetadataUpdater metadataUpdater = new MetadataUpdater(flussConf, rpcClient); this.coordinatorGateway = GatewayClientProxy.createGatewayProxy( diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/RpcClient.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/RpcClient.java index 804058efbf..ce4e6fd2db 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/RpcClient.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/RpcClient.java @@ -37,13 +37,10 @@ public interface RpcClient extends AutoCloseable { * * @param conf The configuration to use. * @param clientMetricGroup The client metric group - * @param isInnerClient Whether it is an inner client used for communicate from server to - * server. * @return The RPC client. */ - static RpcClient create( - Configuration conf, ClientMetricGroup clientMetricGroup, boolean isInnerClient) { - return new NettyClient(conf, clientMetricGroup, isInnerClient); + static RpcClient create(Configuration conf, ClientMetricGroup clientMetricGroup) { + return new NettyClient(conf, clientMetricGroup); } /** diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java index f567b8584d..ebeab71705 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java @@ -76,16 +76,9 @@ public final class NettyClient implements RpcClient { private final Supplier authenticatorSupplier; - /** - * Whether the NettyClient is used as inner network client (Communicating between Fluss's - * servers). - */ - private final boolean isInnerClient; - private volatile boolean isClosed = false; - public NettyClient( - Configuration conf, ClientMetricGroup clientMetricGroup, boolean isInnerClient) { + public NettyClient(Configuration conf, ClientMetricGroup clientMetricGroup) { this.connections = MapUtils.newConcurrentHashMap(); // build bootstrap @@ -106,7 +99,6 @@ public NettyClient( .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ClientChannelInitializer(connectionMaxIdle)); - this.isInnerClient = isInnerClient; this.clientMetricGroup = clientMetricGroup; this.authenticatorSupplier = AuthenticationFactory.loadClientAuthenticatorSupplier(conf); NettyMetrics.registerNettyMetrics(clientMetricGroup, pooledAllocator); @@ -197,8 +189,7 @@ private ServerConnection getOrCreateConnection(ServerNode node) { node, clientMetricGroup, authenticatorSupplier.get(), - (con, ignore) -> connections.remove(serverId, con), - isInnerClient); + (con, ignore) -> connections.remove(serverId, con)); }); } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClientHandler.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClientHandler.java index a2c03028d1..1eef39f05a 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClientHandler.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClientHandler.java @@ -20,12 +20,10 @@ import org.apache.fluss.exception.CorruptMessageException; import org.apache.fluss.rpc.messages.ApiMessage; import org.apache.fluss.rpc.messages.ErrorResponse; -import org.apache.fluss.rpc.messages.FetchLogResponse; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.rpc.protocol.ApiMethod; import org.apache.fluss.rpc.protocol.ResponseType; import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.fluss.shaded.netty4.io.netty.buffer.Unpooled; import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext; import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.fluss.shaded.netty4.io.netty.handler.timeout.IdleState; @@ -49,15 +47,8 @@ public final class NettyClientHandler extends ChannelInboundHandlerAdapter { private final ClientHandlerCallback callback; - /** - * Whether the NettyClientHandler is used as inner network client (Communicating between Fluss's - * servers). - */ - private final boolean isInnerClient; - - public NettyClientHandler(ClientHandlerCallback callback, boolean isInnerClient) { + public NettyClientHandler(ClientHandlerCallback callback) { this.callback = callback; - this.isInnerClient = isInnerClient; } @Override @@ -90,26 +81,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } ApiMessage response = apiMethod.getResponseConstructor().get(); if (response.isLazilyParsed()) { - if (isInnerClient && response instanceof FetchLogResponse) { - // For the FetchLogResponse returned by the FetchLogRequest sent by the - // follower's TabletServer, we needn't perform an unHeap-to-heap memory - // copy to preserve zero-copy capabilities. This requires users to manually - // call ApiMessage#getParsedByteBuf().release() to release the ByteBuf after - // processing the response. - // TODO for the FetchLogResponse returned by the FetchLogRequest sent by the - // Fluss client, We also aim to avoid this memory copy operation, traced by - // https://github.com/apache/fluss/issues/1184 - response.parseFrom(buffer, messageSize); - } else { - // copy the buffer into a heap buffer, this can avoid the network buffer - // being released before the bytes fields of the response are lazily parsed. - ByteBuf copiedBuffer = Unpooled.buffer(messageSize, messageSize); - copiedBuffer.writeBytes(buffer, messageSize); - // response parsed from the copied buffer can be safely cached in user - // queues. - response.parseFrom(copiedBuffer, messageSize); - buffer.release(); - } + // Parse lazily from the original buffer without copying. The + // consumer is responsible for releasing the buffer via + // ApiMessage#getParsedByteBuf().release() after the response + // has been fully consumed. + response.parseFrom(buffer, messageSize); } else { response.parseFrom(buffer, messageSize); // eagerly release the buffer to make the buffer recycle faster diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java index a3fd7afbd5..7d4af53337 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java @@ -106,8 +106,7 @@ final class ServerConnection { ServerNode node, ClientMetricGroup clientMetricGroup, ClientAuthenticator authenticator, - BiConsumer closeCallback, - boolean isInnerClient) { + BiConsumer closeCallback) { this.node = node; this.state = ConnectionState.CONNECTING; this.connectionMetrics = clientMetricGroup.createConnectionMetricGroup(node.uid()); @@ -119,7 +118,7 @@ final class ServerConnection { // callback is not registered when connection established. bootstrap .connect(node.host(), node.port()) - .addListener(future -> establishConnection((ChannelFuture) future, isInnerClient)); + .addListener(future -> establishConnection((ChannelFuture) future)); } public ServerNode getServerNode() { @@ -250,7 +249,7 @@ public void onFailure(Throwable cause) { // ------------------------------------------------------------------------------------------ - private void establishConnection(ChannelFuture future, boolean isInnerClient) { + private void establishConnection(ChannelFuture future) { synchronized (lock) { if (future.isSuccess()) { if (state.isDisconnected()) { @@ -262,9 +261,7 @@ private void establishConnection(ChannelFuture future, boolean isInnerClient) { LOG.debug("Established connection to server {}.", node); channel = future.channel(); channel.pipeline() - .addLast( - "handler", - new NettyClientHandler(new ResponseCallback(), isInnerClient)); + .addLast("handler", new NettyClientHandler(new ResponseCallback())); // start checking api versions switchState(ConnectionState.CHECKING_API_VERSIONS); // TODO: set correct client software name and version, used for metrics in server diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/NettyMetricsTest.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/NettyMetricsTest.java index b56c476cd1..7b6c264405 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/NettyMetricsTest.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/NettyMetricsTest.java @@ -73,7 +73,7 @@ public void setup() throws Exception { } ClientMetricGroup clientMetricGroup = TestingClientMetricGroup.newInstance(); clientGroup = clientMetricGroup.addGroup(NettyMetrics.NETTY_METRIC_GROUP); - nettyClient = new NettyClient(conf, clientMetricGroup, false); + nettyClient = new NettyClient(conf, clientMetricGroup); } @AfterEach diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java index 8ad62d94fc..e96fa283ab 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java @@ -73,7 +73,7 @@ void testNormalAuthenticate() throws Exception { clientConfig.setString("client.security.sasl.username", "root"); clientConfig.setString("client.security.sasl.password", "password"); try (NettyClient nettyClient = - new NettyClient(clientConfig, TestingClientMetricGroup.newInstance(), false)) { + new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) { verifyGetTableNamesList(nettyClient, usernamePasswordServerNode); } } @@ -85,14 +85,14 @@ void testMutualAuthenticate() throws Exception { // test normal mutual auth try (NettyClient nettyClient = - new NettyClient(clientConfig, TestingClientMetricGroup.newInstance(), false)) { + new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) { verifyGetTableNamesList(nettyClient, mutualAuthServerNode); } // test invalid challenge from server clientConfig.setString("client.security.mutual.error-type", "SERVER_ERROR_CHALLENGE"); try (NettyClient nettyClient = - new NettyClient(clientConfig, TestingClientMetricGroup.newInstance(), false)) { + new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) { assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient, mutualAuthServerNode)) .hasRootCauseExactlyInstanceOf(AuthenticationException.class) .rootCause() @@ -102,7 +102,7 @@ void testMutualAuthenticate() throws Exception { // test invalid token from client clientConfig.setString("client.security.mutual.error-type", "CLIENT_ERROR_SECOND_TOKEN"); try (NettyClient nettyClient = - new NettyClient(clientConfig, TestingClientMetricGroup.newInstance(), false)) { + new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) { assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient, mutualAuthServerNode)) .rootCause() .hasMessageContaining("Invalid token value"); @@ -115,7 +115,7 @@ void testNoChallengeBeforeClientComplete() throws Exception { clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual"); clientConfig.setString("client.security.mutual.error-type", "SERVER_NO_CHALLENGE"); try (NettyClient nettyClient = - new NettyClient(clientConfig, TestingClientMetricGroup.newInstance(), false)) { + new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) { assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient, mutualAuthServerNode)) .hasRootCauseExactlyInstanceOf(IllegalStateException.class) @@ -130,7 +130,7 @@ void testRetirableAuthenticateException() throws Exception { clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual"); clientConfig.setString("client.security.mutual.error-type", "RETRIABLE_EXCEPTION"); try (NettyClient nettyClient = - new NettyClient(clientConfig, TestingClientMetricGroup.newInstance(), false)) { + new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) { verifyGetTableNamesList(nettyClient, mutualAuthServerNode); } } @@ -139,7 +139,7 @@ void testRetirableAuthenticateException() throws Exception { void testClientLackAuthenticateProtocol() throws Exception { Configuration clientConfig = new Configuration(); try (NettyClient nettyClient = - new NettyClient(clientConfig, TestingClientMetricGroup.newInstance(), false)) { + new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) { assertThatThrownBy( () -> verifyGetTableNamesList(nettyClient, usernamePasswordServerNode)) .cause() @@ -154,7 +154,7 @@ void testAuthenticateProtocolNotMatch() throws Exception { Configuration clientConfig = new Configuration(); clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual"); try (NettyClient nettyClient = - new NettyClient(clientConfig, TestingClientMetricGroup.newInstance(), false)) { + new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) { assertThatThrownBy( () -> verifyGetTableNamesList(nettyClient, usernamePasswordServerNode)) .cause() @@ -172,7 +172,7 @@ void testWrongPassword() throws Exception { clientConfig.setString("client.security.sasl.username", "root"); clientConfig.setString("client.security.sasl.password", "password2"); try (NettyClient nettyClient = - new NettyClient(clientConfig, TestingClientMetricGroup.newInstance(), false)) { + new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) { assertThatThrownBy( () -> verifyGetTableNamesList(nettyClient, usernamePasswordServerNode)) .cause() @@ -190,12 +190,12 @@ void testMultiClientsWithSameProtocol() throws Exception { clientConfig.setString("client.security.sasl.password", "password"); try (NettyClient nettyClient = - new NettyClient(clientConfig, TestingClientMetricGroup.newInstance(), false)) { + new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) { verifyGetTableNamesList(nettyClient, usernamePasswordServerNode); // client2 with wrong password after client1 successes to authenticate. clientConfig.setString("client.security.sasl.password", "password2"); try (NettyClient nettyClient2 = - new NettyClient(clientConfig, TestingClientMetricGroup.newInstance(), false)) { + new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) { assertThatThrownBy( () -> verifyGetTableNamesList( diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java index a96a826aa2..92e837e18b 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java @@ -206,7 +206,7 @@ private void testAuthentication(Configuration clientConfig, Configuration server new ServerNode( 1, "localhost", availablePort1.getPort(), ServerType.TABLET_SERVER); try (NettyClient nettyClient = - new NettyClient(clientConfig, TestingClientMetricGroup.newInstance(), false)) { + new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) { ListTablesRequest request = new ListTablesRequest().setDatabaseName("test-database"); ListTablesResponse listTablesResponse = diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java index 4c0ba7b148..329f5d2f28 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java @@ -70,7 +70,7 @@ public void setup() throws Exception { conf = new Configuration(); // 3 worker threads is enough for this test conf.setInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS, 3); - nettyClient = new NettyClient(conf, TestingClientMetricGroup.newInstance(), false); + nettyClient = new NettyClient(conf, TestingClientMetricGroup.newInstance()); buildNettyServer(1); } @@ -219,7 +219,7 @@ void testMultipleEndpoint() throws Exception { .get(); assertThat(nettyClient.connections().size()).isEqualTo(1); try (NettyClient client = - new NettyClient(conf, TestingClientMetricGroup.newInstance(), false)) { + new NettyClient(conf, TestingClientMetricGroup.newInstance())) { client.sendRequest( new ServerNode( 2, diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java index 41e4bef2c2..8ea4529283 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java @@ -126,8 +126,7 @@ void testConnectionClose() { serverNode, TestingClientMetricGroup.newInstance(), clientAuthenticator, - (con, ignore) -> {}, - false); + (con, ignore) -> {}); ConnectionState connectionState = connection.getConnectionState(); assertThat(connectionState).isEqualTo(ConnectionState.CONNECTING); @@ -157,20 +156,10 @@ void testConnectionMetrics() throws ExecutionException, InterruptedException { ClientMetricGroup client = new ClientMetricGroup(metricRegistry, "client"); ServerConnection connection = new ServerConnection( - bootstrap, - serverNode, - client, - clientAuthenticator, - (con, ignore) -> {}, - false); + bootstrap, serverNode, client, clientAuthenticator, (con, ignore) -> {}); ServerConnection connection2 = new ServerConnection( - bootstrap, - serverNode2, - client, - clientAuthenticator, - (con, ignore) -> {}, - false); + bootstrap, serverNode2, client, clientAuthenticator, (con, ignore) -> {}); LookupRequest request = new LookupRequest().setTableId(1); PbLookupReqForBucket pbLookupReqForBucket = request.addBucketsReq(); pbLookupReqForBucket.setBucketId(1); @@ -226,8 +215,7 @@ public ChannelFuture connect(String host, int port) { wrongServerTypeNode, TestingClientMetricGroup.newInstance(), clientAuthenticator, - (con, ignore) -> {}, - false); + (con, ignore) -> {}); // Pending request will be rejected with InvalidServerTypeException which is // InvalidRequestException. diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/MessageCodecTest.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/MessageCodecTest.java index 29c9f14895..eb5d492cb4 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/MessageCodecTest.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/MessageCodecTest.java @@ -62,7 +62,7 @@ class MessageCodecTest { @BeforeEach void beforeEach() { this.responseReceiver = new ResponseReceiver(); - this.clientHandler = new NettyClientHandler(responseReceiver, false); + this.clientHandler = new NettyClientHandler(responseReceiver); this.requestChannel = new RequestChannel(100); MetricGroup metricGroup = NOPMetricsGroup.newInstance(); this.serverHandler = diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index 1f9f3306e5..e2c54c268f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -255,7 +255,7 @@ protected void startServices() throws Exception { zkClient, this::registerCoordinatorLeader, this); this.clientMetricGroup = new ClientMetricGroup(metricRegistry, SERVER_NAME); - this.rpcClient = RpcClient.create(conf, clientMetricGroup, true); + this.rpcClient = RpcClient.create(conf, clientMetricGroup); this.coordinatorChannelManager = new CoordinatorChannelManager(rpcClient); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index 65ab96324f..26f45d23c9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -240,7 +240,7 @@ protected void startServices() throws Exception { // to fetch log. this.clientMetricGroup = new ClientMetricGroup(metricRegistry, SERVER_NAME + "-" + serverId); - this.rpcClient = RpcClient.create(conf, clientMetricGroup, true); + this.rpcClient = RpcClient.create(conf, clientMetricGroup); this.coordinatorGateway = GatewayClientProxy.createGatewayProxy( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/ServerITCaseBase.java b/fluss-server/src/test/java/org/apache/fluss/server/ServerITCaseBase.java index 254d1a4acf..4055613ce1 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/ServerITCaseBase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/ServerITCaseBase.java @@ -126,8 +126,7 @@ private void waitUntilServerStartup(TestProcessBuilder.TestProcess process) { private void testConnectionToServer() throws Exception { try (NettyClient client = - new NettyClient( - new Configuration(), TestingClientMetricGroup.newInstance(), false)) { + new NettyClient(new Configuration(), TestingClientMetricGroup.newInstance())) { RpcGateway gateway = GatewayClientProxy.createGatewayProxy( this::getServerNode, client, getRpcGatewayClass()); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java index bd4467c0a2..9d8eda15bf 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java @@ -54,8 +54,7 @@ void testCoordinatorChannelManager() throws Exception { Configuration configuration = new Configuration(); CoordinatorChannelManager coordinatorChannelManager = new CoordinatorChannelManager( - RpcClient.create( - configuration, TestingClientMetricGroup.newInstance(), false)); + RpcClient.create(configuration, TestingClientMetricGroup.newInstance())); List tabletServersNode = FLUSS_CLUSTER_EXTENSION.getTabletServerNodes(); // test start up using server 0 diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java index 4233f7c940..9f6c130640 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java @@ -544,8 +544,7 @@ void testMetadata(boolean isCoordinatorServer) throws Exception { configuration, new ClientMetricGroup( MetricRegistry.create(configuration, null), - "fluss-cluster-extension"), - false)) { + "fluss-cluster-extension"))) { ServerNode serverNode = FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode(CLIENT_LISTENER); AdminGateway adminGatewayForClient = diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorChannelManager.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorChannelManager.java index 3ed99d6c9d..a25c4f4cee 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorChannelManager.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorChannelManager.java @@ -36,7 +36,7 @@ public TestCoordinatorChannelManager() { } public TestCoordinatorChannelManager(Map gateways) { - super(RpcClient.create(new Configuration(), TestingClientMetricGroup.newInstance(), false)); + super(RpcClient.create(new Configuration(), TestingClientMetricGroup.newInstance())); this.gateways = gateways; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java index 70578b6144..c7c7591d5a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java @@ -294,8 +294,7 @@ private ReplicaStateMachine createReplicaStateMachine(CoordinatorContext coordin new CoordinatorChannelManager( RpcClient.create( new Configuration(), - TestingClientMetricGroup.newInstance(), - false)), + TestingClientMetricGroup.newInstance())), (event) -> { // do nothing }, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java index b029c75e93..bbac45c6b3 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java @@ -272,8 +272,7 @@ void testStateChangeToOnline() throws Exception { new CoordinatorChannelManager( RpcClient.create( new Configuration(), - TestingClientMetricGroup.newInstance(), - false)), + TestingClientMetricGroup.newInstance())), coordinatorContext, autoPartitionManager, lakeTableTieringManager, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java index 4b1474f641..f829ca8924 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java @@ -210,7 +210,7 @@ public void setup() throws Exception { new LakeCatalogDynamicLoader(new Configuration(), null, true))); initMetadataCache(serverMetadataCache); - rpcClient = RpcClient.create(conf, TestingClientMetricGroup.newInstance(), false); + rpcClient = RpcClient.create(conf, TestingClientMetricGroup.newInstance()); snapshotReporter = new TestingCompletedKvSnapshotCommitter(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java index 0501d0374b..760b4dd17c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java @@ -442,7 +442,7 @@ private ReplicaManager createReplicaManager(int serverId) throws Exception { null, conf, new LakeCatalogDynamicLoader(conf, null, true))), - RpcClient.create(conf, TestingClientMetricGroup.newInstance(), false), + RpcClient.create(conf, TestingClientMetricGroup.newInstance()), TestingMetricGroups.TABLET_SERVER_METRICS, manualClock, ioExecutor); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index baabd58cb1..bffa3bbec2 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -219,8 +219,7 @@ public void start() throws Exception { RpcClient.create( conf, new ClientMetricGroup( - MetricRegistry.create(conf, null), "fluss-cluster-extension"), - false); + MetricRegistry.create(conf, null), "fluss-cluster-extension")); startCoordinatorServer(); startTabletServers(); // wait coordinator knows all tablet servers to make cluster diff --git a/fluss-server/src/test/java/org/apache/fluss/server/utils/RpcGatewayManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/utils/RpcGatewayManagerTest.java index f059358a32..8e07921a7c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/utils/RpcGatewayManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/utils/RpcGatewayManagerTest.java @@ -36,7 +36,7 @@ void testRpcGatewayManage() throws Exception { RpcGatewayManager gatewayRpcGatewayManager = new RpcGatewayManager<>( new NettyClient( - new Configuration(), TestingClientMetricGroup.newInstance(), false), + new Configuration(), TestingClientMetricGroup.newInstance()), TabletServerGateway.class); ServerNode serverNode1 =