Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ mod tests {
use crate::env::ObfuscatedFileSystem;
use crate::file_pipe_log::FileNameExt;
use crate::pipe_log::Version;
use crate::test_util::{generate_entries, PanicGuard};
use crate::test_util::{catch_unwind_silent, generate_entries, PanicGuard};
use crate::util::ReadableSize;
use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;
Expand Down Expand Up @@ -704,6 +704,27 @@ mod tests {
.unwrap();
}

#[test]
fn test_batch_with_save_point() {
let dir = tempfile::Builder::new()
.prefix("test_batch_with_save_point")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
..Default::default()
};
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
let mut batch = LogBatch::default();
batch.add_command(1, Command::Clean);
batch.set_save_point();
engine.write(&mut batch, false).unwrap();
assert!(catch_unwind_silent(|| batch.rollback_to_save_point()).is_err());
assert!(catch_unwind_silent(|| batch.pop_save_point()).is_err());
}

#[test]
fn test_get_entry() {
let normal_batch_size = 10;
Expand Down
147 changes: 135 additions & 12 deletions src/log_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@

const DEFAULT_LOG_ITEM_BATCH_CAP: usize = 64;
const MAX_LOG_BATCH_BUFFER_CAP: usize = 8 * 1024 * 1024;
// 2GiB, The maximum content length accepted by lz4 compression.
const MAX_LOG_ENTRIES_SIZE_PER_BATCH: usize = i32::MAX as usize;
// LZ4_MAX_INPUT_SIZE = 0x7E000000
const MAX_LOG_ENTRIES_SIZE_PER_BATCH: usize = 0x7E000000;

/// `MessageExt` trait allows for probing log index from a specific type of
/// protobuf messages.
Expand Down Expand Up @@ -347,6 +347,7 @@
items: Vec<LogItem>,
item_size: usize,
entries_size: u32,
save_points: Vec<(usize, usize, u32)>,
}

impl Default for LogItemBatch {
Expand All @@ -361,6 +362,7 @@
items: Vec::with_capacity(cap),
item_size: 0,
entries_size: 0,
save_points: Vec::new(),
}
}

Expand All @@ -376,6 +378,7 @@
pub fn drain(&mut self) -> LogItemDrain {
self.item_size = 0;
self.entries_size = 0;
self.save_points.clear();
self.items.drain(..)
}

Expand All @@ -396,9 +399,39 @@
rhs.item_size = 0;
self.entries_size += rhs.entries_size;
rhs.entries_size = 0;
let (a_delta, b_delta, c_delta) = self.save();
for (ref mut a, ref mut b, ref mut c) in &mut rhs.save_points {
*a += a_delta;
*b += b_delta;
*c += c_delta;
}

Check warning on line 407 in src/log_batch.rs

View check run for this annotation

Codecov / codecov/patch

src/log_batch.rs#L404-L407

Added lines #L404 - L407 were not covered by tests
self.save_points.append(&mut rhs.save_points);
self.items.append(&mut rhs.items);
}

#[inline]
fn save(&self) -> (usize, usize, u32) {
(self.items.len(), self.item_size, self.entries_size)
}

#[inline]
pub fn set_save_point(&mut self) {
self.save_points.push(self.save());
}

#[inline]
pub fn pop_save_point(&mut self) {
self.save_points.pop().unwrap();
}

#[inline]
pub fn rollback_to_save_point(&mut self) {
let (a, b, c) = self.save_points.pop().unwrap();
self.items.truncate(a);
self.item_size = b;
self.entries_size = c;
}

pub(crate) fn finish_populate(&mut self, compression_type: CompressionType) {
for item in self.items.iter_mut() {
if let LogItemContent::EntryIndexes(entry_indexes) = &mut item.content {
Expand Down Expand Up @@ -578,6 +611,7 @@
item_batch: LogItemBatch,
buf_state: BufState,
buf: Vec<u8>,
save_points: Vec<usize>,
}

impl Default for LogBatch {
Expand All @@ -596,17 +630,19 @@
item_batch: LogItemBatch::with_capacity(cap),
buf_state: BufState::Open,
buf,
save_points: Vec::new(),
}
}

/// Moves all log items of `rhs` into `Self`, leaving `rhs` empty.
pub fn merge(&mut self, rhs: &mut Self) -> Result<()> {
debug_assert!(self.buf_state == BufState::Open && rhs.buf_state == BufState::Open);
assert!(self.buf_state == BufState::Open && rhs.buf_state == BufState::Open);
let max_entries_size = (|| {
fail::fail_point!("log_batch::1kb_entries_size_per_batch", |_| 1024);
MAX_LOG_ENTRIES_SIZE_PER_BATCH
})();
if !rhs.buf.is_empty() {
let old_buf_len = self.buf.len();
if rhs.buf.len() > LOG_BATCH_HEADER_LEN {
if rhs.buf.len() + self.buf.len() > max_entries_size + LOG_BATCH_HEADER_LEN * 2 {
return Err(Error::Full);
}
Expand All @@ -617,18 +653,47 @@
rhs.buf.truncate(LOG_BATCH_HEADER_LEN);
}
self.item_batch.merge(&mut rhs.item_batch);
for s in &rhs.save_points {
assert!(*s >= LOG_BATCH_HEADER_LEN);
self.save_points
.push(*s - LOG_BATCH_HEADER_LEN + old_buf_len);

Check warning on line 659 in src/log_batch.rs

View check run for this annotation

Codecov / codecov/patch

src/log_batch.rs#L657-L659

Added lines #L657 - L659 were not covered by tests
}
self.buf_state = BufState::Open;
rhs.buf_state = BufState::Open;
Ok(())
}

/// Creates a save point of current status.
#[inline]
pub fn set_save_point(&mut self) {
assert!(self.buf_state == BufState::Open);
self.item_batch.set_save_point();
self.save_points.push(self.buf.len());
}

/// Panic if there's no save point to pop.
#[inline]
pub fn pop_save_point(&mut self) {
assert!(self.buf_state == BufState::Open);
self.item_batch.pop_save_point();
self.save_points.pop();
}

/// Panic if there's no save point to rollback to.
#[inline]
pub fn rollback_to_save_point(&mut self) {
assert!(self.buf_state == BufState::Open);
self.item_batch.rollback_to_save_point();
self.buf.truncate(self.save_points.pop().unwrap());
}

/// Adds some protobuf log entries into the log batch.
pub fn add_entries<M: MessageExt>(
&mut self,
region_id: u64,
entries: &[M::Entry],
) -> Result<()> {
debug_assert!(self.buf_state == BufState::Open);
assert!(self.buf_state == BufState::Open);
if entries.is_empty() {
return Ok(());
}
Expand Down Expand Up @@ -668,8 +733,8 @@
mut entry_indexes: Vec<EntryIndex>,
entries: Vec<Vec<u8>>,
) -> Result<()> {
debug_assert!(entry_indexes.len() == entries.len());
debug_assert!(self.buf_state == BufState::Open);
assert!(entry_indexes.len() == entries.len());
assert!(self.buf_state == BufState::Open);
if entry_indexes.is_empty() {
return Ok(());
}
Expand Down Expand Up @@ -727,7 +792,7 @@
/// compression type to each entry index.
pub(crate) fn finish_populate(&mut self, compression_threshold: usize) -> Result<usize> {
let _t = StopWatch::new(perf_context!(log_populating_duration));
debug_assert!(self.buf_state == BufState::Open);
assert!(self.buf_state == BufState::Open);
if self.is_empty() {
self.buf_state = BufState::Encoded(self.buf.len(), 0);
return Ok(0);
Expand Down Expand Up @@ -805,13 +870,13 @@
///
/// Internally sets the file locations of each log entry indexes.
pub(crate) fn finish_write(&mut self, mut handle: FileBlockHandle) {
debug_assert!(matches!(self.buf_state, BufState::Sealed(_, _)));
assert!(matches!(self.buf_state, BufState::Sealed(_, _)));
if !self.is_empty() {
// adjust log batch handle to log entries handle.
handle.offset += LOG_BATCH_HEADER_LEN as u64;
match self.buf_state {
BufState::Sealed(_, entries_len) => {
debug_assert!(LOG_BATCH_HEADER_LEN + entries_len < handle.len as usize);
assert!(LOG_BATCH_HEADER_LEN + entries_len < handle.len as usize);
handle.len = entries_len;
}
_ => unreachable!(),
Expand All @@ -822,11 +887,12 @@

/// Consumes log items into an iterator.
pub(crate) fn drain(&mut self) -> LogItemDrain {
debug_assert!(!matches!(self.buf_state, BufState::Incomplete));
assert!(!matches!(self.buf_state, BufState::Incomplete));

self.buf.shrink_to(MAX_LOG_BATCH_BUFFER_CAP);
self.buf.truncate(LOG_BATCH_HEADER_LEN);
self.buf_state = BufState::Open;
self.save_points.clear();
self.item_batch.drain()
}

Expand Down Expand Up @@ -936,7 +1002,9 @@
mod tests {
use super::*;
use crate::pipe_log::{LogQueue, Version};
use crate::test_util::{catch_unwind_silent, generate_entries, generate_entry_indexes_opt};
use crate::test_util::{
catch_unwind_silent, generate_entries, generate_entry_indexes_opt, PanicGuard,
};
use protobuf::parse_from_bytes;
use raft::eraftpb::Entry;
use strum::IntoEnumIterator;
Expand Down Expand Up @@ -1371,6 +1439,61 @@
assert!(batch.is_empty());
}

#[test]
fn test_save_point() {
let ops = [
|b: &mut LogBatch| {
b.add_entries::<Entry>(1, &generate_entries(1, 11, None))
.unwrap()
},
|b: &mut LogBatch| {
b.add_entries::<Entry>(7, &generate_entries(1, 11, Some(&vec![b'x'; 1024])))
.unwrap()
},
|b: &mut LogBatch| b.add_command(17, Command::Clean),
|b: &mut LogBatch| b.put(27, b"key27".to_vec(), b"value27".to_vec()),
|b: &mut LogBatch| b.delete(37, b"key37".to_vec()),
|b: &mut LogBatch| b.add_command(47, Command::Compact { index: 777 }),
|b: &mut LogBatch| {
b.add_entries::<Entry>(57, &generate_entries(1, 51, None))
.unwrap()
},
];
for start in 0..ops.len() {
for stripe in 1..=5 {
for num in ops.len()..ops.len() * 5 {
for repeat in 1..=5 {
let _guard = PanicGuard::with_prompt(format!(
"case: [{}, {}, {}, {}]",
start, stripe, num, repeat
));
let mut to_verify = Vec::new();
let mut batch = LogBatch::default();
let mut op_idx = start;
let mut total_op = 0;
for _ in 0..num {
for _ in 0..repeat {
ops[op_idx % ops.len()](&mut batch);
to_verify.push(batch.clone());
batch.set_save_point();
total_op += 1;
if total_op % 5 == 0 {
to_verify.pop().unwrap();
batch.pop_save_point();
}
}
op_idx += stripe;
}
while let Some(b) = to_verify.pop() {
batch.rollback_to_save_point();
assert_eq!(batch, b);
}
}
}
}
}
}

#[cfg(feature = "nightly")]
#[bench]
fn bench_log_batch_add_entry_and_encode(b: &mut test::Bencher) {
Expand Down