Skip to content

Commit de1e2dd

Browse files
committed
imports and enum dispatch
1 parent fc75fbc commit de1e2dd

4 files changed

Lines changed: 107 additions & 95 deletions

File tree

crates/fluss/src/client/table/log_fetch_buffer.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,14 @@ use crate::metadata::TableBucket;
2626
use crate::record::{
2727
LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext, ScanRecord,
2828
};
29-
use std::collections::{HashMap, VecDeque};
30-
use std::sync::Arc;
31-
use std::sync::atomic::{AtomicBool, Ordering};
32-
use std::time::Duration;
29+
use std::{
30+
collections::{HashMap, VecDeque},
31+
sync::{
32+
Arc,
33+
atomic::{AtomicBool, Ordering},
34+
},
35+
time::{Duration, Instant},
36+
};
3337
use tokio::sync::Notify;
3438

3539
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -109,7 +113,7 @@ impl LogFetchBuffer {
109113
/// Wait for the buffer to become non-empty, with timeout.
110114
/// Returns true if data became available, false if timeout.
111115
pub async fn await_not_empty(&self, timeout: Duration) -> Result<bool> {
112-
let deadline = std::time::Instant::now() + timeout;
116+
let deadline = Instant::now() + timeout;
113117

114118
loop {
115119
// Check if buffer is not empty
@@ -125,7 +129,7 @@ impl LogFetchBuffer {
125129
}
126130

127131
// Check if timeout
128-
let now = std::time::Instant::now();
132+
let now = Instant::now();
129133
if now >= deadline {
130134
return Ok(false);
131135
}
@@ -659,10 +663,7 @@ pub struct RemoteCompletedFetch {
659663
}
660664

661665
impl RemoteCompletedFetch {
662-
pub fn new(
663-
inner: DefaultCompletedFetch,
664-
permit: PrefetchPermit,
665-
) -> Self {
666+
pub fn new(inner: DefaultCompletedFetch, permit: PrefetchPermit) -> Self {
666667
Self {
667668
inner,
668669
permit: Some(permit),
@@ -835,7 +836,6 @@ mod tests {
835836
use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext, to_arrow_schema};
836837
use crate::row::GenericRow;
837838
use std::sync::Arc;
838-
use std::time::Duration;
839839

840840
fn test_read_context() -> ReadContext {
841841
let row_type = RowType::new(vec![DataField::new(

crates/fluss/src/client/table/remote_log.rs

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@ use crate::io::{FileIO, Storage};
1919
use crate::metadata::TableBucket;
2020
use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
2121
use parking_lot::{Mutex, RwLock};
22-
use std::cmp::Reverse;
23-
use std::collections::{BinaryHeap, HashMap};
24-
use std::future::Future;
25-
use std::io;
26-
use std::path::{Path, PathBuf};
27-
use std::pin::Pin;
28-
use std::sync::Arc;
22+
use std::{
23+
cmp::{Ordering, Reverse, min},
24+
collections::{BinaryHeap, HashMap},
25+
env,
26+
future::Future,
27+
io, mem,
28+
path::{Path, PathBuf},
29+
pin::Pin,
30+
sync::Arc,
31+
time::{Duration, SystemTime, UNIX_EPOCH},
32+
};
2933
use tempfile::TempDir;
3034
use tokio::io::AsyncWriteExt;
3135
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
@@ -211,7 +215,7 @@ impl RemoteLogDownloadRequest {
211215
// Primary: Java semantics (timestamp cross-bucket, offset within-bucket)
212216
// Tie-breakers: table_bucket fields (table_id, partition_id, bucket_id), then segment_id
213217
impl Ord for RemoteLogDownloadRequest {
214-
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
218+
fn cmp(&self, other: &Self) -> Ordering {
215219
if self.segment.table_bucket == other.segment.table_bucket {
216220
// Same bucket: order by start_offset (ascending - earlier segments first)
217221
self.segment
@@ -248,14 +252,14 @@ impl Ord for RemoteLogDownloadRequest {
248252
}
249253

250254
impl PartialOrd for RemoteLogDownloadRequest {
251-
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
255+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
252256
Some(self.cmp(other))
253257
}
254258
}
255259

256260
impl PartialEq for RemoteLogDownloadRequest {
257261
fn eq(&self, other: &Self) -> bool {
258-
self.cmp(other) == std::cmp::Ordering::Equal
262+
self.cmp(other) == Ordering::Equal
259263
}
260264
}
261265

@@ -637,7 +641,7 @@ impl RemoteLogDownloadFuture {
637641
// This also ensures that any callbacks registered after this point will be called immediately
638642
let callbacks: Vec<CompletionCallback> = {
639643
let mut callbacks_guard = callbacks_clone.lock();
640-
std::mem::take(&mut *callbacks_guard)
644+
mem::take(&mut *callbacks_guard)
641645
};
642646
for callback in callbacks {
643647
callback();
@@ -890,7 +894,7 @@ impl RemoteLogDownloader {
890894
let (op, relative_path) = storage.create(remote_path)?;
891895

892896
// Timeout for remote storage operations (30 seconds)
893-
const REMOTE_OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
897+
const REMOTE_OP_TIMEOUT: Duration = Duration::from_secs(30);
894898

895899
// Get file metadata to know the size with timeout
896900
let meta = op.stat(relative_path).await?;
@@ -907,7 +911,7 @@ impl RemoteLogDownloader {
907911
let total_chunks = file_size.div_ceil(CHUNK_SIZE);
908912

909913
while offset < file_size {
910-
let end = std::cmp::min(offset + CHUNK_SIZE, file_size);
914+
let end = min(offset + CHUNK_SIZE, file_size);
911915
let range = offset..end;
912916
chunk_count += 1;
913917

@@ -1035,9 +1039,9 @@ mod tests {
10351039
})
10361040
} else {
10371041
let fake_data = vec![1, 2, 3, 4];
1038-
let temp_dir = std::env::temp_dir();
1039-
let timestamp = std::time::SystemTime::now()
1040-
.duration_since(std::time::UNIX_EPOCH)
1042+
let temp_dir = env::temp_dir();
1043+
let timestamp = SystemTime::now()
1044+
.duration_since(UNIX_EPOCH)
10411045
.unwrap()
10421046
.as_nanos();
10431047
let file_path =
@@ -1159,7 +1163,7 @@ mod tests {
11591163
.collect();
11601164

11611165
// Wait for exactly 2 to start
1162-
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1166+
tokio::time::sleep(Duration::from_millis(50)).await;
11631167
assert_eq!(
11641168
fake_fetcher.in_flight(),
11651169
2,
@@ -1168,7 +1172,7 @@ mod tests {
11681172

11691173
// Release one
11701174
fake_fetcher.release_one();
1171-
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1175+
tokio::time::sleep(Duration::from_millis(50)).await;
11721176

11731177
// Max should never exceed 2
11741178
assert_eq!(
@@ -1206,19 +1210,19 @@ mod tests {
12061210
.collect();
12071211

12081212
// Wait for first 2 to complete
1209-
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
1213+
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
12101214
loop {
12111215
if futures.iter().filter(|f| f.is_done()).count() >= 2 {
12121216
break;
12131217
}
12141218
if tokio::time::Instant::now() > deadline {
12151219
panic!("Timeout waiting for first 2 downloads");
12161220
}
1217-
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1221+
tokio::time::sleep(Duration::from_millis(10)).await;
12181222
}
12191223

12201224
// Verify 3rd and 4th are blocked (prefetch limit)
1221-
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1225+
tokio::time::sleep(Duration::from_millis(50)).await;
12221226
assert_eq!(
12231227
futures.iter().filter(|f| f.is_done()).count(),
12241228
2,
@@ -1231,15 +1235,15 @@ mod tests {
12311235
drop(futures);
12321236

12331237
// 3rd and 4th should now complete
1234-
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
1238+
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
12351239
loop {
12361240
if f3.is_done() && f4.is_done() {
12371241
break;
12381242
}
12391243
if tokio::time::Instant::now() > deadline {
12401244
panic!("Timeout after permit release");
12411245
}
1242-
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1246+
tokio::time::sleep(Duration::from_millis(10)).await;
12431247
}
12441248
}
12451249

@@ -1257,15 +1261,15 @@ mod tests {
12571261
let future = downloader.request_remote_log("dir", &seg);
12581262

12591263
// Should succeed after retries
1260-
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
1264+
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
12611265
loop {
12621266
if future.is_done() {
12631267
break;
12641268
}
12651269
if tokio::time::Instant::now() > deadline {
12661270
panic!("Timeout waiting for retry to succeed");
12671271
}
1268-
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1272+
tokio::time::sleep(Duration::from_millis(50)).await;
12691273
}
12701274

12711275
assert!(future.is_done(), "Should succeed after retries");
@@ -1277,11 +1281,11 @@ mod tests {
12771281
RemoteLogDownloader::new_with_fetcher(fake_fetcher2.clone(), 10, 1).unwrap();
12781282

12791283
let future2 = downloader2.request_remote_log("dir", &seg2);
1280-
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1284+
tokio::time::sleep(Duration::from_millis(50)).await;
12811285

12821286
// Drop to cancel
12831287
drop(future2);
1284-
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1288+
tokio::time::sleep(Duration::from_millis(50)).await;
12851289

12861290
assert_eq!(
12871291
fake_fetcher2.in_flight(),

crates/fluss/src/client/table/scanner.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ use arrow::array::RecordBatch;
1919
use arrow_schema::SchemaRef;
2020
use log::{debug, warn};
2121
use parking_lot::{Mutex, RwLock};
22-
use std::collections::{HashMap, HashSet};
23-
use std::slice::from_ref;
24-
use std::sync::Arc;
25-
use std::time::Duration;
22+
use std::{
23+
collections::{HashMap, HashSet},
24+
slice::from_ref,
25+
sync::Arc,
26+
time::{Duration, Instant},
27+
};
2628
use tempfile::TempDir;
2729

2830
use crate::client::connection::FlussConnection;
@@ -294,7 +296,7 @@ impl LogScannerInner {
294296
}
295297

296298
async fn poll_records(&self, timeout: Duration) -> Result<ScanRecords> {
297-
let start = std::time::Instant::now();
299+
let start = Instant::now();
298300
let deadline = start + timeout;
299301

300302
loop {
@@ -309,7 +311,7 @@ impl LogScannerInner {
309311
}
310312

311313
// No data available, check if we should wait
312-
let now = std::time::Instant::now();
314+
let now = Instant::now();
313315
if now >= deadline {
314316
// Timeout reached, return empty result
315317
return Ok(ScanRecords::new(HashMap::new()));
@@ -378,7 +380,7 @@ impl LogScannerInner {
378380
}
379381

380382
async fn poll_batches(&self, timeout: Duration) -> Result<Vec<RecordBatch>> {
381-
let start = std::time::Instant::now();
383+
let start = Instant::now();
382384
let deadline = start + timeout;
383385

384386
loop {
@@ -389,7 +391,7 @@ impl LogScannerInner {
389391
return Ok(batches);
390392
}
391393

392-
let now = std::time::Instant::now();
394+
let now = Instant::now();
393395
if now >= deadline {
394396
return Ok(Vec::new());
395397
}

0 commit comments

Comments
 (0)