diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index b5a03a1caf..b4a0566f0c 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -30,6 +30,8 @@ import org.apache.fluss.exception.InvalidMetadataException; import org.apache.fluss.exception.LeaderNotAvailableException; import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.exception.TableNotExistException; +import org.apache.fluss.exception.UnknownTableOrBucketException; import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.SchemaGetter; @@ -227,18 +229,99 @@ private void checkAndUpdateMetadata(List tableBuckets) { } else if (needUpdate) { metadataUpdater.updateTableOrPartitionMetadata(tablePath, null); } + + if (!tableBuckets.isEmpty()) { + checkTableId(tableBuckets); + } } catch (Exception e) { + // If exception occurs, we suspect table might be gone or recreated. + // We force update table metadata. + try { + metadataUpdater.updateTableOrPartitionMetadata(tablePath, null); + if (!tableBuckets.isEmpty()) { + checkTableId(tableBuckets); + } + } catch (Exception ex) { + // If checkTableId threw exception, rethrow it. + if (ex instanceof TableNotExistException) { + throw (TableNotExistException) ex; + } + + // If updateTableOrPartitionMetadata failed, check if it's because table missing. + if (isTableNotExistException(ex)) { + // Table is truly gone. + throw new TableNotExistException("Table " + tablePath + " does not exist.", ex); + } + } + if (e instanceof PartitionNotExistException) { // ignore this exception, this is probably happen because the partition is deleted. // The fetcher can also work fine. The caller like flink can remove the partition // from fetch list when receive exception. LOG.warn("Receive PartitionNotExistException when update metadata, ignore it", e); } else { + if (isTableNotExistException(e)) { + throw new TableNotExistException("Table " + tablePath + " does not exist.", e); + } throw e; } } } + private void checkTableId(List tableBuckets) { + long tableId = tableBuckets.get(0).getTableId(); + // Check if the table still exists and matches the tableId + Optional currentTableId = metadataUpdater.getCluster().getTableId(tablePath); + if (!currentTableId.isPresent()) { + throw new TableNotExistException("Table " + tablePath + " does not exist."); + } + long metadataTableId = currentTableId.get(); + if (metadataTableId != tableId) { + // If table is recreated, we try to auto-heal for non-partitioned table. + // For partitioned table, we can't easily map old partition ID to new partition ID, + // so we throw exception. + if (!isPartitioned) { + LOG.warn( + "Table {} was recreated. Updating table ID from {} to {} and resuming fetch.", + tablePath, + tableId, + metadataTableId); + Map newBuckets = new HashMap<>(); + List oldBuckets = new ArrayList<>(); + + for (TableBucket oldBucket : tableBuckets) { + if (oldBucket.getTableId() == tableId) { + Long offset = logScannerStatus.getBucketOffset(oldBucket); + if (offset != null) { + TableBucket newBucket = + new TableBucket( + metadataTableId, + oldBucket.getPartitionId(), + oldBucket.getBucket()); + newBuckets.put(newBucket, offset); + oldBuckets.add(oldBucket); + } + } + } + + logScannerStatus.unassignScanBuckets(oldBuckets); + logScannerStatus.assignScanBuckets(newBuckets); + } else { + throw new TableNotExistException("Table " + tablePath + " has been recreated."); + } + } + } + + private boolean isTableNotExistException(Throwable t) { + while (t != null) { + if (t instanceof TableNotExistException || t instanceof UnknownTableOrBucketException) { + return true; + } + t = t.getCause(); + } + return false; + } + @VisibleForTesting void sendFetchRequest(int destination, FetchLogRequest fetchLogRequest) { TableOrPartitions tableOrPartitionsInFetchRequest = diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java index 2990054999..9bf99d2bdd 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java @@ -130,10 +130,33 @@ public static Cluster sendMetadataRequestAndRebuildCluster( // the origin cluster. newTablePathToTableId = new HashMap<>(originCluster.getTableIdByPath()); + if (tablePaths != null) { + tablePaths.forEach(newTablePathToTableId::remove); + } + newBucketLocations = new HashMap<>(originCluster.getBucketLocationsByPath()); + if (tablePaths != null) { + newBucketLocations + .keySet() + .removeIf( + ptp -> tablePaths.contains(ptp.getTablePath())); + } + if (tablePartitions != null) { + tablePartitions.forEach(newBucketLocations::remove); + } + newPartitionIdByPath = new HashMap<>(originCluster.getPartitionIdByPath()); + if (tablePaths != null) { + newPartitionIdByPath + .keySet() + .removeIf( + ptp -> tablePaths.contains(ptp.getTablePath())); + } + if (tablePartitions != null) { + tablePartitions.forEach(newPartitionIdByPath::remove); + } newTablePathToTableId.putAll(newTableMetadata.tablePathToTableId); newBucketLocations.putAll(newTableMetadata.bucketLocations); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java index cfe215fb0b..e624c29d4d 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java @@ -23,6 +23,8 @@ import org.apache.fluss.client.table.writer.AppendWriter; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.exception.FetchException; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; @@ -467,4 +469,55 @@ void testSubscribeOutOfRangeLog() throws Exception { } } } + + @Test + void testDropTableWhileScanning() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_drop_table_while_scanning"); + createTable(tablePath, DATA1_TABLE_DESCRIPTOR, false); + + // append a batch of data. + int recordSize = 100; + try (Table table = conn.getTable(tablePath)) { + AppendWriter appendWriter = table.newAppend().createWriter(); + for (int i = 0; i < recordSize; i++) { + GenericRow row = row(i, "a"); + appendWriter.append(row); + } + appendWriter.flush(); + + LogScanner logScanner = createLogScanner(table); + subscribeFromBeginning(logScanner, table); + + // Poll some data + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + assertThat(scanRecords.isEmpty()).isFalse(); + + // Drop the table + admin.dropTable(tablePath, false).get(); + + // Poll again, should eventually fail + assertThatThrownBy( + () -> { + while (true) { + logScanner.poll(Duration.ofSeconds(1)); + } + }) + .satisfies( + e -> { + boolean isTableMissing = + e instanceof TableNotExistException + || (e instanceof FlussRuntimeException + && e.getMessage() + .contains( + "Failed to update metadata")) + || (e.getCause() != null + && e.getCause() + instanceof TableNotExistException); + assertThat(isTableMissing) + .as( + "Expected TableNotExistException or FlussRuntimeException with metadata update failure") + .isTrue(); + }); + } + } }