Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions vortex-file/src/read/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashSet;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
Expand Down Expand Up @@ -209,7 +210,10 @@ impl State {
let mut current_end = requests[0].offset + requests[0].length as u64;
let align = *self.coalesced_buffer_alignment as u64;

let mut keys_to_remove = Vec::new();
// Track requests that we've already decided to remove (or that were cancelled) so that
// we don't repeatedly process them during range scans.
let mut keys_to_remove: Vec<(u64, RequestId)> = Vec::new();
let mut ids_to_remove: HashSet<RequestId> = HashSet::new();
let mut found_new_requests = true;

// Keep expanding the window while we can find new requests within constraints
Expand All @@ -225,8 +229,8 @@ impl State {
.requests_by_offset
.range((scan_start, RequestId::MIN)..=(scan_end, RequestId::MAX))
{
// Skip if we've already marked this request for removal
if keys_to_remove.iter().any(|&(_, id)| id == req_id) {
// Skip if we've already marked this request for removal.
if ids_to_remove.contains(&req_id) {
continue;
}

Expand All @@ -236,13 +240,15 @@ impl State {
.or_else(|| self.requests.get(&req_id))
.vortex_expect("Missing request in requests_by_offset");

// Skip any cancelled requests
// Skip any cancelled requests.
if req.callback.is_closed() {
keys_to_remove.push((req_offset, req_id));
if ids_to_remove.insert(req_id) {
keys_to_remove.push((req_offset, req_id));
}
continue;
}

// Check if this request is within coalescing distance of our current range
// Check if this request is within coalescing distance of our current range.
let req_end = req_offset + req.length as u64;
if (req_offset <= current_end + window.distance && req_end >= current_start)
|| (req_end + window.distance >= current_start && req_offset <= current_end)
Expand All @@ -267,7 +273,9 @@ impl State {
.vortex_expect("Missing request in requests_by_offset");

requests.push(req);
keys_to_remove.push((req_offset, req_id));
if ids_to_remove.insert(req_id) {
keys_to_remove.push((req_offset, req_id));
}
found_new_requests = true;
}
}
Expand Down