Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ boolean isConsumed() {
return isConsumed;
}

public int getSizeInBytes() {
return sizeInBytes;
}

boolean isInitialized() {
return initialized;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ public LogFetchCollector(
* @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
*/
public Map<TableBucket, List<ScanRecord>> collectFetch(final LogFetchBuffer logFetchBuffer) {
public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
Map<TableBucket, List<ScanRecord>> fetched = new HashMap<>();
int recordsRemaining = maxPollRecords;
long totalBytesRead = 0;

try {
while (recordsRemaining > 0) {
Expand Down Expand Up @@ -135,6 +136,11 @@ public Map<TableBucket, List<ScanRecord>> collectFetch(final LogFetchBuffer logF

recordsRemaining -= records.size();
}

// Only count bytes when the fetch is fully consumed
if (nextInLineFetch.isConsumed()) {
totalBytesRead += nextInLineFetch.getSizeInBytes();
}
}
}
} catch (FetchException e) {
Expand All @@ -143,7 +149,7 @@ public Map<TableBucket, List<ScanRecord>> collectFetch(final LogFetchBuffer logF
}
}

return fetched;
return new ScanRecords(fetched, totalBytesRead);
}

private List<ScanRecord> fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.client.metrics.ScannerMetricGroup;
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.cluster.BucketLocation;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
Expand Down Expand Up @@ -161,7 +160,7 @@ public boolean hasAvailableFetches() {
return !logFetchBuffer.isEmpty();
}

public Map<TableBucket, List<ScanRecord>> collectFetch() {
public ScanRecords collectFetch() {
return logFetchCollector.collectFetch(logFetchBuffer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.client.metrics.ScannerMetricGroup;
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.WakeupException;
import org.apache.fluss.metadata.SchemaGetter;
Expand All @@ -41,8 +40,6 @@
import java.time.Duration;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -141,25 +138,25 @@ public ScanRecords poll(Duration timeout) {
long timeoutNanos = timeout.toNanos();
long startNanos = System.nanoTime();
do {
Map<TableBucket, List<ScanRecord>> fetchRecords = pollForFetches();
if (fetchRecords.isEmpty()) {
ScanRecords scanRecords = pollForFetches();
if (scanRecords.isEmpty()) {
try {
if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) {
// logFetcher waits for the timeout and no data in buffer,
// so we return empty
return new ScanRecords(fetchRecords);
return scanRecords;
}
} catch (WakeupException e) {
// wakeup() is called, we need to return empty
return new ScanRecords(fetchRecords);
return scanRecords;
}
} else {
// before returning the fetched records, we can send off the next round of
// fetches and avoid block waiting for their responses to enable pipelining
// while the user is handling the fetched records.
logFetcher.sendFetches();

return new ScanRecords(fetchRecords);
return scanRecords;
}
} while (System.nanoTime() - startNanos < timeoutNanos);

Expand Down Expand Up @@ -247,10 +244,10 @@ public void wakeup() {
logFetcher.wakeup();
}

private Map<TableBucket, List<ScanRecord>> pollForFetches() {
Map<TableBucket, List<ScanRecord>> fetchedRecords = logFetcher.collectFetch();
if (!fetchedRecords.isEmpty()) {
return fetchedRecords;
private ScanRecords pollForFetches() {
ScanRecords scanRecords = logFetcher.collectFetch();
if (!scanRecords.isEmpty()) {
return scanRecords;
}

// send any new fetches (won't resend pending fetches).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,27 @@
*/
@PublicEvolving
public class ScanRecords implements Iterable<ScanRecord> {
public static final ScanRecords EMPTY = new ScanRecords(Collections.emptyMap());
public static final ScanRecords EMPTY = new ScanRecords(Collections.emptyMap(), 0);

private final Map<TableBucket, List<ScanRecord>> records;
private final long totalBytesRead;

public ScanRecords(Map<TableBucket, List<ScanRecord>> records) {
this(records, 0);
}

public ScanRecords(Map<TableBucket, List<ScanRecord>> records, long totalBytesRead) {
this.records = records;
this.totalBytesRead = totalBytesRead;
}

/**
* Get the total bytes read from the Fluss log in this batch.
*
* @return the total bytes read
*/
public long getTotalBytesRead() {
return totalBytesRead;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.client.metadata.TestingMetadataUpdater;
import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.LogRecordReadContext;
Expand All @@ -31,7 +30,6 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.fluss.record.TestData.DATA1;
Expand Down Expand Up @@ -97,10 +95,9 @@ void testNormal() throws Exception {
assertThat(completedFetch.isInitialized()).isFalse();

// Fetch the data and validate that we get all the records we want back.
Map<TableBucket, List<ScanRecord>> bucketAndRecords =
logFetchCollector.collectFetch(logFetchBuffer);
assertThat(bucketAndRecords.size()).isEqualTo(1);
assertThat(bucketAndRecords.get(tb)).size().isEqualTo(10);
ScanRecords bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
assertThat(bucketAndRecords.buckets().size()).isEqualTo(1);
assertThat(bucketAndRecords.records(tb).size()).isEqualTo(10);

// When we collected the data from the buffer, this will cause the completed fetch to get
// initialized.
Expand All @@ -122,7 +119,7 @@ void testNormal() throws Exception {

// Now attempt to collect more records from the fetch buffer.
bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
assertThat(bucketAndRecords.size()).isEqualTo(0);
assertThat(bucketAndRecords.buckets().size()).isEqualTo(0);
}

@Test
Expand All @@ -147,14 +144,48 @@ void testCollectAfterUnassign() throws Exception {
// unassign bucket 2
logScannerStatus.unassignScanBuckets(Collections.singletonList(tb2));

Map<TableBucket, List<ScanRecord>> bucketAndRecords =
logFetchCollector.collectFetch(logFetchBuffer);
ScanRecords bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
// should only contain records for bucket 1
assertThat(bucketAndRecords.keySet()).containsExactly(tb1);
assertThat(bucketAndRecords.buckets()).containsExactly(tb1);

// collect again, should be empty
bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
assertThat(bucketAndRecords.size()).isEqualTo(0);
assertThat(bucketAndRecords.buckets().size()).isEqualTo(0);
}

@Test
void testTotalBytesRead() throws Exception {
TableBucket tb1 = new TableBucket(DATA1_TABLE_ID, 1L, 1);
TableBucket tb2 = new TableBucket(DATA1_TABLE_ID, 1L, 2);
Map<TableBucket, Long> scanBuckets = new HashMap<>();
scanBuckets.put(tb1, 0L);
scanBuckets.put(tb2, 0L);
logScannerStatus.assignScanBuckets(scanBuckets);

CompletedFetch completedFetch1 =
makeCompletedFetch(
tb1,
new FetchLogResultForBucket(tb1, genMemoryLogRecordsByObject(DATA1), 10L),
0L);
CompletedFetch completedFetch2 =
makeCompletedFetch(
tb2,
new FetchLogResultForBucket(tb2, genMemoryLogRecordsByObject(DATA1), 10L),
0L);

logFetchBuffer.add(completedFetch1);
logFetchBuffer.add(completedFetch2);

ScanRecords scanRecords = logFetchCollector.collectFetch(logFetchBuffer);

// Both fetches should be fully consumed
assertThat(completedFetch1.isConsumed()).isTrue();
assertThat(completedFetch2.isConsumed()).isTrue();

// totalBytesRead should be the sum of both completed fetches
long expectedBytes = completedFetch1.getSizeInBytes() + completedFetch2.getSizeInBytes();
assertThat(scanRecords.getTotalBytesRead()).isEqualTo(expectedBytes);
assertThat(scanRecords.getTotalBytesRead()).isGreaterThan(0);
}

private DefaultCompletedFetch makeCompletedFetch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ void testFetchWithSchemaChange() throws Exception {
assertThat(logFetcher.hasAvailableFetches()).isTrue();
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
});
Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
assertThat(records.size()).isEqualTo(1);
List<ScanRecord> scanRecords = records.get(tb0);
ScanRecords records = logFetcher.collectFetch();
assertThat(records.buckets().size()).isEqualTo(1);
List<ScanRecord> scanRecords = records.records(tb0);
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
.isEqualTo(expectedRows);

Expand Down Expand Up @@ -193,9 +193,9 @@ void testFetchWithSchemaChange() throws Exception {
assertThat(newSchemaLogFetcher.getCompletedFetchesSize()).isEqualTo(2);
});
records = newSchemaLogFetcher.collectFetch();
assertThat(records.size()).isEqualTo(1);
assertThat(records.get(tb0)).hasSize(20);
scanRecords = records.get(tb0);
assertThat(records.buckets().size()).isEqualTo(1);
assertThat(records.records(tb0)).hasSize(20);
scanRecords = records.records(tb0);
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
.isEqualTo(expectedRows);
newSchemaLogFetcher.close();
Expand Down Expand Up @@ -226,10 +226,10 @@ void testFetch() throws Exception {
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
});

Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
assertThat(records.size()).isEqualTo(2);
assertThat(records.get(tb0).size()).isEqualTo(10);
assertThat(records.get(tb1).size()).isEqualTo(10);
ScanRecords records = logFetcher.collectFetch();
assertThat(records.buckets().size()).isEqualTo(2);
assertThat(records.records(tb0).size()).isEqualTo(10);
assertThat(records.records(tb1).size()).isEqualTo(10);

// after collect fetch, the fetcher is empty.
assertThat(logFetcher.hasAvailableFetches()).isFalse();
Expand Down Expand Up @@ -297,9 +297,9 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception {
assertThat(logFetcher.hasAvailableFetches()).isTrue();
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(1);
});
Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
assertThat(records.size()).isEqualTo(1);
assertThat(records.get(tb0).size()).isEqualTo(10);
ScanRecords records = logFetcher.collectFetch();
assertThat(records.buckets().size()).isEqualTo(1);
assertThat(records.records(tb0).size()).isEqualTo(10);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,12 @@ public class MetricNames {
public static final String NETTY_NUM_ALLOCATIONS_PER_SECONDS = "numAllocationsPerSecond";
public static final String NETTY_NUM_HUGE_ALLOCATIONS_PER_SECONDS =
"numHugeAllocationsPerSecond";

// --------------------------------------------------------------------------------------------
// metrics for tiering service
// --------------------------------------------------------------------------------------------

// for lake tiering metrics - operator level
public static final String TIERING_SERVICE_READ_BYTES = "readBytes";
public static final String TIERING_SERVICE_READ_BYTES_RATE = "readBytesPerSecond";
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.fluss.client.Connection;
import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter;
import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics;
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
import org.apache.fluss.flink.tiering.source.state.TieringSplitState;
import org.apache.fluss.lake.writer.LakeTieringFactory;
Expand Down Expand Up @@ -73,17 +74,31 @@ public TieringSourceReader(
Duration pollTimeout) {
super(
elementsQueue,
new TieringSourceFetcherManager<>(
elementsQueue,
() -> new TieringSplitReader<>(connection, lakeTieringFactory, pollTimeout),
context.getConfiguration(),
(ignore) -> {}),
createFetcherManager(
elementsQueue, context, connection, lakeTieringFactory, pollTimeout),
new TableBucketWriteResultEmitter<>(),
context.getConfiguration(),
context);
this.connection = connection;
}

private static <WriteResult> TieringSourceFetcherManager<WriteResult> createFetcherManager(
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
elementsQueue,
SourceReaderContext context,
Connection connection,
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
Duration pollTimeout) {
TieringMetrics tieringMetrics = new TieringMetrics(context.metricGroup());
return new TieringSourceFetcherManager<>(
elementsQueue,
() ->
new TieringSplitReader<>(
connection, lakeTieringFactory, pollTimeout, tieringMetrics),
context.getConfiguration(),
(ignore) -> {});
}

@Override
public void start() {
// we request a split only if we did not get splits during the checkpoint restore
Expand Down
Loading