Skip to content

Commit 0008bc9

Browse files
committed
feat: add continue_write() for concurrent worker cursor sharing
- Add continue_write() method to TrackerIterBuilder that creates WriteIter without resetting the write cursor (unlike write() which resets to 0) - Make reset_write_cursor() public on PrefixTracker for manual cursor reset - Add multi-threaded tests for continue_write() verifying no duplicate claims - Update README.md with continue_write() documentation and examples - Update INTEGRATION_GUIDE.md with Use Case 8 for concurrent claim workers - Update API reference with new methods
1 parent f762471 commit 0008bc9

4 files changed

Lines changed: 243 additions & 4 deletions

File tree

INTEGRATION_GUIDE.md

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,59 @@ let removed = tracker.removed_count_since(&pre_workload);
598598
println!("Workload complete: +{} -{}", added, removed);
599599
```
600600

601-
### Use Case 8: Iterate Only Reference Set Members
601+
### Use Case 8: Concurrent Claim Workers with Shared Cursor
602+
603+
**Scenario**: Multiple workers atomically claim unique IDs without coordination, using `continue_write()`.
604+
605+
When workers independently create iterators via `write()`, each call resets the cursor to 0, causing duplicate claims. Use `continue_write()` to share cursor state across workers:
606+
607+
```rust
608+
use std::sync::Arc;
609+
use std::thread;
610+
611+
let tracker = Arc::new(tracker);
612+
let num_workers = 8;
613+
let claims_per_worker = 1000;
614+
615+
// Reset cursor once before spawning workers
616+
tracker.reset_write_cursor();
617+
618+
let handles: Vec<_> = (0..num_workers)
619+
.map(|_| {
620+
let t = tracker.clone();
621+
thread::spawn(move || {
622+
let mut claimed = Vec::with_capacity(claims_per_worker);
623+
624+
// continue_write() does NOT reset cursor - shares state across workers
625+
let mut iter = t.iter().continue_write();
626+
627+
for _ in 0..claims_per_worker {
628+
if let Some((id, _)) = iter.next() {
629+
// ID is atomically claimed and set in bitmap
630+
valkey.hset(format!("vec:{}", id), "embedding", random_vector());
631+
claimed.push(id);
632+
}
633+
}
634+
claimed
635+
})
636+
})
637+
.collect();
638+
639+
// All claimed IDs are unique - no duplicates across workers
640+
let all_claimed: Vec<u64> = handles.into_iter()
641+
.flat_map(|h| h.join().unwrap())
642+
.collect();
643+
644+
println!("Claimed {} unique IDs", all_claimed.len());
645+
```
646+
647+
**Key difference:**
648+
| Method | Cursor Behavior | Use Case |
649+
|--------|-----------------|----------|
650+
| `write()` | Resets cursor to 0 | Single iterator, fresh start |
651+
| `continue_write()` | Keeps current position | Multiple workers sharing cursor |
652+
653+
### Use Case 9: Iterate Only Reference Set Members
602654

603655
**Scenario**: Run queries only on vectors that exist in both tracker and reference set.
604656

@@ -670,6 +722,9 @@ impl PrefixTracker {
670722
pub fn remove_range(&self, start: u64, end: u64) -> u64;
671723
pub fn clear(&self);
672724

725+
// === Cursor Control ===
726+
pub fn reset_write_cursor(&self); // Reset to 0 before spawning workers
727+
673728
// === Snapshot & Diff ===
674729
pub fn snapshot(&self) -> BitmapSnapshot;
675730
pub fn added_since(&self, snapshot: &BitmapSnapshot) -> Vec<u64>;
@@ -723,7 +778,13 @@ impl<'a> TrackerIterBuilder<'a> {
723778
// === Terminal Operations ===
724779
pub fn sequential(self) -> SequentialIter<'a>;
725780
pub fn random(self) -> RandomIter<'a>;
726-
pub fn claim(self) -> ClaimIter<'a>;
781+
782+
/// Build write iterator (resets cursor to 0).
783+
pub fn write(self) -> WriteIter<'a>;
784+
785+
/// Build write iterator (continues from current cursor position).
786+
/// Use when multiple workers share cursor state.
787+
pub fn continue_write(self) -> WriteIter<'a>;
727788
}
728789
```
729790

README.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,49 @@ while let Some((id, _)) = writer.next() {
290290
// All IDs in `claimed` are now set and unique
291291
```
292292

293+
#### Concurrent Workers with `continue_write()`
294+
295+
When multiple workers need to share cursor state across independently-created iterators,
296+
use `continue_write()` instead of `write()`:
297+
298+
```rust
299+
use std::sync::Arc;
300+
use std::thread;
301+
302+
let tracker = Arc::new(tracker);
303+
304+
// Reset cursor once before spawning workers
305+
tracker.reset_write_cursor();
306+
307+
let handles: Vec<_> = (0..8)
308+
.map(|_| {
309+
let t = tracker.clone();
310+
thread::spawn(move || {
311+
// continue_write() does NOT reset cursor - shares state across workers
312+
let mut iter = t.iter().continue_write();
313+
let mut claimed = Vec::new();
314+
315+
for _ in 0..100 {
316+
if let Some((id, _)) = iter.next() {
317+
claimed.push(id);
318+
}
319+
}
320+
claimed
321+
})
322+
})
323+
.collect();
324+
325+
// All claimed IDs are unique - no duplicates across workers
326+
let all_claimed: Vec<u64> = handles.into_iter()
327+
.flat_map(|h| h.join().unwrap())
328+
.collect();
329+
```
330+
331+
| Method | Cursor Behavior | Use Case |
332+
|--------|-----------------|----------|
333+
| `write()` | Resets cursor to 0 | Single iterator, fresh start |
334+
| `continue_write()` | Keeps current position | Multiple workers sharing cursor |
335+
293336
### Delete Iterator
294337

295338
Exclusive iterator that clears IDs:

src/iterators.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,10 @@ impl<'a> TrackerIterBuilder<'a> {
296296
///
297297
/// Multiple `WriteIter`s can run concurrently. Each call to `next()`
298298
/// atomically claims a unique ID.
299+
///
300+
/// **Note:** This resets the write cursor to 0 before iteration begins.
301+
/// For concurrent scenarios where multiple iterators should share cursor
302+
/// position, use [`continue_write()`](Self::continue_write) instead.
299303
pub fn write(self) -> WriteIter<'a> {
300304
// Reset cursor at start of iteration
301305
self.tracker.reset_write_cursor();
@@ -310,6 +314,40 @@ impl<'a> TrackerIterBuilder<'a> {
310314
}
311315
}
312316

317+
/// Build a write iterator that continues from the current cursor position.
318+
///
319+
/// Unlike [`write()`](Self::write), this does **not** reset the write cursor.
320+
/// Use this when multiple workers need to share cursor state across
321+
/// independently-created iterators.
322+
///
323+
/// # Example: Concurrent workers
324+
///
325+
/// ```
326+
/// use keyspace_tracker::PrefixTracker;
327+
///
328+
/// let tracker = PrefixTracker::new(100);
329+
/// tracker.reset_write_cursor(); // Reset once at the start
330+
///
331+
/// // Multiple workers can create iterators without resetting cursor
332+
/// let mut iter1 = tracker.iter().continue_write();
333+
/// let mut iter2 = tracker.iter().continue_write();
334+
///
335+
/// // Each claim_next_id() atomically advances the shared cursor
336+
/// let id1 = iter1.claim_next_id();
337+
/// let id2 = iter2.claim_next_id();
338+
/// assert_ne!(id1, id2); // Different IDs guaranteed
339+
/// ```
340+
pub fn continue_write(self) -> WriteIter<'a> {
341+
let max_id = self.range.id_max.min(self.tracker.effective_max_id());
342+
343+
WriteIter {
344+
tracker: self.tracker,
345+
range: self.range,
346+
filter: self.filter,
347+
max_id,
348+
}
349+
}
350+
313351
/// Build an exclusive delete iterator.
314352
///
315353
/// Only one `DeleteIter` can exist per tracker at a time.
@@ -1710,4 +1748,87 @@ mod tests {
17101748
let total: u64 = handles.into_iter().map(|h: thread::JoinHandle<u64>| h.join().unwrap()).sum();
17111749
assert_eq!(total, 10000);
17121750
}
1751+
1752+
#[test]
1753+
fn test_continue_write_multithreaded() {
1754+
use std::collections::HashSet;
1755+
use std::sync::Arc;
1756+
use std::thread;
1757+
1758+
let tracker = Arc::new(PrefixTracker::new(
1759+
TrackerConfig::simple("vec:").with_max_id(10000),
1760+
));
1761+
1762+
// Reset cursor once before spawning workers
1763+
tracker.reset_write_cursor();
1764+
1765+
let num_workers = 8;
1766+
let claims_per_worker = 100;
1767+
1768+
let handles: Vec<_> = (0..num_workers)
1769+
.map(|_| {
1770+
let t = tracker.clone();
1771+
thread::spawn(move || {
1772+
let mut claimed = Vec::with_capacity(claims_per_worker);
1773+
let mut iter = t.iter().continue_write();
1774+
1775+
for _ in 0..claims_per_worker {
1776+
if let Some((id, _sub_id)) = iter.next() {
1777+
claimed.push(id);
1778+
}
1779+
}
1780+
claimed
1781+
})
1782+
})
1783+
.collect();
1784+
1785+
// Collect all claimed IDs
1786+
let all_claimed: Vec<u64> = handles
1787+
.into_iter()
1788+
.flat_map(|h| h.join().unwrap())
1789+
.collect();
1790+
1791+
// Verify no duplicates - each ID should be claimed exactly once
1792+
let unique: HashSet<_> = all_claimed.iter().collect();
1793+
assert_eq!(
1794+
unique.len(),
1795+
all_claimed.len(),
1796+
"Duplicate IDs claimed! Got {} claims but only {} unique",
1797+
all_claimed.len(),
1798+
unique.len()
1799+
);
1800+
1801+
// Should have claimed exactly num_workers * claims_per_worker IDs
1802+
assert_eq!(all_claimed.len(), num_workers * claims_per_worker);
1803+
}
1804+
1805+
#[test]
1806+
fn test_continue_write_vs_write_cursor_behavior() {
1807+
let tracker = PrefixTracker::new(TrackerConfig::simple("vec:").with_max_id(100));
1808+
1809+
// First write() resets cursor to 0
1810+
let mut iter1 = tracker.iter().write();
1811+
let (id1, _) = iter1.next().unwrap();
1812+
assert_eq!(id1, 0);
1813+
1814+
// Second write() also resets cursor to 0 - gets same ID!
1815+
let mut iter2 = tracker.iter().write();
1816+
let (id2, _) = iter2.next().unwrap();
1817+
assert_eq!(id2, 0, "write() should reset cursor");
1818+
1819+
// Now use continue_write() - manually reset first
1820+
tracker.reset_write_cursor();
1821+
let mut iter3 = tracker.iter().continue_write();
1822+
let (id3, _) = iter3.next().unwrap();
1823+
assert_eq!(id3, 0);
1824+
1825+
// continue_write() does NOT reset - continues from cursor
1826+
let mut iter4 = tracker.iter().continue_write();
1827+
let (id4, _) = iter4.next().unwrap();
1828+
assert_eq!(id4, 1, "continue_write() should NOT reset cursor");
1829+
1830+
let mut iter5 = tracker.iter().continue_write();
1831+
let (id5, _) = iter5.next().unwrap();
1832+
assert_eq!(id5, 2, "continue_write() should continue advancing");
1833+
}
17131834
}

src/tracker.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -564,8 +564,22 @@ impl PrefixTracker {
564564
&self.write_cursor
565565
}
566566

567-
/// Reset write cursor (call before starting iteration).
568-
pub(crate) fn reset_write_cursor(&self) {
567+
/// Reset the write cursor to position 0.
568+
///
569+
/// Call this once before spawning concurrent workers that use
570+
/// [`continue_write()`](crate::iterators::TrackerIterBuilder::continue_write).
571+
///
572+
/// # Example
573+
///
574+
/// ```
575+
/// use keyspace_tracker::PrefixTracker;
576+
///
577+
/// let tracker = PrefixTracker::new(100);
578+
/// tracker.reset_write_cursor(); // Reset once
579+
///
580+
/// // Spawn workers that call tracker.iter().continue_write()
581+
/// ```
582+
pub fn reset_write_cursor(&self) {
569583
self.write_cursor.store(0, Ordering::Release);
570584
}
571585

0 commit comments

Comments
 (0)