Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion crates/fluss/src/client/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::cluster::{Cluster, ServerNode, ServerType};
use crate::error::Result;
use crate::metadata::{TableBucket, TablePath};
use crate::metadata::{PhysicalTablePath, TableBucket, TablePath};
use crate::proto::MetadataResponse;
use crate::rpc::message::UpdateMetadataRequest;
use crate::rpc::{RpcClient, ServerConnection};
Expand Down Expand Up @@ -71,6 +71,16 @@ impl Metadata {
*cluster_guard = Arc::new(updated_cluster);
}

pub fn invalidate_physical_table_meta(
&self,
physical_tables_to_invalid: &HashSet<PhysicalTablePath>,
) {
let mut cluster_guard = self.cluster.write();
let updated_cluster =
cluster_guard.invalidate_physical_table_meta(physical_tables_to_invalid);
*cluster_guard = Arc::new(updated_cluster);
}

pub async fn update(&self, metadata_response: MetadataResponse) -> Result<()> {
let origin_cluster = self.cluster.read().clone();
let new_cluster =
Expand Down
149 changes: 121 additions & 28 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::client::table::remote_log::{
RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
};
use crate::error::{ApiError, Error, FlussError, Result};
use crate::metadata::{TableBucket, TableInfo, TablePath};
use crate::metadata::{PhysicalTablePath, TableBucket, TableInfo, TablePath};
use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable};
use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, to_arrow_schema};
use crate::rpc::{RpcClient, RpcError, message};
Expand Down Expand Up @@ -462,6 +462,16 @@ struct LogFetcher {
nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
}

struct FetchResponseContext {
metadata: Arc<Metadata>,
log_fetch_buffer: Arc<LogFetchBuffer>,
log_scanner_status: Arc<LogScannerStatus>,
read_context: ReadContext,
remote_read_context: ReadContext,
remote_log_downloader: Arc<RemoteLogDownloader>,
credentials_cache: Arc<CredentialsCache>,
}

impl LogFetcher {
pub fn new(
table_info: TableInfo,
Expand Down Expand Up @@ -518,7 +528,8 @@ impl LogFetcher {
| FlussError::LogStorageException
| FlussError::KvStorageException
| FlussError::StorageException
| FlussError::FencedLeaderEpochException => FetchErrorContext {
| FlussError::FencedLeaderEpochException
| FlussError::LeaderNotAvailableException => FetchErrorContext {
action: FetchErrorAction::Ignore,
log_level: FetchErrorLogLevel::Debug,
log_message: format!(
Expand Down Expand Up @@ -570,6 +581,17 @@ impl LogFetcher {
}
}

fn should_invalidate_table_meta(error: FlussError) -> bool {
matches!(
error,
FlussError::NotLeaderOrFollower
| FlussError::LeaderNotAvailableException
| FlussError::FencedLeaderEpochException
| FlussError::UnknownTableOrBucketException
| FlussError::InvalidCoordinatorException
)
}

async fn check_and_update_metadata(&self) -> Result<()> {
let need_update = self
.fetchable_buckets()
Expand Down Expand Up @@ -639,6 +661,15 @@ impl LogFetcher {
let creds_cache = self.credentials_cache.clone();
let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone();
let metadata = self.metadata.clone();
let response_context = FetchResponseContext {
metadata: metadata.clone(),
log_fetch_buffer,
log_scanner_status,
read_context,
remote_read_context,
remote_log_downloader,
credentials_cache: creds_cache,
};
// Spawn async task to handle the fetch request
// Note: These tasks are not explicitly tracked or cancelled when LogFetcher is dropped.
// This is acceptable because:
Expand Down Expand Up @@ -684,16 +715,7 @@ impl LogFetcher {
}
};

Self::handle_fetch_response(
fetch_response,
&log_fetch_buffer,
&log_scanner_status,
&read_context,
&remote_read_context,
&remote_log_downloader,
&creds_cache,
)
.await;
Self::handle_fetch_response(fetch_response, response_context).await;
});
}

Expand All @@ -712,13 +734,18 @@ impl LogFetcher {
/// Handle fetch response and add completed fetches to buffer
async fn handle_fetch_response(
fetch_response: crate::proto::FetchLogResponse,
log_fetch_buffer: &Arc<LogFetchBuffer>,
log_scanner_status: &Arc<LogScannerStatus>,
read_context: &ReadContext,
remote_read_context: &ReadContext,
remote_log_downloader: &Arc<RemoteLogDownloader>,
credentials_cache: &Arc<CredentialsCache>,
context: FetchResponseContext,
) {
let FetchResponseContext {
metadata,
log_fetch_buffer,
log_scanner_status,
read_context,
remote_read_context,
remote_log_downloader,
credentials_cache,
} = context;

for pb_fetch_log_resp in fetch_response.tables_resp {
let table_id = pb_fetch_log_resp.table_id;
let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
Expand All @@ -745,6 +772,20 @@ impl LogFetcher {
.into();

let error = FlussError::for_code(error_code);
if Self::should_invalidate_table_meta(error) {
// TODO: Consider triggering table meta invalidation from sender/lookup paths.
let table_id = table_bucket.table_id();
let cluster = metadata.get_cluster();
if let Some(table_path) = cluster.get_table_path_by_id(table_id) {
let physical_tables =
HashSet::from([PhysicalTablePath::of(table_path.clone())]);
metadata.invalidate_physical_table_meta(&physical_tables);
} else {
warn!(
"Table id {table_id} is missing from table_path_by_id while invalidating table metadata"
);
}
}
let error_context = Self::describe_fetch_error(
error,
&table_bucket,
Expand Down Expand Up @@ -1577,20 +1618,72 @@ mod tests {
}],
};

LogFetcher::handle_fetch_response(
response,
&fetcher.log_fetch_buffer,
&fetcher.log_scanner_status,
&fetcher.read_context,
&fetcher.remote_read_context,
&fetcher.remote_log_downloader,
&fetcher.credentials_cache,
)
.await;
let response_context = FetchResponseContext {
metadata: metadata.clone(),
log_fetch_buffer: fetcher.log_fetch_buffer.clone(),
log_scanner_status: fetcher.log_scanner_status.clone(),
read_context: fetcher.read_context.clone(),
remote_read_context: fetcher.remote_read_context.clone(),
remote_log_downloader: fetcher.remote_log_downloader.clone(),
credentials_cache: fetcher.credentials_cache.clone(),
};

LogFetcher::handle_fetch_response(response, response_context).await;

let completed = fetcher.log_fetch_buffer.poll().expect("completed fetch");
let api_error = completed.api_error().expect("api error");
assert_eq!(api_error.code, FlussError::AuthorizationException.code());
Ok(())
}

#[tokio::test]
async fn handle_fetch_response_invalidates_table_meta() -> Result<()> {
let table_path = TablePath::new("db".to_string(), "tbl".to_string());
let table_info = build_table_info(table_path.clone(), 1, 1);
let cluster = build_cluster_arc(&table_path, 1, 1);
let metadata = Arc::new(Metadata::new_for_test(cluster.clone()));
let status = Arc::new(LogScannerStatus::new());
status.assign_scan_bucket(TableBucket::new(1, 0), 5);
let fetcher = LogFetcher::new(
table_info.clone(),
Arc::new(RpcClient::new()),
metadata.clone(),
status.clone(),
None,
)?;

let bucket = TableBucket::new(1, 0);
assert!(metadata.leader_for(&bucket).is_some());

let response = crate::proto::FetchLogResponse {
tables_resp: vec![crate::proto::PbFetchLogRespForTable {
table_id: 1,
buckets_resp: vec![crate::proto::PbFetchLogRespForBucket {
partition_id: None,
bucket_id: 0,
error_code: Some(FlussError::NotLeaderOrFollower.code()),
error_message: Some("not leader".to_string()),
high_watermark: None,
log_start_offset: None,
remote_log_fetch_info: None,
records: None,
}],
}],
};

let response_context = FetchResponseContext {
metadata: metadata.clone(),
log_fetch_buffer: fetcher.log_fetch_buffer.clone(),
log_scanner_status: fetcher.log_scanner_status.clone(),
read_context: fetcher.read_context.clone(),
remote_read_context: fetcher.remote_read_context.clone(),
remote_log_downloader: fetcher.remote_log_downloader.clone(),
credentials_cache: fetcher.credentials_cache.clone(),
};

LogFetcher::handle_fetch_response(response, response_context).await;

assert!(metadata.leader_for(&bucket).is_none());
Ok(())
}
}
64 changes: 52 additions & 12 deletions crates/fluss/src/cluster/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
use crate::BucketId;
use crate::cluster::{BucketLocation, ServerNode, ServerType};
use crate::error::Result;
use crate::metadata::{JsonSerde, TableBucket, TableDescriptor, TableInfo, TablePath};
use crate::metadata::{
JsonSerde, PhysicalTablePath, TableBucket, TableDescriptor, TableInfo, TablePath,
};
use crate::proto::MetadataResponse;
use crate::rpc::{from_pb_server_node, from_pb_table_path};
use rand::random_range;
Expand Down Expand Up @@ -77,23 +79,33 @@ impl Cluster {
.filter_map(|id| self.table_path_by_id.get(id))
.collect();

let available_locations_by_path = self
.available_locations_by_path
.iter()
.filter(|&(path, _)| !table_paths.contains(path))
.map(|(path, locations)| (path.clone(), locations.clone()))
.collect();
let (available_locations_by_path, available_locations_by_bucket) =
self.filter_bucket_locations_by_path(&table_paths);

let available_locations_by_bucket = self
.available_locations_by_bucket
Cluster::new(
self.coordinator_server.clone(),
alive_tablet_servers_by_id,
available_locations_by_path,
available_locations_by_bucket,
self.table_id_by_path.clone(),
self.table_info_by_path.clone(),
)
}

pub fn invalidate_physical_table_meta(
&self,
physical_tables_to_invalid: &HashSet<PhysicalTablePath>,
) -> Self {
let table_paths: HashSet<&TablePath> = physical_tables_to_invalid
.iter()
.filter(|&(_bucket, location)| !table_paths.contains(&location.table_path))
.map(|(bucket, location)| (bucket.clone(), location.clone()))
.map(|path| path.get_table_path())
.collect();
let (available_locations_by_path, available_locations_by_bucket) =
self.filter_bucket_locations_by_path(&table_paths);

Cluster::new(
self.coordinator_server.clone(),
alive_tablet_servers_by_id,
self.alive_tablet_servers_by_id.clone(),
available_locations_by_path,
available_locations_by_bucket,
self.table_id_by_path.clone(),
Expand Down Expand Up @@ -122,6 +134,30 @@ impl Cluster {
self.table_info_by_path = table_info_by_path;
}

fn filter_bucket_locations_by_path(
&self,
table_paths: &HashSet<&TablePath>,
) -> (
HashMap<TablePath, Vec<BucketLocation>>,
HashMap<TableBucket, BucketLocation>,
) {
let available_locations_by_path = self
.available_locations_by_path
.iter()
.filter(|&(path, _)| !table_paths.contains(path))
.map(|(path, locations)| (path.clone(), locations.clone()))
.collect();

let available_locations_by_bucket = self
.available_locations_by_bucket
.iter()
.filter(|&(_bucket, location)| !table_paths.contains(&location.table_path))
.map(|(bucket, location)| (bucket.clone(), location.clone()))
.collect();

(available_locations_by_path, available_locations_by_bucket)
}

pub fn from_metadata_response(
metadata_response: MetadataResponse,
origin_cluster: Option<&Cluster>,
Expand Down Expand Up @@ -242,6 +278,10 @@ impl Cluster {
&self.table_id_by_path
}

pub fn get_table_path_by_id(&self, table_id: i64) -> Option<&TablePath> {
self.table_path_by_id.get(&table_id)
}

pub fn get_available_buckets_for_table_path(
&self,
table_path: &TablePath,
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/metadata/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ impl TablePath {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PhysicalTablePath {
table_path: TablePath,
#[allow(dead_code)]
Expand Down
Loading