From 5c3992cce6748fb5307f63e52a7946198eb2ee80 Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Mon, 30 Mar 2026 17:16:39 +0800 Subject: [PATCH 1/2] [lake/tiering] add bytes read tracking for log fetch operations --- .../table/scanner/log/CompletedFetch.java | 4 ++ .../table/scanner/log/LogFetchCollector.java | 10 ++- .../client/table/scanner/log/LogFetcher.java | 3 +- .../table/scanner/log/LogScannerImpl.java | 21 +++--- .../client/table/scanner/log/ScanRecords.java | 17 ++++- .../scanner/log/LogFetchCollectorTest.java | 53 ++++++++++++--- .../table/scanner/log/LogFetcherITCase.java | 26 +++---- .../table/scanner/log/ScanRecordsTest.java | 14 ++++ .../org/apache/fluss/metrics/MetricNames.java | 8 +++ .../tiering/source/TieringSourceReader.java | 25 +++++-- .../tiering/source/TieringSplitReader.java | 23 ++++++- .../source/metrics/TieringMetrics.java | 67 +++++++++++++++++++ .../tiering/source/TieringMetricsTest.java | 53 +++++++++++++++ .../source/TieringSplitReaderTest.java | 10 ++- fluss-test-coverage/pom.xml | 1 + .../observability/monitor-metrics.md | 32 ++++++++- 16 files changed, 316 insertions(+), 51 deletions(-) create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java create mode 100644 fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringMetricsTest.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java index 8f29f3ef34..db75675a31 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java @@ -109,6 +109,10 @@ boolean isConsumed() { return isConsumed; } + public int getSizeInBytes() { + return sizeInBytes; + } + boolean isInitialized() { return initialized; } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java index 34a3ec86e6..b02f0b7804 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java @@ -83,9 +83,10 @@ public LogFetchCollector( * @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and * the defaultResetPolicy is NONE */ - public Map> collectFetch(final LogFetchBuffer logFetchBuffer) { + public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) { Map> fetched = new HashMap<>(); int recordsRemaining = maxPollRecords; + long totalBytesRead = 0; try { while (recordsRemaining > 0) { @@ -135,6 +136,11 @@ public Map> collectFetch(final LogFetchBuffer logF recordsRemaining -= records.size(); } + + // Only count bytes when the fetch is fully consumed + if (nextInLineFetch.isConsumed()) { + totalBytesRead += nextInLineFetch.getSizeInBytes(); + } } } } catch (FetchException e) { @@ -143,7 +149,7 @@ public Map> collectFetch(final LogFetchBuffer logF } } - return fetched; + return new ScanRecords(fetched, totalBytesRead); } private List fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index b5a03a1caf..c8f87984b0 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -22,7 +22,6 @@ import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.metrics.ScannerMetricGroup; import org.apache.fluss.client.table.scanner.RemoteFileDownloader; -import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.cluster.BucketLocation; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; @@ -161,7 +160,7 @@ public boolean hasAvailableFetches() { return !logFetchBuffer.isEmpty(); } - public Map> collectFetch() { + public ScanRecords collectFetch() { return logFetchCollector.collectFetch(logFetchBuffer); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java index 33181bd7ae..9c24dbeb77 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java @@ -21,7 +21,6 @@ import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.metrics.ScannerMetricGroup; import org.apache.fluss.client.table.scanner.RemoteFileDownloader; -import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.WakeupException; import org.apache.fluss.metadata.SchemaGetter; @@ -41,8 +40,6 @@ import java.time.Duration; import java.util.Collections; import java.util.ConcurrentModificationException; -import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -141,17 +138,17 @@ public ScanRecords poll(Duration timeout) { long timeoutNanos = timeout.toNanos(); long startNanos = System.nanoTime(); do { - Map> fetchRecords = pollForFetches(); - if (fetchRecords.isEmpty()) { + ScanRecords scanRecords = pollForFetches(); + if (scanRecords.isEmpty()) { try { if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) { // logFetcher waits for the timeout and no data in buffer, // so we return empty - return new ScanRecords(fetchRecords); + return scanRecords; } } catch (WakeupException e) { // wakeup() is called, we need to return empty - return new ScanRecords(fetchRecords); + return scanRecords; } } else { // before returning the fetched records, we can send off the next round of @@ -159,7 +156,7 @@ public ScanRecords poll(Duration timeout) { // while the user is handling the fetched records. logFetcher.sendFetches(); - return new ScanRecords(fetchRecords); + return scanRecords; } } while (System.nanoTime() - startNanos < timeoutNanos); @@ -247,10 +244,10 @@ public void wakeup() { logFetcher.wakeup(); } - private Map> pollForFetches() { - Map> fetchedRecords = logFetcher.collectFetch(); - if (!fetchedRecords.isEmpty()) { - return fetchedRecords; + private ScanRecords pollForFetches() { + ScanRecords scanRecords = logFetcher.collectFetch(); + if (!scanRecords.isEmpty()) { + return scanRecords; } // send any new fetches (won't resend pending fetches). diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java index 9d58c22b49..c9a7271daa 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java @@ -37,12 +37,27 @@ */ @PublicEvolving public class ScanRecords implements Iterable { - public static final ScanRecords EMPTY = new ScanRecords(Collections.emptyMap()); + public static final ScanRecords EMPTY = new ScanRecords(Collections.emptyMap(), 0); private final Map> records; + private final long totalBytesRead; public ScanRecords(Map> records) { + this(records, 0); + } + + public ScanRecords(Map> records, long totalBytesRead) { this.records = records; + this.totalBytesRead = totalBytesRead; + } + + /** + * Get the total bytes read from the Fluss log in this batch. + * + * @return the total bytes read + */ + public long getTotalBytesRead() { + return totalBytesRead; } /** diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java index 99a108e2f3..0edb5ea971 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java @@ -19,7 +19,6 @@ import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.metadata.TestingMetadataUpdater; -import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.config.Configuration; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.record.LogRecordReadContext; @@ -31,7 +30,6 @@ import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import static org.apache.fluss.record.TestData.DATA1; @@ -97,10 +95,9 @@ void testNormal() throws Exception { assertThat(completedFetch.isInitialized()).isFalse(); // Fetch the data and validate that we get all the records we want back. - Map> bucketAndRecords = - logFetchCollector.collectFetch(logFetchBuffer); - assertThat(bucketAndRecords.size()).isEqualTo(1); - assertThat(bucketAndRecords.get(tb)).size().isEqualTo(10); + ScanRecords bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer); + assertThat(bucketAndRecords.buckets().size()).isEqualTo(1); + assertThat(bucketAndRecords.records(tb).size()).isEqualTo(10); // When we collected the data from the buffer, this will cause the completed fetch to get // initialized. @@ -122,7 +119,7 @@ void testNormal() throws Exception { // Now attempt to collect more records from the fetch buffer. bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer); - assertThat(bucketAndRecords.size()).isEqualTo(0); + assertThat(bucketAndRecords.buckets().size()).isEqualTo(0); } @Test @@ -147,14 +144,48 @@ void testCollectAfterUnassign() throws Exception { // unassign bucket 2 logScannerStatus.unassignScanBuckets(Collections.singletonList(tb2)); - Map> bucketAndRecords = - logFetchCollector.collectFetch(logFetchBuffer); + ScanRecords bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer); // should only contain records for bucket 1 - assertThat(bucketAndRecords.keySet()).containsExactly(tb1); + assertThat(bucketAndRecords.buckets()).containsExactly(tb1); // collect again, should be empty bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer); - assertThat(bucketAndRecords.size()).isEqualTo(0); + assertThat(bucketAndRecords.buckets().size()).isEqualTo(0); + } + + @Test + void testTotalBytesRead() throws Exception { + TableBucket tb1 = new TableBucket(DATA1_TABLE_ID, 1L, 1); + TableBucket tb2 = new TableBucket(DATA1_TABLE_ID, 1L, 2); + Map scanBuckets = new HashMap<>(); + scanBuckets.put(tb1, 0L); + scanBuckets.put(tb2, 0L); + logScannerStatus.assignScanBuckets(scanBuckets); + + CompletedFetch completedFetch1 = + makeCompletedFetch( + tb1, + new FetchLogResultForBucket(tb1, genMemoryLogRecordsByObject(DATA1), 10L), + 0L); + CompletedFetch completedFetch2 = + makeCompletedFetch( + tb2, + new FetchLogResultForBucket(tb2, genMemoryLogRecordsByObject(DATA1), 10L), + 0L); + + logFetchBuffer.add(completedFetch1); + logFetchBuffer.add(completedFetch2); + + ScanRecords scanRecords = logFetchCollector.collectFetch(logFetchBuffer); + + // Both fetches should be fully consumed + assertThat(completedFetch1.isConsumed()).isTrue(); + assertThat(completedFetch2.isConsumed()).isTrue(); + + // totalBytesRead should be the sum of both completed fetches + long expectedBytes = completedFetch1.getSizeInBytes() + completedFetch2.getSizeInBytes(); + assertThat(scanRecords.getTotalBytesRead()).isEqualTo(expectedBytes); + assertThat(scanRecords.getTotalBytesRead()).isGreaterThan(0); } private DefaultCompletedFetch makeCompletedFetch( diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java index 73c9b40fb6..dca64fb542 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java @@ -147,9 +147,9 @@ void testFetchWithSchemaChange() throws Exception { assertThat(logFetcher.hasAvailableFetches()).isTrue(); assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2); }); - Map> records = logFetcher.collectFetch(); - assertThat(records.size()).isEqualTo(1); - List scanRecords = records.get(tb0); + ScanRecords records = logFetcher.collectFetch(); + assertThat(records.buckets().size()).isEqualTo(1); + List scanRecords = records.records(tb0); assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList())) .isEqualTo(expectedRows); @@ -193,9 +193,9 @@ void testFetchWithSchemaChange() throws Exception { assertThat(newSchemaLogFetcher.getCompletedFetchesSize()).isEqualTo(2); }); records = newSchemaLogFetcher.collectFetch(); - assertThat(records.size()).isEqualTo(1); - assertThat(records.get(tb0)).hasSize(20); - scanRecords = records.get(tb0); + assertThat(records.buckets().size()).isEqualTo(1); + assertThat(records.records(tb0)).hasSize(20); + scanRecords = records.records(tb0); assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList())) .isEqualTo(expectedRows); newSchemaLogFetcher.close(); @@ -226,10 +226,10 @@ void testFetch() throws Exception { assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2); }); - Map> records = logFetcher.collectFetch(); - assertThat(records.size()).isEqualTo(2); - assertThat(records.get(tb0).size()).isEqualTo(10); - assertThat(records.get(tb1).size()).isEqualTo(10); + ScanRecords records = logFetcher.collectFetch(); + assertThat(records.buckets().size()).isEqualTo(2); + assertThat(records.records(tb0).size()).isEqualTo(10); + assertThat(records.records(tb1).size()).isEqualTo(10); // after collect fetch, the fetcher is empty. assertThat(logFetcher.hasAvailableFetches()).isFalse(); @@ -297,9 +297,9 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception { assertThat(logFetcher.hasAvailableFetches()).isTrue(); assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(1); }); - Map> records = logFetcher.collectFetch(); - assertThat(records.size()).isEqualTo(1); - assertThat(records.get(tb0).size()).isEqualTo(10); + ScanRecords records = logFetcher.collectFetch(); + assertThat(records.buckets().size()).isEqualTo(1); + assertThat(records.records(tb0).size()).isEqualTo(10); } @Test diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java index db4a326676..ab841ccd5a 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java @@ -57,4 +57,18 @@ void iterator() { } assertThat(c).isEqualTo(4); } + + @Test + void testTotalBytesRead() { + Map> records = new LinkedHashMap<>(); + long tableId = 0; + ScanRecord record1 = new ScanRecord(0L, 1000L, ChangeType.INSERT, row(1, "a")); + records.put(new TableBucket(tableId, 0), Arrays.asList(record1)); + + // New constructor carries totalBytesRead + assertThat(new ScanRecords(records, 1024L).getTotalBytesRead()).isEqualTo(1024L); + + // Old constructor defaults to 0 for backward compatibility + assertThat(new ScanRecords(records).getTotalBytesRead()).isEqualTo(0L); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index b11c3b92fc..bf6440decd 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -280,4 +280,12 @@ public class MetricNames { public static final String NETTY_NUM_ALLOCATIONS_PER_SECONDS = "numAllocationsPerSecond"; public static final String NETTY_NUM_HUGE_ALLOCATIONS_PER_SECONDS = "numHugeAllocationsPerSecond"; + + // -------------------------------------------------------------------------------------------- + // metrics for tiering service + // -------------------------------------------------------------------------------------------- + + // for lake tiering metrics - operator level + public static final String TIERING_SERVICE_READ_BYTES = "readBytes"; + public static final String TIERING_SERVICE_READ_BYTES_RATE = "readBytesPerSecond"; } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java index eae584efed..6f0fc43b95 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java @@ -22,6 +22,7 @@ import org.apache.fluss.client.Connection; import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; +import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.state.TieringSplitState; import org.apache.fluss.lake.writer.LakeTieringFactory; @@ -73,17 +74,31 @@ public TieringSourceReader( Duration pollTimeout) { super( elementsQueue, - new TieringSourceFetcherManager<>( - elementsQueue, - () -> new TieringSplitReader<>(connection, lakeTieringFactory, pollTimeout), - context.getConfiguration(), - (ignore) -> {}), + createFetcherManager( + elementsQueue, context, connection, lakeTieringFactory, pollTimeout), new TableBucketWriteResultEmitter<>(), context.getConfiguration(), context); this.connection = connection; } + private static TieringSourceFetcherManager createFetcherManager( + FutureCompletingBlockingQueue>> + elementsQueue, + SourceReaderContext context, + Connection connection, + LakeTieringFactory lakeTieringFactory, + Duration pollTimeout) { + TieringMetrics tieringMetrics = new TieringMetrics(context.metricGroup()); + return new TieringSourceFetcherManager<>( + elementsQueue, + () -> + new TieringSplitReader<>( + connection, lakeTieringFactory, pollTimeout, tieringMetrics), + context.getConfiguration(), + (ignore) -> {}); + } + @Override public void start() { // we request a split only if we did not get splits during the checkpoint restore diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index 1e530d69a3..992ea8e729 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -25,6 +25,7 @@ import org.apache.fluss.client.table.scanner.log.ScanRecords; import org.apache.fluss.flink.source.reader.BoundedSplitReader; import org.apache.fluss.flink.source.reader.RecordAndPos; +import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics; import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplit; @@ -33,6 +34,8 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.BinaryRow; +import org.apache.fluss.row.InternalRow; import org.apache.fluss.utils.CloseableIterator; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; @@ -106,16 +109,21 @@ public class TieringSplitReader private final Set currentEmptySplits; + private final TieringMetrics tieringMetrics; + public TieringSplitReader( - Connection connection, LakeTieringFactory lakeTieringFactory) { - this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT); + Connection connection, + LakeTieringFactory lakeTieringFactory, + TieringMetrics tieringMetrics) { + this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT, tieringMetrics); } @VisibleForTesting protected TieringSplitReader( Connection connection, LakeTieringFactory lakeTieringFactory, - Duration pollTimeout) { + Duration pollTimeout, + TieringMetrics tieringMetrics) { this.lakeTieringFactory = lakeTieringFactory; // owned by TieringSourceReader this.connection = connection; @@ -129,6 +137,7 @@ protected TieringSplitReader( this.currentPendingSnapshotSplits = new ArrayDeque<>(); this.reachTieringMaxDurationTables = new HashSet<>(); this.pollTimeout = pollTimeout; + this.tieringMetrics = tieringMetrics; } @Override @@ -348,6 +357,9 @@ private RecordsWithSplitIds> forLogRecords( Map> writeResults = new HashMap<>(); Map finishedSplitIds = new HashMap<>(); LOG.info("for log records to tier table {}.", currentTableId); + + // Report bytes read from Fluss log records + tieringMetrics.recordBytesRead(scanRecords.getTotalBytesRead()); for (TableBucket bucket : scanRecords.buckets()) { LOG.info("tiering table bucket {}.", bucket); List bucketScanRecords = scanRecords.records(bucket); @@ -502,10 +514,15 @@ private TableBucketWriteResultWithSplitIds forSnapshotSplitRecords( LakeWriter lakeWriter = getOrCreateLakeWriter( bucket, checkNotNull(currentSnapshotSplit).getPartitionName()); + long bytesRead = 0; while (recordIterator.hasNext()) { ScanRecord scanRecord = recordIterator.next().record(); lakeWriter.write(scanRecord); + InternalRow row = scanRecord.getRow(); + // Snapshot path always produces BinaryRow (CompactedRow/IndexedRow). + bytesRead += ((BinaryRow) row).getSizeInBytes(); } + tieringMetrics.recordBytesRead(bytesRead); recordIterator.close(); return emptyTableBucketWriteResultWithSplitIds(); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java new file mode 100644 index 0000000000..3e2beb3d8a --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.source.metrics; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.metrics.MetricNames; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.SourceReaderMetricGroup; + +/** + * A collection class for handling metrics in Tiering source reader. + * + *

All metrics are registered under group "fluss.tieringService", which is a child group of + * {@link org.apache.flink.metrics.groups.OperatorMetricGroup}. + * + *

The following metrics are available: + * + *

    + *
  • {@code fluss.tieringService.readBytes} - Counter: cumulative bytes read from Fluss records + * since the job started (actual Fluss records size). + *
  • {@code fluss.tieringService.readBytesPerSecond} - Meter: bytes-per-second rate derived from + * the counter using a 60-second sliding window. + *
+ */ +@Internal +public class TieringMetrics { + + // Metric group names + public static final String FLUSS_METRIC_GROUP = "fluss"; + public static final String TIERING_SERVICE_GROUP = "tieringService"; + + private final Counter readBytesCounter; + + public TieringMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) { + MetricGroup tieringServiceGroup = + sourceReaderMetricGroup + .addGroup(FLUSS_METRIC_GROUP) + .addGroup(TIERING_SERVICE_GROUP); + + this.readBytesCounter = tieringServiceGroup.counter(MetricNames.TIERING_SERVICE_READ_BYTES); + tieringServiceGroup.meter( + MetricNames.TIERING_SERVICE_READ_BYTES_RATE, new MeterView(readBytesCounter)); + } + + /** Records bytes read from Fluss records. Called per batch or record processing. */ + public void recordBytesRead(long bytes) { + readBytesCounter.inc(bytes); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringMetricsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringMetricsTest.java new file mode 100644 index 0000000000..8be1b22150 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringMetricsTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.source; + +import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics; +import org.apache.fluss.metrics.MetricNames; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.testutils.MetricListener; +import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link TieringMetrics}. */ +class TieringMetricsTest { + + @Test + void testRecordBytesRead() { + MetricListener metricListener = new MetricListener(); + TieringMetrics tieringMetrics = + new TieringMetrics( + InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup())); + + tieringMetrics.recordBytesRead(100); + tieringMetrics.recordBytesRead(200); + + Optional counter = + metricListener.getCounter( + TieringMetrics.FLUSS_METRIC_GROUP, + TieringMetrics.TIERING_SERVICE_GROUP, + MetricNames.TIERING_SERVICE_READ_BYTES); + assertThat(counter).isPresent(); + assertThat(counter.get().getCount()).isEqualTo(300); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java index fdf7a84771..447ada7a7a 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java @@ -26,6 +26,7 @@ import org.apache.fluss.client.write.HashBucketAssigner; import org.apache.fluss.flink.tiering.TestingLakeTieringFactory; import org.apache.fluss.flink.tiering.TestingWriteResult; +import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics; import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplit; @@ -38,6 +39,8 @@ import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.metrics.testutils.MetricListener; +import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -277,7 +280,12 @@ void testTieringMixTables() throws Exception { } private TieringSplitReader createTieringReader(Connection connection) { - return new TieringSplitReader<>(connection, new TestingLakeTieringFactory()); + final TieringMetrics tieringMetrics = + new TieringMetrics( + InternalSourceReaderMetricGroup.mock( + new MetricListener().getMetricGroup())); + return new TieringSplitReader<>( + connection, new TestingLakeTieringFactory(), tieringMetrics); } private void verifyTieringRows( diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 399f64583e..e131686f89 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -445,6 +445,7 @@ org.apache.fluss.lake.lance.* org.apache.fluss.lake.iceberg.* + org.apache.fluss.lake.writer.LakeWriter org.apache.fluss.row.encode.iceberg.* org.apache.fluss.bucketing.IcebergBucketingFunction diff --git a/website/docs/maintenance/observability/monitor-metrics.md b/website/docs/maintenance/observability/monitor-metrics.md index ade4be9c61..97240938b5 100644 --- a/website/docs/maintenance/observability/monitor-metrics.md +++ b/website/docs/maintenance/observability/monitor-metrics.md @@ -1125,6 +1125,36 @@ How to Use Flink Metrics, you can see [Flink Metrics](https://nightlies.apache.o Table The output records per second. Meter - + + + + +### Tiering Service Metrics + +These metrics are exposed by the Flink-based tiering source reader when running the Lake Tiering Service. +All metrics are registered under the `fluss.tieringService` metric group, which is a child of the Flink `SourceReaderMetricGroup`. + + + + + + + + + + + + + + + + + + + + + + +
Metrics NameLevelDescriptionType
readBytesFlink Source OperatorThe cumulative bytes read from Fluss records since the tiering job started. For the snapshot path, this measures per-record BinaryRow size; for the log path, this measures batch-level byte size from log fetches.Counter
readBytesPerSecondFlink Source OperatorThe read throughput rate in bytes per second, derived from the readBytes counter using a 60-second sliding window.Meter
\ No newline at end of file From c039ae00f38be6170520e56df20046d98c59086c Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Mon, 30 Mar 2026 17:28:20 +0800 Subject: [PATCH 2/2] remove LakeWriter exclusion and update ScanRecords test - Removed LakeWriter from test coverage exclusion list in pom.xml - Deleted testTotalBytesRead method from ScanRecordsTest class - Cleaned up redundant test cases for total bytes read functionality - Updated test suite to align with current code coverage requirements --- .../client/table/scanner/log/ScanRecordsTest.java | 14 -------------- fluss-test-coverage/pom.xml | 1 - 2 files changed, 15 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java index ab841ccd5a..db4a326676 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java @@ -57,18 +57,4 @@ void iterator() { } assertThat(c).isEqualTo(4); } - - @Test - void testTotalBytesRead() { - Map> records = new LinkedHashMap<>(); - long tableId = 0; - ScanRecord record1 = new ScanRecord(0L, 1000L, ChangeType.INSERT, row(1, "a")); - records.put(new TableBucket(tableId, 0), Arrays.asList(record1)); - - // New constructor carries totalBytesRead - assertThat(new ScanRecords(records, 1024L).getTotalBytesRead()).isEqualTo(1024L); - - // Old constructor defaults to 0 for backward compatibility - assertThat(new ScanRecords(records).getTotalBytesRead()).isEqualTo(0L); - } } diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index e131686f89..399f64583e 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -445,7 +445,6 @@ org.apache.fluss.lake.lance.* org.apache.fluss.lake.iceberg.* - org.apache.fluss.lake.writer.LakeWriter org.apache.fluss.row.encode.iceberg.* org.apache.fluss.bucketing.IcebergBucketingFunction