From 599bd5cc7b3615310905a5c30ad74b5d356b4f9e Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Wed, 21 Jan 2026 00:11:02 +0000 Subject: [PATCH 1/6] [TASK-140] Priority Queue for downloading remote segments --- crates/fluss/src/client/connection.rs | 4 + .../src/client/table/log_fetch_buffer.rs | 174 +++ crates/fluss/src/client/table/mod.rs | 3 + crates/fluss/src/client/table/remote_log.rs | 1102 +++++++++++++++-- crates/fluss/src/client/table/scanner.rs | 25 +- crates/fluss/src/config.rs | 12 + crates/fluss/src/proto/fluss_api.proto | 1 + crates/fluss/src/record/arrow.rs | 442 ++++++- crates/fluss/src/util/mod.rs | 9 +- 9 files changed, 1628 insertions(+), 144 deletions(-) diff --git a/crates/fluss/src/client/connection.rs b/crates/fluss/src/client/connection.rs index 595daf55..0e41bbe7 100644 --- a/crates/fluss/src/client/connection.rs +++ b/crates/fluss/src/client/connection.rs @@ -59,6 +59,10 @@ impl FlussConnection { self.network_connects.clone() } + pub fn config(&self) -> &Config { + &self.args + } + pub async fn get_admin(&self) -> Result { FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()).await } diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index ca0a2532..c56f0e59 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -325,6 +325,7 @@ impl PendingFetch for CompletedPendingFetch { } /// Default implementation of CompletedFetch for in-memory log records +/// Used for local fetches from tablet server pub struct DefaultCompletedFetch { table_bucket: TableBucket, api_error: Option, @@ -644,6 +645,179 @@ impl CompletedFetch for DefaultCompletedFetch { } } +/// Completed fetch for remote log segments +/// Matches Java's RemoteCompletedFetch design - separate class for remote vs local +/// Holds RAII permit until consumed (data is in inner) +pub struct RemoteCompletedFetch { + inner: DefaultCompletedFetch, + permit: Option, +} + +impl RemoteCompletedFetch { + pub fn new( + inner: DefaultCompletedFetch, + permit: crate::client::table::remote_log::PrefetchPermit, + ) -> Self { + Self { + inner, + permit: Some(permit), + } + } +} + +impl CompletedFetch for RemoteCompletedFetch { + fn table_bucket(&self) -> &TableBucket { + self.inner.table_bucket() + } + + fn api_error(&self) -> Option<&ApiError> { + self.inner.api_error() + } + + fn fetch_error_context(&self) -> Option<&FetchErrorContext> { + self.inner.fetch_error_context() + } + + fn take_error(&mut self) -> Option { + self.inner.take_error() + } + + fn fetch_records(&mut self, max_records: usize) -> Result> { + self.inner.fetch_records(max_records) + } + + fn fetch_batches(&mut self, max_batches: usize) -> Result> { + self.inner.fetch_batches(max_batches) + } + + fn is_consumed(&self) -> bool { + self.inner.is_consumed() + } + + fn records_read(&self) -> usize { + self.inner.records_read() + } + + fn drain(&mut self) { + self.inner.drain(); + // Release permit immediately (don't wait for struct drop) + // Critical: allows prefetch to continue even if Box kept around + self.permit.take(); // drops permit here, triggers recycle notification + } + + fn size_in_bytes(&self) -> usize { + self.inner.size_in_bytes() + } + + fn high_watermark(&self) -> i64 { + self.inner.high_watermark() + } + + fn is_initialized(&self) -> bool { + self.inner.is_initialized() + } + + fn set_initialized(&mut self) { + self.inner.set_initialized() + } + + fn next_fetch_offset(&self) -> i64 { + self.inner.next_fetch_offset() + } +} +// Permit released explicitly in drain() or automatically when struct drops + +/// Pending fetch that waits for remote log file to be downloaded +pub struct RemotePendingFetch { + segment: crate::client::table::remote_log::RemoteLogSegment, + download_future: crate::client::table::remote_log::RemoteLogDownloadFuture, + pos_in_log_segment: i32, + fetch_offset: i64, + high_watermark: i64, + read_context: ReadContext, +} + +impl RemotePendingFetch { + pub fn new( + segment: crate::client::table::remote_log::RemoteLogSegment, + download_future: crate::client::table::remote_log::RemoteLogDownloadFuture, + pos_in_log_segment: i32, + fetch_offset: i64, + high_watermark: i64, + read_context: ReadContext, + ) -> Self { + Self { + segment, + download_future, + pos_in_log_segment, + fetch_offset, + high_watermark, + read_context, + } + } +} + +impl PendingFetch for RemotePendingFetch { + fn table_bucket(&self) -> &TableBucket { + &self.segment.table_bucket + } + + fn is_completed(&self) -> bool { + self.download_future.is_done() + } + + fn to_completed_fetch(self: Box) -> Result> { + // Take the RemoteLogFile and destructure + let remote_log_file = self.download_future.take_remote_log_file()?; + let crate::client::table::remote_log::RemoteLogFile { + file_path, + file_size: _, + permit, + } = remote_log_file; + + // Open file for streaming (no memory allocation for entire file) + let file = std::fs::File::open(&file_path)?; + let file_size = file.metadata()?.len() as usize; + + // Create file-backed LogRecordsBatches with cleanup (streaming!) + // Data will be read batch-by-batch on-demand, not all at once + // FileSource will delete the file when dropped (after file is closed) + let log_record_batch = LogRecordsBatches::from_file_with_cleanup( + file, + self.pos_in_log_segment as usize, + file_path.clone(), + )?; + + // Calculate size based on position offset + let size_in_bytes = if self.pos_in_log_segment > 0 { + let pos = self.pos_in_log_segment as usize; + if pos >= file_size { + return Err(Error::UnexpectedError { + message: format!("Position {} exceeds file size {}", pos, file_size), + source: None, + }); + } + file_size - pos + } else { + file_size + }; + + // Create DefaultCompletedFetch + let inner_fetch = DefaultCompletedFetch::new( + self.segment.table_bucket.clone(), + log_record_batch, + size_in_bytes, + self.read_context, + self.fetch_offset, + self.high_watermark, + ); + + // Wrap it with RemoteCompletedFetch to hold the permit + // Permit will delete the file when dropped + Ok(Box::new(RemoteCompletedFetch::new(inner_fetch, permit))) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 2bfa0541..2dc56d52 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -36,6 +36,9 @@ mod writer; use crate::client::table::upsert::TableUpsert; pub use append::{AppendWriter, TableAppend}; pub use lookup::{LookupResult, Lookuper, TableLookup}; +pub use remote_log::{ + DEFAULT_SCANNER_REMOTE_LOG_DOWNLOAD_THREADS, DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM, +}; pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan}; pub use writer::{TableWriter, UpsertWriter}; diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index 01425157..7be4b07f 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -14,21 +14,75 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::client::table::log_fetch_buffer::{CompletedFetch, DefaultCompletedFetch, PendingFetch}; use crate::error::{Error, Result}; use crate::io::{FileIO, Storage}; use crate::metadata::TableBucket; use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; -use crate::record::{LogRecordsBatches, ReadContext}; -use crate::util::delete_file; use parking_lot::{Mutex, RwLock}; -use std::collections::HashMap; +use std::cmp::Reverse; +use std::collections::{BinaryHeap, HashMap}; +use std::future::Future; use std::io; use std::path::{Path, PathBuf}; +use std::pin::Pin; use std::sync::Arc; use tempfile::TempDir; use tokio::io::AsyncWriteExt; -use tokio::sync::oneshot; +use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; +use tokio::task::JoinSet; + +/// Default maximum number of remote log segments to prefetch +/// Matches Java's CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM (default: 4) +pub const DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM: usize = 4; + +/// Default maximum concurrent remote log downloads +/// Matches Java's REMOTE_FILE_DOWNLOAD_THREAD_NUM (default: 3) +pub const DEFAULT_SCANNER_REMOTE_LOG_DOWNLOAD_THREADS: usize = 3; + +/// Initial retry backoff delay (milliseconds) +/// Prevents hot-spin retry loops on persistent failures +const RETRY_BACKOFF_BASE_MS: u64 = 100; + +/// Maximum retry backoff delay (milliseconds) +/// Caps exponential backoff to avoid excessive delays +const RETRY_BACKOFF_MAX_MS: u64 = 5_000; + +/// Maximum number of retries before giving up +/// After this many retries, the download will fail permanently +const MAX_RETRY_COUNT: u32 = 10; + +/// Calculate exponential backoff delay with jitter for retries +fn calculate_backoff_delay(retry_count: u32) -> tokio::time::Duration { + use rand::Rng; + + // Exponential backoff: base * 2^retry_count + let exponential_ms = RETRY_BACKOFF_BASE_MS.saturating_mul(1 << retry_count.min(10)); // Cap exponent to prevent overflow + + // Cap at maximum + let capped_ms = exponential_ms.min(RETRY_BACKOFF_MAX_MS); + + // Add jitter (±25% randomness) to avoid thundering herd + let mut rng = rand::rng(); + let jitter = rng.random_range(0.75..=1.25); + let final_ms = ((capped_ms as f64) * jitter) as u64; + + tokio::time::Duration::from_millis(final_ms) +} + +/// Result of a fetch operation containing file path and size +#[derive(Debug)] +pub struct FetchResult { + pub file_path: PathBuf, + pub file_size: usize, +} + +/// Trait for fetching remote log segments (allows dependency injection for testing) +pub trait RemoteLogFetcher: Send + Sync { + fn fetch( + &self, + request: &RemoteLogDownloadRequest, + ) -> Pin> + Send>>; +} /// Represents a remote log segment that needs to be downloaded #[derive(Debug, Clone)] @@ -40,6 +94,7 @@ pub struct RemoteLogSegment { #[allow(dead_code)] pub size_in_bytes: i32, pub table_bucket: TableBucket, + pub max_timestamp: i64, } impl RemoteLogSegment { @@ -50,6 +105,9 @@ impl RemoteLogSegment { end_offset: segment.remote_log_end_offset, size_in_bytes: segment.segment_size_in_bytes, table_bucket, + // Match Java's behavior: use -1 for missing timestamp + // (Java: CommonRpcMessageUtils.java:171-174) + max_timestamp: segment.max_timestamp.unwrap_or(-1), } } @@ -88,17 +146,473 @@ impl RemoteLogFetchInfo { } } +/// RAII guard for prefetch permit that notifies coordinator on drop +/// +/// NOTE: File deletion is now handled by FileSource::drop(), not here. +/// This ensures the file is closed before deletion +#[derive(Debug)] +pub struct PrefetchPermit { + permit: Option, + recycle_notify: Arc, +} + +impl PrefetchPermit { + fn new(permit: OwnedSemaphorePermit, recycle_notify: Arc) -> Self { + Self { + permit: Some(permit), + recycle_notify, + } + } +} + +impl Drop for PrefetchPermit { + fn drop(&mut self) { + // Release capacity (critical: permit must be dropped before notify) + let _ = self.permit.take(); // drops permit here + + // Then wake coordinator so it can acquire the now-available permit + self.recycle_notify.notify_one(); + } +} + +/// Downloaded remote log file with prefetch permit +/// File remains on disk for memory efficiency - permit cleanup deletes it +#[derive(Debug)] +pub struct RemoteLogFile { + /// Path to the downloaded file on local disk + pub file_path: PathBuf, + /// Size of the file in bytes + /// Currently unused but kept for potential future use (logging, metrics, etc.) + #[allow(dead_code)] + pub file_size: usize, + /// RAII permit that will delete the file when dropped + pub permit: PrefetchPermit, +} + +/// Represents a request to download a remote log segment with priority ordering +#[derive(Debug)] +pub struct RemoteLogDownloadRequest { + segment: RemoteLogSegment, + remote_log_tablet_dir: String, + result_sender: oneshot::Sender>, + retry_count: u32, + next_retry_at: Option, +} + +impl RemoteLogDownloadRequest { + /// Get the segment (used by test fetcher implementations) + #[cfg(test)] + pub fn segment(&self) -> &RemoteLogSegment { + &self.segment + } +} + +// Total ordering for priority queue (Rust requirement: cmp==Equal implies Eq) +// Primary: Java semantics (timestamp cross-bucket, offset within-bucket) +// Tie-breakers: table_bucket fields (table_id, partition_id, bucket_id), then segment_id +impl Ord for RemoteLogDownloadRequest { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + if self.segment.table_bucket == other.segment.table_bucket { + // Same bucket: order by start_offset (ascending - earlier segments first) + self.segment + .start_offset + .cmp(&other.segment.start_offset) + .then_with(|| self.segment.segment_id.cmp(&other.segment.segment_id)) + } else { + // Different buckets: order by max_timestamp (ascending - older segments first) + // Then by table_bucket fields for true total ordering + self.segment + .max_timestamp + .cmp(&other.segment.max_timestamp) + .then_with(|| { + self.segment + .table_bucket + .table_id() + .cmp(&other.segment.table_bucket.table_id()) + }) + .then_with(|| { + self.segment + .table_bucket + .partition_id() + .cmp(&other.segment.table_bucket.partition_id()) + }) + .then_with(|| { + self.segment + .table_bucket + .bucket_id() + .cmp(&other.segment.table_bucket.bucket_id()) + }) + .then_with(|| self.segment.segment_id.cmp(&other.segment.segment_id)) + } + } +} + +impl PartialOrd for RemoteLogDownloadRequest { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for RemoteLogDownloadRequest { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == std::cmp::Ordering::Equal + } +} + +impl Eq for RemoteLogDownloadRequest {} + +/// Result of a download task +enum DownloadResult { + /// Successful download - deliver result to future + Success { + result: RemoteLogFile, + result_sender: oneshot::Sender>, + }, + /// Download failed - re-queue request for retry (Java pattern) + FailedRetry { request: RemoteLogDownloadRequest }, + /// Download failed permanently after max retries - fail the future + FailedPermanently { + error: Error, + result_sender: oneshot::Sender>, + }, + /// Cancelled - don't deliver, don't re-queue + Cancelled, +} + +/// Production implementation of RemoteLogFetcher that downloads from actual storage +struct ProductionFetcher { + remote_fs_props: Arc>>, + local_log_dir: Arc, +} + +impl RemoteLogFetcher for ProductionFetcher { + fn fetch( + &self, + request: &RemoteLogDownloadRequest, + ) -> Pin> + Send>> { + let remote_fs_props = self.remote_fs_props.clone(); + let local_log_dir = self.local_log_dir.clone(); + + // Clone data needed for async operation to avoid lifetime issues + let segment = request.segment.clone(); + let remote_log_tablet_dir = request.remote_log_tablet_dir.to_string(); + + Box::pin(async move { + let local_file_name = segment.local_file_name(); + let local_file_path = local_log_dir.path().join(&local_file_name); + + // Build remote path + let offset_prefix = format!("{:020}", segment.start_offset); + let remote_path = format!( + "{}/{}/{}.log", + remote_log_tablet_dir, segment.segment_id, offset_prefix + ); + + let remote_fs_props_map = remote_fs_props.read().clone(); + + // Download file to disk (streaming, no memory spike) + let file_path = RemoteLogDownloader::download_file( + &remote_log_tablet_dir, + &remote_path, + &local_file_path, + &remote_fs_props_map, + ) + .await?; + + // Get file size + let metadata = tokio::fs::metadata(&file_path).await?; + let file_size = metadata.len() as usize; + + // Return file path - file stays on disk until PrefetchPermit is dropped + Ok(FetchResult { + file_path, + file_size, + }) + }) + } +} + +/// Coordinator that owns all download state and orchestrates downloads +struct DownloadCoordinator { + download_queue: BinaryHeap>, + active_downloads: JoinSet, + in_flight: usize, + prefetch_semaphore: Arc, + max_concurrent_downloads: usize, + recycle_notify: Arc, + fetcher: Arc, +} + +impl DownloadCoordinator { + /// Check if we should wait for recycle notification + /// Only wait if we're blocked on permits AND have pending work + fn should_wait_for_recycle(&self) -> bool { + !self.download_queue.is_empty() + && self.in_flight < self.max_concurrent_downloads + && self.prefetch_semaphore.available_permits() == 0 + } + + /// Find the earliest retry deadline among pending requests + fn next_retry_deadline(&self) -> Option { + self.download_queue + .iter() + .filter_map(|Reverse(req)| req.next_retry_at) + .min() + } +} + +impl DownloadCoordinator { + /// Try to start as many downloads as possible (event-driven drain) + fn drain(&mut self) { + // Collect deferred requests (backoff not ready) to push back later + let mut deferred = Vec::new(); + // Scan entire queue once to find ready requests (prevents head-of-line blocking) + // Bound to reasonable max to avoid excessive work if queue is huge + let max_scan = self.download_queue.len().min(100); + let mut scanned = 0; + + while !self.download_queue.is_empty() + && self.in_flight < self.max_concurrent_downloads + && scanned < max_scan + { + // Try acquire prefetch permit (non-blocking) + let permit = match self.prefetch_semaphore.clone().try_acquire_owned() { + Ok(p) => p, + Err(_) => break, // No permits available + }; + + // Pop highest priority request + let Some(Reverse(request)) = self.download_queue.pop() else { + drop(permit); + break; + }; + + scanned += 1; + + // Retry backoff check: defer if retry time hasn't arrived yet + if let Some(next_retry_at) = request.next_retry_at { + let now = tokio::time::Instant::now(); + if next_retry_at > now { + // Not ready for retry yet - defer and continue looking for ready requests + drop(permit); + deferred.push(request); + continue; // Don't block - keep looking for ready requests + } + } + + // Cancellation check: skip if sender closed + if request.result_sender.is_closed() { + drop(permit); + continue; // Try next request + } + + // Clone data for the spawned task + let fetcher = self.fetcher.clone(); + let recycle_notify = self.recycle_notify.clone(); + + // Spawn download task + self.active_downloads.spawn(async move { + spawn_download_task(request, permit, fetcher, recycle_notify).await + }); + self.in_flight += 1; + } + + // Push deferred requests back to queue (maintains priority order) + if !deferred.is_empty() { + for req in deferred { + self.download_queue.push(Reverse(req)); + } + } + } +} + +/// Spawn a download task that attempts download once +/// Matches Java's RemoteLogDownloader.java +/// +/// Benefits over infinite in-place retry: +/// - Failed downloads don't block prefetch slots +/// - Other segments can make progress while one is failing +/// - Natural retry through coordinator re-picking from queue +async fn spawn_download_task( + request: RemoteLogDownloadRequest, + permit: tokio::sync::OwnedSemaphorePermit, + fetcher: Arc, + recycle_notify: Arc, +) -> DownloadResult { + // Check if receiver still alive (early cancellation check) + if request.result_sender.is_closed() { + drop(permit); + return DownloadResult::Cancelled; + } + + // Try download ONCE + let download_result = fetcher.fetch(&request).await; + + match download_result { + Ok(fetch_result) => { + // Success - permit will be released on drop (FileSource handles file deletion) + DownloadResult::Success { + result: RemoteLogFile { + file_path: fetch_result.file_path, + file_size: fetch_result.file_size, + permit: PrefetchPermit::new(permit, recycle_notify.clone()), + }, + result_sender: request.result_sender, + } + } + Err(e) if request.result_sender.is_closed() => { + // Receiver dropped (cancelled) - release permit, don't re-queue + drop(permit); + DownloadResult::Cancelled + } + Err(e) => { + // Download failed - check if we should retry or give up + let retry_count = request.retry_count + 1; + + if retry_count > MAX_RETRY_COUNT { + // Too many retries - give up and fail the future + log::error!( + "Failed to download remote log segment {} after {} retries: {}. Giving up.", + request.segment.segment_id, + retry_count, + e + ); + drop(permit); // Release immediately + + DownloadResult::FailedPermanently { + error: Error::UnexpectedError { + message: format!( + "Failed to download remote log segment after {} retries: {}", + retry_count, e + ), + source: Some(Box::new(e)), + }, + result_sender: request.result_sender, + } + } else { + // Retry with exponential backoff + let backoff_delay = calculate_backoff_delay(retry_count); + let next_retry_at = tokio::time::Instant::now() + backoff_delay; + + log::warn!( + "Failed to download remote log segment {}: {}. Retry {}/{} after {:?}", + request.segment.segment_id, + e, + retry_count, + MAX_RETRY_COUNT, + backoff_delay + ); + drop(permit); // Release immediately - critical! + + // Update retry state + let mut retry_request = request; + retry_request.retry_count = retry_count; + retry_request.next_retry_at = Some(next_retry_at); + + // Re-queue request to same priority queue + // Future stays with request, NOT completed - will complete on successful retry + DownloadResult::FailedRetry { + request: retry_request, + } + } + } + } +} + +/// Coordinator event loop - owns all download state and reacts to events +async fn coordinator_loop( + mut coordinator: DownloadCoordinator, + mut request_receiver: mpsc::UnboundedReceiver, +) { + loop { + // Drain once at start of iteration to process ready work + coordinator.drain(); + + // Calculate sleep duration until next retry (if any deferred requests) + let next_retry_sleep = coordinator.next_retry_deadline().map(|deadline| { + let now = tokio::time::Instant::now(); + if deadline > now { + deadline - now + } else { + tokio::time::Duration::from_millis(0) // Ready now + } + }); + + tokio::select! { + // Event 1: NewRequest + Some(request) = request_receiver.recv() => { + coordinator.download_queue.push(Reverse(request)); + // Immediately try to start this download + continue; + } + + // Event 2: DownloadFinished + Some(result) = coordinator.active_downloads.join_next() => { + coordinator.in_flight -= 1; + + match result { + Ok(DownloadResult::Success { result, result_sender }) => { + // Success - deliver result to future + if !result_sender.is_closed() { + let _ = result_sender.send(Ok(result)); + } + // Permit held in RemoteLogFile until consumed + } + Ok(DownloadResult::FailedRetry { request }) => { + // Re-queue immediately (don't block coordinator with sleep) + // The retry time will be checked in drain() before processing + // (Java line 177: segmentsToFetch.add(request)) + // Permit already released (Java line 174) + coordinator.download_queue.push(Reverse(request)); + } + Ok(DownloadResult::FailedPermanently { error, result_sender }) => { + // Permanent failure - deliver error to future + if !result_sender.is_closed() { + let _ = result_sender.send(Err(error)); + } + // Permit already released + } + Ok(DownloadResult::Cancelled) => { + // Cancelled - permit already released, nothing to do + } + Err(e) => { + log::error!("Download task panicked: {:?}", e); + // Permit already released via RAII + } + } + // Immediately try to start another download + continue; + } + + // Event 3: Recycled (only wait when blocked on permits with pending work) + _ = coordinator.recycle_notify.notified(), + if coordinator.should_wait_for_recycle() => { + // Wake up to try draining + continue; + } + + // Event 4: Retry timer - wake up when next retry is ready + _ = tokio::time::sleep(next_retry_sleep.unwrap_or(tokio::time::Duration::from_secs(3600))), + if next_retry_sleep.is_some() => { + // Wake up to retry deferred requests + continue; + } + + else => break, // All channels closed AND no work pending + } + } +} + type CompletionCallback = Box; /// Future for a remote log download request pub struct RemoteLogDownloadFuture { - result: Arc>>>>, + result: Arc>>>, completion_callbacks: Arc>>, - // todo: add recycleCallback } impl RemoteLogDownloadFuture { - pub fn new(receiver: oneshot::Receiver>>) -> Self { + pub fn new(receiver: oneshot::Receiver>) -> Self { let result = Arc::new(Mutex::new(None)); let result_clone = Arc::clone(&result); let completion_callbacks: Arc>> = @@ -172,19 +686,23 @@ impl RemoteLogDownloadFuture { self.result.lock().is_some() } - /// Get the downloaded file path (synchronous, only works after is_done() returns true) - pub fn get_remote_log_bytes(&self) -> Result> { - // todo: handle download fail - let guard = self.result.lock(); - match guard.as_ref() { - Some(Ok(path)) => Ok(path.clone()), - Some(Err(e)) => Err(Error::IoUnexpectedError { - message: format!("Fail to get remote log bytes: {e}"), - source: io::Error::other(format!("{e:?}")), - }), + /// Take the RemoteLogFile (including the permit) from this future + /// This should only be called when the download is complete + /// This is the correct way to consume the download - it transfers permit ownership + pub fn take_remote_log_file(&self) -> Result { + let mut guard = self.result.lock(); + match guard.take() { + Some(Ok(remote_log_file)) => Ok(remote_log_file), + Some(Err(e)) => { + let error_msg = format!("{e}"); + Err(Error::IoUnexpectedError { + message: format!("Fail to get remote log file: {error_msg}"), + source: io::Error::other(error_msg), + }) + } None => Err(Error::IoUnexpectedError { - message: "Get remote log bytes not completed yet".to_string(), - source: io::Error::other("Get remote log bytes not completed yet"), + message: "Remote log file already taken or not ready".to_string(), + source: io::Error::other("Remote log file already taken or not ready"), }), } } @@ -192,20 +710,95 @@ impl RemoteLogDownloadFuture { /// Downloader for remote log segment files pub struct RemoteLogDownloader { - local_log_dir: TempDir, - remote_fs_props: RwLock>, + request_sender: Option>, + remote_fs_props: Option>>>, + /// Handle to the coordinator task. Used for graceful shutdown via shutdown() method. + #[allow(dead_code)] + coordinator_handle: Option>, } impl RemoteLogDownloader { - pub fn new(local_log_dir: TempDir) -> Result { + pub fn new( + local_log_dir: TempDir, + max_prefetch_segments: usize, + max_concurrent_downloads: usize, + ) -> Result { + let remote_fs_props = Arc::new(RwLock::new(HashMap::new())); + let fetcher = Arc::new(ProductionFetcher { + remote_fs_props: remote_fs_props.clone(), + local_log_dir: Arc::new(local_log_dir), + }); + + let (request_sender, request_receiver) = mpsc::unbounded_channel(); + + let coordinator = DownloadCoordinator { + download_queue: BinaryHeap::new(), + active_downloads: JoinSet::new(), + in_flight: 0, + prefetch_semaphore: Arc::new(Semaphore::new(max_prefetch_segments)), + max_concurrent_downloads, + recycle_notify: Arc::new(Notify::new()), + fetcher, + }; + + let coordinator_handle = tokio::spawn(coordinator_loop(coordinator, request_receiver)); + Ok(Self { - local_log_dir, - remote_fs_props: RwLock::new(HashMap::new()), + request_sender: Some(request_sender), + remote_fs_props: Some(remote_fs_props), + coordinator_handle: Some(coordinator_handle), }) } + /// Create a RemoteLogDownloader with a custom fetcher (for testing) + /// The remote_fs_props will be None since custom fetchers typically don't need S3 credentials + #[cfg(test)] + pub fn new_with_fetcher( + fetcher: Arc, + max_prefetch_segments: usize, + max_concurrent_downloads: usize, + ) -> Result { + let (request_sender, request_receiver) = mpsc::unbounded_channel(); + + let coordinator = DownloadCoordinator { + download_queue: BinaryHeap::new(), + active_downloads: JoinSet::new(), + in_flight: 0, + prefetch_semaphore: Arc::new(Semaphore::new(max_prefetch_segments)), + max_concurrent_downloads, + recycle_notify: Arc::new(Notify::new()), + fetcher, + }; + + let coordinator_handle = tokio::spawn(coordinator_loop(coordinator, request_receiver)); + + Ok(Self { + request_sender: Some(request_sender), + remote_fs_props: None, + coordinator_handle: Some(coordinator_handle), + }) + } + + /// Gracefully shutdown the downloader + /// Closes the request channel and waits for coordinator to finish pending work + /// + /// Note: This consumes self to prevent use-after-shutdown + #[allow(dead_code)] + pub async fn shutdown(mut self) { + // Drop the request_sender to close the channel + drop(self.request_sender.take()); + + // Wait for coordinator to finish gracefully + // Coordinator will exit when: recv() returns None && queue empty && joinset empty + if let Some(handle) = self.coordinator_handle.take() { + let _ = handle.await; + } + } + pub fn set_remote_fs_props(&self, props: HashMap) { - *self.remote_fs_props.write() = props; + if let Some(ref remote_fs_props) = self.remote_fs_props { + *remote_fs_props.write() = props; + } } /// Request to fetch a remote log segment to local. This method is non-blocking. @@ -214,49 +807,53 @@ impl RemoteLogDownloader { remote_log_tablet_dir: &str, segment: &RemoteLogSegment, ) -> RemoteLogDownloadFuture { - let (sender, receiver) = oneshot::channel(); - let local_file_name = segment.local_file_name(); - let local_file_path = self.local_log_dir.path().join(&local_file_name); - let remote_path = self.build_remote_path(remote_log_tablet_dir, segment); - let remote_log_tablet_dir = remote_log_tablet_dir.to_string(); - let remote_fs_props = self.remote_fs_props.read().clone(); - // Spawn async download & read task - tokio::spawn(async move { - let result = async { - let file_path = Self::download_file( - &remote_log_tablet_dir, - &remote_path, - &local_file_path, - &remote_fs_props, - ) - .await?; - let bytes = tokio::fs::read(&file_path).await?; - - // Delete the downloaded local file to free disk (async, but we'll do it in background) - let file_path_clone = file_path.clone(); - tokio::spawn(async move { - let _ = delete_file(file_path_clone).await; - }); - - Ok(bytes) + let (result_sender, result_receiver) = oneshot::channel(); + + let request = RemoteLogDownloadRequest { + segment: segment.clone(), + remote_log_tablet_dir: remote_log_tablet_dir.to_string(), + result_sender, + retry_count: 0, + next_retry_at: None, + }; + + // Send to coordinator (non-blocking) + if let Some(ref sender) = self.request_sender { + if sender.send(request).is_err() { + // Coordinator is gone - immediately fail the future + let (error_sender, error_receiver) = oneshot::channel(); + let _ = error_sender.send(Err(Error::UnexpectedError { + message: "RemoteLogDownloader coordinator has shut down".to_string(), + source: None, + })); + return RemoteLogDownloadFuture::new(error_receiver); } - .await; + } - let _ = sender.send(result); - }); - RemoteLogDownloadFuture::new(receiver) + RemoteLogDownloadFuture::new(result_receiver) } +} - /// Build the remote path for a log segment - fn build_remote_path(&self, remote_log_tablet_dir: &str, segment: &RemoteLogSegment) -> String { - // Format: ${remote_log_tablet_dir}/${segment_id}/${offset_prefix}.log - let offset_prefix = format!("{:020}", segment.start_offset); - format!( - "{}/{}/{}.log", - remote_log_tablet_dir, segment.segment_id, offset_prefix - ) +impl Drop for RemoteLogDownloader { + fn drop(&mut self) { + // Drop the request sender to signal coordinator shutdown + // This causes request_receiver.recv() to return None, allowing the + // coordinator to exit gracefully after processing pending work. + drop(self.request_sender.take()); + + // Note: We cannot await in Drop (sync context), so we can't wait for + // the coordinator to finish. Pending futures will fail when they detect + // the coordinator has exited (via closed channel). + // + // For graceful shutdown with waiting, use `shutdown().await` instead. + + // We don't abort the coordinator handle anymore - let it finish naturally. + // The JoinHandle will be dropped here, which detaches the task. + // The coordinator will exit on its own when it sees the channel closed. } +} +impl RemoteLogDownloader { /// Download a file from remote storage to local using streaming read/write async fn download_file( remote_log_tablet_dir: &str, @@ -347,70 +944,349 @@ impl RemoteLogDownloader { } } -/// Pending fetch that waits for remote log file to be downloaded -pub struct RemotePendingFetch { - segment: RemoteLogSegment, - download_future: RemoteLogDownloadFuture, - pos_in_log_segment: i32, - fetch_offset: i64, - high_watermark: i64, - read_context: ReadContext, -} +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; -impl RemotePendingFetch { - pub fn new( - segment: RemoteLogSegment, - download_future: RemoteLogDownloadFuture, - pos_in_log_segment: i32, - fetch_offset: i64, - high_watermark: i64, - read_context: ReadContext, - ) -> Self { - Self { + /// Helper function to create a TableBucket for testing + fn create_table_bucket(table_id: i64, bucket_id: i32) -> TableBucket { + TableBucket::new(table_id, bucket_id) + } + + /// Simplified fake fetcher for testing + struct FakeFetcher { + completion_gate: Arc, + in_flight: Arc, + max_seen_in_flight: Arc, + fail_count: Arc>, + auto_complete: bool, + } + + impl FakeFetcher { + fn new(fail_count: usize, auto_complete: bool) -> Self { + Self { + completion_gate: Arc::new(Notify::new()), + in_flight: Arc::new(AtomicUsize::new(0)), + max_seen_in_flight: Arc::new(AtomicUsize::new(0)), + fail_count: Arc::new(Mutex::new(fail_count)), + auto_complete, + } + } + + fn max_seen_in_flight(&self) -> usize { + self.max_seen_in_flight.load(Ordering::SeqCst) + } + + fn in_flight(&self) -> usize { + self.in_flight.load(Ordering::SeqCst) + } + + fn release_one(&self) { + self.completion_gate.notify_one(); + } + + fn release_all(&self) { + self.completion_gate.notify_waiters(); + } + } + + impl RemoteLogFetcher for FakeFetcher { + fn fetch( + &self, + request: &RemoteLogDownloadRequest, + ) -> Pin> + Send>> { + let gate = self.completion_gate.clone(); + let in_flight = self.in_flight.clone(); + let max_seen = self.max_seen_in_flight.clone(); + let fail_count = self.fail_count.clone(); + let segment_id = request.segment().segment_id.clone(); + let auto_complete = self.auto_complete; + + Box::pin(async move { + // Track in-flight + let current = in_flight.fetch_add(1, Ordering::SeqCst) + 1; + max_seen.fetch_max(current, Ordering::SeqCst); + + // Wait for gate (or auto-complete) + if !auto_complete { + gate.notified().await; + } else { + tokio::task::yield_now().await; + } + + // Check if should fail + let should_fail = { + let mut count = fail_count.lock(); + if *count > 0 { + *count -= 1; + true + } else { + false + } + }; + + in_flight.fetch_sub(1, Ordering::SeqCst); + + if should_fail { + Err(Error::UnexpectedError { + message: format!("Fake fetch failed for {}", segment_id), + source: None, + }) + } else { + let fake_data = vec![1, 2, 3, 4]; + let temp_dir = std::env::temp_dir(); + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + let file_path = + temp_dir.join(format!("fake_segment_{}_{}.log", segment_id, timestamp)); + tokio::fs::write(&file_path, &fake_data).await?; + + Ok(FetchResult { + file_path, + file_size: fake_data.len(), + }) + } + }) + } + } + + /// Helper function to create a RemoteLogSegment for testing + fn create_segment( + segment_id: &str, + start_offset: i64, + max_timestamp: i64, + table_bucket: TableBucket, + ) -> RemoteLogSegment { + RemoteLogSegment { + segment_id: segment_id.to_string(), + start_offset, + end_offset: start_offset + 1000, + size_in_bytes: 1024, + table_bucket, + max_timestamp, + } + } + + /// Helper function to create a RemoteLogDownloadRequest for testing + fn create_request(segment: RemoteLogSegment) -> RemoteLogDownloadRequest { + let (result_sender, _) = oneshot::channel(); + RemoteLogDownloadRequest { + remote_log_tablet_dir: "test_dir".to_string(), segment, - download_future, - pos_in_log_segment, - fetch_offset, - high_watermark, - read_context, + result_sender, + retry_count: 0, + next_retry_at: None, } } -} -impl PendingFetch for RemotePendingFetch { - fn table_bucket(&self) -> &TableBucket { - &self.segment.table_bucket + #[test] + fn test_priority_ordering_matching_java_test_case() { + // Test priority ordering: timestamp across buckets, offset within bucket + // Does NOT test tie-breakers (segment_id) - those are implementation details + + let bucket1 = create_table_bucket(1, 0); + let bucket2 = create_table_bucket(1, 1); + let bucket3 = create_table_bucket(1, 2); + let bucket4 = create_table_bucket(1, 3); + + // Create segments with distinct timestamps/offsets (no ties) + let seg_negative = create_segment("seg_neg", 0, -1, bucket1.clone()); + let seg_zero = create_segment("seg_zero", 0, 0, bucket2.clone()); + let seg_1000 = create_segment("seg_1000", 0, 1000, bucket3.clone()); + let seg_2000 = create_segment("seg_2000", 0, 2000, bucket4.clone()); + let seg_same_bucket_100 = create_segment("seg_sb_100", 100, 5000, bucket1.clone()); + let seg_same_bucket_50 = create_segment("seg_sb_50", 50, 5000, bucket1.clone()); + + let mut heap = BinaryHeap::new(); + heap.push(Reverse(create_request(seg_2000))); + heap.push(Reverse(create_request(seg_same_bucket_100))); + heap.push(Reverse(create_request(seg_1000))); + heap.push(Reverse(create_request(seg_zero))); + heap.push(Reverse(create_request(seg_negative))); + heap.push(Reverse(create_request(seg_same_bucket_50))); + + // Verify ordering by timestamp/offset, not segment_id + let first = heap.pop().unwrap().0; + assert_eq!(first.segment.max_timestamp, -1, "Lowest timestamp first"); + + let second = heap.pop().unwrap().0; + assert_eq!(second.segment.max_timestamp, 0); + + let third = heap.pop().unwrap().0; + assert_eq!(third.segment.max_timestamp, 1000); + + let fourth = heap.pop().unwrap().0; + assert_eq!(fourth.segment.max_timestamp, 2000); + + // Last two are same bucket (ts=5000), ordered by offset + let fifth = heap.pop().unwrap().0; + assert_eq!(fifth.segment.max_timestamp, 5000); + assert_eq!( + fifth.segment.start_offset, 50, + "Lower offset first within bucket" + ); + + let sixth = heap.pop().unwrap().0; + assert_eq!(sixth.segment.max_timestamp, 5000); + assert_eq!(sixth.segment.start_offset, 100); } - fn is_completed(&self) -> bool { - self.download_future.is_done() + #[tokio::test] + async fn test_concurrency_and_priority() { + // Test concurrency limiting and priority-based scheduling together + let fake_fetcher = Arc::new(FakeFetcher::new(0, false)); // Manual control + + let downloader = RemoteLogDownloader::new_with_fetcher( + fake_fetcher.clone(), + 10, // High prefetch limit + 2, // Max concurrent downloads = 2 + ) + .unwrap(); + + let bucket = create_table_bucket(1, 0); + + // Request 4 segments with same priority (to isolate concurrency limiting from priority) + let segs: Vec<_> = (0..4) + .map(|i| create_segment(&format!("seg{}", i), i * 100, 1000, bucket.clone())) + .collect(); + + let _futures: Vec<_> = segs + .iter() + .map(|seg| downloader.request_remote_log("dir", seg)) + .collect(); + + // Wait for exactly 2 to start + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + assert_eq!( + fake_fetcher.in_flight(), + 2, + "Concurrency limit: exactly 2 should be in-flight" + ); + + // Release one + fake_fetcher.release_one(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Max should never exceed 2 + assert_eq!( + fake_fetcher.max_seen_in_flight(), + 2, + "Max concurrent should not exceed 2" + ); + + // Release all + fake_fetcher.release_all(); } - fn to_completed_fetch(self: Box) -> Result> { - // Get the file path (this should only be called when is_completed() returns true) - let mut data = self.download_future.get_remote_log_bytes()?; + #[tokio::test] + async fn test_prefetch_limit() { + // Test that prefetch semaphore limits outstanding downloads + let fake_fetcher = Arc::new(FakeFetcher::new(0, true)); // Auto-complete - // Slice the data if needed - let data = if self.pos_in_log_segment > 0 { - data.split_off(self.pos_in_log_segment as usize) - } else { - data - }; + let downloader = RemoteLogDownloader::new_with_fetcher( + fake_fetcher, + 2, // Max prefetch = 2 + 10, // High concurrent limit + ) + .unwrap(); + + let bucket = create_table_bucket(1, 0); + + // Request 4 downloads + let segs: Vec<_> = (0..4) + .map(|i| create_segment(&format!("seg{}", i), i * 100, 1000, bucket.clone())) + .collect(); - let size_in_bytes = data.len(); + let mut futures: Vec<_> = segs + .iter() + .map(|seg| downloader.request_remote_log("dir", seg)) + .collect(); - let log_record_batch = LogRecordsBatches::new(data); + // Wait for first 2 to complete + let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); + loop { + if futures.iter().filter(|f| f.is_done()).count() >= 2 { + break; + } + if tokio::time::Instant::now() > deadline { + panic!("Timeout waiting for first 2 downloads"); + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } - // Create DefaultCompletedFetch from the data - let completed_fetch = DefaultCompletedFetch::new( - self.segment.table_bucket, - log_record_batch, - size_in_bytes, - self.read_context, - self.fetch_offset, - self.high_watermark, + // Verify 3rd and 4th are blocked (prefetch limit) + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + assert_eq!( + futures.iter().filter(|f| f.is_done()).count(), + 2, + "Prefetch limit: only 2 should complete" ); - Ok(Box::new(completed_fetch)) + // Drop first 2 (releases permits) + let f4 = futures.pop().unwrap(); + let f3 = futures.pop().unwrap(); + drop(futures); + + // 3rd and 4th should now complete + let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); + loop { + if f3.is_done() && f4.is_done() { + break; + } + if tokio::time::Instant::now() > deadline { + panic!("Timeout after permit release"); + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + } + + #[tokio::test] + async fn test_retry_and_cancellation() { + // Test retry with exponential backoff + let fake_fetcher = Arc::new(FakeFetcher::new(2, true)); // Fail twice, succeed third time + + let downloader = + RemoteLogDownloader::new_with_fetcher(fake_fetcher.clone(), 10, 1).unwrap(); + + let bucket = create_table_bucket(1, 0); + let seg = create_segment("seg1", 0, 1000, bucket); + + let future = downloader.request_remote_log("dir", &seg); + + // Should succeed after retries + let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5); + loop { + if future.is_done() { + break; + } + if tokio::time::Instant::now() > deadline { + panic!("Timeout waiting for retry to succeed"); + } + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + + assert!(future.is_done(), "Should succeed after retries"); + + // Test cancellation + let seg2 = create_segment("seg2", 100, 1000, create_table_bucket(1, 0)); + let fake_fetcher2 = Arc::new(FakeFetcher::new(100, true)); // Fail forever + let downloader2 = + RemoteLogDownloader::new_with_fetcher(fake_fetcher2.clone(), 10, 1).unwrap(); + + let future2 = downloader2.request_remote_log("dir", &seg2); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Drop to cancel + drop(future2); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + assert_eq!( + fake_fetcher2.in_flight(), + 0, + "Cancellation should release resources" + ); } } diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index e9b2ce10..8ace20dc 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -30,11 +30,9 @@ use crate::client::credentials::CredentialsCache; use crate::client::metadata::Metadata; use crate::client::table::log_fetch_buffer::{ CompletedFetch, DefaultCompletedFetch, FetchErrorAction, FetchErrorContext, FetchErrorLogLevel, - LogFetchBuffer, -}; -use crate::client::table::remote_log::{ - RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch, + LogFetchBuffer, RemotePendingFetch, }; +use crate::client::table::remote_log::{RemoteLogDownloader, RemoteLogFetchInfo}; use crate::error::{ApiError, Error, FlussError, Result}; use crate::metadata::{TableBucket, TableInfo, TablePath}; use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; @@ -223,6 +221,7 @@ impl<'a> TableScan<'a> { &self.table_info, self.metadata.clone(), self.conn.get_connections(), + self.conn.config(), self.projected_fields, )?; Ok(LogScanner { @@ -235,6 +234,7 @@ impl<'a> TableScan<'a> { &self.table_info, self.metadata.clone(), self.conn.get_connections(), + self.conn.config(), self.projected_fields, )?; Ok(RecordBatchLogScanner { @@ -273,6 +273,7 @@ impl LogScannerInner { table_info: &TableInfo, metadata: Arc, connections: Arc, + config: &crate::config::Config, projected_fields: Option>, ) -> Result { let log_scanner_status = Arc::new(LogScannerStatus::new()); @@ -286,6 +287,7 @@ impl LogScannerInner { connections.clone(), metadata.clone(), log_scanner_status.clone(), + config, projected_fields, )?, }) @@ -468,6 +470,7 @@ impl LogFetcher { conns: Arc, metadata: Arc, log_scanner_status: Arc, + config: &crate::config::Config, projected_fields: Option>, ) -> Result { let full_arrow_schema = to_arrow_schema(table_info.get_row_type()); @@ -487,7 +490,11 @@ impl LogFetcher { log_scanner_status, read_context, remote_read_context, - remote_log_downloader: Arc::new(RemoteLogDownloader::new(tmp_dir)?), + remote_log_downloader: Arc::new(RemoteLogDownloader::new( + tmp_dir, + config.scanner_remote_log_prefetch_num, + config.scanner_remote_log_download_threads, + )?), credentials_cache: Arc::new(CredentialsCache::new(conns.clone(), metadata.clone())), log_fetch_buffer, nodes_with_pending_fetch_requests: Arc::new(Mutex::new(HashSet::new())), @@ -1469,6 +1476,7 @@ mod tests { Arc::new(RpcClient::new()), metadata, status.clone(), + &crate::config::Config::default(), None, )?; @@ -1488,8 +1496,8 @@ mod tests { Ok(()) } - #[test] - fn fetch_records_from_fetch_drains_unassigned_bucket() -> Result<()> { + #[tokio::test] + async fn fetch_records_from_fetch_drains_unassigned_bucket() -> Result<()> { let table_path = TablePath::new("db".to_string(), "tbl".to_string()); let table_info = build_table_info(table_path.clone(), 1, 1); let cluster = build_cluster_arc(&table_path, 1, 1); @@ -1500,6 +1508,7 @@ mod tests { Arc::new(RpcClient::new()), metadata, status, + &crate::config::Config::default(), None, )?; @@ -1535,6 +1544,7 @@ mod tests { Arc::new(RpcClient::new()), metadata, status, + &crate::config::Config::default(), None, )?; @@ -1558,6 +1568,7 @@ mod tests { Arc::new(RpcClient::new()), metadata.clone(), status.clone(), + &crate::config::Config::default(), None, )?; diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 92f600e6..705e241d 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -36,6 +36,16 @@ pub struct Config { #[arg(long, default_value_t = 2 * 1024 * 1024)] pub writer_batch_size: i32, + + /// Maximum number of remote log segments to prefetch + /// Default: 4 (matching Java CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM) + #[arg(long, default_value_t = 4)] + pub scanner_remote_log_prefetch_num: usize, + + /// Maximum concurrent remote log downloads + /// Default: 3 (matching Java REMOTE_FILE_DOWNLOAD_THREAD_NUM) + #[arg(long, default_value_t = 3)] + pub scanner_remote_log_download_threads: usize, } impl Default for Config { @@ -46,6 +56,8 @@ impl Default for Config { writer_acks: String::from("all"), writer_retries: i32::MAX, writer_batch_size: 2 * 1024 * 1024, + scanner_remote_log_prefetch_num: 4, + scanner_remote_log_download_threads: 3, } } } diff --git a/crates/fluss/src/proto/fluss_api.proto b/crates/fluss/src/proto/fluss_api.proto index eaee94c3..65eddce7 100644 --- a/crates/fluss/src/proto/fluss_api.proto +++ b/crates/fluss/src/proto/fluss_api.proto @@ -306,6 +306,7 @@ message PbRemoteLogSegment { required int64 remote_log_start_offset = 2; required int64 remote_log_end_offset = 3; required int32 segment_size_in_bytes = 4; + optional int64 max_timestamp = 5; } message PbListOffsetsRespForBucket { diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 3c94b720..53fcc84b 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -44,7 +44,7 @@ use bytes::Bytes; use crc32c::crc32c; use parking_lot::Mutex; use std::{ - io::{Cursor, Write}, + io::{Cursor, Seek, SeekFrom, Write}, sync::Arc, }; @@ -80,6 +80,11 @@ pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; pub const ARROW_CHANGETYPE_OFFSET: usize = RECORD_BATCH_HEADER_SIZE; pub const LOG_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; +/// Maximum batch size matches Java's Integer.MAX_VALUE limit. +/// Java uses int type for batch size, so max value is 2^31 - 1 = 2,147,483,647 bytes (~2GB). +/// This is the implicit limit in FileLogRecords.java and other Java components. +pub const MAX_BATCH_SIZE: usize = i32::MAX as usize; // 2,147,483,647 bytes (~2GB) + /// const for record /// The "magic" values. #[derive(Debug, Clone, Copy)] @@ -87,6 +92,49 @@ pub enum LogMagicValue { V0 = 0, } +/// Safely convert batch size from i32 to usize with validation. +/// +/// Validates that: +/// - batch_size_bytes is non-negative +/// - batch_size_bytes + LOG_OVERHEAD doesn't overflow +/// - Result is within reasonable bounds +fn validate_batch_size(batch_size_bytes: i32) -> Result { + // Check for negative size (corrupted data) + if batch_size_bytes < 0 { + return Err(Error::UnexpectedError { + message: format!("Invalid negative batch size: {}", batch_size_bytes), + source: None, + }); + } + + let batch_size_u = batch_size_bytes as usize; + + // Check for overflow when adding LOG_OVERHEAD + let total_size = + batch_size_u + .checked_add(LOG_OVERHEAD) + .ok_or_else(|| Error::UnexpectedError { + message: format!( + "Batch size {} + LOG_OVERHEAD {} would overflow", + batch_size_u, LOG_OVERHEAD + ), + source: None, + })?; + + // Sanity check: reject unreasonably large batches + if total_size > MAX_BATCH_SIZE { + return Err(Error::UnexpectedError { + message: format!( + "Batch size {} exceeds maximum allowed size {}", + total_size, MAX_BATCH_SIZE + ), + source: None, + }); + } + + Ok(total_size) +} + // NOTE: Rust layout/offsets currently match Java only for V0. // TODO: Add V1 layout/offsets to keep parity with Java's V1 format. pub const CURRENT_LOG_MAGIC_VALUE: u8 = LogMagicValue::V0 as u8; @@ -373,17 +421,253 @@ pub trait ToArrow { fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>; } -pub struct LogRecordsBatches { +/// Abstract source of log record data. +/// Allows streaming from files or in-memory buffers. +pub trait LogRecordsSource: Send + Sync { + /// Read batch header at given position. + /// Returns (base_offset, batch_size) tuple. + fn read_batch_header(&self, pos: usize) -> Result<(i64, usize)>; + + /// Read full batch data at given position with given size. + /// Returns Bytes that can be zero-copy sliced. + fn read_batch_data(&self, pos: usize, size: usize) -> Result; + + /// Total size of the source in bytes. + fn total_size(&self) -> usize; +} + +/// In-memory implementation of LogRecordsSource. +/// Used for local tablet server fetches (existing path). +pub struct MemorySource { data: Bytes, +} + +impl MemorySource { + pub fn new(data: Vec) -> Self { + Self { + data: Bytes::from(data), + } + } +} + +impl LogRecordsSource for MemorySource { + fn read_batch_header(&self, pos: usize) -> Result<(i64, usize)> { + if pos + LOG_OVERHEAD > self.data.len() { + return Err(Error::UnexpectedError { + message: format!( + "Position {} + LOG_OVERHEAD {} exceeds data size {}", + pos, + LOG_OVERHEAD, + self.data.len() + ), + source: None, + }); + } + + let base_offset = LittleEndian::read_i64(&self.data[pos + BASE_OFFSET_OFFSET..]); + let batch_size_bytes = LittleEndian::read_i32(&self.data[pos + LENGTH_OFFSET..]); + + // Validate batch size to prevent integer overflow and corruption + let batch_size = validate_batch_size(batch_size_bytes)?; + + Ok((base_offset, batch_size)) + } + + fn read_batch_data(&self, pos: usize, size: usize) -> Result { + if pos + size > self.data.len() { + return Err(Error::UnexpectedError { + message: format!( + "Read beyond data size: {} + {} > {}", + pos, + size, + self.data.len() + ), + source: None, + }); + } + // Zero-copy slice (Bytes is Arc-based) + Ok(self.data.slice(pos..pos + size)) + } + + fn total_size(&self) -> usize { + self.data.len() + } +} + +/// RAII guard that deletes a file when dropped. +/// Used to ensure file deletion happens AFTER the file handle is closed. +struct FileCleanupGuard { + file_path: std::path::PathBuf, +} + +impl Drop for FileCleanupGuard { + fn drop(&mut self) { + // File handle is already closed (this guard drops after the file field) + if let Err(e) = std::fs::remove_file(&self.file_path) { + log::warn!( + "Failed to delete remote log file {}: {}", + self.file_path.display(), + e + ); + } else { + log::debug!("Deleted remote log file: {}", self.file_path.display()); + } + } +} + +/// File-backed implementation of LogRecordsSource. +/// Used for remote log segments downloaded to local disk. +/// Streams data on-demand instead of loading entire file into memory. +/// +/// Uses Mutex with seek + read_exact for cross-platform compatibility. +/// Access pattern is sequential iteration (single consumer), so mutex overhead is negligible. +pub struct FileSource { + file: Mutex, + file_size: usize, + base_offset: usize, + _cleanup: Option, // Drops AFTER file (field order matters!) +} + +impl FileSource { + /// Create a new FileSource without automatic cleanup. + pub fn new(file: std::fs::File, base_offset: usize) -> Result { + let file_size = file.metadata()?.len() as usize; + Ok(Self { + file: Mutex::new(file), + file_size, + base_offset, + _cleanup: None, + }) + } + + /// Create a new FileSource that will delete the file when dropped. + /// This is used for remote log files that need cleanup. + pub fn new_with_cleanup( + file: std::fs::File, + base_offset: usize, + file_path: std::path::PathBuf, + ) -> Result { + let file_size = file.metadata()?.len() as usize; + Ok(Self { + file: Mutex::new(file), + file_size, + base_offset, + _cleanup: Some(FileCleanupGuard { file_path }), + }) + } + + /// Read data at a specific position using seek + read_exact. + /// This is cross-platform and adequate for sequential access patterns. + fn read_at(&self, pos: u64, buf: &mut [u8]) -> Result<()> { + use std::io::Read; + let mut file = self.file.lock(); + file.seek(SeekFrom::Start(pos))?; + file.read_exact(buf)?; + Ok(()) + } +} + +impl LogRecordsSource for FileSource { + fn read_batch_header(&self, pos: usize) -> Result<(i64, usize)> { + let actual_pos = self.base_offset + pos; + if actual_pos + LOG_OVERHEAD > self.file_size { + return Err(Error::UnexpectedError { + message: format!( + "Position {} exceeds file size {}", + actual_pos, self.file_size + ), + source: None, + }); + } + + // Read only the header to extract base_offset and batch_size + let mut header_buf = vec![0u8; LOG_OVERHEAD]; + self.read_at(actual_pos as u64, &mut header_buf)?; + + let base_offset = LittleEndian::read_i64(&header_buf[BASE_OFFSET_OFFSET..]); + let batch_size_bytes = LittleEndian::read_i32(&header_buf[LENGTH_OFFSET..]); + + // Validate batch size to prevent integer overflow and corruption + let batch_size = validate_batch_size(batch_size_bytes)?; + + Ok((base_offset, batch_size)) + } + + fn read_batch_data(&self, pos: usize, size: usize) -> Result { + let actual_pos = self.base_offset + pos; + if actual_pos + size > self.file_size { + return Err(Error::UnexpectedError { + message: format!( + "Read beyond file size: {} + {} > {}", + actual_pos, size, self.file_size + ), + source: None, + }); + } + + // Read the full batch data + let mut batch_buf = vec![0u8; size]; + self.read_at(actual_pos as u64, &mut batch_buf)?; + + Ok(Bytes::from(batch_buf)) + } + + fn total_size(&self) -> usize { + self.file_size - self.base_offset + } +} + +pub struct LogRecordsBatches { + source: Box, current_pos: usize, remaining_bytes: usize, } impl LogRecordsBatches { + /// Create from in-memory Vec (existing path - backward compatible). pub fn new(data: Vec) -> Self { - let remaining_bytes: usize = data.len(); + let remaining_bytes = data.len(); Self { - data: Bytes::from(data), + source: Box::new(MemorySource::new(data)), + current_pos: 0, + remaining_bytes, + } + } + + /// Create from file. + /// Enables streaming without loading entire file into memory. + pub fn from_file(file: std::fs::File, base_offset: usize) -> Result { + let source = FileSource::new(file, base_offset)?; + let remaining_bytes = source.total_size(); + Ok(Self { + source: Box::new(source), + current_pos: 0, + remaining_bytes, + }) + } + + /// Create from file with automatic cleanup. + /// The file will be deleted when the LogRecordsBatches is dropped. + /// This ensures file is closed before deletion. + pub fn from_file_with_cleanup( + file: std::fs::File, + base_offset: usize, + file_path: std::path::PathBuf, + ) -> Result { + let source = FileSource::new_with_cleanup(file, base_offset, file_path)?; + let remaining_bytes = source.total_size(); + Ok(Self { + source: Box::new(source), + current_pos: 0, + remaining_bytes, + }) + } + + /// Create from any source (for testing). + pub fn from_source(source: Box) -> Self { + let remaining_bytes = source.total_size(); + Self { + source, current_pos: 0, remaining_bytes, } @@ -394,13 +678,24 @@ impl LogRecordsBatches { return None; } - let batch_size_bytes = - LittleEndian::read_i32(self.data.get(self.current_pos + LENGTH_OFFSET..).unwrap()); - let batch_size = batch_size_bytes as usize + LOG_OVERHEAD; - if batch_size > self.remaining_bytes { - return None; + // Read only header to get size (efficient!) + match self.source.read_batch_header(self.current_pos) { + Ok((_base_offset, batch_size)) => { + if batch_size > self.remaining_bytes { + None + } else { + Some(batch_size) + } + } + Err(e) => { + log::debug!( + "Failed to read batch header at pos {}: {}", + self.current_pos, + e + ); + None + } } - Some(batch_size) } } @@ -410,14 +705,24 @@ impl Iterator for LogRecordsBatches { fn next(&mut self) -> Option { match self.next_batch_size() { Some(batch_size) => { - let start = self.current_pos; - let end = start + batch_size; - // Since LogRecordsBatches owns the Vec, the slice is valid - // as long as the mutable reference exists, which is 'a - let record_batch = LogRecordBatch::new(self.data.slice(start..end)); - self.current_pos += batch_size; - self.remaining_bytes -= batch_size; - Some(record_batch) + // Read full batch data on-demand + match self.source.read_batch_data(self.current_pos, batch_size) { + Ok(data) => { + let record_batch = LogRecordBatch::new(data); + self.current_pos += batch_size; + self.remaining_bytes -= batch_size; + Some(record_batch) + } + Err(e) => { + log::error!( + "Failed to read batch data at pos {} size {}: {}", + self.current_pos, + batch_size, + e + ); + None + } + } } None => None, } @@ -1249,4 +1554,105 @@ mod tests { } out } + + // Tests for file-backed streaming + + #[test] + fn test_file_source_streaming() -> Result<()> { + use std::io::Write; + use tempfile::NamedTempFile; + + // Test 1: Basic file reads work + let test_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let mut tmp_file = NamedTempFile::new()?; + tmp_file.write_all(&test_data)?; + tmp_file.flush()?; + + let file = std::fs::File::open(tmp_file.path())?; + let source = FileSource::new(file, 0)?; + + // Read full data + let data = source.read_batch_data(0, 10)?; + assert_eq!(data.to_vec(), test_data); + + // Read partial data + let partial = source.read_batch_data(2, 5)?; + assert_eq!(partial.to_vec(), vec![3, 4, 5, 6, 7]); + + // Test 2: base_offset works (critical for remote logs with pos_in_log_segment) + let prefix = vec![0xFF; 100]; + let actual_data = vec![1, 2, 3, 4, 5]; + let mut tmp_file2 = NamedTempFile::new()?; + tmp_file2.write_all(&prefix)?; + tmp_file2.write_all(&actual_data)?; + tmp_file2.flush()?; + + let file2 = std::fs::File::open(tmp_file2.path())?; + let source2 = FileSource::new(file2, 100)?; // Skip first 100 bytes + + assert_eq!(source2.total_size(), 5); // Only counts data after offset + let data2 = source2.read_batch_data(0, 5)?; + assert_eq!(data2.to_vec(), actual_data); + + Ok(()) + } + + #[test] + fn test_log_records_batches_from_file() -> Result<()> { + use crate::client::WriteRecord; + use crate::compression::{ + ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }; + use crate::metadata::TablePath; + use crate::row::GenericRow; + use std::io::Write; + use tempfile::NamedTempFile; + + // Integration test: Real log record batch streamed from file + let row_type = DataTypes::row(vec![ + DataField::new("id".to_string(), DataTypes::int(), None), + DataField::new("name".to_string(), DataTypes::string(), None), + ]); + let table_path = Arc::new(TablePath::new("db".to_string(), "tbl".to_string())); + + let mut builder = MemoryLogRecordsArrowBuilder::new( + 1, + &row_type, + false, + ArrowCompressionInfo { + compression_type: ArrowCompressionType::None, + compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }, + ); + + let mut row = GenericRow::new(); + row.set_field(0, 1_i32); + row.set_field(1, "alice"); + let record = WriteRecord::for_append(table_path.clone(), 1, row); + builder.append(&record)?; + + let mut row2 = GenericRow::new(); + row2.set_field(0, 2_i32); + row2.set_field(1, "bob"); + let record2 = WriteRecord::for_append(table_path, 2, row2); + builder.append(&record2)?; + + let data = builder.build()?; + + // Write to file + let mut tmp_file = NamedTempFile::new()?; + tmp_file.write_all(&data)?; + tmp_file.flush()?; + + // Create file-backed LogRecordsBatches (should stream, not load all into memory) + let file = std::fs::File::open(tmp_file.path())?; + let mut batches = LogRecordsBatches::from_file(file, 0)?; + + // Iterate through batches (should work just like in-memory) + let batch = batches.next().expect("Should have at least one batch"); + assert!(batch.size_in_bytes() > 0); + assert_eq!(batch.record_count(), 2); + + Ok(()) + } } diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs index 30424e5d..156ef049 100644 --- a/crates/fluss/src/util/mod.rs +++ b/crates/fluss/src/util/mod.rs @@ -22,7 +22,6 @@ use crate::metadata::TableBucket; use linked_hash_map::LinkedHashMap; use std::collections::{HashMap, HashSet}; use std::hash::Hash; -use std::path::PathBuf; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -33,11 +32,9 @@ pub fn current_time_ms() -> i64 { .as_millis() as i64 } -pub async fn delete_file(file_path: PathBuf) { - tokio::fs::remove_file(&file_path) - .await - .unwrap_or_else(|err| log::warn!("Could not delete file: {file_path:?}, error: {err:?}")); -} +// Removed: delete_file() is no longer used. +// File cleanup is now handled via RAII with FileCleanupGuard in arrow.rs +// which uses Rust's drop order to ensure files are closed before deletion. pub struct FairBucketStatusMap { map: LinkedHashMap>, From bec7bd2925f52f8e4fd4aa67fb6250f38498e59e Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Wed, 21 Jan 2026 02:31:11 +0000 Subject: [PATCH 2/6] rebase and fix failing test --- crates/fluss/src/record/arrow.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 53fcc84b..5fbdfdc2 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -1360,7 +1360,7 @@ pub struct MyVec(pub StreamReader); #[cfg(test)] mod tests { use super::*; - use crate::metadata::{DataField, DataTypes}; + use crate::metadata::{DataField, DataTypes, RowType}; #[test] fn test_to_array_type() { @@ -1609,7 +1609,7 @@ mod tests { use tempfile::NamedTempFile; // Integration test: Real log record batch streamed from file - let row_type = DataTypes::row(vec![ + let row_type = RowType::new(vec![ DataField::new("id".to_string(), DataTypes::int(), None), DataField::new("name".to_string(), DataTypes::string(), None), ]); From 669b5e6aa65c72b29024e73359f793e13715e79e Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Wed, 21 Jan 2026 10:10:05 +0000 Subject: [PATCH 3/6] address feedback --- .../src/client/table/log_fetch_buffer.rs | 9 ++- crates/fluss/src/client/table/remote_log.rs | 4 +- crates/fluss/src/record/arrow.rs | 65 +++++++++++-------- 3 files changed, 46 insertions(+), 32 deletions(-) diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index c56f0e59..12540e46 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -442,7 +442,8 @@ impl DefaultCompletedFetch { if record.offset() >= self.next_fetch_offset { return Ok(Some(record)); } - } else if let Some(batch) = self.log_record_batch.next() { + } else if let Some(batch_result) = self.log_record_batch.next() { + let batch = batch_result?; self.current_record_iterator = Some(batch.records(&self.read_context)?); self.current_record_batch = Some(batch); } else { @@ -471,11 +472,12 @@ impl DefaultCompletedFetch { /// Get the next batch directly without row iteration fn next_fetched_batch(&mut self) -> Result> { loop { - let Some(log_batch) = self.log_record_batch.next() else { + let Some(log_batch_result) = self.log_record_batch.next() else { self.drain(); return Ok(None); }; + let log_batch = log_batch_result?; let mut record_batch = log_batch.record_batch(&self.read_context)?; // Skip empty batches @@ -813,7 +815,8 @@ impl PendingFetch for RemotePendingFetch { ); // Wrap it with RemoteCompletedFetch to hold the permit - // Permit will delete the file when dropped + // Permit manages the prefetch slot (releases semaphore and notifies coordinator) when dropped; + // file deletion is handled by FileCleanupGuard in the file-backed source created via from_file_with_cleanup Ok(Box::new(RemoteCompletedFetch::new(inner_fetch, permit))) } } diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index 7be4b07f..8b0e30e5 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -176,7 +176,7 @@ impl Drop for PrefetchPermit { } /// Downloaded remote log file with prefetch permit -/// File remains on disk for memory efficiency - permit cleanup deletes it +/// File remains on disk for memory efficiency; file deletion is handled by FileCleanupGuard in FileSource #[derive(Debug)] pub struct RemoteLogFile { /// Path to the downloaded file on local disk @@ -185,7 +185,7 @@ pub struct RemoteLogFile { /// Currently unused but kept for potential future use (logging, metrics, etc.) #[allow(dead_code)] pub file_size: usize, - /// RAII permit that will delete the file when dropped + /// RAII permit that releases prefetch semaphore slot and notifies coordinator when dropped pub permit: PrefetchPermit, } diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 5fbdfdc2..a012c151 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -532,6 +532,18 @@ impl FileSource { /// Create a new FileSource without automatic cleanup. pub fn new(file: std::fs::File, base_offset: usize) -> Result { let file_size = file.metadata()?.len() as usize; + + // Validate base_offset to prevent underflow in total_size() + if base_offset > file_size { + return Err(Error::UnexpectedError { + message: format!( + "base_offset ({}) exceeds file_size ({})", + base_offset, file_size + ), + source: None, + }); + } + Ok(Self { file: Mutex::new(file), file_size, @@ -548,6 +560,18 @@ impl FileSource { file_path: std::path::PathBuf, ) -> Result { let file_size = file.metadata()?.len() as usize; + + // Validate base_offset to prevent underflow in total_size() + if base_offset > file_size { + return Err(Error::UnexpectedError { + message: format!( + "base_offset ({}) exceeds file_size ({})", + base_offset, file_size + ), + source: None, + }); + } + Ok(Self { file: Mutex::new(file), file_size, @@ -673,58 +697,45 @@ impl LogRecordsBatches { } } - pub fn next_batch_size(&self) -> Option { + /// Try to get the size of the next batch. + fn next_batch_size(&self) -> Result> { if self.remaining_bytes < LOG_OVERHEAD { - return None; + return Ok(None); } - // Read only header to get size (efficient!) + // Read only header to get size match self.source.read_batch_header(self.current_pos) { Ok((_base_offset, batch_size)) => { if batch_size > self.remaining_bytes { - None + Ok(None) } else { - Some(batch_size) + Ok(Some(batch_size)) } } - Err(e) => { - log::debug!( - "Failed to read batch header at pos {}: {}", - self.current_pos, - e - ); - None - } + Err(e) => Err(e), } } } impl Iterator for LogRecordsBatches { - type Item = LogRecordBatch; + type Item = Result; fn next(&mut self) -> Option { match self.next_batch_size() { - Some(batch_size) => { + Ok(Some(batch_size)) => { // Read full batch data on-demand match self.source.read_batch_data(self.current_pos, batch_size) { Ok(data) => { let record_batch = LogRecordBatch::new(data); self.current_pos += batch_size; self.remaining_bytes -= batch_size; - Some(record_batch) - } - Err(e) => { - log::error!( - "Failed to read batch data at pos {} size {}: {}", - self.current_pos, - batch_size, - e - ); - None + Some(Ok(record_batch)) } + Err(e) => Some(Err(e)), } } - None => None, + Ok(None) => None, + Err(e) => Some(Err(e)), } } } @@ -1649,7 +1660,7 @@ mod tests { let mut batches = LogRecordsBatches::from_file(file, 0)?; // Iterate through batches (should work just like in-memory) - let batch = batches.next().expect("Should have at least one batch"); + let batch = batches.next().expect("Should have at least one batch")?; assert!(batch.size_in_bytes() > 0); assert_eq!(batch.record_count(), 2); From fc75fbc31f7c7db2f2e1bd9d61e81a4333abcec4 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Wed, 21 Jan 2026 16:52:05 +0000 Subject: [PATCH 4/6] fix imports --- .../fluss/src/client/table/log_fetch_buffer.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index 12540e46..43acddec 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -18,6 +18,9 @@ use arrow::array::RecordBatch; use parking_lot::Mutex; +use crate::client::table::remote_log::{ + PrefetchPermit, RemoteLogDownloadFuture, RemoteLogFile, RemoteLogSegment, +}; use crate::error::{ApiError, Error, Result}; use crate::metadata::TableBucket; use crate::record::{ @@ -652,13 +655,13 @@ impl CompletedFetch for DefaultCompletedFetch { /// Holds RAII permit until consumed (data is in inner) pub struct RemoteCompletedFetch { inner: DefaultCompletedFetch, - permit: Option, + permit: Option, } impl RemoteCompletedFetch { pub fn new( inner: DefaultCompletedFetch, - permit: crate::client::table::remote_log::PrefetchPermit, + permit: PrefetchPermit, ) -> Self { Self { inner, @@ -731,8 +734,8 @@ impl CompletedFetch for RemoteCompletedFetch { /// Pending fetch that waits for remote log file to be downloaded pub struct RemotePendingFetch { - segment: crate::client::table::remote_log::RemoteLogSegment, - download_future: crate::client::table::remote_log::RemoteLogDownloadFuture, + segment: RemoteLogSegment, + download_future: RemoteLogDownloadFuture, pos_in_log_segment: i32, fetch_offset: i64, high_watermark: i64, @@ -741,8 +744,8 @@ pub struct RemotePendingFetch { impl RemotePendingFetch { pub fn new( - segment: crate::client::table::remote_log::RemoteLogSegment, - download_future: crate::client::table::remote_log::RemoteLogDownloadFuture, + segment: RemoteLogSegment, + download_future: RemoteLogDownloadFuture, pos_in_log_segment: i32, fetch_offset: i64, high_watermark: i64, @@ -771,7 +774,7 @@ impl PendingFetch for RemotePendingFetch { fn to_completed_fetch(self: Box) -> Result> { // Take the RemoteLogFile and destructure let remote_log_file = self.download_future.take_remote_log_file()?; - let crate::client::table::remote_log::RemoteLogFile { + let RemoteLogFile { file_path, file_size: _, permit, From de1e2dd0e9b033c57e68a05678f31f32bd1ea5ff Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Wed, 21 Jan 2026 23:34:39 +0000 Subject: [PATCH 5/6] imports and enum dispatch --- .../src/client/table/log_fetch_buffer.rs | 22 ++-- crates/fluss/src/client/table/remote_log.rs | 58 +++++----- crates/fluss/src/client/table/scanner.rs | 18 +-- crates/fluss/src/record/arrow.rs | 104 +++++++++--------- 4 files changed, 107 insertions(+), 95 deletions(-) diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index 43acddec..2ed24e82 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -26,10 +26,14 @@ use crate::metadata::TableBucket; use crate::record::{ LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext, ScanRecord, }; -use std::collections::{HashMap, VecDeque}; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::Duration; +use std::{ + collections::{HashMap, VecDeque}, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + time::{Duration, Instant}, +}; use tokio::sync::Notify; #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -109,7 +113,7 @@ impl LogFetchBuffer { /// Wait for the buffer to become non-empty, with timeout. /// Returns true if data became available, false if timeout. pub async fn await_not_empty(&self, timeout: Duration) -> Result { - let deadline = std::time::Instant::now() + timeout; + let deadline = Instant::now() + timeout; loop { // Check if buffer is not empty @@ -125,7 +129,7 @@ impl LogFetchBuffer { } // Check if timeout - let now = std::time::Instant::now(); + let now = Instant::now(); if now >= deadline { return Ok(false); } @@ -659,10 +663,7 @@ pub struct RemoteCompletedFetch { } impl RemoteCompletedFetch { - pub fn new( - inner: DefaultCompletedFetch, - permit: PrefetchPermit, - ) -> Self { + pub fn new(inner: DefaultCompletedFetch, permit: PrefetchPermit) -> Self { Self { inner, permit: Some(permit), @@ -835,7 +836,6 @@ mod tests { use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext, to_arrow_schema}; use crate::row::GenericRow; use std::sync::Arc; - use std::time::Duration; fn test_read_context() -> ReadContext { let row_type = RowType::new(vec![DataField::new( diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index 8b0e30e5..61356fa5 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -19,13 +19,17 @@ use crate::io::{FileIO, Storage}; use crate::metadata::TableBucket; use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; use parking_lot::{Mutex, RwLock}; -use std::cmp::Reverse; -use std::collections::{BinaryHeap, HashMap}; -use std::future::Future; -use std::io; -use std::path::{Path, PathBuf}; -use std::pin::Pin; -use std::sync::Arc; +use std::{ + cmp::{Ordering, Reverse, min}, + collections::{BinaryHeap, HashMap}, + env, + future::Future, + io, mem, + path::{Path, PathBuf}, + pin::Pin, + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; use tempfile::TempDir; use tokio::io::AsyncWriteExt; use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; @@ -211,7 +215,7 @@ impl RemoteLogDownloadRequest { // Primary: Java semantics (timestamp cross-bucket, offset within-bucket) // Tie-breakers: table_bucket fields (table_id, partition_id, bucket_id), then segment_id impl Ord for RemoteLogDownloadRequest { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { + fn cmp(&self, other: &Self) -> Ordering { if self.segment.table_bucket == other.segment.table_bucket { // Same bucket: order by start_offset (ascending - earlier segments first) self.segment @@ -248,14 +252,14 @@ impl Ord for RemoteLogDownloadRequest { } impl PartialOrd for RemoteLogDownloadRequest { - fn partial_cmp(&self, other: &Self) -> Option { + fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } impl PartialEq for RemoteLogDownloadRequest { fn eq(&self, other: &Self) -> bool { - self.cmp(other) == std::cmp::Ordering::Equal + self.cmp(other) == Ordering::Equal } } @@ -637,7 +641,7 @@ impl RemoteLogDownloadFuture { // This also ensures that any callbacks registered after this point will be called immediately let callbacks: Vec = { let mut callbacks_guard = callbacks_clone.lock(); - std::mem::take(&mut *callbacks_guard) + mem::take(&mut *callbacks_guard) }; for callback in callbacks { callback(); @@ -890,7 +894,7 @@ impl RemoteLogDownloader { let (op, relative_path) = storage.create(remote_path)?; // Timeout for remote storage operations (30 seconds) - const REMOTE_OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); + const REMOTE_OP_TIMEOUT: Duration = Duration::from_secs(30); // Get file metadata to know the size with timeout let meta = op.stat(relative_path).await?; @@ -907,7 +911,7 @@ impl RemoteLogDownloader { let total_chunks = file_size.div_ceil(CHUNK_SIZE); while offset < file_size { - let end = std::cmp::min(offset + CHUNK_SIZE, file_size); + let end = min(offset + CHUNK_SIZE, file_size); let range = offset..end; chunk_count += 1; @@ -1035,9 +1039,9 @@ mod tests { }) } else { let fake_data = vec![1, 2, 3, 4]; - let temp_dir = std::env::temp_dir(); - let timestamp = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) + let temp_dir = env::temp_dir(); + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) .unwrap() .as_nanos(); let file_path = @@ -1159,7 +1163,7 @@ mod tests { .collect(); // Wait for exactly 2 to start - tokio::time::sleep(std::time::Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_millis(50)).await; assert_eq!( fake_fetcher.in_flight(), 2, @@ -1168,7 +1172,7 @@ mod tests { // Release one fake_fetcher.release_one(); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_millis(50)).await; // Max should never exceed 2 assert_eq!( @@ -1206,7 +1210,7 @@ mod tests { .collect(); // Wait for first 2 to complete - let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); + let deadline = tokio::time::Instant::now() + Duration::from_secs(2); loop { if futures.iter().filter(|f| f.is_done()).count() >= 2 { break; @@ -1214,11 +1218,11 @@ mod tests { if tokio::time::Instant::now() > deadline { panic!("Timeout waiting for first 2 downloads"); } - tokio::time::sleep(std::time::Duration::from_millis(10)).await; + tokio::time::sleep(Duration::from_millis(10)).await; } // Verify 3rd and 4th are blocked (prefetch limit) - tokio::time::sleep(std::time::Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_millis(50)).await; assert_eq!( futures.iter().filter(|f| f.is_done()).count(), 2, @@ -1231,7 +1235,7 @@ mod tests { drop(futures); // 3rd and 4th should now complete - let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); + let deadline = tokio::time::Instant::now() + Duration::from_secs(2); loop { if f3.is_done() && f4.is_done() { break; @@ -1239,7 +1243,7 @@ mod tests { if tokio::time::Instant::now() > deadline { panic!("Timeout after permit release"); } - tokio::time::sleep(std::time::Duration::from_millis(10)).await; + tokio::time::sleep(Duration::from_millis(10)).await; } } @@ -1257,7 +1261,7 @@ mod tests { let future = downloader.request_remote_log("dir", &seg); // Should succeed after retries - let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5); + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); loop { if future.is_done() { break; @@ -1265,7 +1269,7 @@ mod tests { if tokio::time::Instant::now() > deadline { panic!("Timeout waiting for retry to succeed"); } - tokio::time::sleep(std::time::Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_millis(50)).await; } assert!(future.is_done(), "Should succeed after retries"); @@ -1277,11 +1281,11 @@ mod tests { RemoteLogDownloader::new_with_fetcher(fake_fetcher2.clone(), 10, 1).unwrap(); let future2 = downloader2.request_remote_log("dir", &seg2); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_millis(50)).await; // Drop to cancel drop(future2); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_millis(50)).await; assert_eq!( fake_fetcher2.in_flight(), diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 8ace20dc..534d8b0f 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -19,10 +19,12 @@ use arrow::array::RecordBatch; use arrow_schema::SchemaRef; use log::{debug, warn}; use parking_lot::{Mutex, RwLock}; -use std::collections::{HashMap, HashSet}; -use std::slice::from_ref; -use std::sync::Arc; -use std::time::Duration; +use std::{ + collections::{HashMap, HashSet}, + slice::from_ref, + sync::Arc, + time::{Duration, Instant}, +}; use tempfile::TempDir; use crate::client::connection::FlussConnection; @@ -294,7 +296,7 @@ impl LogScannerInner { } async fn poll_records(&self, timeout: Duration) -> Result { - let start = std::time::Instant::now(); + let start = Instant::now(); let deadline = start + timeout; loop { @@ -309,7 +311,7 @@ impl LogScannerInner { } // No data available, check if we should wait - let now = std::time::Instant::now(); + let now = Instant::now(); if now >= deadline { // Timeout reached, return empty result return Ok(ScanRecords::new(HashMap::new())); @@ -378,7 +380,7 @@ impl LogScannerInner { } async fn poll_batches(&self, timeout: Duration) -> Result> { - let start = std::time::Instant::now(); + let start = Instant::now(); let deadline = start + timeout; loop { @@ -389,7 +391,7 @@ impl LogScannerInner { return Ok(batches); } - let now = std::time::Instant::now(); + let now = Instant::now(); if now >= deadline { return Ok(Vec::new()); } diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index a012c151..40e6c2d9 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -44,7 +44,10 @@ use bytes::Bytes; use crc32c::crc32c; use parking_lot::Mutex; use std::{ - io::{Cursor, Seek, SeekFrom, Write}, + collections::HashMap, + fs::File, + io::{Cursor, Read, Seek, SeekFrom, Write}, + path::PathBuf, sync::Arc, }; @@ -421,36 +424,19 @@ pub trait ToArrow { fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>; } -/// Abstract source of log record data. -/// Allows streaming from files or in-memory buffers. -pub trait LogRecordsSource: Send + Sync { - /// Read batch header at given position. - /// Returns (base_offset, batch_size) tuple. - fn read_batch_header(&self, pos: usize) -> Result<(i64, usize)>; - - /// Read full batch data at given position with given size. - /// Returns Bytes that can be zero-copy sliced. - fn read_batch_data(&self, pos: usize, size: usize) -> Result; - - /// Total size of the source in bytes. - fn total_size(&self) -> usize; -} - -/// In-memory implementation of LogRecordsSource. +/// In-memory log record source. /// Used for local tablet server fetches (existing path). -pub struct MemorySource { +struct MemorySource { data: Bytes, } impl MemorySource { - pub fn new(data: Vec) -> Self { + fn new(data: Vec) -> Self { Self { data: Bytes::from(data), } } -} -impl LogRecordsSource for MemorySource { fn read_batch_header(&self, pos: usize) -> Result<(i64, usize)> { if pos + LOG_OVERHEAD > self.data.len() { return Err(Error::UnexpectedError { @@ -497,7 +483,7 @@ impl LogRecordsSource for MemorySource { /// RAII guard that deletes a file when dropped. /// Used to ensure file deletion happens AFTER the file handle is closed. struct FileCleanupGuard { - file_path: std::path::PathBuf, + file_path: PathBuf, } impl Drop for FileCleanupGuard { @@ -515,14 +501,14 @@ impl Drop for FileCleanupGuard { } } -/// File-backed implementation of LogRecordsSource. +/// File-backed log record source. /// Used for remote log segments downloaded to local disk. /// Streams data on-demand instead of loading entire file into memory. /// /// Uses Mutex with seek + read_exact for cross-platform compatibility. /// Access pattern is sequential iteration (single consumer), so mutex overhead is negligible. -pub struct FileSource { - file: Mutex, +struct FileSource { + file: Mutex, file_size: usize, base_offset: usize, _cleanup: Option, // Drops AFTER file (field order matters!) @@ -530,7 +516,7 @@ pub struct FileSource { impl FileSource { /// Create a new FileSource without automatic cleanup. - pub fn new(file: std::fs::File, base_offset: usize) -> Result { + pub fn new(file: File, base_offset: usize) -> Result { let file_size = file.metadata()?.len() as usize; // Validate base_offset to prevent underflow in total_size() @@ -554,11 +540,7 @@ impl FileSource { /// Create a new FileSource that will delete the file when dropped. /// This is used for remote log files that need cleanup. - pub fn new_with_cleanup( - file: std::fs::File, - base_offset: usize, - file_path: std::path::PathBuf, - ) -> Result { + fn new_with_cleanup(file: File, base_offset: usize, file_path: PathBuf) -> Result { let file_size = file.metadata()?.len() as usize; // Validate base_offset to prevent underflow in total_size() @@ -583,15 +565,12 @@ impl FileSource { /// Read data at a specific position using seek + read_exact. /// This is cross-platform and adequate for sequential access patterns. fn read_at(&self, pos: u64, buf: &mut [u8]) -> Result<()> { - use std::io::Read; let mut file = self.file.lock(); file.seek(SeekFrom::Start(pos))?; file.read_exact(buf)?; Ok(()) } -} -impl LogRecordsSource for FileSource { fn read_batch_header(&self, pos: usize) -> Result<(i64, usize)> { let actual_pos = self.base_offset + pos; if actual_pos + LOG_OVERHEAD > self.file_size { @@ -641,8 +620,37 @@ impl LogRecordsSource for FileSource { } } +/// Enum for different log record sources. +pub enum LogRecordsSource { + Memory(MemorySource), + File(FileSource), +} + +impl LogRecordsSource { + fn read_batch_header(&self, pos: usize) -> Result<(i64, usize)> { + match self { + Self::Memory(s) => s.read_batch_header(pos), + Self::File(s) => s.read_batch_header(pos), + } + } + + fn read_batch_data(&self, pos: usize, size: usize) -> Result { + match self { + Self::Memory(s) => s.read_batch_data(pos, size), + Self::File(s) => s.read_batch_data(pos, size), + } + } + + fn total_size(&self) -> usize { + match self { + Self::Memory(s) => s.total_size(), + Self::File(s) => s.total_size(), + } + } +} + pub struct LogRecordsBatches { - source: Box, + source: LogRecordsSource, current_pos: usize, remaining_bytes: usize, } @@ -652,7 +660,7 @@ impl LogRecordsBatches { pub fn new(data: Vec) -> Self { let remaining_bytes = data.len(); Self { - source: Box::new(MemorySource::new(data)), + source: LogRecordsSource::Memory(MemorySource::new(data)), current_pos: 0, remaining_bytes, } @@ -660,11 +668,11 @@ impl LogRecordsBatches { /// Create from file. /// Enables streaming without loading entire file into memory. - pub fn from_file(file: std::fs::File, base_offset: usize) -> Result { + pub fn from_file(file: File, base_offset: usize) -> Result { let source = FileSource::new(file, base_offset)?; let remaining_bytes = source.total_size(); Ok(Self { - source: Box::new(source), + source: LogRecordsSource::File(source), current_pos: 0, remaining_bytes, }) @@ -674,21 +682,21 @@ impl LogRecordsBatches { /// The file will be deleted when the LogRecordsBatches is dropped. /// This ensures file is closed before deletion. pub fn from_file_with_cleanup( - file: std::fs::File, + file: File, base_offset: usize, - file_path: std::path::PathBuf, + file_path: PathBuf, ) -> Result { let source = FileSource::new_with_cleanup(file, base_offset, file_path)?; let remaining_bytes = source.total_size(); Ok(Self { - source: Box::new(source), + source: LogRecordsSource::File(source), current_pos: 0, remaining_bytes, }) } /// Create from any source (for testing). - pub fn from_source(source: Box) -> Self { + pub fn from_source(source: LogRecordsSource) -> Self { let remaining_bytes = source.total_size(); Self { source, @@ -1226,7 +1234,7 @@ impl ReadContext { &body_buffer, batch_metadata, resolve_schema, - &std::collections::HashMap::new(), + &HashMap::new(), None, &version, )?; @@ -1266,7 +1274,7 @@ impl ReadContext { &body_buffer, batch_metadata, self.full_schema.clone(), - &std::collections::HashMap::new(), + &HashMap::new(), None, &version, )?; @@ -1570,7 +1578,6 @@ mod tests { #[test] fn test_file_source_streaming() -> Result<()> { - use std::io::Write; use tempfile::NamedTempFile; // Test 1: Basic file reads work @@ -1579,7 +1586,7 @@ mod tests { tmp_file.write_all(&test_data)?; tmp_file.flush()?; - let file = std::fs::File::open(tmp_file.path())?; + let file = File::open(tmp_file.path())?; let source = FileSource::new(file, 0)?; // Read full data @@ -1598,7 +1605,7 @@ mod tests { tmp_file2.write_all(&actual_data)?; tmp_file2.flush()?; - let file2 = std::fs::File::open(tmp_file2.path())?; + let file2 = File::open(tmp_file2.path())?; let source2 = FileSource::new(file2, 100)?; // Skip first 100 bytes assert_eq!(source2.total_size(), 5); // Only counts data after offset @@ -1616,7 +1623,6 @@ mod tests { }; use crate::metadata::TablePath; use crate::row::GenericRow; - use std::io::Write; use tempfile::NamedTempFile; // Integration test: Real log record batch streamed from file @@ -1656,7 +1662,7 @@ mod tests { tmp_file.flush()?; // Create file-backed LogRecordsBatches (should stream, not load all into memory) - let file = std::fs::File::open(tmp_file.path())?; + let file = File::open(tmp_file.path())?; let mut batches = LogRecordsBatches::from_file(file, 0)?; // Iterate through batches (should work just like in-memory) From 4a166c3250d728672d75339b8ea3cf2f2499774a Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Thu, 22 Jan 2026 02:18:00 +0000 Subject: [PATCH 6/6] fix CI/CD --- crates/fluss/src/client/table/remote_log.rs | 9 +++++++-- crates/fluss/src/record/arrow.rs | 17 ++++------------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index 61356fa5..fb97c4ba 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -22,13 +22,18 @@ use parking_lot::{Mutex, RwLock}; use std::{ cmp::{Ordering, Reverse, min}, collections::{BinaryHeap, HashMap}, - env, future::Future, io, mem, path::{Path, PathBuf}, pin::Pin, sync::Arc, - time::{Duration, SystemTime, UNIX_EPOCH}, + time::Duration, +}; + +#[cfg(test)] +use std::{ + env, + time::{SystemTime, UNIX_EPOCH}, }; use tempfile::TempDir; use tokio::io::AsyncWriteExt; diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 40e6c2d9..a86e146d 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -621,7 +621,7 @@ impl FileSource { } /// Enum for different log record sources. -pub enum LogRecordsSource { +enum LogRecordsSource { Memory(MemorySource), File(FileSource), } @@ -658,9 +658,10 @@ pub struct LogRecordsBatches { impl LogRecordsBatches { /// Create from in-memory Vec (existing path - backward compatible). pub fn new(data: Vec) -> Self { - let remaining_bytes = data.len(); + let source = LogRecordsSource::Memory(MemorySource::new(data)); + let remaining_bytes = source.total_size(); Self { - source: LogRecordsSource::Memory(MemorySource::new(data)), + source, current_pos: 0, remaining_bytes, } @@ -695,16 +696,6 @@ impl LogRecordsBatches { }) } - /// Create from any source (for testing). - pub fn from_source(source: LogRecordsSource) -> Self { - let remaining_bytes = source.total_size(); - Self { - source, - current_pos: 0, - remaining_bytes, - } - } - /// Try to get the size of the next batch. fn next_batch_size(&self) -> Result> { if self.remaining_bytes < LOG_OVERHEAD {