Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public final class FlussConnection implements Connection {
String clientId = conf.getString(ConfigOptions.CLIENT_ID);
this.metricRegistry = metricRegistry;
this.clientMetricGroup = new ClientMetricGroup(metricRegistry, clientId);
this.rpcClient = RpcClient.create(conf, clientMetricGroup, false);
this.rpcClient = RpcClient.create(conf, clientMetricGroup);

// TODO this maybe remove after we introduce client metadata.
this.metadataUpdater = new MetadataUpdater(conf, rpcClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.fluss.record.LogRecordReadContext;
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
import org.apache.fluss.rpc.messages.FetchLogRequest;
import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;

import javax.annotation.Nullable;

/**
* {@link DefaultCompletedFetch} is a {@link CompletedFetch} that represents a completed fetch that
Expand All @@ -30,13 +33,21 @@
@Internal
class DefaultCompletedFetch extends CompletedFetch {

/**
* The parsed ByteBuf backing the lazily-parsed records data. This reference is retained to keep
* the underlying network buffer alive while the records are being consumed. Released in {@link
* #drain()}.
*/
@Nullable private ByteBuf parsedByteBuf;

public DefaultCompletedFetch(
TableBucket tableBucket,
FetchLogResultForBucket fetchLogResultForBucket,
LogRecordReadContext readContext,
LogScannerStatus logScannerStatus,
boolean isCheckCrc,
Long fetchOffset) {
Long fetchOffset,
@Nullable ByteBuf parsedByteBuf) {
super(
tableBucket,
fetchLogResultForBucket.getError(),
Expand All @@ -47,5 +58,18 @@ public DefaultCompletedFetch(
logScannerStatus,
isCheckCrc,
fetchOffset);
this.parsedByteBuf = parsedByteBuf;
}

@Override
void drain() {
try {
super.drain();
} finally {
if (parsedByteBuf != null) {
parsedByteBuf.release();
parsedByteBuf = null;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.fluss.rpc.messages.PbFetchLogRespForTable;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.Errors;
import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.fluss.utils.IOUtils;
import org.apache.fluss.utils.Projection;

Expand Down Expand Up @@ -340,6 +341,11 @@ private void handleFetchLogException(
/** Implements the core logic for a successful fetch log response. */
private synchronized void handleFetchLogResponse(
int destination, long requestStartTime, FetchLogResponse fetchLogResponse) {
// Capture the parsed ByteBuf for buffer lifecycle management. The response may
// have been lazily parsed from the network buffer. Each DefaultCompletedFetch
// that references the buffer's records data must retain it. We release the base
// reference in the finally block.
ByteBuf parsedByteBuf = fetchLogResponse.getParsedByteBuf();
try {
if (isClosed) {
return;
Expand Down Expand Up @@ -387,6 +393,11 @@ private synchronized void handleFetchLogResponse(
LogRecords logRecords = fetchResultForBucket.recordsOrEmpty();
if (!MemoryLogRecords.EMPTY.equals(logRecords)
|| fetchResultForBucket.getErrorCode() != Errors.NONE.code()) {
// Retain the parsed buffer so it stays alive while
// this CompletedFetch's records are being consumed.
if (parsedByteBuf != null) {
parsedByteBuf.retain();
}
// In oder to not signal notEmptyCondition, add completed
// fetch to buffer until log records is not empty.
DefaultCompletedFetch completedFetch =
Expand All @@ -398,14 +409,21 @@ private synchronized void handleFetchLogResponse(
// skipping CRC check if projection push downed as
// the data is pruned
isCheckCrcs,
fetchOffset);
fetchOffset,
parsedByteBuf);
logFetchBuffer.add(completedFetch);
}
}
}
}
}
} finally {
// Release the base reference from the network buffer. Any CompletedFetch
// objects created above hold their own retained references, keeping the
// buffer alive until they are drained.
if (parsedByteBuf != null) {
parsedByteBuf.release();
}
LOG.debug("Removing pending request for node: {}", destination);
nodesWithPendingFetchRequests.remove(destination);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class MetadataUpdaterTest {
void testInitializeClusterWithRetries() throws Exception {
Configuration configuration = new Configuration();
RpcClient rpcClient =
RpcClient.create(configuration, TestingClientMetricGroup.newInstance(), false);
RpcClient.create(configuration, TestingClientMetricGroup.newInstance());

// retry lower than max retry count.
AdminReadOnlyGateway gateway = new TestingAdminReadOnlyGateway(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public TestingClientSchemaGetter(
tablePath,
latestSchemaInfo,
new FlussAdmin(
RpcClient.create(conf, TestingClientMetricGroup.newInstance(), false),
RpcClient.create(conf, TestingClientMetricGroup.newInstance()),
metadataUpdater));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public TestingMetadataUpdater(
Map<Integer, TestTabletServerGateway> customGateways,
Configuration conf) {
super(
RpcClient.create(conf, TestingClientMetricGroup.newInstance(), false),
RpcClient.create(conf, TestingClientMetricGroup.newInstance()),
conf,
Cluster.empty());
initializeCluster(coordinatorServer, tabletServers, tableInfos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ void testGetMetaInfo() throws Exception {
Collections.singleton(DATA1_TABLE_PATH_PK), null, null);

try (RpcClient rpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance(), false)) {
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) {
AdminGateway guestGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
Expand Down Expand Up @@ -836,7 +836,7 @@ void testControlledShutdown() throws Exception {
new ControlledShutdownRequest().setTabletServerId(-1).setTabletServerEpoch(-1);

try (RpcClient rpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance(), false)) {
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) {
CoordinatorGateway guestGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.client.table.scanner.log;

import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.LogRecordReadContext;
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.fluss.shaded.netty4.io.netty.buffer.Unpooled;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.apache.fluss.record.TestData.DATA1;
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
import static org.apache.fluss.record.TestData.TEST_SCHEMA_GETTER;
import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link DefaultCompletedFetch} zero-copy buffer lifecycle (issue #1184). */
class DefaultCompletedFetchBufferLifecycleTest {

private final TableBucket tb0 = new TableBucket(1, 0);
private final TableBucket tb1 = new TableBucket(1, 1);
private LogScannerStatus logScannerStatus;
private LogRecordReadContext readContext;

@BeforeEach
void setup() {
Map<TableBucket, Long> scanBuckets = new HashMap<>();
scanBuckets.put(tb0, 0L);
scanBuckets.put(tb1, 0L);
logScannerStatus = new LogScannerStatus();
logScannerStatus.assignScanBuckets(scanBuckets);
readContext =
LogRecordReadContext.createArrowReadContext(
DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, TEST_SCHEMA_GETTER);
}

@AfterEach
void afterEach() {
if (readContext != null) {
readContext.close();
readContext = null;
}
}

@Test
void testBufferReleasedOnDrain() throws Exception {
ByteBuf buf = Unpooled.buffer(64);
buf.writeBytes(new byte[64]);

buf.retain();
DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf);
buf.release(); // base reference
assertThat(buf.refCnt()).isEqualTo(1);

fetch.drain();
assertThat(buf.refCnt()).isEqualTo(0);
}

@Test
void testMultipleCompletedFetchShareBuffer() throws Exception {
ByteBuf buf = Unpooled.buffer(64);
buf.writeBytes(new byte[64]);

buf.retain();
DefaultCompletedFetch fetch1 = makeCompletedFetch(tb0, buf);
buf.retain();
DefaultCompletedFetch fetch2 = makeCompletedFetch(tb1, buf);
buf.release(); // base reference
assertThat(buf.refCnt()).isEqualTo(2);

fetch1.drain();
assertThat(buf.refCnt()).isEqualTo(1);

fetch2.drain();
assertThat(buf.refCnt()).isEqualTo(0);
}

@Test
void testDrainIsIdempotent() throws Exception {
ByteBuf buf = Unpooled.buffer(64);
buf.writeBytes(new byte[64]);
buf.retain();

DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf);
buf.release();

fetch.drain();
assertThat(buf.refCnt()).isEqualTo(0);
fetch.drain(); // no-op, no exception
}

@Test
void testNullBufferHandledGracefully() throws Exception {
DefaultCompletedFetch fetch = makeCompletedFetch(tb0, null);
fetch.drain();
}

@Test
void testLogFetchBufferCloseReleasesBuffer() throws Exception {
ByteBuf buf = Unpooled.buffer(64);
buf.writeBytes(new byte[64]);

buf.retain();
DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf);
buf.release();

try (LogFetchBuffer logFetchBuffer = new LogFetchBuffer()) {
logFetchBuffer.add(fetch);
assertThat(buf.refCnt()).isEqualTo(1);
}
assertThat(buf.refCnt()).isEqualTo(0);
}

@Test
void testRetainAllUnsubscribeReleasesBuffer() throws Exception {
ByteBuf buf = Unpooled.buffer(64);
buf.writeBytes(new byte[64]);

buf.retain();
DefaultCompletedFetch fetch = makeCompletedFetch(tb0, buf);
buf.release();

try (LogFetchBuffer logFetchBuffer = new LogFetchBuffer()) {
logFetchBuffer.add(fetch);
logFetchBuffer.retainAll(Collections.singleton(tb1));
assertThat(buf.refCnt()).isEqualTo(0);
}
}

private DefaultCompletedFetch makeCompletedFetch(TableBucket tableBucket, ByteBuf parsedByteBuf)
throws Exception {
return new DefaultCompletedFetch(
tableBucket,
new FetchLogResultForBucket(tableBucket, genMemoryLogRecordsByObject(DATA1), 10L),
readContext,
logScannerStatus,
true,
0L,
parsedByteBuf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ void testComplexTypeFetch() throws Exception {
tableInfo.getSchemaId(), tableInfo.getSchema())),
logScannerStatus,
true,
fetchOffset);
fetchOffset,
null);
List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(3);
// close the read context to release arrow root resource,
// this is important to test complex types
Expand Down Expand Up @@ -398,7 +399,8 @@ private DefaultCompletedFetch makeCompletedFetch(
new TestingSchemaGetter(tableInfo.getSchemaId(), tableInfo.getSchema())),
logScannerStatus,
true,
offset);
offset,
null);
}

private static Collection<Arguments> typeAndMagic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ private DefaultCompletedFetch makeCompletedFetch(TableBucket tableBucket) throws
readContext,
logScannerStatus,
true,
0L);
0L,
null);
}

private PendingFetch makePendingFetch(TableBucket tableBucket) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,6 @@ void testCollectAfterUnassign() throws Exception {
private DefaultCompletedFetch makeCompletedFetch(
TableBucket tableBucket, FetchLogResultForBucket resultForBucket, long offset) {
return new DefaultCompletedFetch(
tableBucket, resultForBucket, readContext, logScannerStatus, true, offset);
tableBucket, resultForBucket, readContext, logScannerStatus, true, offset, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,7 @@ private RecordAccumulator createTestRecordAccumulator(
conf.getInt(ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET),
GatewayClientProxy.createGatewayProxy(
() -> cluster.getRandomTabletServer(),
RpcClient.create(
conf, TestingClientMetricGroup.newInstance(), false),
RpcClient.create(conf, TestingClientMetricGroup.newInstance()),
TabletServerGateway.class),
null),
TestingWriterMetricGroup.newInstance(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ public void open() {
String clientId = flussConf.getString(ConfigOptions.CLIENT_ID);
MetricRegistry metricRegistry = MetricRegistry.create(flussConf, null);
// don't care about metrics, but pass a ClientMetricGroup to make compiler happy
rpcClient =
RpcClient.create(flussConf, new ClientMetricGroup(metricRegistry, clientId), false);
rpcClient = RpcClient.create(flussConf, new ClientMetricGroup(metricRegistry, clientId));
MetadataUpdater metadataUpdater = new MetadataUpdater(flussConf, rpcClient);
this.coordinatorGateway =
GatewayClientProxy.createGatewayProxy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void start() {
FlinkMetricRegistry metricRegistry = new FlinkMetricRegistry(enumeratorMetricGroup);
ClientMetricGroup clientMetricGroup =
new ClientMetricGroup(metricRegistry, "LakeTieringService");
this.rpcClient = RpcClient.create(flussConf, clientMetricGroup, false);
this.rpcClient = RpcClient.create(flussConf, clientMetricGroup);
MetadataUpdater metadataUpdater = new MetadataUpdater(flussConf, rpcClient);
this.coordinatorGateway =
GatewayClientProxy.createGatewayProxy(
Expand Down
Loading