Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ jobs:
metadata
message_bus
storage
simulator
configs

license-headers:
Expand Down
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ members = [
"core/partitions",
"core/sdk",
"core/server",
"core/simulator",
"core/tools",
"examples/rust",
]
Expand Down
1 change: 1 addition & 0 deletions DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ simd-adler32: 0.3.8, "MIT",
simd-json: 0.17.0, "Apache-2.0 OR MIT",
simdutf8: 0.1.5, "Apache-2.0 OR MIT",
simple_asn1: 0.6.3, "ISC",
simulator: 0.1.0, "N/A",
siphasher: 1.0.1, "Apache-2.0 OR MIT",
slab: 0.4.11, "MIT",
smallvec: 1.15.1, "Apache-2.0 OR MIT",
Expand Down
1 change: 1 addition & 0 deletions core/common/src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub trait Sender {
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum SenderKind {
Tcp(TcpSender),
TcpTls(TcpTlsSender),
Expand Down
28 changes: 27 additions & 1 deletion core/common/src/types/consensus/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ pub enum Operation {
UpdatePermissions = 145,
CreatePersonalAccessToken = 146,
DeletePersonalAccessToken = 147,

Reserved = 200,
}

#[repr(C)]
Expand Down Expand Up @@ -172,6 +174,30 @@ pub struct RequestHeader {
pub reserved: [u8; 95],
}

impl Default for RequestHeader {
fn default() -> Self {
Self {
reserved: [0; 95],
checksum: 0,
checksum_body: 0,
cluster: 0,
size: 0,
epoch: 0,
view: 0,
release: 0,
protocol: 0,
command: Default::default(),
replica: 0,
reserved_frame: [0; 12],
client: 0,
request_checksum: 0,
timestamp: 0,
request: 0,
operation: Default::default(),
}
}
}

unsafe impl Pod for RequestHeader {}
unsafe impl Zeroable for RequestHeader {}

Expand Down Expand Up @@ -348,7 +374,7 @@ impl ConsensusHeader for CommitHeader {
}

#[repr(C)]
#[derive(Debug, Clone, Copy)]
#[derive(Default, Debug, Clone, Copy)]
pub struct ReplyHeader {
pub checksum: u128,
pub checksum_body: u128,
Expand Down
71 changes: 24 additions & 47 deletions core/common/src/types/consensus/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use crate::types::consensus::header::{
self, CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, PrepareOkHeader, ReplyHeader,
use crate::{
header::RequestHeader,
types::consensus::header::{self, ConsensusHeader, PrepareHeader, PrepareOkHeader},
};
use bytes::Bytes;
use std::marker::PhantomData;
Expand Down Expand Up @@ -278,33 +279,36 @@ where
#[derive(Debug)]
#[allow(unused)]
pub enum MessageBag {
Generic(Message<GenericHeader>),
Request(Message<RequestHeader>),
Prepare(Message<PrepareHeader>),
PrepareOk(Message<PrepareOkHeader>),
Commit(Message<CommitHeader>),
Reply(Message<ReplyHeader>),
}

impl MessageBag {
#[allow(unused)]
pub fn command(&self) -> header::Command2 {
match self {
MessageBag::Generic(message) => message.header().command,
MessageBag::Request(message) => message.header().command,
MessageBag::Prepare(message) => message.header().command,
MessageBag::PrepareOk(message) => message.header().command,
MessageBag::Commit(message) => message.header().command,
MessageBag::Reply(message) => message.header().command,
}
}

#[allow(unused)]
pub fn size(&self) -> u32 {
match self {
MessageBag::Generic(message) => message.header().size(),
MessageBag::Request(message) => message.header().size(),
MessageBag::Prepare(message) => message.header().size(),
MessageBag::PrepareOk(message) => message.header().size(),
MessageBag::Commit(message) => message.header().size(),
MessageBag::Reply(message) => message.header().size(),
}
}

#[allow(unused)]
pub fn operation(&self) -> header::Operation {
match self {
MessageBag::Request(message) => message.header().operation,
MessageBag::Prepare(message) => message.header().operation,
MessageBag::PrepareOk(message) => message.header().operation,
}
}
}
Expand All @@ -326,17 +330,17 @@ where
unsafe { Message::<header::PrepareHeader>::from_buffer_unchecked(buffer) };
MessageBag::Prepare(msg)
}
header::Command2::Commit => {
let msg = unsafe { Message::<header::CommitHeader>::from_buffer_unchecked(buffer) };
MessageBag::Commit(msg)
header::Command2::Request => {
let msg =
unsafe { Message::<header::RequestHeader>::from_buffer_unchecked(buffer) };
MessageBag::Request(msg)
}
header::Command2::Reply => {
let msg = unsafe { Message::<header::ReplyHeader>::from_buffer_unchecked(buffer) };
MessageBag::Reply(msg)
header::Command2::PrepareOk => {
let msg =
unsafe { Message::<header::PrepareOkHeader>::from_buffer_unchecked(buffer) };
MessageBag::PrepareOk(msg)
}
_ => unreachable!(
"For now we only support Prepare, Commit, and Reply. In the future we will support more commands. Command2: {command:?}"
),
_ => unreachable!(),
}
}
}
Expand Down Expand Up @@ -500,33 +504,6 @@ mod tests {

assert_eq!(bag.command(), header::Command2::Prepare);
assert!(matches!(bag, MessageBag::Prepare(_)));
assert!(!matches!(bag, MessageBag::Commit(_)));
assert!(!matches!(bag, MessageBag::Reply(_)));
assert!(!matches!(bag, MessageBag::Generic(_)));
}

#[test]
fn test_message_bag_from_commit() {
let commit = header::CommitHeader::create_test();
let bag = MessageBag::from(commit);

assert_eq!(bag.command(), header::Command2::Commit);
assert!(!matches!(bag, MessageBag::Prepare(_)));
assert!(matches!(bag, MessageBag::Commit(_)));
assert!(!matches!(bag, MessageBag::Reply(_)));
assert!(!matches!(bag, MessageBag::Generic(_)));
}

#[test]
fn test_message_bag_from_reply() {
let reply = header::ReplyHeader::create_test();
let bag = MessageBag::from(reply);

assert_eq!(bag.command(), header::Command2::Reply);
assert!(!matches!(bag, MessageBag::Prepare(_)));
assert!(!matches!(bag, MessageBag::Commit(_)));
assert!(matches!(bag, MessageBag::Reply(_)));
assert!(!matches!(bag, MessageBag::Generic(_)));
}
}

Expand Down
Loading
Loading