diff --git a/aerospike-core/src/batch/mod.rs b/aerospike-core/src/batch/mod.rs index 402825fc..23377fcb 100644 --- a/aerospike-core/src/batch/mod.rs +++ b/aerospike-core/src/batch/mod.rs @@ -43,7 +43,7 @@ pub(crate) struct BatchRecordIndex { } /// Policy for a single batch read operation. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct BatchReadPolicy { /// read_touch_ttl determines how record TTL (time to live) is affected on reads. When enabled, the server can /// efficiently operate as a read-based LRU cache where the least recently used records are expired. @@ -76,7 +76,7 @@ impl Default for BatchReadPolicy { } /// Policy for a single batch write operation. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct BatchWritePolicy { /// RecordExistsAction qualifies how to handle writes where the record already exists. pub record_exists_action: RecordExistsAction, @@ -128,7 +128,7 @@ impl Default for BatchWritePolicy { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] /// Policy for a single batch delete operation. pub struct BatchDeletePolicy { /// GenerationPolicy qualifies how to handle record writes based on record generation. @@ -173,7 +173,7 @@ impl Default for BatchDeletePolicy { } /// Policy for a single batch udf operation. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct BatchUDFPolicy { /// Desired consistency guarantee when committing a transaction on the server. The default /// (CommitAll) indicates that the server should wait for master and all replica commits to @@ -211,7 +211,7 @@ impl Default for BatchUDFPolicy { /// Represents a batch operation. /// Do not directly create the batch operations. Use the helper methods instead. -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum BatchOperation<'a> { #[doc(hidden)] Read { diff --git a/aerospike-core/src/commands/batch_attr.rs b/aerospike-core/src/commands/batch_attr.rs index 3fb4a31d..91a06f14 100644 --- a/aerospike-core/src/commands/batch_attr.rs +++ b/aerospike-core/src/commands/batch_attr.rs @@ -82,8 +82,11 @@ impl BatchAttr { self.send_key = false; } - pub(crate) fn set_batch_read(&mut self, rp: &BatchReadPolicy) { - self.filter_expression = rp.filter_expression.clone(); + pub(crate) fn set_batch_read(&mut self, rp: &BatchReadPolicy, parent: &BatchPolicy) { + self.filter_expression = rp + .filter_expression + .clone() + .or(parent.filter_expression.clone()); self.read_attr = buffer::INFO1_READ; // if rp.ReadModeAP == ReadModeAPAll { @@ -135,8 +138,11 @@ impl BatchAttr { } } - pub(crate) fn set_batch_write(&mut self, wp: &BatchWritePolicy) { - self.filter_expression = wp.filter_expression.clone(); + pub(crate) fn set_batch_write(&mut self, wp: &BatchWritePolicy, parent: &BatchPolicy) { + self.filter_expression = wp + .filter_expression + .clone() + .or(parent.filter_expression.clone()); self.read_attr = 0; self.write_attr = buffer::INFO2_WRITE | buffer::INFO2_RESPOND_ALL_OPS; self.info_attr = 0; @@ -221,8 +227,11 @@ impl BatchAttr { } } - pub(crate) fn set_batch_udf(&mut self, up: &BatchUDFPolicy) { - self.filter_expression = up.filter_expression.clone(); + pub(crate) fn set_batch_udf(&mut self, up: &BatchUDFPolicy, parent: &BatchPolicy) { + self.filter_expression = up + .filter_expression + .clone() + .or(parent.filter_expression.clone()); self.read_attr = 0; self.write_attr = buffer::INFO2_WRITE; self.info_attr = 0; @@ -245,8 +254,11 @@ impl BatchAttr { } } - pub(crate) fn set_batch_delete(&mut self, dp: &BatchDeletePolicy) { - self.filter_expression = dp.filter_expression.clone(); + pub(crate) fn set_batch_delete(&mut self, dp: &BatchDeletePolicy, parent: &BatchPolicy) { + self.filter_expression = dp + .filter_expression + .clone() + .or(parent.filter_expression.clone()); self.read_attr = 0; self.write_attr = buffer::INFO2_WRITE | buffer::INFO2_RESPOND_ALL_OPS | buffer::INFO2_DELETE; diff --git a/aerospike-core/src/commands/batch_operate_command.rs b/aerospike-core/src/commands/batch_operate_command.rs index 7790bf2e..659340d3 100644 --- a/aerospike-core/src/commands/batch_operate_command.rs +++ b/aerospike-core/src/commands/batch_operate_command.rs @@ -16,15 +16,17 @@ use aerospike_rt::time::Instant; use std::collections::HashMap; use std::sync::Arc; +use crate::batch; use crate::batch::BatchOperation; use crate::batch::BatchRecordIndex; use crate::cluster::partition::Partition; use crate::cluster::{Cluster, Node}; use crate::commands::field_type::FieldType; use crate::commands::Duration; +use crate::commands::StreamCommand; use crate::commands::{self}; use crate::errors::{Error, Result}; -use crate::net::Connection; +use crate::net::{BufferedConn, Connection}; use crate::policy::{BatchPolicy, Policy, PolicyLike, Replica}; use crate::result_code; use crate::value::bytes_to_particle; @@ -128,6 +130,8 @@ impl<'a> BatchOperateCommand<'a> { .await .map_err(|e| e.chain_error("Failed to set timeout for send buffer"))?; + conn.exhausted = false; + // Send command. if let Err(err) = self.write_buffer(&mut conn).await { // IO errors are considered temporary anomalies. Retry. @@ -149,6 +153,8 @@ impl<'a> BatchOperateCommand<'a> { return Err(err); } + conn.exhausted = true; + // command has completed successfully. Exit method. return Ok(self); } @@ -199,7 +205,7 @@ impl<'a> BatchOperateCommand<'a> { async fn parse_group( batch_ops: &mut [(BatchOperation<'a>, usize)], - conn: &mut Connection, + conn: &mut BufferedConn<'_>, size: usize, ) -> Result { while conn.bytes_read() < size { @@ -214,68 +220,94 @@ impl<'a> BatchOperateCommand<'a> { batch_op.0.set_record(batch_record.record); batch_op.0.set_result_code(batch_record.result_code, false); } + Err(Error::BatchLastError(batch_index, rc, in_doubt, ref msg)) => { + let batch_op = batch_ops + .get_mut(batch_index as usize) + .expect("Invalid batch index"); + batch_op.0.set_result_code(rc, in_doubt); + return Err(Error::BatchError(batch_index, rc, in_doubt, msg.clone())); + } Err(Error::BatchError(batch_index, rc, in_doubt, ..)) => { let batch_op = batch_ops .get_mut(batch_index as usize) .expect("Invalid batch index"); batch_op.0.set_result_code(rc, in_doubt); } - Err(err @ _) => return Err(err), + Err(err) => return Err(err), } } Ok(true) } - async fn parse_record(conn: &mut Connection) -> Result> { - let batch_index = conn.buffer.read_u32(Some(14)); - let result_code = ResultCode::from(conn.buffer.read_u8(Some(5))); + async fn parse_record(conn: &mut BufferedConn<'_>) -> Result> { + // if cmd is the end marker of the response, do not proceed further + let info3 = conn.buffer().read_u8(Some(3)); + let last_record = info3 & commands::buffer::INFO3_LAST == commands::buffer::INFO3_LAST; + + let batch_index = conn.buffer().read_u32(Some(14)); + let result_code = ResultCode::from(conn.buffer().read_u8(Some(5))); + match result_code { ResultCode::Ok => (), ResultCode::KeyNotFoundError | ResultCode::FilteredOut => (), rc => { - return Err(Error::BatchError(batch_index, rc, false, conn.addr.clone())); + if last_record { + return Err(Error::BatchLastError( + batch_index, + rc, + false, + conn.conn.addr.clone(), + )); + } + + return Err(Error::BatchError( + batch_index, + rc, + false, + conn.conn.addr.clone(), + )); } }; // if cmd is the end marker of the response, do not proceed further - let info3 = conn.buffer.read_u8(Some(3)); - if info3 & commands::buffer::INFO3_LAST == commands::buffer::INFO3_LAST { + if last_record { return Ok(None); } let found_key = match result_code { ResultCode::Ok => true, ResultCode::KeyNotFoundError | ResultCode::FilteredOut => false, - rc => { - return Err(Error::BatchError(batch_index, rc, false, conn.addr.clone())); - } + _ => unreachable!(), }; - conn.buffer.skip(6); - let generation = conn.buffer.read_u32(None); - let expiration = conn.buffer.read_u32(None); - let batch_index = conn.buffer.read_u32(None); - let field_count = conn.buffer.read_u16(None) as usize; // almost certainly 0 - let op_count = conn.buffer.read_u16(None) as usize; + conn.buffer().skip(6); + let generation = conn.buffer().read_u32(None); + let expiration = conn.buffer().read_u32(None); + let batch_index = conn.buffer().read_u32(None); + let field_count = conn.buffer().read_u16(None) as usize; // almost certainly 0 + let op_count = conn.buffer().read_u16(None) as usize; - let (key, _) = Self::parse_key(conn, field_count).await?; + let (key, _) = StreamCommand::parse_key(conn, field_count).await?; let record = if found_key { let mut bins: HashMap = HashMap::with_capacity(op_count); for _ in 0..op_count { conn.read_buffer(8).await?; - let op_size = conn.buffer.read_u32(None) as usize; - conn.buffer.skip(1); - let particle_type = conn.buffer.read_u8(None); - conn.buffer.skip(1); - let name_size = conn.buffer.read_u8(None) as usize; + let op_size = conn.buffer().read_u32(None) as usize; + conn.buffer().skip(1); + let particle_type = conn.buffer().read_u8(None); + conn.buffer().skip(1); + let name_size = conn.buffer().read_u8(None) as usize; conn.read_buffer(name_size).await?; - let name = conn.buffer.read_str(name_size)?; + let name = conn.buffer().read_str(name_size)?; let particle_bytes_size = op_size - (4 + name_size); conn.read_buffer(particle_bytes_size).await?; - let value = - value::bytes_to_particle(particle_type, &mut conn.buffer, particle_bytes_size)?; + let value = value::bytes_to_particle( + particle_type, + &mut conn.buffer(), + particle_bytes_size, + )?; bins.insert(name, value); } @@ -316,13 +348,18 @@ impl<'a> BatchOperateCommand<'a> { conn: &mut Connection, ) -> Result<()> { loop { + let mut conn = BufferedConn::new(conn); + conn.set_limit(8)?; conn.read_buffer(8).await?; - let size = conn.buffer.read_msg_size(None); + let size = conn.buffer().read_msg_size(None); conn.bookmark(); - if size > 0 && !Self::parse_group(batch_ops, conn, size as usize).await? { + + conn.set_limit(size)?; + if size > 0 && !Self::parse_group(batch_ops, &mut conn, size as usize).await? { break; } } + conn.close().await; Ok(()) } @@ -334,54 +371,4 @@ impl<'a> BatchOperateCommand<'a> { conn.buffer.write_timeout(timeout); Ok(()) } - - async fn parse_key(conn: &mut Connection, field_count: usize) -> Result<(Key, Option)> { - let mut digest: [u8; 20] = [0; 20]; - let mut namespace: String = "".to_string(); - let mut set_name: String = "".to_string(); - let mut orig_key: Option = None; - let mut bval = None; - - for _ in 0..field_count { - conn.read_buffer(4).await?; - let field_len = conn.buffer.read_u32(None) as usize; - conn.read_buffer(field_len).await?; - let field_type = conn.buffer.read_u8(None); - - match field_type { - x if x == FieldType::DigestRipe as u8 => { - digest.copy_from_slice(conn.buffer.read_slice(field_len - 1)); - } - x if x == FieldType::Namespace as u8 => { - namespace = conn.buffer.read_str(field_len - 1)?; - } - x if x == FieldType::Table as u8 => { - set_name = conn.buffer.read_str(field_len - 1)?; - } - x if x == FieldType::Key as u8 => { - let particle_type = conn.buffer.read_u8(None); - let particle_bytes_size = field_len - 2; - orig_key = Some(bytes_to_particle( - particle_type, - &mut conn.buffer, - particle_bytes_size, - )?); - } - x if x == FieldType::BValArray as u8 => { - bval = Some(conn.buffer.read_le_u64(None)); - } - _ => unreachable!(), - } - } - - Ok(( - Key { - namespace, - set_name, - user_key: orig_key, - digest, - }, - bval, - )) - } } diff --git a/aerospike-core/src/commands/buffer.rs b/aerospike-core/src/commands/buffer.rs index 5f99cd6e..e611f756 100644 --- a/aerospike-core/src/commands/buffer.rs +++ b/aerospike-core/src/commands/buffer.rs @@ -116,6 +116,7 @@ pub(crate) const MAX_BUFFER_SIZE: usize = 120 * 1024 * 1024 + 8; // 120 MB + hea pub struct Buffer { pub data_buffer: Vec, pub data_offset: usize, + // pub estimated_data_offset: usize, pub reclaim_threshold: usize, } @@ -124,6 +125,7 @@ impl Buffer { Buffer { data_buffer: Vec::with_capacity(1024), data_offset: 0, + // estimated_data_offset: 0, reclaim_threshold, } } @@ -134,6 +136,7 @@ impl Buffer { pub(crate) fn size_buffer(&mut self) -> Result<()> { let offset = self.data_offset; + // self.estimated_data_offset = offset; self.resize_buffer(offset) } @@ -165,6 +168,13 @@ impl Buffer { | ((i64::from(CL_MSG_VERSION) << 56) as i64) | (i64::from(AS_MSG_TYPE) << 48); + // assert!( + // self.data_offset == self.estimated_data_offset, + // "estimated command size was not correct: est {} != {} actual", + // self.estimated_data_offset, + // self.data_offset + // ); + // reset data offset self.reset_offset(); self.write_i64(size); @@ -555,11 +565,11 @@ impl Buffer { match batch_op { BatchOperation::Read { br: _, - policy, + policy: brpolicy, bins, ops, } => { - attr.set_batch_read(policy); + attr.set_batch_read(brpolicy, policy); match (bins, ops) { (Bins::Some(bin_names), Some(ops)) if bin_names.len() > 0 && ops.len() > 0 => @@ -591,23 +601,30 @@ impl Buffer { } } } - BatchOperation::Write { br: _, policy, ops } => { - attr.set_batch_write(policy); + BatchOperation::Write { + br: _, + policy: bwpolicy, + ops, + } => { + attr.set_batch_write(bwpolicy, policy); attr.adjust_write(ops); self.write_batch_operations(key, ops, &attr, &attr.filter_expression)?; } - BatchOperation::Delete { br: _, policy } => { - attr.set_batch_delete(policy); + BatchOperation::Delete { + br: _, + policy: bdpolicy, + } => { + attr.set_batch_delete(bdpolicy, policy); self.write_batch_write(key, &attr, &attr.filter_expression, 0, 0)?; } BatchOperation::UDF { br: _, - policy, + policy: bupolicy, udf_name, function_name, args, } => { - attr.set_batch_udf(policy); + attr.set_batch_udf(bupolicy, policy); self.write_batch_write(key, &attr, &attr.filter_expression, 3, 0)?; self.write_field_string(udf_name, FieldType::UdfPackageName); self.write_field_string(function_name, FieldType::UdfFunction); @@ -1156,6 +1173,7 @@ impl Buffer { filter.clone().map_or(0, |filter| { let filter_size = filter.pack(&mut None); self.data_offset += filter_size + FIELD_HEADER_SIZE as usize; + // filter_size + FIELD_HEADER_SIZE as usize filter_size }) } @@ -1380,7 +1398,7 @@ impl Buffer { fn write_filter_expression(&mut self, filter: &FilterExpression, size: usize) { self.write_field_header(size, FieldType::FilterExp); - filter.pack(&mut Some(self)); + let _ = filter.pack(&mut Some(self)); } fn write_field_header(&mut self, size: usize, ftype: FieldType) { diff --git a/aerospike-core/src/errors.rs b/aerospike-core/src/errors.rs index 9e69e405..1628a364 100644 --- a/aerospike-core/src/errors.rs +++ b/aerospike-core/src/errors.rs @@ -94,6 +94,9 @@ pub enum Error { /// Server responded with a response code indicating an error condition for batch. #[error("BatchIndex error: Index: {0:?}, Result Code: {1:?}, In Doubt: {2}, Node: {3}")] BatchError(u32, ResultCode, bool, String), + /// Server responded with a response code indicating an error condition for batch. + #[error("BatchIndex error: Index: {0:?}, Result Code: {1:?}, In Doubt: {2}, Node: {3}")] + BatchLastError(u32, ResultCode, bool, String), /// Server responded with a response code indicating an error condition. #[error("Server error: {0:?}, In Doubt: {1}, Node: {2}")] ServerError(ResultCode, bool, String), diff --git a/aerospike-core/src/expressions/mod.rs b/aerospike-core/src/expressions/mod.rs index 52ea7461..ce407188 100644 --- a/aerospike-core/src/expressions/mod.rs +++ b/aerospike-core/src/expressions/mod.rs @@ -28,7 +28,7 @@ use std::collections::HashMap; use std::fmt::Debug; /// Expression Data Types for usage in some `FilterExpressions` on for example Map and List -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum ExpType { /// NIL Expression Type NIL = 0, @@ -52,7 +52,7 @@ pub enum ExpType { HLL = 9, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] #[doc(hidden)] pub(crate) enum ExpOp { Unknown = 0, @@ -116,7 +116,7 @@ pub(crate) enum ExpOp { #[doc(hidden)] pub const MODIFY: i64 = 0x40; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] #[doc(hidden)] pub enum ExpressionArgument { Value(Value), @@ -127,7 +127,7 @@ pub enum ExpressionArgument { /// Filter expression, which can be applied to most commands, to control which records are /// affected by the command. Filter expression are created using the functions in the /// [expressions](crate::expressions) module and its submodules. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct FilterExpression { /// The Operation code cmd: Option, diff --git a/aerospike-core/src/operations/cdt_context.rs b/aerospike-core/src/operations/cdt_context.rs index 4e5c004f..60f08859 100644 --- a/aerospike-core/src/operations/cdt_context.rs +++ b/aerospike-core/src/operations/cdt_context.rs @@ -36,7 +36,7 @@ pub enum CtxType { /// for the current level. /// An array of CTX identifies location of the list/map on multiple /// levels on nesting. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct CdtContext { /// Context Type pub id: u8, diff --git a/aerospike-core/src/policy/expiration.rs b/aerospike-core/src/policy/expiration.rs index 9ad9ccfa..58985bd2 100644 --- a/aerospike-core/src/policy/expiration.rs +++ b/aerospike-core/src/policy/expiration.rs @@ -20,7 +20,7 @@ const NEVER_EXPIRE: u32 = 0xFFFF_FFFF; // -1 as i32 const DONT_UPDATE: u32 = 0xFFFF_FFFE; // -2 as i32 /// Record expiration, also known as time-to-live (TTL). -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum Expiration { /// Set the record to expire X seconds from now Seconds(u32), diff --git a/aerospike-core/src/policy/read_touch_ttl_percent.rs b/aerospike-core/src/policy/read_touch_ttl_percent.rs index 072a264d..7423fe1f 100644 --- a/aerospike-core/src/policy/read_touch_ttl_percent.rs +++ b/aerospike-core/src/policy/read_touch_ttl_percent.rs @@ -27,7 +27,7 @@ const DONT_RESET: u32 = 0xFFFF_FFFF; // -1 as i32 /// 80, the next read within 8 hours of the record's end of life (equivalent to 2 hours after the most /// recent write) will result in a touch, resetting the TTL to another 10 hours. /// Supported in server v8+. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum ReadTouchTTL { /// 1 - 100 : Reset record TTL on reads when within this percentage of the most recent write TTL Percent(u8), diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 663c2861..f13a3c9f 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -41,6 +41,8 @@ lazy_static! { env::var("AEROSPIKE_HOSTS").unwrap_or_else(|_| String::from("127.0.0.1:3100")); static ref AEROSPIKE_NAMESPACE: String = env::var("AEROSPIKE_NAMESPACE").unwrap_or_else(|_| String::from("test")); + static ref AEROSPIKE_PROP_SET_NAME: String = + env::var("AEROSPIKE_PROP_SET_NAME").unwrap_or_else(|_| String::from("test")); static ref AEROSPIKE_CLUSTER: Option = env::var("AEROSPIKE_CLUSTER").ok(); static ref AEROSPIKE_USE_SERVICES_ALTERNATE: bool = env::var("AEROSPIKE_USE_SERVICES_ALTERNATE").is_ok(); @@ -154,6 +156,10 @@ pub fn namespace() -> &'static str { &*AEROSPIKE_NAMESPACE } +pub fn prop_setname() -> &'static str { + &*AEROSPIKE_PROP_SET_NAME +} + pub fn client_policy() -> &'static ClientPolicy { &*GLOBAL_CLIENT_POLICY } diff --git a/tests/proptests/batch_operation.rs b/tests/proptests/batch_operation.rs new file mode 100644 index 00000000..ff43c3bc --- /dev/null +++ b/tests/proptests/batch_operation.rs @@ -0,0 +1,118 @@ +use crate::common; +use crate::proptest::prelude::*; +use crate::proptest_async; + +use crate::proptests::key::*; +use crate::proptests::operation::*; +use crate::proptests::value::*; + +use aerospike::policy::*; +use aerospike::*; + +use futures::stream::StreamExt; + +use crate::proptests::operation::any_operation_readish; +use crate::proptests::{bins::*, partition_filter::*, policy::*}; + +#[derive(Clone, Debug, PartialEq)] +pub enum PropBatchOperation { + ReadBins(BatchReadPolicy, Bins), + ReadOps(BatchReadPolicy, Vec), + Write(BatchWritePolicy, Vec), + // Delete(BatchDeletePolicy, Key), + // UDF(BatchUDFPolicy, Key, String, String, Option>), +} + +impl PropBatchOperation { + pub fn to_op(&self, key: Key) -> aerospike::BatchOperation<'_> { + match self { + PropBatchOperation::ReadBins(brp, bins) => BatchOperation::read(brp, key, bins.clone()), + PropBatchOperation::ReadOps(brp, ops) => { + BatchOperation::read_ops(brp, key, ops.iter().map(|op| op.to_op()).collect()) + } + PropBatchOperation::Write(bwp, ops) => { + BatchOperation::write(bwp, key, ops.iter().map(|op| op.to_op()).collect()) + } // PropBatchOperation::Delete(bdp) => todo!(), + // PropBatchOperation::UDF(bup, package, func, vals) => todo!(), + } + } +} + +// select one batch operation and return a strategy for it. + +pub fn any_batch_operation(bin: Bin) -> impl Strategy { + prop_oneof![ + // bop_read_bins(), + // bop_read_ops(), + bop_write(), + // bop_delete(), + // bop_udf(), + ] +} + +pub fn any_batch_read_operation(bin: Bin) -> impl Strategy { + prop_oneof![bop_read_bins(), bop_read_ops(),] +} + +pub fn any_batch_write_operation(bin: Bin) -> impl Strategy { + prop_oneof![bop_write()] +} + +prop_compose! { + pub fn many_batch_operations(n: usize)(bin in bin())(ops in prop::collection::vec(any_batch_operation(bin), 1..n as usize)) -> Vec { + ops + } +} + +prop_compose! { + pub fn many_batch_read_operations(n: usize)(bin in bin())(ops in prop::collection::vec(any_batch_read_operation(bin), 1..n as usize)) -> Vec { + ops + } +} + +prop_compose! { + pub fn many_batch_write_operations(n: usize)(bin in bin())(ops in prop::collection::vec(any_batch_write_operation(bin), 1..n as usize)) -> Vec { + ops + } +} + +// Given a randomly selected BatchReadPolicy, and +// given a random set of 10 bins, then +// construct an enumeration variant that implements a strategy. +// +// Later on, we'll 'match' on this variant (maybe not us directly, but one +// of aerospike's APIs) to invoke calls back to the server. + +prop_compose! { + pub fn bop_read_bins()( + brp in batch_read_policy(), + bs in bins(10), + ) -> PropBatchOperation { + // eprintln!("bop_read_bins() called"); + PropBatchOperation::ReadBins(brp, bs) + } +} + +prop_compose! { + pub fn bop_read_ops() + (n in 1usize..20, bin in bin()) + ( + brp in batch_read_policy(), + ops in prop::collection::vec(operation_readish(bin), n) + ) -> PropBatchOperation { + // eprintln!("bop_read_ops() called"); + PropBatchOperation::ReadOps(brp, ops) + } +} + +prop_compose! { + pub fn bop_write() + (n in 1usize..2, bin in bin()) + ( + bwp in batch_write_policy(), + ops in prop::collection::vec(operation_writeish(bin), n) + ) -> PropBatchOperation { + // eprintln!("bop_write() called"); + PropBatchOperation::Write(bwp, ops) + } +} diff --git a/tests/proptests/batches.rs b/tests/proptests/batches.rs new file mode 100644 index 00000000..332c4ec9 --- /dev/null +++ b/tests/proptests/batches.rs @@ -0,0 +1,120 @@ +use crate::proptest::prelude::*; +use crate::proptest_async; +use crate::{common, proptests::key}; +use proptest::strategy::{Strategy, ValueTree}; +use proptest::test_runner::TestRunner; + +use crate::proptests::value::*; + +use aerospike::query::*; +use aerospike::*; + +use futures::stream::StreamExt; + +use crate::proptests::{ + batch_operation::*, bins::*, key::*, operation::*, partition_filter::*, policy::*, +}; + +proptest_async::proptest! { + #[test] + async fn batch_read( + i in 0..10, + batch_policy in batch_policy(30000), + ops in many_batch_read_operations(2), + ) { + let client = common::singleton_client().await; + let namespace: &str = common::namespace(); + let set_name: &str = common::prop_setname(); + + // let now = aerospike_rt::time::Instant::now(); + // eprintln!("PRPBAT001 It is now {:?}", now.elapsed()); + + // let as_ops: Vec = ops.into_iter().map(|op| op.to_op()).collect(); + let mut as_ops = vec![]; + for op in &ops { + let key = as_key!(namespace, set_name, i); + let as_op = op.to_op(key); + as_ops.push(as_op); + } + + // eprintln!("PRPBAT002 Submitting batch operation at {:?}", now.elapsed()); + let res = client.batch(&batch_policy, &as_ops).await; + // eprintln!("PRPBAT003 Batch returned in {:?}", now.elapsed()); + + match res { + // Err(Error::ServerError(ResultCode::ParameterError, _, _)) => { + // if write_policy.respond_per_each_op && ops.into_iter().find(|op| *op == PropOperation::Get).is_some() { + // return; + // } + // }, // it's fine + // Err(Error::ServerError(ResultCode::BinTypeError, _, _)) => { + // } + // Err(Error::ServerError(ResultCode::KeyNotFoundError, _, _)) => { + // }, + // Err(e @ Error::ServerError(ResultCode::KeyExistsError, _, _)) => { + // if write_policy.record_exists_action != RecordExistsAction::CreateOnly { + // panic!("{}",e); + // } + // }, + // Err(e @ Error::ServerError(ResultCode::GenerationError, _, _)) => { + // if write_policy.generation_policy != GenerationPolicy::None { + // return; // it's fine + // } + // panic!("{}", e); + // }, + Err(e) => panic!("{}", e), + Ok(res) => (), //println!("OK"), + } + } + + #[test] + async fn batch_write( + i in 0..10, + batch_policy in batch_policy(30000), + ops in many_batch_write_operations(2), + ) { + let client = common::singleton_client().await; + let namespace: &str = common::namespace(); + let set_name: &str = common::prop_setname(); + + let now = aerospike_rt::time::Instant::now(); + eprintln!("PRPBAT001 It is now {:?}", now.elapsed()); + + // let as_ops: Vec = ops.into_iter().map(|op| op.to_op()).collect(); + let mut as_ops = vec![]; + for op in &ops { + let key = as_key!(namespace, set_name, i); + let as_op = op.to_op(key); + as_ops.push(as_op); + } + + eprintln!("PRPBAT002 Submitting batch operation at {:?}", now.elapsed()); + let res = client.batch(&batch_policy, &as_ops).await; + eprintln!("PRPBAT003 Batch returned in {:?}", now.elapsed()); + + match res { + // Err(Error::ServerError(ResultCode::ParameterError, _, _)) => { + // if write_policy.respond_per_each_op && ops.into_iter().find(|op| *op == PropOperation::Get).is_some() { + // return; + // } + // }, // it's fine + // Err(Error::ServerError(ResultCode::BinTypeError, _, _)) => { + // } + // Err(Error::ServerError(ResultCode::KeyNotFoundError, _, _)) => { + // }, + // Err(e @ Error::ServerError(ResultCode::KeyExistsError, _, _)) => { + // if write_policy.record_exists_action != RecordExistsAction::CreateOnly { + // panic!("{}",e); + // } + // }, + // Err(e @ Error::ServerError(ResultCode::GenerationError, _, _)) => { + // if write_policy.generation_policy != GenerationPolicy::None { + // return; // it's fine + // } + // panic!("{}", e); + // }, + Err(e) => panic!("{}", e), + Ok(res) => (), //println!("OK"), + } + } +} diff --git a/tests/proptests/kv.rs b/tests/proptests/kv.rs index 4b4af1d8..c8b2c36e 100644 --- a/tests/proptests/kv.rs +++ b/tests/proptests/kv.rs @@ -11,7 +11,7 @@ proptest_async::proptest! { async fn put(i in 0..10, write_policy in write_policy(5000), ref bins in many_bins(255)) { let client = common::singleton_client().await; let namespace: &str = common::namespace(); - let set_name = "test"; + let set_name: &str = common::prop_setname(); let key = as_key!(namespace, set_name, i); let err = client.put(&write_policy, &key, bins).await; @@ -44,7 +44,7 @@ proptest_async::proptest! { async fn add(i in 0..1000, write_policy in write_policy_without_replace(3000), val in -1000..1000) { let client = common::singleton_client().await; let namespace: &str = common::namespace(); - let set_name = "test"; + let set_name: &str = common::prop_setname(); let key = as_key!(namespace, set_name, i); let bins = vec![ as_bin!("bin_i", val), @@ -81,7 +81,7 @@ proptest_async::proptest! { async fn append(i in 0..1000, write_policy in write_policy_without_replace(3000), s in "[\\w\\d]{1,1000}") { let client = common::singleton_client().await; let namespace: &str = common::namespace(); - let set_name = "test"; + let set_name: &str = common::prop_setname(); let key = as_key!(namespace, set_name, i); let bins = vec![ as_bin!("bin_s", s.clone()), @@ -118,7 +118,7 @@ proptest_async::proptest! { async fn prepend(i in 0..1000, write_policy in write_policy_without_replace(3000), s in "[\\w\\d]{1,1000}") { let client = common::singleton_client().await; let namespace: &str = common::namespace(); - let set_name = "test"; + let set_name: &str = common::prop_setname(); let key = as_key!(namespace, set_name, i); let bins = vec![ as_bin!("bin_s", s.clone()), @@ -155,7 +155,7 @@ proptest_async::proptest! { async fn touch(i in 0..1000, write_policy in write_policy_without_replace(3000)) { let client = common::singleton_client().await; let namespace: &str = common::namespace(); - let set_name = "test"; + let set_name: &str = common::prop_setname(); let key = as_key!(namespace, set_name, i); let err = client.touch(&write_policy, &key).await; @@ -183,7 +183,7 @@ proptest_async::proptest! { async fn delete(i in 0..1000, write_policy in write_policy_without_replace(3000)) { let client = common::singleton_client().await; let namespace: &str = common::namespace(); - let set_name = "test"; + let set_name: &str = common::prop_setname(); let key = as_key!(namespace, set_name, i); let err = client.touch(&write_policy, &key).await; @@ -212,7 +212,7 @@ proptest_async::proptest! { let client = common::singleton_client().await; let namespace: &str = common::namespace(); - let set_name = "test"; + let set_name: &str = common::prop_setname(); let key = as_key!(namespace, set_name, i); let res = client.get(&read_policy, &key, bins).await; @@ -234,7 +234,7 @@ proptest_async::proptest! { async fn exists(i in 0..1000, read_policy in read_policy(3000)) { let client = common::singleton_client().await; let namespace: &str = common::namespace(); - let set_name = "test"; + let set_name: &str = common::prop_setname(); let key = as_key!(namespace, set_name, i); let res = client.exists(&read_policy, &key).await; diff --git a/tests/proptests/mod.rs b/tests/proptests/mod.rs index bf136d45..86a64cdb 100644 --- a/tests/proptests/mod.rs +++ b/tests/proptests/mod.rs @@ -13,6 +13,8 @@ // License for the specific language governing permissions and limitations under // the License. +mod batch_operation; +mod batches; mod bins; mod filter_expression; mod key; diff --git a/tests/proptests/operation.rs b/tests/proptests/operation.rs index e721c96e..c9ac96d8 100644 --- a/tests/proptests/operation.rs +++ b/tests/proptests/operation.rs @@ -50,6 +50,11 @@ pub fn any_operation(bin: Bin) -> impl Strategy { operation(bin) } +pub fn any_operation_readish(bin: Bin) -> impl Strategy { + operation_readish(bin) +} + +// Selects an operation that is a readish or write-ish in nature. pub fn operation(bin: Bin) -> impl Strategy { prop_oneof![ // op_get(), @@ -64,6 +69,22 @@ pub fn operation(bin: Bin) -> impl Strategy { ] } +// Selects an operation that is strictly readish in nature. +pub fn operation_readish(bin: Bin) -> impl Strategy { + prop_oneof![op_get(), op_get_header(), op_get_bin(bin.clone().name),] +} + +// Selects an operation that is strictly write-ish in nature. +pub fn operation_writeish(bin: Bin) -> impl Strategy { + prop_oneof![ + op_touch(), + op_put(bin.clone()), + op_append(bin.clone()), + op_prepend(bin.clone()), + op_add(bin.clone()), + ] +} + pub fn op_get() -> impl Strategy { Just(PropOperation::Get) } diff --git a/tests/proptests/policy.rs b/tests/proptests/policy.rs index 32350579..937b49de 100644 --- a/tests/proptests/policy.rs +++ b/tests/proptests/policy.rs @@ -6,13 +6,16 @@ use aerospike::policy::BasePolicy; use aerospike::policy::Replica; use aerospike::CollectionIndexType; use aerospike::CommitLevel; +use aerospike::Concurrency; use aerospike::GenerationPolicy; use aerospike::QueryDuration; use aerospike::QueryPolicy; use aerospike::ReadTouchTTL; use aerospike::RecordExistsAction; -use aerospike::{Expiration, ReadPolicy, ScanPolicy, WritePolicy}; +use aerospike::{ + BatchPolicy, BatchReadPolicy, BatchWritePolicy, Expiration, ReadPolicy, ScanPolicy, WritePolicy, +}; use proptest::bool; use proptest::prelude::*; @@ -27,6 +30,10 @@ pub fn read_touch_ttl() -> impl Strategy { ] } +pub fn concurrency() -> impl Strategy { + prop_oneof![Just(Concurrency::Sequential), Just(Concurrency::Parallel),] +} + pub fn consistency_level() -> impl Strategy { prop_oneof![ Just(ConsistencyLevel::ConsistencyOne), @@ -66,7 +73,7 @@ pub fn replica() -> impl Strategy { prop_oneof![ Just(Replica::Master), Just(Replica::Sequence), - Just(Replica::PreferRack), + // Just(Replica::PreferRack), ] } @@ -90,10 +97,10 @@ pub fn collection_index_type() -> impl Strategy { pub fn record_exists_action() -> impl Strategy { prop_oneof![ Just(RecordExistsAction::Update), - Just(RecordExistsAction::UpdateOnly), - Just(RecordExistsAction::Replace), - Just(RecordExistsAction::ReplaceOnly), - Just(RecordExistsAction::CreateOnly), + // Just(RecordExistsAction::UpdateOnly), + // Just(RecordExistsAction::Replace), + // Just(RecordExistsAction::ReplaceOnly), + // Just(RecordExistsAction::CreateOnly), ] } @@ -127,7 +134,7 @@ pub fn base_policy(timeout_ms: u32) -> impl Strategy { duration_ms_opt(100, 500), consistency_level(), read_touch_ttl(), - true_or_false_filter_expression(), + Just(None), //true_or_false_filter_expression(), ) .prop_map( |( @@ -295,3 +302,65 @@ pub fn read_policy(timeout_ms: u32) -> impl Strategy { replica, }) } + +pub fn batch_policy(timeout_ms: u32) -> impl Strategy { + ( + base_policy(timeout_ms), + concurrency(), + any::(), + any::(), + any::(), + true_or_false_filter_expression(), + replica(), + ) + .prop_map( + |( + base_policy, + concurrency, + allow_inline, + allow_inline_ssd, + respond_all_keys, + filter_expression, + replica, + )| { + BatchPolicy { + base_policy, + concurrency, + allow_inline, + allow_inline_ssd, + respond_all_keys, + filter_expression, + replica, + } + }, + ) +} + +pub fn batch_read_policy() -> impl Strategy { + (read_touch_ttl(), true_or_false_filter_expression()).prop_map( + |(read_touch_ttl, filter_expression)| BatchReadPolicy { + read_touch_ttl, + filter_expression, + }, + ) +} + +prop_compose! { + pub fn batch_write_policy() + ( + record_exists_action in record_exists_action(), + expiration in expiration(0, 5), + durable_delete in any::(), + filter_expression in true_or_false_filter_expression(), + ) + -> BatchWritePolicy { + BatchWritePolicy { + record_exists_action, + expiration, + durable_delete, + filter_expression, + // for all other fields, assume their default values. + ..Default::default() + } + } +} diff --git a/tests/src/batch.rs b/tests/src/batch.rs index f9cdb3e9..a6c29554 100644 --- a/tests/src/batch.rs +++ b/tests/src/batch.rs @@ -218,8 +218,8 @@ async fn batch_operate_read_touch_ttl() { let _ = env_logger::try_init(); let client = common::client().await; - let namespace: &str = "test"; //common::namespace(); - let set_name = "test"; // &common::rand_str(10); + let namespace: &str = common::namespace(); + let set_name = &common::rand_str(10); let mut bpolicy = BatchPolicy::default(); bpolicy.concurrency = Concurrency::Parallel; diff --git a/tools/docker/Dockerfile b/tools/docker/Dockerfile new file mode 100644 index 00000000..173cd256 --- /dev/null +++ b/tools/docker/Dockerfile @@ -0,0 +1,17 @@ +FROM rust:trixie + +# ENV AEROSPIKE_CLIENT_BRANCH=tls +# ENV AEROSPIKE_HOSTS=build-trixie.:3000 +# ENV AEROSPIKE_USE_SERVICES_ALTERNATE=false +# ENV AEROSPIKE_NAMESPACE=ns +# ENV CARGO_TEST_OPTS='--no-default-features --features async,rt-tokio -- --no-capture' +# ENV PROPTEST_CASES=1000 + +WORKDIR /app +#RUN apt update -y && \ +# apt install -y git curl build-essential + +COPY startup-sequence /app/startup-sequence +#RUN echo 'source $HOME/.cargo/env' >> $HOME/.bashrc +CMD "/app/startup-sequence" + diff --git a/tools/docker/startup-sequence b/tools/docker/startup-sequence new file mode 100755 index 00000000..fb56de9d --- /dev/null +++ b/tools/docker/startup-sequence @@ -0,0 +1,38 @@ +#!/bin/bash + +# Come up with a reasonably random namespace for this particular docker +# image when run. Through this, we avoid namespace collisions during +# long-running tests. NOTE!! If running the docker image with the +# --hostname flag, you MUST use a unique name for the hostname. However, +# if you avoid using the --hostname flag, Docker automatically randomizes +# the hostname for us. +# +# We rely upon a (suitably unique) hostname and the current PID. + +export AEROSPIKE_PROP_SET_NAME="prop-$(hostname | md5sum | cut -c 1-6)-$$" + +echo "================================================================" +echo " RECEIVED ENVIRONMENT CONFIGURATION " +echo "----------------------------------------------------------------" +echo "AEROSPIKE_CLIENT_BRANCH = ${AEROSPIKE_CLIENT_BRANCH}" +echo "AEROSPIKE_HOSTS = ${AEROSPIKE_HOSTS}" +echo "AEROSPIKE_NAMESPACE = ${AEROSPIKE_NAMESPACE}" +echo "AEROSPIKE_USE_SERVICES_ALTERNATE = ${AEROSPIKE_USE_SERVICES_ALTERNATE}" +echo "" +echo "CARGO_TEST_OPTS = ${CARGO_TEST_OPTS}" +echo "" +echo "PROPTEST_CASES = ${PROPTEST_CASES}" +echo "----------------------------------------------------------------" +echo " DERIVED ENVIRONMENT CONFIGURATION " +echo "----------------------------------------------------------------" +echo "AEROSPIKE_PROP_SET_NAME = ${AEROSPIKE_PROP_SET_NAME}" +echo "================================================================" + +source $HOME/.cargo/env + +git clone https://github.com/aerospike/aerospike-client-rust.git +cd aerospike-client-rust +git checkout ${AEROSPIKE_CLIENT_BRANCH} +cargo clean +cargo test ${CARGO_TEST_OPTS} +