From 307a0f50515728574838a148440c44b721720a8c Mon Sep 17 00:00:00 2001 From: Prajwal-banakar Date: Wed, 14 Jan 2026 16:09:29 +0000 Subject: [PATCH 1/3] [FLUSS-2301] Fail Log Scanner when table is dropped during scan --- .../client/table/scanner/log/LogFetcher.java | 26 +++++++++ .../fluss/client/utils/MetadataUtils.java | 23 ++++++++ .../table/scanner/log/LogScannerITCase.java | 53 +++++++++++++++++++ 3 files changed, 102 insertions(+) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index b5a03a1caf..5f08c7a1a6 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -30,6 +30,8 @@ import org.apache.fluss.exception.InvalidMetadataException; import org.apache.fluss.exception.LeaderNotAvailableException; import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.exception.TableNotExistException; +import org.apache.fluss.exception.UnknownTableOrBucketException; import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.SchemaGetter; @@ -86,6 +88,7 @@ public class LogFetcher implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(LogFetcher.class); private final TablePath tablePath; + private final long tableId; private final boolean isPartitioned; private final LogRecordReadContext readContext; // TODO this context can be merge with readContext. Introduce it only because log remote read @@ -122,6 +125,7 @@ public LogFetcher( RemoteFileDownloader remoteFileDownloader, SchemaGetter schemaGetter) { this.tablePath = tableInfo.getTablePath(); + this.tableId = tableInfo.getTableId(); this.isPartitioned = tableInfo.isPartitioned(); this.readContext = LogRecordReadContext.createReadContext(tableInfo, false, projection, schemaGetter); @@ -227,6 +231,15 @@ private void checkAndUpdateMetadata(List tableBuckets) { } else if (needUpdate) { metadataUpdater.updateTableOrPartitionMetadata(tablePath, null); } + + // Check if the table still exists and matches the tableId + Optional currentTableId = metadataUpdater.getCluster().getTableId(tablePath); + if (!currentTableId.isPresent()) { + throw new TableNotExistException("Table " + tablePath + " does not exist."); + } + if (currentTableId.get() != tableId) { + throw new TableNotExistException("Table " + tablePath + " has been recreated."); + } } catch (Exception e) { if (e instanceof PartitionNotExistException) { // ignore this exception, this is probably happen because the partition is deleted. @@ -234,11 +247,24 @@ private void checkAndUpdateMetadata(List tableBuckets) { // from fetch list when receive exception. LOG.warn("Receive PartitionNotExistException when update metadata, ignore it", e); } else { + if (isTableNotExistException(e)) { + throw new TableNotExistException("Table " + tablePath + " does not exist.", e); + } throw e; } } } + private boolean isTableNotExistException(Throwable t) { + while (t != null) { + if (t instanceof TableNotExistException || t instanceof UnknownTableOrBucketException) { + return true; + } + t = t.getCause(); + } + return false; + } + @VisibleForTesting void sendFetchRequest(int destination, FetchLogRequest fetchLogRequest) { TableOrPartitions tableOrPartitionsInFetchRequest = diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java index 2990054999..9bf99d2bdd 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java @@ -130,10 +130,33 @@ public static Cluster sendMetadataRequestAndRebuildCluster( // the origin cluster. newTablePathToTableId = new HashMap<>(originCluster.getTableIdByPath()); + if (tablePaths != null) { + tablePaths.forEach(newTablePathToTableId::remove); + } + newBucketLocations = new HashMap<>(originCluster.getBucketLocationsByPath()); + if (tablePaths != null) { + newBucketLocations + .keySet() + .removeIf( + ptp -> tablePaths.contains(ptp.getTablePath())); + } + if (tablePartitions != null) { + tablePartitions.forEach(newBucketLocations::remove); + } + newPartitionIdByPath = new HashMap<>(originCluster.getPartitionIdByPath()); + if (tablePaths != null) { + newPartitionIdByPath + .keySet() + .removeIf( + ptp -> tablePaths.contains(ptp.getTablePath())); + } + if (tablePartitions != null) { + tablePartitions.forEach(newPartitionIdByPath::remove); + } newTablePathToTableId.putAll(newTableMetadata.tablePathToTableId); newBucketLocations.putAll(newTableMetadata.bucketLocations); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java index cfe215fb0b..e624c29d4d 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java @@ -23,6 +23,8 @@ import org.apache.fluss.client.table.writer.AppendWriter; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.exception.FetchException; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; @@ -467,4 +469,55 @@ void testSubscribeOutOfRangeLog() throws Exception { } } } + + @Test + void testDropTableWhileScanning() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_drop_table_while_scanning"); + createTable(tablePath, DATA1_TABLE_DESCRIPTOR, false); + + // append a batch of data. + int recordSize = 100; + try (Table table = conn.getTable(tablePath)) { + AppendWriter appendWriter = table.newAppend().createWriter(); + for (int i = 0; i < recordSize; i++) { + GenericRow row = row(i, "a"); + appendWriter.append(row); + } + appendWriter.flush(); + + LogScanner logScanner = createLogScanner(table); + subscribeFromBeginning(logScanner, table); + + // Poll some data + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + assertThat(scanRecords.isEmpty()).isFalse(); + + // Drop the table + admin.dropTable(tablePath, false).get(); + + // Poll again, should eventually fail + assertThatThrownBy( + () -> { + while (true) { + logScanner.poll(Duration.ofSeconds(1)); + } + }) + .satisfies( + e -> { + boolean isTableMissing = + e instanceof TableNotExistException + || (e instanceof FlussRuntimeException + && e.getMessage() + .contains( + "Failed to update metadata")) + || (e.getCause() != null + && e.getCause() + instanceof TableNotExistException); + assertThat(isTableMissing) + .as( + "Expected TableNotExistException or FlussRuntimeException with metadata update failure") + .isTrue(); + }); + } + } } From fe47487bfd616605aaac6e03e4f1a88ce33ff233 Mon Sep 17 00:00:00 2001 From: Prajwal-banakar Date: Wed, 14 Jan 2026 19:03:02 +0000 Subject: [PATCH 2/3] Fix LogScanner infinite loop and support table recreation detection --- .../client/table/scanner/log/LogFetcher.java | 42 +++++++++++++++---- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index 5f08c7a1a6..6c2105b8e7 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -88,7 +88,6 @@ public class LogFetcher implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(LogFetcher.class); private final TablePath tablePath; - private final long tableId; private final boolean isPartitioned; private final LogRecordReadContext readContext; // TODO this context can be merge with readContext. Introduce it only because log remote read @@ -125,7 +124,6 @@ public LogFetcher( RemoteFileDownloader remoteFileDownloader, SchemaGetter schemaGetter) { this.tablePath = tableInfo.getTablePath(); - this.tableId = tableInfo.getTableId(); this.isPartitioned = tableInfo.isPartitioned(); this.readContext = LogRecordReadContext.createReadContext(tableInfo, false, projection, schemaGetter); @@ -232,15 +230,30 @@ private void checkAndUpdateMetadata(List tableBuckets) { metadataUpdater.updateTableOrPartitionMetadata(tablePath, null); } - // Check if the table still exists and matches the tableId - Optional currentTableId = metadataUpdater.getCluster().getTableId(tablePath); - if (!currentTableId.isPresent()) { - throw new TableNotExistException("Table " + tablePath + " does not exist."); - } - if (currentTableId.get() != tableId) { - throw new TableNotExistException("Table " + tablePath + " has been recreated."); + if (!tableBuckets.isEmpty()) { + checkTableId(tableBuckets.get(0).getTableId()); } } catch (Exception e) { + // If exception occurs, we suspect table might be gone or recreated. + // We force update table metadata. + try { + metadataUpdater.updateTableOrPartitionMetadata(tablePath, null); + if (!tableBuckets.isEmpty()) { + checkTableId(tableBuckets.get(0).getTableId()); + } + } catch (Exception ex) { + // If checkTableId threw exception, rethrow it. + if (ex instanceof TableNotExistException) { + throw (TableNotExistException) ex; + } + + // If updateTableOrPartitionMetadata failed, check if it's because table missing. + if (isTableNotExistException(ex)) { + // Table is truly gone. + throw new TableNotExistException("Table " + tablePath + " does not exist.", ex); + } + } + if (e instanceof PartitionNotExistException) { // ignore this exception, this is probably happen because the partition is deleted. // The fetcher can also work fine. The caller like flink can remove the partition @@ -255,6 +268,17 @@ private void checkAndUpdateMetadata(List tableBuckets) { } } + private void checkTableId(long tableId) { + // Check if the table still exists and matches the tableId + Optional currentTableId = metadataUpdater.getCluster().getTableId(tablePath); + if (!currentTableId.isPresent()) { + throw new TableNotExistException("Table " + tablePath + " does not exist."); + } + if (currentTableId.get() != tableId) { + throw new TableNotExistException("Table " + tablePath + " has been recreated."); + } + } + private boolean isTableNotExistException(Throwable t) { while (t != null) { if (t instanceof TableNotExistException || t instanceof UnknownTableOrBucketException) { From 62a94078c90c0de68e25bddace801c19b480a19f Mon Sep 17 00:00:00 2001 From: Prajwal-banakar Date: Thu, 15 Jan 2026 04:42:59 +0000 Subject: [PATCH 3/3] Fix LogScanner infinite retry loop with support for resilient table recreation --- .../client/table/scanner/log/LogFetcher.java | 43 ++++++++++++++++--- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index 6c2105b8e7..b4a0566f0c 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -231,7 +231,7 @@ private void checkAndUpdateMetadata(List tableBuckets) { } if (!tableBuckets.isEmpty()) { - checkTableId(tableBuckets.get(0).getTableId()); + checkTableId(tableBuckets); } } catch (Exception e) { // If exception occurs, we suspect table might be gone or recreated. @@ -239,7 +239,7 @@ private void checkAndUpdateMetadata(List tableBuckets) { try { metadataUpdater.updateTableOrPartitionMetadata(tablePath, null); if (!tableBuckets.isEmpty()) { - checkTableId(tableBuckets.get(0).getTableId()); + checkTableId(tableBuckets); } } catch (Exception ex) { // If checkTableId threw exception, rethrow it. @@ -268,14 +268,47 @@ private void checkAndUpdateMetadata(List tableBuckets) { } } - private void checkTableId(long tableId) { + private void checkTableId(List tableBuckets) { + long tableId = tableBuckets.get(0).getTableId(); // Check if the table still exists and matches the tableId Optional currentTableId = metadataUpdater.getCluster().getTableId(tablePath); if (!currentTableId.isPresent()) { throw new TableNotExistException("Table " + tablePath + " does not exist."); } - if (currentTableId.get() != tableId) { - throw new TableNotExistException("Table " + tablePath + " has been recreated."); + long metadataTableId = currentTableId.get(); + if (metadataTableId != tableId) { + // If table is recreated, we try to auto-heal for non-partitioned table. + // For partitioned table, we can't easily map old partition ID to new partition ID, + // so we throw exception. + if (!isPartitioned) { + LOG.warn( + "Table {} was recreated. Updating table ID from {} to {} and resuming fetch.", + tablePath, + tableId, + metadataTableId); + Map newBuckets = new HashMap<>(); + List oldBuckets = new ArrayList<>(); + + for (TableBucket oldBucket : tableBuckets) { + if (oldBucket.getTableId() == tableId) { + Long offset = logScannerStatus.getBucketOffset(oldBucket); + if (offset != null) { + TableBucket newBucket = + new TableBucket( + metadataTableId, + oldBucket.getPartitionId(), + oldBucket.getBucket()); + newBuckets.put(newBucket, offset); + oldBuckets.add(oldBucket); + } + } + } + + logScannerStatus.unassignScanBuckets(oldBuckets); + logScannerStatus.assignScanBuckets(newBuckets); + } else { + throw new TableNotExistException("Table " + tablePath + " has been recreated."); + } } }