diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 0e6f9651..3c6730b5 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -17,7 +17,7 @@ use crate::cluster::{Cluster, ServerNode, ServerType}; use crate::error::Result; -use crate::metadata::{TableBucket, TablePath}; +use crate::metadata::{PhysicalTablePath, TableBucket, TablePath}; use crate::proto::MetadataResponse; use crate::rpc::message::UpdateMetadataRequest; use crate::rpc::{RpcClient, ServerConnection}; @@ -71,6 +71,16 @@ impl Metadata { *cluster_guard = Arc::new(updated_cluster); } + pub fn invalidate_physical_table_meta( + &self, + physical_tables_to_invalid: &HashSet, + ) { + let mut cluster_guard = self.cluster.write(); + let updated_cluster = + cluster_guard.invalidate_physical_table_meta(physical_tables_to_invalid); + *cluster_guard = Arc::new(updated_cluster); + } + pub async fn update(&self, metadata_response: MetadataResponse) -> Result<()> { let origin_cluster = self.cluster.read().clone(); let new_cluster = diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index e9b2ce10..626ae486 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -36,7 +36,7 @@ use crate::client::table::remote_log::{ RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch, }; use crate::error::{ApiError, Error, FlussError, Result}; -use crate::metadata::{TableBucket, TableInfo, TablePath}; +use crate::metadata::{PhysicalTablePath, TableBucket, TableInfo, TablePath}; use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, to_arrow_schema}; use crate::rpc::{RpcClient, RpcError, message}; @@ -462,6 +462,16 @@ struct LogFetcher { nodes_with_pending_fetch_requests: Arc>>, } +struct FetchResponseContext { + metadata: Arc, + log_fetch_buffer: Arc, + log_scanner_status: Arc, + read_context: ReadContext, + remote_read_context: ReadContext, + remote_log_downloader: Arc, + credentials_cache: Arc, +} + impl LogFetcher { pub fn new( table_info: TableInfo, @@ -518,7 +528,8 @@ impl LogFetcher { | FlussError::LogStorageException | FlussError::KvStorageException | FlussError::StorageException - | FlussError::FencedLeaderEpochException => FetchErrorContext { + | FlussError::FencedLeaderEpochException + | FlussError::LeaderNotAvailableException => FetchErrorContext { action: FetchErrorAction::Ignore, log_level: FetchErrorLogLevel::Debug, log_message: format!( @@ -570,6 +581,17 @@ impl LogFetcher { } } + fn should_invalidate_table_meta(error: FlussError) -> bool { + matches!( + error, + FlussError::NotLeaderOrFollower + | FlussError::LeaderNotAvailableException + | FlussError::FencedLeaderEpochException + | FlussError::UnknownTableOrBucketException + | FlussError::InvalidCoordinatorException + ) + } + async fn check_and_update_metadata(&self) -> Result<()> { let need_update = self .fetchable_buckets() @@ -639,6 +661,15 @@ impl LogFetcher { let creds_cache = self.credentials_cache.clone(); let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone(); let metadata = self.metadata.clone(); + let response_context = FetchResponseContext { + metadata: metadata.clone(), + log_fetch_buffer, + log_scanner_status, + read_context, + remote_read_context, + remote_log_downloader, + credentials_cache: creds_cache, + }; // Spawn async task to handle the fetch request // Note: These tasks are not explicitly tracked or cancelled when LogFetcher is dropped. // This is acceptable because: @@ -684,16 +715,7 @@ impl LogFetcher { } }; - Self::handle_fetch_response( - fetch_response, - &log_fetch_buffer, - &log_scanner_status, - &read_context, - &remote_read_context, - &remote_log_downloader, - &creds_cache, - ) - .await; + Self::handle_fetch_response(fetch_response, response_context).await; }); } @@ -712,13 +734,18 @@ impl LogFetcher { /// Handle fetch response and add completed fetches to buffer async fn handle_fetch_response( fetch_response: crate::proto::FetchLogResponse, - log_fetch_buffer: &Arc, - log_scanner_status: &Arc, - read_context: &ReadContext, - remote_read_context: &ReadContext, - remote_log_downloader: &Arc, - credentials_cache: &Arc, + context: FetchResponseContext, ) { + let FetchResponseContext { + metadata, + log_fetch_buffer, + log_scanner_status, + read_context, + remote_read_context, + remote_log_downloader, + credentials_cache, + } = context; + for pb_fetch_log_resp in fetch_response.tables_resp { let table_id = pb_fetch_log_resp.table_id; let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; @@ -745,6 +772,20 @@ impl LogFetcher { .into(); let error = FlussError::for_code(error_code); + if Self::should_invalidate_table_meta(error) { + // TODO: Consider triggering table meta invalidation from sender/lookup paths. + let table_id = table_bucket.table_id(); + let cluster = metadata.get_cluster(); + if let Some(table_path) = cluster.get_table_path_by_id(table_id) { + let physical_tables = + HashSet::from([PhysicalTablePath::of(table_path.clone())]); + metadata.invalidate_physical_table_meta(&physical_tables); + } else { + warn!( + "Table id {table_id} is missing from table_path_by_id while invalidating table metadata" + ); + } + } let error_context = Self::describe_fetch_error( error, &table_bucket, @@ -1577,20 +1618,72 @@ mod tests { }], }; - LogFetcher::handle_fetch_response( - response, - &fetcher.log_fetch_buffer, - &fetcher.log_scanner_status, - &fetcher.read_context, - &fetcher.remote_read_context, - &fetcher.remote_log_downloader, - &fetcher.credentials_cache, - ) - .await; + let response_context = FetchResponseContext { + metadata: metadata.clone(), + log_fetch_buffer: fetcher.log_fetch_buffer.clone(), + log_scanner_status: fetcher.log_scanner_status.clone(), + read_context: fetcher.read_context.clone(), + remote_read_context: fetcher.remote_read_context.clone(), + remote_log_downloader: fetcher.remote_log_downloader.clone(), + credentials_cache: fetcher.credentials_cache.clone(), + }; + + LogFetcher::handle_fetch_response(response, response_context).await; let completed = fetcher.log_fetch_buffer.poll().expect("completed fetch"); let api_error = completed.api_error().expect("api error"); assert_eq!(api_error.code, FlussError::AuthorizationException.code()); Ok(()) } + + #[tokio::test] + async fn handle_fetch_response_invalidates_table_meta() -> Result<()> { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = build_table_info(table_path.clone(), 1, 1); + let cluster = build_cluster_arc(&table_path, 1, 1); + let metadata = Arc::new(Metadata::new_for_test(cluster.clone())); + let status = Arc::new(LogScannerStatus::new()); + status.assign_scan_bucket(TableBucket::new(1, 0), 5); + let fetcher = LogFetcher::new( + table_info.clone(), + Arc::new(RpcClient::new()), + metadata.clone(), + status.clone(), + None, + )?; + + let bucket = TableBucket::new(1, 0); + assert!(metadata.leader_for(&bucket).is_some()); + + let response = crate::proto::FetchLogResponse { + tables_resp: vec![crate::proto::PbFetchLogRespForTable { + table_id: 1, + buckets_resp: vec![crate::proto::PbFetchLogRespForBucket { + partition_id: None, + bucket_id: 0, + error_code: Some(FlussError::NotLeaderOrFollower.code()), + error_message: Some("not leader".to_string()), + high_watermark: None, + log_start_offset: None, + remote_log_fetch_info: None, + records: None, + }], + }], + }; + + let response_context = FetchResponseContext { + metadata: metadata.clone(), + log_fetch_buffer: fetcher.log_fetch_buffer.clone(), + log_scanner_status: fetcher.log_scanner_status.clone(), + read_context: fetcher.read_context.clone(), + remote_read_context: fetcher.remote_read_context.clone(), + remote_log_downloader: fetcher.remote_log_downloader.clone(), + credentials_cache: fetcher.credentials_cache.clone(), + }; + + LogFetcher::handle_fetch_response(response, response_context).await; + + assert!(metadata.leader_for(&bucket).is_none()); + Ok(()) + } } diff --git a/crates/fluss/src/cluster/cluster.rs b/crates/fluss/src/cluster/cluster.rs index f14d055f..2484026a 100644 --- a/crates/fluss/src/cluster/cluster.rs +++ b/crates/fluss/src/cluster/cluster.rs @@ -18,7 +18,9 @@ use crate::BucketId; use crate::cluster::{BucketLocation, ServerNode, ServerType}; use crate::error::Result; -use crate::metadata::{JsonSerde, TableBucket, TableDescriptor, TableInfo, TablePath}; +use crate::metadata::{ + JsonSerde, PhysicalTablePath, TableBucket, TableDescriptor, TableInfo, TablePath, +}; use crate::proto::MetadataResponse; use crate::rpc::{from_pb_server_node, from_pb_table_path}; use rand::random_range; @@ -77,23 +79,33 @@ impl Cluster { .filter_map(|id| self.table_path_by_id.get(id)) .collect(); - let available_locations_by_path = self - .available_locations_by_path - .iter() - .filter(|&(path, _)| !table_paths.contains(path)) - .map(|(path, locations)| (path.clone(), locations.clone())) - .collect(); + let (available_locations_by_path, available_locations_by_bucket) = + self.filter_bucket_locations_by_path(&table_paths); - let available_locations_by_bucket = self - .available_locations_by_bucket + Cluster::new( + self.coordinator_server.clone(), + alive_tablet_servers_by_id, + available_locations_by_path, + available_locations_by_bucket, + self.table_id_by_path.clone(), + self.table_info_by_path.clone(), + ) + } + + pub fn invalidate_physical_table_meta( + &self, + physical_tables_to_invalid: &HashSet, + ) -> Self { + let table_paths: HashSet<&TablePath> = physical_tables_to_invalid .iter() - .filter(|&(_bucket, location)| !table_paths.contains(&location.table_path)) - .map(|(bucket, location)| (bucket.clone(), location.clone())) + .map(|path| path.get_table_path()) .collect(); + let (available_locations_by_path, available_locations_by_bucket) = + self.filter_bucket_locations_by_path(&table_paths); Cluster::new( self.coordinator_server.clone(), - alive_tablet_servers_by_id, + self.alive_tablet_servers_by_id.clone(), available_locations_by_path, available_locations_by_bucket, self.table_id_by_path.clone(), @@ -122,6 +134,30 @@ impl Cluster { self.table_info_by_path = table_info_by_path; } + fn filter_bucket_locations_by_path( + &self, + table_paths: &HashSet<&TablePath>, + ) -> ( + HashMap>, + HashMap, + ) { + let available_locations_by_path = self + .available_locations_by_path + .iter() + .filter(|&(path, _)| !table_paths.contains(path)) + .map(|(path, locations)| (path.clone(), locations.clone())) + .collect(); + + let available_locations_by_bucket = self + .available_locations_by_bucket + .iter() + .filter(|&(_bucket, location)| !table_paths.contains(&location.table_path)) + .map(|(bucket, location)| (bucket.clone(), location.clone())) + .collect(); + + (available_locations_by_path, available_locations_by_bucket) + } + pub fn from_metadata_response( metadata_response: MetadataResponse, origin_cluster: Option<&Cluster>, @@ -242,6 +278,10 @@ impl Cluster { &self.table_id_by_path } + pub fn get_table_path_by_id(&self, table_id: i64) -> Option<&TablePath> { + self.table_path_by_id.get(&table_id) + } + pub fn get_available_buckets_for_table_path( &self, table_path: &TablePath, diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index da85b0c2..ec2516e0 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -664,7 +664,7 @@ impl TablePath { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct PhysicalTablePath { table_path: TablePath, #[allow(dead_code)]