Skip to content

Commit 87a52cc

Browse files
committed
fixed transaction scan
1 parent 35460cf commit 87a52cc

4 files changed

Lines changed: 104 additions & 55 deletions

File tree

kv/src/core/db.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,17 @@ impl Db {
367367

368368
DbIterator::new(memtable, immutables, sstables, start_bound, end_bound)
369369
}
370+
371+
pub fn scan_seq(&self, start: Option<&[u8]>, end: Option<&[u8]>, seq: u64) -> DbIterator {
372+
let memtable = Arc::clone(&self.memtable.load());
373+
let immutables = (**self.immutable_memtables.load()).clone();
374+
let sstables = (**self.sstables.load()).clone();
375+
376+
let start_bound = start.map(|s| s.to_vec());
377+
let end_bound = end.map(|e| e.to_vec());
378+
379+
DbIterator::new_with_seq(memtable, immutables, sstables, start_bound, end_bound, Some(seq))
380+
}
370381
}
371382

372383
// custom memory drop implementation to join all the threads (i.e. the compaction and flush queue

kv/src/core/iterator.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ pub struct DbIterator {
7676
last_key: Option<Vec<u8>>,
7777
start_bound: Option<Vec<u8>>,
7878
end_bound: Option<Vec<u8>>,
79+
max_seq: Option<u64>, // for snapshot isolation, we should only see the values with seq less
80+
// than this
7981
}
8082

8183
impl DbIterator {
@@ -85,6 +87,24 @@ impl DbIterator {
8587
sstables: Vec<SSTReader>,
8688
start_bound: Option<Vec<u8>>,
8789
end_bound: Option<Vec<u8>>,
90+
) -> Self {
91+
Self::new_with_seq(
92+
memtable,
93+
immutable_memtable,
94+
sstables,
95+
start_bound,
96+
end_bound,
97+
None,
98+
)
99+
}
100+
101+
pub fn new_with_seq(
102+
memtable: Arc<Memtable>,
103+
immutable_memtable: Vec<Arc<Memtable>>,
104+
sstables: Vec<SSTReader>,
105+
start_bound: Option<Vec<u8>>,
106+
end_bound: Option<Vec<u8>>,
107+
max_seq: Option<u64>,
88108
) -> Self {
89109
let mut sources = Vec::new();
90110
let mut heap = BinaryHeap::new();
@@ -131,7 +151,9 @@ impl DbIterator {
131151

132152
// preload one entry from each source
133153
for i in 0..sources.len() {
134-
if let Some(entry) = Self::advance_source(&mut sources, i, &start_bound, &end_bound) {
154+
if let Some(entry) =
155+
Self::advance_source(&mut sources, i, &start_bound, &end_bound, &max_seq)
156+
{
135157
heap.push(entry);
136158
}
137159
}
@@ -142,6 +164,7 @@ impl DbIterator {
142164
last_key: None,
143165
start_bound,
144166
end_bound,
167+
max_seq,
145168
}
146169
}
147170

@@ -150,6 +173,7 @@ impl DbIterator {
150173
source_idx: usize,
151174
start_bound: &Option<Vec<u8>>,
152175
end_bound: &Option<Vec<u8>>,
176+
max_seq: &Option<u64>,
153177
) -> Option<IterEntry> {
154178
loop {
155179
let (key, value, seq, priority) = match &mut sources[source_idx] {
@@ -172,6 +196,14 @@ impl DbIterator {
172196
},
173197
};
174198

199+
// kkip entries with sequence numbers >= max_seq, for snapshot isolation
200+
// we use >= to match the strict inequality in get_seq (seq < snapshot_seq)
201+
if let Some(max) = max_seq {
202+
if seq >= *max {
203+
continue;
204+
}
205+
}
206+
175207
if let Some(start) = start_bound {
176208
if &key < start {
177209
continue;
@@ -215,6 +247,7 @@ impl Iterator for DbIterator {
215247
idx,
216248
&self.start_bound,
217249
&self.end_bound,
250+
&self.max_seq,
218251
) {
219252
self.heap.push(next_entry);
220253
}

kv/src/transaction/mod.rs

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ impl<'a> Transaction<'a> {
6060
}
6161

6262
pub fn scan(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> TransactionIterator {
63-
// Get underlying DB iterator
64-
let db_iter = self.db.scan(start, end);
63+
// Get underlying DB iterator with snapshot isolation at transaction's sequence
64+
let db_iter = self.db.scan_seq(start, end, self.seq);
6565

6666
// Collect transaction buffer entries within the range
6767
let mut txn_entries = Vec::new();
@@ -88,6 +88,7 @@ impl<'a> Transaction<'a> {
8888
txn_entries,
8989
txn_pos: 0,
9090
last_key: None,
91+
peeked_db_entry: None,
9192
}
9293
}
9394
}
@@ -97,58 +98,64 @@ pub struct TransactionIterator {
9798
txn_entries: Vec<(Vec<u8>, Vec<u8>)>,
9899
txn_pos: usize,
99100
last_key: Option<Vec<u8>>,
101+
peeked_db_entry: Option<(Vec<u8>, Vec<u8>)>, // Store peeked DB entry
100102
}
101103

102104
impl Iterator for TransactionIterator {
103105
type Item = (Vec<u8>, Vec<u8>);
104106

105107
fn next(&mut self) -> Option<Self::Item> {
106108
loop {
107-
// Peek at both sources
109+
// peek at both sources first
108110
let txn_entry = if self.txn_pos < self.txn_entries.len() {
109111
Some(&self.txn_entries[self.txn_pos])
110112
} else {
111113
None
112114
};
113115

114-
let db_entry = self.db_iter.next();
116+
// get or peek DB entry
117+
if self.peeked_db_entry.is_none() {
118+
self.peeked_db_entry = self.db_iter.next();
119+
}
115120

116-
// Determine which entry to return
117-
let (key, val) = match (txn_entry, db_entry) {
118-
(Some((tk, tv)), Some((dk, dv))) => {
121+
// determine which entry to be returned
122+
let (key, val) = match (txn_entry, &self.peeked_db_entry) {
123+
(Some((tk, tv)), Some((dk, _))) => {
119124
// Both have entries, pick the smaller key
120125
// Transaction entries take precedence on equal keys
121-
if tk <= &dk {
126+
if tk <= dk {
122127
self.txn_pos += 1;
123128
(tk.clone(), tv.clone())
124129
} else {
125-
(dk, dv)
130+
let entry = self.peeked_db_entry.take().unwrap();
131+
entry
126132
}
127133
}
128134
(Some((tk, tv)), None) => {
129-
// Only transaction has entry
135+
// only transaction has entry
130136
self.txn_pos += 1;
131137
(tk.clone(), tv.clone())
132138
}
133-
(None, Some((dk, dv))) => {
134-
// Only DB has entry
135-
(dk, dv)
139+
(None, Some(_)) => {
140+
// only DB has entry
141+
let entry = self.peeked_db_entry.take().unwrap();
142+
entry
136143
}
137144
(None, None) => {
138-
// Both exhausted
145+
// none has a entry
139146
return None;
140147
}
141148
};
142149

143-
// Skip duplicates
150+
// skip duplicates
144151
if let Some(ref last) = self.last_key {
145152
if &key == last {
146153
continue;
147154
}
148155
}
149156
self.last_key = Some(key.clone());
150157

151-
// Skip tombstones (empty values)
158+
// skip tombstones, i.e. empty values or deleted values
152159
if val.is_empty() {
153160
continue;
154161
}

kv/tests/transaction_test.rs

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use keylite_kv::core::Db;
2-
use std::fs;
2+
use std::{fs, string};
33

44
#[test]
55
fn test_basic_transaction_commit() {
@@ -98,29 +98,30 @@ fn test_transaction_scan_all() {
9898
let _ = fs::remove_dir_all(test_dir);
9999

100100
let db = Db::open(test_dir).unwrap();
101-
102-
// Put some data in DB
101+
103102
db.put(b"a1", b"db_a1").unwrap();
104103
db.put(b"b2", b"db_b2").unwrap();
105104
db.put(b"c3", b"db_c3").unwrap();
106-
107-
// Create transaction with additional data
105+
108106
let mut txn = db.begin();
109107
txn.put(b"a2", b"txn_a2");
110108
txn.put(b"b1", b"txn_b1");
111109
txn.put(b"d1", b"txn_d1");
112-
113-
// Scan all
110+
114111
let results: Vec<_> = txn.scan(None, None).collect();
115-
112+
for res in results.iter() {
113+
println!("{:?}", String::from_utf8(res.1.clone()));
114+
}
115+
println!("{:?}", results);
116+
116117
assert_eq!(results.len(), 6);
117118
assert_eq!(results[0], (b"a1".to_vec(), b"db_a1".to_vec()));
118119
assert_eq!(results[1], (b"a2".to_vec(), b"txn_a2".to_vec()));
119120
assert_eq!(results[2], (b"b1".to_vec(), b"txn_b1".to_vec()));
120121
assert_eq!(results[3], (b"b2".to_vec(), b"db_b2".to_vec()));
121122
assert_eq!(results[4], (b"c3".to_vec(), b"db_c3".to_vec()));
122123
assert_eq!(results[5], (b"d1".to_vec(), b"txn_d1".to_vec()));
123-
124+
124125
let _ = fs::remove_dir_all(test_dir);
125126
}
126127

@@ -130,27 +131,25 @@ fn test_transaction_scan_with_range() {
130131
let _ = fs::remove_dir_all(test_dir);
131132

132133
let db = Db::open(test_dir).unwrap();
133-
134-
// Put some data in DB
134+
135135
db.put(b"a1", b"db_a1").unwrap();
136136
db.put(b"b2", b"db_b2").unwrap();
137137
db.put(b"c3", b"db_c3").unwrap();
138138
db.put(b"d4", b"db_d4").unwrap();
139-
140-
// Create transaction with additional data
139+
141140
let mut txn = db.begin();
142141
txn.put(b"b1", b"txn_b1");
143142
txn.put(b"c1", b"txn_c1");
144-
145-
// Scan with range [b, d)
143+
146144
let results: Vec<_> = txn.scan(Some(b"b"), Some(b"d")).collect();
147-
145+
println!("{:?}", results);
146+
148147
assert_eq!(results.len(), 4);
149148
assert_eq!(results[0], (b"b1".to_vec(), b"txn_b1".to_vec()));
150149
assert_eq!(results[1], (b"b2".to_vec(), b"db_b2".to_vec()));
151150
assert_eq!(results[2], (b"c1".to_vec(), b"txn_c1".to_vec()));
152151
assert_eq!(results[3], (b"c3".to_vec(), b"db_c3".to_vec()));
153-
152+
154153
let _ = fs::remove_dir_all(test_dir);
155154
}
156155

@@ -160,24 +159,24 @@ fn test_transaction_scan_with_overwrites() {
160159
let _ = fs::remove_dir_all(test_dir);
161160

162161
let db = Db::open(test_dir).unwrap();
163-
162+
164163
// Put some data in DB
165164
db.put(b"key1", b"db_value1").unwrap();
166165
db.put(b"key2", b"db_value2").unwrap();
167166
db.put(b"key3", b"db_value3").unwrap();
168-
167+
169168
// Create transaction that overwrites some keys
170169
let mut txn = db.begin();
171170
txn.put(b"key2", b"txn_value2");
172-
171+
173172
// Scan all - should see transaction value for key2
174173
let results: Vec<_> = txn.scan(None, None).collect();
175-
174+
176175
assert_eq!(results.len(), 3);
177176
assert_eq!(results[0], (b"key1".to_vec(), b"db_value1".to_vec()));
178177
assert_eq!(results[1], (b"key2".to_vec(), b"txn_value2".to_vec()));
179178
assert_eq!(results[2], (b"key3".to_vec(), b"db_value3".to_vec()));
180-
179+
181180
let _ = fs::remove_dir_all(test_dir);
182181
}
183182

@@ -187,23 +186,23 @@ fn test_transaction_scan_with_deletes() {
187186
let _ = fs::remove_dir_all(test_dir);
188187

189188
let db = Db::open(test_dir).unwrap();
190-
189+
191190
// Put some data in DB
192191
db.put(b"key1", b"db_value1").unwrap();
193192
db.put(b"key2", b"db_value2").unwrap();
194193
db.put(b"key3", b"db_value3").unwrap();
195-
194+
196195
// Create transaction that deletes a key
197196
let mut txn = db.begin();
198197
txn.del(b"key2");
199-
198+
200199
// Scan all - should not see key2
201200
let results: Vec<_> = txn.scan(None, None).collect();
202-
201+
203202
assert_eq!(results.len(), 2);
204203
assert_eq!(results[0], (b"key1".to_vec(), b"db_value1".to_vec()));
205204
assert_eq!(results[1], (b"key3".to_vec(), b"db_value3".to_vec()));
206-
205+
207206
let _ = fs::remove_dir_all(test_dir);
208207
}
209208

@@ -213,21 +212,21 @@ fn test_transaction_scan_start_only() {
213212
let _ = fs::remove_dir_all(test_dir);
214213

215214
let db = Db::open(test_dir).unwrap();
216-
215+
217216
db.put(b"a", b"val_a").unwrap();
218217
db.put(b"b", b"val_b").unwrap();
219218
db.put(b"c", b"val_c").unwrap();
220219
db.put(b"d", b"val_d").unwrap();
221-
220+
222221
let txn = db.begin();
223-
222+
224223
// Scan from 'c' onwards
225224
let results: Vec<_> = txn.scan(Some(b"c"), None).collect();
226-
225+
227226
assert_eq!(results.len(), 2);
228227
assert_eq!(results[0], (b"c".to_vec(), b"val_c".to_vec()));
229228
assert_eq!(results[1], (b"d".to_vec(), b"val_d".to_vec()));
230-
229+
231230
let _ = fs::remove_dir_all(test_dir);
232231
}
233232

@@ -237,21 +236,20 @@ fn test_transaction_scan_end_only() {
237236
let _ = fs::remove_dir_all(test_dir);
238237

239238
let db = Db::open(test_dir).unwrap();
240-
239+
241240
db.put(b"a", b"val_a").unwrap();
242241
db.put(b"b", b"val_b").unwrap();
243242
db.put(b"c", b"val_c").unwrap();
244243
db.put(b"d", b"val_d").unwrap();
245-
244+
246245
let txn = db.begin();
247-
246+
248247
// Scan up to 'c' (exclusive)
249248
let results: Vec<_> = txn.scan(None, Some(b"c")).collect();
250-
249+
251250
assert_eq!(results.len(), 2);
252251
assert_eq!(results[0], (b"a".to_vec(), b"val_a".to_vec()));
253252
assert_eq!(results[1], (b"b".to_vec(), b"val_b".to_vec()));
254-
253+
255254
let _ = fs::remove_dir_all(test_dir);
256255
}
257-

0 commit comments

Comments
 (0)