From 5bf6e2967965df8a604080eb4fd6aba27caa0c38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Wed, 21 Jan 2026 15:28:47 +0800 Subject: [PATCH 1/5] chore: invalidate leader info when nececcary --- crates/fluss/src/client/metadata.rs | 6 ++ crates/fluss/src/client/table/scanner.rs | 70 +++++++++++++++++++++++- crates/fluss/src/cluster/cluster.rs | 25 +++++++++ 3 files changed, 100 insertions(+), 1 deletion(-) diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 0e6f9651..05e1f01b 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -71,6 +71,12 @@ impl Metadata { *cluster_guard = Arc::new(updated_cluster); } + pub fn invalidate_bucket_leader(&self, table_bucket: &TableBucket) { + let mut cluster_guard = self.cluster.write(); + let updated_cluster = cluster_guard.invalidate_bucket_leader(table_bucket); + *cluster_guard = Arc::new(updated_cluster); + } + pub async fn update(&self, metadata_response: MetadataResponse) -> Result<()> { let origin_cluster = self.cluster.read().clone(); let new_cluster = diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index e9b2ce10..418cda61 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -518,7 +518,8 @@ impl LogFetcher { | FlussError::LogStorageException | FlussError::KvStorageException | FlussError::StorageException - | FlussError::FencedLeaderEpochException => FetchErrorContext { + | FlussError::FencedLeaderEpochException + | FlussError::LeaderNotAvailableException => FetchErrorContext { action: FetchErrorAction::Ignore, log_level: FetchErrorLogLevel::Debug, log_message: format!( @@ -570,6 +571,16 @@ impl LogFetcher { } } + fn should_invalidate_bucket_leader(error: FlussError) -> bool { + matches!( + error, + FlussError::NotLeaderOrFollower + | FlussError::LeaderNotAvailableException + | FlussError::FencedLeaderEpochException + | FlussError::UnknownTableOrBucketException + ) + } + async fn check_and_update_metadata(&self) -> Result<()> { let need_update = self .fetchable_buckets() @@ -686,6 +697,7 @@ impl LogFetcher { Self::handle_fetch_response( fetch_response, + &metadata, &log_fetch_buffer, &log_scanner_status, &read_context, @@ -712,6 +724,7 @@ impl LogFetcher { /// Handle fetch response and add completed fetches to buffer async fn handle_fetch_response( fetch_response: crate::proto::FetchLogResponse, + metadata: &Arc, log_fetch_buffer: &Arc, log_scanner_status: &Arc, read_context: &ReadContext, @@ -745,6 +758,9 @@ impl LogFetcher { .into(); let error = FlussError::for_code(error_code); + if Self::should_invalidate_bucket_leader(error) { + metadata.invalidate_bucket_leader(&table_bucket); + } let error_context = Self::describe_fetch_error( error, &table_bucket, @@ -1579,6 +1595,7 @@ mod tests { LogFetcher::handle_fetch_response( response, + &metadata, &fetcher.log_fetch_buffer, &fetcher.log_scanner_status, &fetcher.read_context, @@ -1593,4 +1610,55 @@ mod tests { assert_eq!(api_error.code, FlussError::AuthorizationException.code()); Ok(()) } + + #[tokio::test] + async fn handle_fetch_response_invalidates_bucket_leader() -> Result<()> { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = build_table_info(table_path.clone(), 1, 1); + let cluster = build_cluster_arc(&table_path, 1, 1); + let metadata = Arc::new(Metadata::new_for_test(cluster.clone())); + let status = Arc::new(LogScannerStatus::new()); + status.assign_scan_bucket(TableBucket::new(1, 0), 5); + let fetcher = LogFetcher::new( + table_info.clone(), + Arc::new(RpcClient::new()), + metadata.clone(), + status.clone(), + None, + )?; + + let bucket = TableBucket::new(1, 0); + assert!(metadata.leader_for(&bucket).is_some()); + + let response = crate::proto::FetchLogResponse { + tables_resp: vec![crate::proto::PbFetchLogRespForTable { + table_id: 1, + buckets_resp: vec![crate::proto::PbFetchLogRespForBucket { + partition_id: None, + bucket_id: 0, + error_code: Some(FlussError::NotLeaderOrFollower.code()), + error_message: Some("not leader".to_string()), + high_watermark: None, + log_start_offset: None, + remote_log_fetch_info: None, + records: None, + }], + }], + }; + + LogFetcher::handle_fetch_response( + response, + &metadata, + &fetcher.log_fetch_buffer, + &fetcher.log_scanner_status, + &fetcher.read_context, + &fetcher.remote_read_context, + &fetcher.remote_log_downloader, + &fetcher.credentials_cache, + ) + .await; + + assert!(metadata.leader_for(&bucket).is_none()); + Ok(()) + } } diff --git a/crates/fluss/src/cluster/cluster.rs b/crates/fluss/src/cluster/cluster.rs index f14d055f..4f6d661e 100644 --- a/crates/fluss/src/cluster/cluster.rs +++ b/crates/fluss/src/cluster/cluster.rs @@ -101,6 +101,31 @@ impl Cluster { ) } + pub fn invalidate_bucket_leader(&self, table_bucket: &TableBucket) -> Self { + let mut available_locations_by_bucket = self.available_locations_by_bucket.clone(); + available_locations_by_bucket.remove(table_bucket); + + let mut available_locations_by_path = self.available_locations_by_path.clone(); + if let Some(table_path) = self.table_path_by_id.get(&table_bucket.table_id()) { + if let Some(locations) = available_locations_by_path.get_mut(table_path) { + locations.retain(|location| location.table_bucket() != table_bucket); + } + } else { + for locations in available_locations_by_path.values_mut() { + locations.retain(|location| location.table_bucket() != table_bucket); + } + } + + Cluster::new( + self.coordinator_server.clone(), + self.alive_tablet_servers_by_id.clone(), + available_locations_by_path, + available_locations_by_bucket, + self.table_id_by_path.clone(), + self.table_info_by_path.clone(), + ) + } + pub fn update(&mut self, cluster: Cluster) { let Cluster { coordinator_server, From 3e387142f411f4190a69685910cd9bf05d57a811 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Fri, 23 Jan 2026 10:43:50 +0800 Subject: [PATCH 2/5] address clippy issues --- crates/fluss/src/client/table/scanner.rs | 93 ++++++++++++++---------- 1 file changed, 53 insertions(+), 40 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 418cda61..b3cc64b1 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -462,6 +462,16 @@ struct LogFetcher { nodes_with_pending_fetch_requests: Arc>>, } +struct FetchResponseContext { + metadata: Arc, + log_fetch_buffer: Arc, + log_scanner_status: Arc, + read_context: ReadContext, + remote_read_context: ReadContext, + remote_log_downloader: Arc, + credentials_cache: Arc, +} + impl LogFetcher { pub fn new( table_info: TableInfo, @@ -650,6 +660,15 @@ impl LogFetcher { let creds_cache = self.credentials_cache.clone(); let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone(); let metadata = self.metadata.clone(); + let response_context = FetchResponseContext { + metadata: metadata.clone(), + log_fetch_buffer, + log_scanner_status, + read_context, + remote_read_context, + remote_log_downloader, + credentials_cache: creds_cache, + }; // Spawn async task to handle the fetch request // Note: These tasks are not explicitly tracked or cancelled when LogFetcher is dropped. // This is acceptable because: @@ -695,17 +714,7 @@ impl LogFetcher { } }; - Self::handle_fetch_response( - fetch_response, - &metadata, - &log_fetch_buffer, - &log_scanner_status, - &read_context, - &remote_read_context, - &remote_log_downloader, - &creds_cache, - ) - .await; + Self::handle_fetch_response(fetch_response, response_context).await; }); } @@ -724,14 +733,18 @@ impl LogFetcher { /// Handle fetch response and add completed fetches to buffer async fn handle_fetch_response( fetch_response: crate::proto::FetchLogResponse, - metadata: &Arc, - log_fetch_buffer: &Arc, - log_scanner_status: &Arc, - read_context: &ReadContext, - remote_read_context: &ReadContext, - remote_log_downloader: &Arc, - credentials_cache: &Arc, + context: FetchResponseContext, ) { + let FetchResponseContext { + metadata, + log_fetch_buffer, + log_scanner_status, + read_context, + remote_read_context, + remote_log_downloader, + credentials_cache, + } = context; + for pb_fetch_log_resp in fetch_response.tables_resp { let table_id = pb_fetch_log_resp.table_id; let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; @@ -1593,17 +1606,17 @@ mod tests { }], }; - LogFetcher::handle_fetch_response( - response, - &metadata, - &fetcher.log_fetch_buffer, - &fetcher.log_scanner_status, - &fetcher.read_context, - &fetcher.remote_read_context, - &fetcher.remote_log_downloader, - &fetcher.credentials_cache, - ) - .await; + let response_context = FetchResponseContext { + metadata: metadata.clone(), + log_fetch_buffer: fetcher.log_fetch_buffer.clone(), + log_scanner_status: fetcher.log_scanner_status.clone(), + read_context: fetcher.read_context.clone(), + remote_read_context: fetcher.remote_read_context.clone(), + remote_log_downloader: fetcher.remote_log_downloader.clone(), + credentials_cache: fetcher.credentials_cache.clone(), + }; + + LogFetcher::handle_fetch_response(response, response_context).await; let completed = fetcher.log_fetch_buffer.poll().expect("completed fetch"); let api_error = completed.api_error().expect("api error"); @@ -1646,17 +1659,17 @@ mod tests { }], }; - LogFetcher::handle_fetch_response( - response, - &metadata, - &fetcher.log_fetch_buffer, - &fetcher.log_scanner_status, - &fetcher.read_context, - &fetcher.remote_read_context, - &fetcher.remote_log_downloader, - &fetcher.credentials_cache, - ) - .await; + let response_context = FetchResponseContext { + metadata: metadata.clone(), + log_fetch_buffer: fetcher.log_fetch_buffer.clone(), + log_scanner_status: fetcher.log_scanner_status.clone(), + read_context: fetcher.read_context.clone(), + remote_read_context: fetcher.remote_read_context.clone(), + remote_log_downloader: fetcher.remote_log_downloader.clone(), + credentials_cache: fetcher.credentials_cache.clone(), + }; + + LogFetcher::handle_fetch_response(response, response_context).await; assert!(metadata.leader_for(&bucket).is_none()); Ok(()) From 2eddf3047f2f78225f0446cb9ea57804b6e6f9a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Fri, 23 Jan 2026 10:59:03 +0800 Subject: [PATCH 3/5] address comments --- crates/fluss/src/client/table/scanner.rs | 2 + crates/fluss/src/cluster/cluster.rs | 67 +++++++++++++++++++----- crates/fluss/src/metadata/table.rs | 2 +- 3 files changed, 57 insertions(+), 14 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index b3cc64b1..3b7f1a29 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -588,6 +588,7 @@ impl LogFetcher { | FlussError::LeaderNotAvailableException | FlussError::FencedLeaderEpochException | FlussError::UnknownTableOrBucketException + | FlussError::InvalidCoordinatorException ) } @@ -772,6 +773,7 @@ impl LogFetcher { let error = FlussError::for_code(error_code); if Self::should_invalidate_bucket_leader(error) { + // TODO: Consider triggering leader invalidation from sender/lookup paths. metadata.invalidate_bucket_leader(&table_bucket); } let error_context = Self::describe_fetch_error( diff --git a/crates/fluss/src/cluster/cluster.rs b/crates/fluss/src/cluster/cluster.rs index 4f6d661e..13ef1fcd 100644 --- a/crates/fluss/src/cluster/cluster.rs +++ b/crates/fluss/src/cluster/cluster.rs @@ -18,9 +18,12 @@ use crate::BucketId; use crate::cluster::{BucketLocation, ServerNode, ServerType}; use crate::error::Result; -use crate::metadata::{JsonSerde, TableBucket, TableDescriptor, TableInfo, TablePath}; +use crate::metadata::{ + JsonSerde, PhysicalTablePath, TableBucket, TableDescriptor, TableInfo, TablePath, +}; use crate::proto::MetadataResponse; use crate::rpc::{from_pb_server_node, from_pb_table_path}; +use log::warn; use rand::random_range; use std::collections::{HashMap, HashSet}; @@ -77,23 +80,33 @@ impl Cluster { .filter_map(|id| self.table_path_by_id.get(id)) .collect(); - let available_locations_by_path = self - .available_locations_by_path - .iter() - .filter(|&(path, _)| !table_paths.contains(path)) - .map(|(path, locations)| (path.clone(), locations.clone())) - .collect(); + let (available_locations_by_path, available_locations_by_bucket) = + self.filter_bucket_locations_by_path(&table_paths); - let available_locations_by_bucket = self - .available_locations_by_bucket + Cluster::new( + self.coordinator_server.clone(), + alive_tablet_servers_by_id, + available_locations_by_path, + available_locations_by_bucket, + self.table_id_by_path.clone(), + self.table_info_by_path.clone(), + ) + } + + pub fn invalidate_physical_table_bucket_meta( + &self, + physical_tables_to_invalid: &HashSet, + ) -> Self { + let table_paths: HashSet<&TablePath> = physical_tables_to_invalid .iter() - .filter(|&(_bucket, location)| !table_paths.contains(&location.table_path)) - .map(|(bucket, location)| (bucket.clone(), location.clone())) + .map(|path| path.get_table_path()) .collect(); + let (available_locations_by_path, available_locations_by_bucket) = + self.filter_bucket_locations_by_path(&table_paths); Cluster::new( self.coordinator_server.clone(), - alive_tablet_servers_by_id, + self.alive_tablet_servers_by_id.clone(), available_locations_by_path, available_locations_by_bucket, self.table_id_by_path.clone(), @@ -111,6 +124,10 @@ impl Cluster { locations.retain(|location| location.table_bucket() != table_bucket); } } else { + warn!( + "Table id {} is missing from table_path_by_id while invalidating bucket leader", + table_bucket.table_id() + ); for locations in available_locations_by_path.values_mut() { locations.retain(|location| location.table_bucket() != table_bucket); } @@ -144,7 +161,31 @@ impl Cluster { self.available_locations_by_bucket = available_locations_by_bucket; self.table_id_by_path = table_id_by_path; self.table_path_by_id = table_path_by_id; - self.table_info_by_path = table_info_by_path; + self.table_info_by_path = table_info_by_path; + } + + fn filter_bucket_locations_by_path( + &self, + table_paths: &HashSet<&TablePath>, + ) -> ( + HashMap>, + HashMap, + ) { + let available_locations_by_path = self + .available_locations_by_path + .iter() + .filter(|&(path, _)| !table_paths.contains(path)) + .map(|(path, locations)| (path.clone(), locations.clone())) + .collect(); + + let available_locations_by_bucket = self + .available_locations_by_bucket + .iter() + .filter(|&(_bucket, location)| !table_paths.contains(&location.table_path)) + .map(|(bucket, location)| (bucket.clone(), location.clone())) + .collect(); + + (available_locations_by_path, available_locations_by_bucket) } pub fn from_metadata_response( diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index da85b0c2..ec2516e0 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -664,7 +664,7 @@ impl TablePath { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct PhysicalTablePath { table_path: TablePath, #[allow(dead_code)] From 7a19a60a3723c769c6cd74e4913aaa551e4e03ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Fri, 23 Jan 2026 11:07:35 +0800 Subject: [PATCH 4/5] fix fmt issue --- crates/fluss/src/cluster/cluster.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluss/src/cluster/cluster.rs b/crates/fluss/src/cluster/cluster.rs index 13ef1fcd..45dfe94e 100644 --- a/crates/fluss/src/cluster/cluster.rs +++ b/crates/fluss/src/cluster/cluster.rs @@ -161,7 +161,7 @@ impl Cluster { self.available_locations_by_bucket = available_locations_by_bucket; self.table_id_by_path = table_id_by_path; self.table_path_by_id = table_path_by_id; - self.table_info_by_path = table_info_by_path; + self.table_info_by_path = table_info_by_path; } fn filter_bucket_locations_by_path( From a66df0f955000c390459d326e05100d6506099fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Fri, 23 Jan 2026 14:14:20 +0800 Subject: [PATCH 5/5] address comments --- crates/fluss/src/client/metadata.rs | 10 +++++-- crates/fluss/src/client/table/scanner.rs | 22 +++++++++++---- crates/fluss/src/cluster/cluster.rs | 36 ++++-------------------- 3 files changed, 28 insertions(+), 40 deletions(-) diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 05e1f01b..3c6730b5 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -17,7 +17,7 @@ use crate::cluster::{Cluster, ServerNode, ServerType}; use crate::error::Result; -use crate::metadata::{TableBucket, TablePath}; +use crate::metadata::{PhysicalTablePath, TableBucket, TablePath}; use crate::proto::MetadataResponse; use crate::rpc::message::UpdateMetadataRequest; use crate::rpc::{RpcClient, ServerConnection}; @@ -71,9 +71,13 @@ impl Metadata { *cluster_guard = Arc::new(updated_cluster); } - pub fn invalidate_bucket_leader(&self, table_bucket: &TableBucket) { + pub fn invalidate_physical_table_meta( + &self, + physical_tables_to_invalid: &HashSet, + ) { let mut cluster_guard = self.cluster.write(); - let updated_cluster = cluster_guard.invalidate_bucket_leader(table_bucket); + let updated_cluster = + cluster_guard.invalidate_physical_table_meta(physical_tables_to_invalid); *cluster_guard = Arc::new(updated_cluster); } diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 3b7f1a29..626ae486 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -36,7 +36,7 @@ use crate::client::table::remote_log::{ RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch, }; use crate::error::{ApiError, Error, FlussError, Result}; -use crate::metadata::{TableBucket, TableInfo, TablePath}; +use crate::metadata::{PhysicalTablePath, TableBucket, TableInfo, TablePath}; use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, to_arrow_schema}; use crate::rpc::{RpcClient, RpcError, message}; @@ -581,7 +581,7 @@ impl LogFetcher { } } - fn should_invalidate_bucket_leader(error: FlussError) -> bool { + fn should_invalidate_table_meta(error: FlussError) -> bool { matches!( error, FlussError::NotLeaderOrFollower @@ -772,9 +772,19 @@ impl LogFetcher { .into(); let error = FlussError::for_code(error_code); - if Self::should_invalidate_bucket_leader(error) { - // TODO: Consider triggering leader invalidation from sender/lookup paths. - metadata.invalidate_bucket_leader(&table_bucket); + if Self::should_invalidate_table_meta(error) { + // TODO: Consider triggering table meta invalidation from sender/lookup paths. + let table_id = table_bucket.table_id(); + let cluster = metadata.get_cluster(); + if let Some(table_path) = cluster.get_table_path_by_id(table_id) { + let physical_tables = + HashSet::from([PhysicalTablePath::of(table_path.clone())]); + metadata.invalidate_physical_table_meta(&physical_tables); + } else { + warn!( + "Table id {table_id} is missing from table_path_by_id while invalidating table metadata" + ); + } } let error_context = Self::describe_fetch_error( error, @@ -1627,7 +1637,7 @@ mod tests { } #[tokio::test] - async fn handle_fetch_response_invalidates_bucket_leader() -> Result<()> { + async fn handle_fetch_response_invalidates_table_meta() -> Result<()> { let table_path = TablePath::new("db".to_string(), "tbl".to_string()); let table_info = build_table_info(table_path.clone(), 1, 1); let cluster = build_cluster_arc(&table_path, 1, 1); diff --git a/crates/fluss/src/cluster/cluster.rs b/crates/fluss/src/cluster/cluster.rs index 45dfe94e..2484026a 100644 --- a/crates/fluss/src/cluster/cluster.rs +++ b/crates/fluss/src/cluster/cluster.rs @@ -23,7 +23,6 @@ use crate::metadata::{ }; use crate::proto::MetadataResponse; use crate::rpc::{from_pb_server_node, from_pb_table_path}; -use log::warn; use rand::random_range; use std::collections::{HashMap, HashSet}; @@ -93,7 +92,7 @@ impl Cluster { ) } - pub fn invalidate_physical_table_bucket_meta( + pub fn invalidate_physical_table_meta( &self, physical_tables_to_invalid: &HashSet, ) -> Self { @@ -114,35 +113,6 @@ impl Cluster { ) } - pub fn invalidate_bucket_leader(&self, table_bucket: &TableBucket) -> Self { - let mut available_locations_by_bucket = self.available_locations_by_bucket.clone(); - available_locations_by_bucket.remove(table_bucket); - - let mut available_locations_by_path = self.available_locations_by_path.clone(); - if let Some(table_path) = self.table_path_by_id.get(&table_bucket.table_id()) { - if let Some(locations) = available_locations_by_path.get_mut(table_path) { - locations.retain(|location| location.table_bucket() != table_bucket); - } - } else { - warn!( - "Table id {} is missing from table_path_by_id while invalidating bucket leader", - table_bucket.table_id() - ); - for locations in available_locations_by_path.values_mut() { - locations.retain(|location| location.table_bucket() != table_bucket); - } - } - - Cluster::new( - self.coordinator_server.clone(), - self.alive_tablet_servers_by_id.clone(), - available_locations_by_path, - available_locations_by_bucket, - self.table_id_by_path.clone(), - self.table_info_by_path.clone(), - ) - } - pub fn update(&mut self, cluster: Cluster) { let Cluster { coordinator_server, @@ -308,6 +278,10 @@ impl Cluster { &self.table_id_by_path } + pub fn get_table_path_by_id(&self, table_id: i64) -> Option<&TablePath> { + self.table_path_by_id.get(&table_id) + } + pub fn get_available_buckets_for_table_path( &self, table_path: &TablePath,