Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/fluss/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ impl FlussConnection {
self.network_connects.clone()
}

pub fn config(&self) -> &Config {
&self.args
}

pub async fn get_admin(&self) -> Result<FlussAdmin> {
FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()).await
}
Expand Down
198 changes: 189 additions & 9 deletions crates/fluss/src/client/table/log_fetch_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@
use arrow::array::RecordBatch;
use parking_lot::Mutex;

use crate::client::table::remote_log::{
PrefetchPermit, RemoteLogDownloadFuture, RemoteLogFile, RemoteLogSegment,
};
use crate::error::{ApiError, Error, Result};
use crate::metadata::TableBucket;
use crate::record::{
LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext, ScanRecord,
};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::{
collections::{HashMap, VecDeque},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::{Duration, Instant},
};
use tokio::sync::Notify;

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -106,7 +113,7 @@ impl LogFetchBuffer {
/// Wait for the buffer to become non-empty, with timeout.
/// Returns true if data became available, false if timeout.
pub async fn await_not_empty(&self, timeout: Duration) -> Result<bool> {
let deadline = std::time::Instant::now() + timeout;
let deadline = Instant::now() + timeout;

loop {
// Check if buffer is not empty
Expand All @@ -122,7 +129,7 @@ impl LogFetchBuffer {
}

// Check if timeout
let now = std::time::Instant::now();
let now = Instant::now();
if now >= deadline {
return Ok(false);
}
Expand Down Expand Up @@ -325,6 +332,7 @@ impl PendingFetch for CompletedPendingFetch {
}

/// Default implementation of CompletedFetch for in-memory log records
/// Used for local fetches from tablet server
pub struct DefaultCompletedFetch {
table_bucket: TableBucket,
api_error: Option<ApiError>,
Expand Down Expand Up @@ -441,7 +449,8 @@ impl DefaultCompletedFetch {
if record.offset() >= self.next_fetch_offset {
return Ok(Some(record));
}
} else if let Some(batch) = self.log_record_batch.next() {
} else if let Some(batch_result) = self.log_record_batch.next() {
let batch = batch_result?;
self.current_record_iterator = Some(batch.records(&self.read_context)?);
self.current_record_batch = Some(batch);
} else {
Expand Down Expand Up @@ -470,11 +479,12 @@ impl DefaultCompletedFetch {
/// Get the next batch directly without row iteration
fn next_fetched_batch(&mut self) -> Result<Option<RecordBatch>> {
loop {
let Some(log_batch) = self.log_record_batch.next() else {
let Some(log_batch_result) = self.log_record_batch.next() else {
self.drain();
return Ok(None);
};

let log_batch = log_batch_result?;
let mut record_batch = log_batch.record_batch(&self.read_context)?;

// Skip empty batches
Expand Down Expand Up @@ -644,6 +654,177 @@ impl CompletedFetch for DefaultCompletedFetch {
}
}

/// Completed fetch for remote log segments
/// Matches Java's RemoteCompletedFetch design - separate class for remote vs local
/// Holds RAII permit until consumed (data is in inner)
pub struct RemoteCompletedFetch {
inner: DefaultCompletedFetch,
permit: Option<PrefetchPermit>,
}

impl RemoteCompletedFetch {
pub fn new(inner: DefaultCompletedFetch, permit: PrefetchPermit) -> Self {
Self {
inner,
permit: Some(permit),
}
}
}

impl CompletedFetch for RemoteCompletedFetch {
fn table_bucket(&self) -> &TableBucket {
self.inner.table_bucket()
}

fn api_error(&self) -> Option<&ApiError> {
self.inner.api_error()
}

fn fetch_error_context(&self) -> Option<&FetchErrorContext> {
self.inner.fetch_error_context()
}

fn take_error(&mut self) -> Option<Error> {
self.inner.take_error()
}

fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>> {
self.inner.fetch_records(max_records)
}

fn fetch_batches(&mut self, max_batches: usize) -> Result<Vec<RecordBatch>> {
self.inner.fetch_batches(max_batches)
}

fn is_consumed(&self) -> bool {
self.inner.is_consumed()
}

fn records_read(&self) -> usize {
self.inner.records_read()
}

fn drain(&mut self) {
self.inner.drain();
// Release permit immediately (don't wait for struct drop)
// Critical: allows prefetch to continue even if Box<dyn CompletedFetch> kept around
self.permit.take(); // drops permit here, triggers recycle notification
}

fn size_in_bytes(&self) -> usize {
self.inner.size_in_bytes()
}

fn high_watermark(&self) -> i64 {
self.inner.high_watermark()
}

fn is_initialized(&self) -> bool {
self.inner.is_initialized()
}

fn set_initialized(&mut self) {
self.inner.set_initialized()
}

fn next_fetch_offset(&self) -> i64 {
self.inner.next_fetch_offset()
}
}
// Permit released explicitly in drain() or automatically when struct drops

/// Pending fetch that waits for remote log file to be downloaded
pub struct RemotePendingFetch {
segment: RemoteLogSegment,
download_future: RemoteLogDownloadFuture,
pos_in_log_segment: i32,
fetch_offset: i64,
high_watermark: i64,
read_context: ReadContext,
}

impl RemotePendingFetch {
pub fn new(
segment: RemoteLogSegment,
download_future: RemoteLogDownloadFuture,
pos_in_log_segment: i32,
fetch_offset: i64,
high_watermark: i64,
read_context: ReadContext,
) -> Self {
Self {
segment,
download_future,
pos_in_log_segment,
fetch_offset,
high_watermark,
read_context,
}
}
}

impl PendingFetch for RemotePendingFetch {
fn table_bucket(&self) -> &TableBucket {
&self.segment.table_bucket
}

fn is_completed(&self) -> bool {
self.download_future.is_done()
}

fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
// Take the RemoteLogFile and destructure
let remote_log_file = self.download_future.take_remote_log_file()?;
let RemoteLogFile {
file_path,
file_size: _,
permit,
} = remote_log_file;

// Open file for streaming (no memory allocation for entire file)
let file = std::fs::File::open(&file_path)?;
let file_size = file.metadata()?.len() as usize;

// Create file-backed LogRecordsBatches with cleanup (streaming!)
// Data will be read batch-by-batch on-demand, not all at once
// FileSource will delete the file when dropped (after file is closed)
let log_record_batch = LogRecordsBatches::from_file_with_cleanup(
file,
self.pos_in_log_segment as usize,
file_path.clone(),
)?;

// Calculate size based on position offset
let size_in_bytes = if self.pos_in_log_segment > 0 {
let pos = self.pos_in_log_segment as usize;
if pos >= file_size {
return Err(Error::UnexpectedError {
message: format!("Position {} exceeds file size {}", pos, file_size),
source: None,
});
}
file_size - pos
} else {
file_size
};

// Create DefaultCompletedFetch
let inner_fetch = DefaultCompletedFetch::new(
self.segment.table_bucket.clone(),
log_record_batch,
size_in_bytes,
self.read_context,
self.fetch_offset,
self.high_watermark,
);

// Wrap it with RemoteCompletedFetch to hold the permit
// Permit manages the prefetch slot (releases semaphore and notifies coordinator) when dropped;
// file deletion is handled by FileCleanupGuard in the file-backed source created via from_file_with_cleanup
Ok(Box::new(RemoteCompletedFetch::new(inner_fetch, permit)))
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -655,7 +836,6 @@ mod tests {
use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext, to_arrow_schema};
use crate::row::GenericRow;
use std::sync::Arc;
use std::time::Duration;

fn test_read_context() -> ReadContext {
let row_type = RowType::new(vec![DataField::new(
Expand Down
3 changes: 3 additions & 0 deletions crates/fluss/src/client/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ mod writer;
use crate::client::table::upsert::TableUpsert;
pub use append::{AppendWriter, TableAppend};
pub use lookup::{LookupResult, Lookuper, TableLookup};
pub use remote_log::{
DEFAULT_SCANNER_REMOTE_LOG_DOWNLOAD_THREADS, DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM,
};
pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
pub use writer::{TableWriter, UpsertWriter};

Expand Down
Loading
Loading