Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.fluss.exception.InvalidMetadataException;
import org.apache.fluss.exception.LeaderNotAvailableException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.exception.UnknownTableOrBucketException;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.SchemaGetter;
Expand Down Expand Up @@ -227,18 +229,99 @@ private void checkAndUpdateMetadata(List<TableBucket> tableBuckets) {
} else if (needUpdate) {
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
}

if (!tableBuckets.isEmpty()) {
checkTableId(tableBuckets);
}
} catch (Exception e) {
// If exception occurs, we suspect table might be gone or recreated.
// We force update table metadata.
try {
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
if (!tableBuckets.isEmpty()) {
checkTableId(tableBuckets);
}
} catch (Exception ex) {
// If checkTableId threw exception, rethrow it.
if (ex instanceof TableNotExistException) {
throw (TableNotExistException) ex;
}

// If updateTableOrPartitionMetadata failed, check if it's because table missing.
if (isTableNotExistException(ex)) {
// Table is truly gone.
throw new TableNotExistException("Table " + tablePath + " does not exist.", ex);
}
}

if (e instanceof PartitionNotExistException) {
// ignore this exception, this is probably happen because the partition is deleted.
// The fetcher can also work fine. The caller like flink can remove the partition
// from fetch list when receive exception.
LOG.warn("Receive PartitionNotExistException when update metadata, ignore it", e);
} else {
if (isTableNotExistException(e)) {
throw new TableNotExistException("Table " + tablePath + " does not exist.", e);
}
throw e;
}
}
}

private void checkTableId(List<TableBucket> tableBuckets) {
long tableId = tableBuckets.get(0).getTableId();
// Check if the table still exists and matches the tableId
Optional<Long> currentTableId = metadataUpdater.getCluster().getTableId(tablePath);
if (!currentTableId.isPresent()) {
throw new TableNotExistException("Table " + tablePath + " does not exist.");
}
long metadataTableId = currentTableId.get();
if (metadataTableId != tableId) {
// If table is recreated, we try to auto-heal for non-partitioned table.
// For partitioned table, we can't easily map old partition ID to new partition ID,
// so we throw exception.
if (!isPartitioned) {
LOG.warn(
"Table {} was recreated. Updating table ID from {} to {} and resuming fetch.",
tablePath,
tableId,
metadataTableId);
Map<TableBucket, Long> newBuckets = new HashMap<>();
List<TableBucket> oldBuckets = new ArrayList<>();

for (TableBucket oldBucket : tableBuckets) {
if (oldBucket.getTableId() == tableId) {
Long offset = logScannerStatus.getBucketOffset(oldBucket);
if (offset != null) {
TableBucket newBucket =
new TableBucket(
metadataTableId,
oldBucket.getPartitionId(),
oldBucket.getBucket());
newBuckets.put(newBucket, offset);
oldBuckets.add(oldBucket);
}
}
}

logScannerStatus.unassignScanBuckets(oldBuckets);
logScannerStatus.assignScanBuckets(newBuckets);
} else {
throw new TableNotExistException("Table " + tablePath + " has been recreated.");
}
}
}

private boolean isTableNotExistException(Throwable t) {
while (t != null) {
if (t instanceof TableNotExistException || t instanceof UnknownTableOrBucketException) {
return true;
}
t = t.getCause();
}
return false;
}

@VisibleForTesting
void sendFetchRequest(int destination, FetchLogRequest fetchLogRequest) {
TableOrPartitions tableOrPartitionsInFetchRequest =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,33 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
// the origin cluster.
newTablePathToTableId =
new HashMap<>(originCluster.getTableIdByPath());
if (tablePaths != null) {
tablePaths.forEach(newTablePathToTableId::remove);
}

newBucketLocations =
new HashMap<>(originCluster.getBucketLocationsByPath());
if (tablePaths != null) {
newBucketLocations
.keySet()
.removeIf(
ptp -> tablePaths.contains(ptp.getTablePath()));
}
if (tablePartitions != null) {
tablePartitions.forEach(newBucketLocations::remove);
}

newPartitionIdByPath =
new HashMap<>(originCluster.getPartitionIdByPath());
if (tablePaths != null) {
newPartitionIdByPath
.keySet()
.removeIf(
ptp -> tablePaths.contains(ptp.getTablePath()));
}
if (tablePartitions != null) {
tablePartitions.forEach(newPartitionIdByPath::remove);
}

newTablePathToTableId.putAll(newTableMetadata.tablePathToTableId);
newBucketLocations.putAll(newTableMetadata.bucketLocations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.fluss.client.table.writer.AppendWriter;
import org.apache.fluss.client.table.writer.UpsertWriter;
import org.apache.fluss.exception.FetchException;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
Expand Down Expand Up @@ -467,4 +469,55 @@ void testSubscribeOutOfRangeLog() throws Exception {
}
}
}

@Test
void testDropTableWhileScanning() throws Exception {
TablePath tablePath = TablePath.of("test_db_1", "test_drop_table_while_scanning");
createTable(tablePath, DATA1_TABLE_DESCRIPTOR, false);

// append a batch of data.
int recordSize = 100;
try (Table table = conn.getTable(tablePath)) {
AppendWriter appendWriter = table.newAppend().createWriter();
for (int i = 0; i < recordSize; i++) {
GenericRow row = row(i, "a");
appendWriter.append(row);
}
appendWriter.flush();

LogScanner logScanner = createLogScanner(table);
subscribeFromBeginning(logScanner, table);

// Poll some data
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
assertThat(scanRecords.isEmpty()).isFalse();

// Drop the table
admin.dropTable(tablePath, false).get();

// Poll again, should eventually fail
assertThatThrownBy(
() -> {
while (true) {
logScanner.poll(Duration.ofSeconds(1));
}
})
.satisfies(
e -> {
boolean isTableMissing =
e instanceof TableNotExistException
|| (e instanceof FlussRuntimeException
&& e.getMessage()
.contains(
"Failed to update metadata"))
|| (e.getCause() != null
&& e.getCause()
instanceof TableNotExistException);
assertThat(isTableMissing)
.as(
"Expected TableNotExistException or FlussRuntimeException with metadata update failure")
.isTrue();
});
}
}
}
Loading