Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions aerospike-core/src/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(crate) struct BatchRecordIndex {
}

/// Policy for a single batch read operation.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct BatchReadPolicy {
/// read_touch_ttl determines how record TTL (time to live) is affected on reads. When enabled, the server can
/// efficiently operate as a read-based LRU cache where the least recently used records are expired.
Expand Down Expand Up @@ -76,7 +76,7 @@ impl Default for BatchReadPolicy {
}

/// Policy for a single batch write operation.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct BatchWritePolicy {
/// RecordExistsAction qualifies how to handle writes where the record already exists.
pub record_exists_action: RecordExistsAction,
Expand Down Expand Up @@ -128,7 +128,7 @@ impl Default for BatchWritePolicy {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
/// Policy for a single batch delete operation.
pub struct BatchDeletePolicy {
/// GenerationPolicy qualifies how to handle record writes based on record generation.
Expand Down Expand Up @@ -173,7 +173,7 @@ impl Default for BatchDeletePolicy {
}

/// Policy for a single batch udf operation.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct BatchUDFPolicy {
/// Desired consistency guarantee when committing a transaction on the server. The default
/// (CommitAll) indicates that the server should wait for master and all replica commits to
Expand Down Expand Up @@ -211,7 +211,7 @@ impl Default for BatchUDFPolicy {

/// Represents a batch operation.
/// Do not directly create the batch operations. Use the helper methods instead.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum BatchOperation<'a> {
#[doc(hidden)]
Read {
Expand Down
28 changes: 20 additions & 8 deletions aerospike-core/src/commands/batch_attr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,11 @@ impl BatchAttr {
self.send_key = false;
}

pub(crate) fn set_batch_read(&mut self, rp: &BatchReadPolicy) {
self.filter_expression = rp.filter_expression.clone();
pub(crate) fn set_batch_read(&mut self, rp: &BatchReadPolicy, parent: &BatchPolicy) {
self.filter_expression = rp
.filter_expression
.clone()
.or(parent.filter_expression.clone());
self.read_attr = buffer::INFO1_READ;

// if rp.ReadModeAP == ReadModeAPAll {
Expand Down Expand Up @@ -135,8 +138,11 @@ impl BatchAttr {
}
}

pub(crate) fn set_batch_write(&mut self, wp: &BatchWritePolicy) {
self.filter_expression = wp.filter_expression.clone();
pub(crate) fn set_batch_write(&mut self, wp: &BatchWritePolicy, parent: &BatchPolicy) {
self.filter_expression = wp
.filter_expression
.clone()
.or(parent.filter_expression.clone());
self.read_attr = 0;
self.write_attr = buffer::INFO2_WRITE | buffer::INFO2_RESPOND_ALL_OPS;
self.info_attr = 0;
Expand Down Expand Up @@ -221,8 +227,11 @@ impl BatchAttr {
}
}

pub(crate) fn set_batch_udf(&mut self, up: &BatchUDFPolicy) {
self.filter_expression = up.filter_expression.clone();
pub(crate) fn set_batch_udf(&mut self, up: &BatchUDFPolicy, parent: &BatchPolicy) {
self.filter_expression = up
.filter_expression
.clone()
.or(parent.filter_expression.clone());
self.read_attr = 0;
self.write_attr = buffer::INFO2_WRITE;
self.info_attr = 0;
Expand All @@ -245,8 +254,11 @@ impl BatchAttr {
}
}

pub(crate) fn set_batch_delete(&mut self, dp: &BatchDeletePolicy) {
self.filter_expression = dp.filter_expression.clone();
pub(crate) fn set_batch_delete(&mut self, dp: &BatchDeletePolicy, parent: &BatchPolicy) {
self.filter_expression = dp
.filter_expression
.clone()
.or(parent.filter_expression.clone());
self.read_attr = 0;
self.write_attr =
buffer::INFO2_WRITE | buffer::INFO2_RESPOND_ALL_OPS | buffer::INFO2_DELETE;
Expand Down
145 changes: 66 additions & 79 deletions aerospike-core/src/commands/batch_operate_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ use aerospike_rt::time::Instant;
use std::collections::HashMap;
use std::sync::Arc;

use crate::batch;
use crate::batch::BatchOperation;
use crate::batch::BatchRecordIndex;
use crate::cluster::partition::Partition;
use crate::cluster::{Cluster, Node};
use crate::commands::field_type::FieldType;
use crate::commands::Duration;
use crate::commands::StreamCommand;
use crate::commands::{self};
use crate::errors::{Error, Result};
use crate::net::Connection;
use crate::net::{BufferedConn, Connection};
use crate::policy::{BatchPolicy, Policy, PolicyLike, Replica};
use crate::result_code;
use crate::value::bytes_to_particle;
Expand Down Expand Up @@ -128,6 +130,8 @@ impl<'a> BatchOperateCommand<'a> {
.await
.map_err(|e| e.chain_error("Failed to set timeout for send buffer"))?;

conn.exhausted = false;

// Send command.
if let Err(err) = self.write_buffer(&mut conn).await {
// IO errors are considered temporary anomalies. Retry.
Expand All @@ -149,6 +153,8 @@ impl<'a> BatchOperateCommand<'a> {
return Err(err);
}

conn.exhausted = true;

// command has completed successfully. Exit method.
return Ok(self);
}
Expand Down Expand Up @@ -199,7 +205,7 @@ impl<'a> BatchOperateCommand<'a> {

async fn parse_group(
batch_ops: &mut [(BatchOperation<'a>, usize)],
conn: &mut Connection,
conn: &mut BufferedConn<'_>,
size: usize,
) -> Result<bool> {
while conn.bytes_read() < size {
Expand All @@ -214,68 +220,94 @@ impl<'a> BatchOperateCommand<'a> {
batch_op.0.set_record(batch_record.record);
batch_op.0.set_result_code(batch_record.result_code, false);
}
Err(Error::BatchLastError(batch_index, rc, in_doubt, ref msg)) => {
let batch_op = batch_ops
.get_mut(batch_index as usize)
.expect("Invalid batch index");
batch_op.0.set_result_code(rc, in_doubt);
return Err(Error::BatchError(batch_index, rc, in_doubt, msg.clone()));
}
Err(Error::BatchError(batch_index, rc, in_doubt, ..)) => {
let batch_op = batch_ops
.get_mut(batch_index as usize)
.expect("Invalid batch index");
batch_op.0.set_result_code(rc, in_doubt);
}
Err(err @ _) => return Err(err),
Err(err) => return Err(err),
}
}
Ok(true)
}

async fn parse_record(conn: &mut Connection) -> Result<Option<BatchRecordIndex>> {
let batch_index = conn.buffer.read_u32(Some(14));
let result_code = ResultCode::from(conn.buffer.read_u8(Some(5)));
async fn parse_record(conn: &mut BufferedConn<'_>) -> Result<Option<BatchRecordIndex>> {
// if cmd is the end marker of the response, do not proceed further
let info3 = conn.buffer().read_u8(Some(3));
let last_record = info3 & commands::buffer::INFO3_LAST == commands::buffer::INFO3_LAST;

let batch_index = conn.buffer().read_u32(Some(14));
let result_code = ResultCode::from(conn.buffer().read_u8(Some(5)));

match result_code {
ResultCode::Ok => (),
ResultCode::KeyNotFoundError | ResultCode::FilteredOut => (),
rc => {
return Err(Error::BatchError(batch_index, rc, false, conn.addr.clone()));
if last_record {
return Err(Error::BatchLastError(
batch_index,
rc,
false,
conn.conn.addr.clone(),
));
}

return Err(Error::BatchError(
batch_index,
rc,
false,
conn.conn.addr.clone(),
));
}
};

// if cmd is the end marker of the response, do not proceed further
let info3 = conn.buffer.read_u8(Some(3));
if info3 & commands::buffer::INFO3_LAST == commands::buffer::INFO3_LAST {
if last_record {
return Ok(None);
}

let found_key = match result_code {
ResultCode::Ok => true,
ResultCode::KeyNotFoundError | ResultCode::FilteredOut => false,
rc => {
return Err(Error::BatchError(batch_index, rc, false, conn.addr.clone()));
}
_ => unreachable!(),
};

conn.buffer.skip(6);
let generation = conn.buffer.read_u32(None);
let expiration = conn.buffer.read_u32(None);
let batch_index = conn.buffer.read_u32(None);
let field_count = conn.buffer.read_u16(None) as usize; // almost certainly 0
let op_count = conn.buffer.read_u16(None) as usize;
conn.buffer().skip(6);
let generation = conn.buffer().read_u32(None);
let expiration = conn.buffer().read_u32(None);
let batch_index = conn.buffer().read_u32(None);
let field_count = conn.buffer().read_u16(None) as usize; // almost certainly 0
let op_count = conn.buffer().read_u16(None) as usize;

let (key, _) = Self::parse_key(conn, field_count).await?;
let (key, _) = StreamCommand::parse_key(conn, field_count).await?;

let record = if found_key {
let mut bins: HashMap<String, Value> = HashMap::with_capacity(op_count);

for _ in 0..op_count {
conn.read_buffer(8).await?;
let op_size = conn.buffer.read_u32(None) as usize;
conn.buffer.skip(1);
let particle_type = conn.buffer.read_u8(None);
conn.buffer.skip(1);
let name_size = conn.buffer.read_u8(None) as usize;
let op_size = conn.buffer().read_u32(None) as usize;
conn.buffer().skip(1);
let particle_type = conn.buffer().read_u8(None);
conn.buffer().skip(1);
let name_size = conn.buffer().read_u8(None) as usize;
conn.read_buffer(name_size).await?;
let name = conn.buffer.read_str(name_size)?;
let name = conn.buffer().read_str(name_size)?;
let particle_bytes_size = op_size - (4 + name_size);
conn.read_buffer(particle_bytes_size).await?;
let value =
value::bytes_to_particle(particle_type, &mut conn.buffer, particle_bytes_size)?;
let value = value::bytes_to_particle(
particle_type,
&mut conn.buffer(),
particle_bytes_size,
)?;
bins.insert(name, value);
}

Expand Down Expand Up @@ -316,13 +348,18 @@ impl<'a> BatchOperateCommand<'a> {
conn: &mut Connection,
) -> Result<()> {
loop {
let mut conn = BufferedConn::new(conn);
conn.set_limit(8)?;
conn.read_buffer(8).await?;
let size = conn.buffer.read_msg_size(None);
let size = conn.buffer().read_msg_size(None);
conn.bookmark();
if size > 0 && !Self::parse_group(batch_ops, conn, size as usize).await? {

conn.set_limit(size)?;
if size > 0 && !Self::parse_group(batch_ops, &mut conn, size as usize).await? {
break;
}
}
conn.close().await;
Ok(())
}

Expand All @@ -334,54 +371,4 @@ impl<'a> BatchOperateCommand<'a> {
conn.buffer.write_timeout(timeout);
Ok(())
}

async fn parse_key(conn: &mut Connection, field_count: usize) -> Result<(Key, Option<u64>)> {
let mut digest: [u8; 20] = [0; 20];
let mut namespace: String = "".to_string();
let mut set_name: String = "".to_string();
let mut orig_key: Option<Value> = None;
let mut bval = None;

for _ in 0..field_count {
conn.read_buffer(4).await?;
let field_len = conn.buffer.read_u32(None) as usize;
conn.read_buffer(field_len).await?;
let field_type = conn.buffer.read_u8(None);

match field_type {
x if x == FieldType::DigestRipe as u8 => {
digest.copy_from_slice(conn.buffer.read_slice(field_len - 1));
}
x if x == FieldType::Namespace as u8 => {
namespace = conn.buffer.read_str(field_len - 1)?;
}
x if x == FieldType::Table as u8 => {
set_name = conn.buffer.read_str(field_len - 1)?;
}
x if x == FieldType::Key as u8 => {
let particle_type = conn.buffer.read_u8(None);
let particle_bytes_size = field_len - 2;
orig_key = Some(bytes_to_particle(
particle_type,
&mut conn.buffer,
particle_bytes_size,
)?);
}
x if x == FieldType::BValArray as u8 => {
bval = Some(conn.buffer.read_le_u64(None));
}
_ => unreachable!(),
}
}

Ok((
Key {
namespace,
set_name,
user_key: orig_key,
digest,
},
bval,
))
}
}
Loading