Skip to content
Merged
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/metadata/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,7 @@ journal = { workspace = true }
left-right = { workspace = true }
message_bus = { workspace = true }
paste = { workspace = true }
rmp-serde = { workspace = true }
serde = { workspace = true, features = ["derive"] }
slab = { workspace = true }
tracing = { workspace = true }
55 changes: 55 additions & 0 deletions core/metadata/src/impls/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
use crate::stm::StateMachine;
use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, SnapshotError};
use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
use iggy_common::{
header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, ReplyHeader},
Expand All @@ -24,6 +25,60 @@ use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
use tracing::{debug, warn};

#[derive(Debug, Clone)]
#[allow(unused)]
pub struct IggySnapshot {
snapshot: MetadataSnapshot,
}

#[allow(unused)]
impl IggySnapshot {
pub fn new(sequence_number: u64) -> Self {
Self {
snapshot: MetadataSnapshot::new(sequence_number),
}
}

pub fn snapshot(&self) -> &MetadataSnapshot {
&self.snapshot
}
}

impl Snapshot for IggySnapshot {
type Error = SnapshotError;
type SequenceNumber = u64;
type Timestamp = u64;
type Inner = MetadataSnapshot;

fn create<T>(stm: &T, sequence_number: u64) -> Result<Self, SnapshotError>
where
T: FillSnapshot<MetadataSnapshot>,
{
let mut snapshot = MetadataSnapshot::new(sequence_number);

stm.fill_snapshot(&mut snapshot)?;

Ok(Self { snapshot })
}

fn encode(&self) -> Result<Vec<u8>, SnapshotError> {
self.snapshot.encode()
}

fn decode(bytes: &[u8]) -> Result<Self, SnapshotError> {
let snapshot = MetadataSnapshot::decode(bytes)?;
Ok(Self { snapshot })
}

fn sequence_number(&self) -> u64 {
Comment thread
numinnex marked this conversation as resolved.
self.snapshot.sequence_number
}

fn created_at(&self) -> u64 {
Comment thread
numinnex marked this conversation as resolved.
self.snapshot.created_at
}
}

pub trait Metadata<C>
where
C: Consensus,
Expand Down
28 changes: 28 additions & 0 deletions core/metadata/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,20 @@ impl StreamStats {
self.zero_out_messages_count();
self.zero_out_segments_count();
}

pub fn load_for_snapshot(&self) -> (u64, u64, u32) {
(
self.size_bytes.load(Ordering::Relaxed),
self.messages_count.load(Ordering::Relaxed),
self.segments_count.load(Ordering::Relaxed),
)
}

pub fn store_from_snapshot(&self, size_bytes: u64, messages_count: u64, segments_count: u32) {
self.size_bytes.store(size_bytes, Ordering::Relaxed);
self.messages_count.store(messages_count, Ordering::Relaxed);
self.segments_count.store(segments_count, Ordering::Relaxed);
}
}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -219,6 +233,20 @@ impl TopicStats {
self.zero_out_messages_count();
self.zero_out_segments_count();
}

pub fn load_for_snapshot(&self) -> (u64, u64, u32) {
(
self.size_bytes.load(Ordering::Relaxed),
self.messages_count.load(Ordering::Relaxed),
self.segments_count.load(Ordering::Relaxed),
)
}

pub fn store_from_snapshot(&self, size_bytes: u64, messages_count: u64, segments_count: u32) {
self.size_bytes.store(size_bytes, Ordering::Relaxed);
self.messages_count.store(messages_count, Ordering::Relaxed);
self.segments_count.store(segments_count, Ordering::Relaxed);
}
}

#[derive(Default, Debug)]
Expand Down
146 changes: 144 additions & 2 deletions core/metadata/src/stm/consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
// under the License.

use crate::stm::StateHandler;
use crate::{collect_handlers, define_state};
use crate::stm::snapshot::Snapshotable;
use crate::{collect_handlers, define_state, impl_fill_restore};

use ahash::AHashMap;
use iggy_common::create_consumer_group::CreateConsumerGroup;
use iggy_common::delete_consumer_group::DeleteConsumerGroup;
use iggy_common::{IdKind, Identifier};
use serde::{Deserialize, Serialize};
use slab::Slab;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Debug, Clone)]
pub struct ConsumerGroupMember {
Expand Down Expand Up @@ -231,3 +234,142 @@ impl StateHandler for DeleteConsumerGroup {
}
}
}

/// Consumer group member snapshot representation for serialization.
#[derive(Debug, Clone, Serialize, Deserialize)]
Comment thread
numinnex marked this conversation as resolved.
pub struct ConsumerGroupMemberSnapshot {
pub id: usize,
pub client_id: u32,
pub partitions: Vec<usize>,
pub partition_index: usize,
}

/// Consumer group snapshot representation for serialization.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsumerGroupSnapshot {
pub id: usize,
pub name: String,
pub partitions: Vec<usize>,
pub members: Vec<(usize, ConsumerGroupMemberSnapshot)>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsumerGroupsSnapshot {
pub items: Vec<(usize, ConsumerGroupSnapshot)>,
pub topic_index: Vec<((usize, usize), Vec<usize>)>,
pub topic_name_index: Vec<((String, String), Vec<usize>)>,
}

impl Snapshotable for ConsumerGroups {
type Snapshot = ConsumerGroupsSnapshot;

fn to_snapshot(&self) -> Self::Snapshot {
self.inner.read(|inner| {
let items: Vec<(usize, ConsumerGroupSnapshot)> = inner
.items
.iter()
.map(|(group_id, group)| {
let members: Vec<(usize, ConsumerGroupMemberSnapshot)> = group
.members
.iter()
.map(|(member_id, member)| {
(
member_id,
ConsumerGroupMemberSnapshot {
id: member.id,
client_id: member.client_id,
partitions: member.partitions.clone(),
partition_index: member.partition_index.load(Ordering::Relaxed),
},
)
})
.collect();

(
group_id,
ConsumerGroupSnapshot {
id: group.id,
name: group.name.to_string(),
partitions: group.partitions.clone(),
members,
},
)
})
.collect();

let topic_index: Vec<((usize, usize), Vec<usize>)> = inner
.topic_index
.iter()
.map(|(&k, v)| (k, v.clone()))
.collect();

let topic_name_index: Vec<((String, String), Vec<usize>)> = inner
.topic_name_index
.iter()
.map(|((s, t), v)| ((s.to_string(), t.to_string()), v.clone()))
.collect();

ConsumerGroupsSnapshot {
items,
topic_index,
topic_name_index,
}
})
}

fn from_snapshot(
snapshot: Self::Snapshot,
) -> Result<Self, crate::stm::snapshot::SnapshotError> {
let mut name_index: AHashMap<Arc<str>, usize> = AHashMap::new();
let mut group_entries: Vec<(usize, ConsumerGroup)> = Vec::new();

for (slab_key, group_snap) in snapshot.items {
let member_entries: Vec<(usize, ConsumerGroupMember)> = group_snap
.members
.into_iter()
.map(|(member_key, member_snap)| {
let member = ConsumerGroupMember {
id: member_snap.id,
client_id: member_snap.client_id,
partitions: member_snap.partitions,
partition_index: Arc::new(AtomicUsize::new(member_snap.partition_index)),
};
(member_key, member)
})
.collect();
let members: Slab<ConsumerGroupMember> = member_entries.into_iter().collect();

let group_name: Arc<str> = Arc::from(group_snap.name.as_str());
let group = ConsumerGroup {
id: group_snap.id,
name: group_name.clone(),
partitions: group_snap.partitions,
members,
};

name_index.insert(group_name, slab_key);
group_entries.push((slab_key, group));
}

let items = group_entries.into_iter().collect();

let topic_index: AHashMap<(usize, usize), Vec<usize>> =
snapshot.topic_index.into_iter().collect();

let topic_name_index: AHashMap<(Arc<str>, Arc<str>), Vec<usize>> = snapshot
.topic_name_index
.into_iter()
.map(|((s, t), v)| ((Arc::from(s.as_str()), Arc::from(t.as_str())), v))
.collect();

let inner = ConsumerGroupsInner {
name_index,
topic_index,
topic_name_index,
items,
};
Ok(inner.into())
}
}

impl_fill_restore!(ConsumerGroups, consumer_groups);
14 changes: 14 additions & 0 deletions core/metadata/src/stm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

pub mod consumer_group;
pub mod mux;
pub mod snapshot;
pub mod stream;
pub mod user;

Expand Down Expand Up @@ -87,6 +88,19 @@ where
read: Arc<ReadHandle<T>>,
}

impl<T, C> LeftRight<T, C>
where
T: Absorb<C>,
{
pub fn read<F, R>(&self, f: F) -> R
where
F: FnOnce(&T) -> R,
{
let guard = self.read.enter().expect("read handle should be accessible");
f(&*guard)
}
}
Comment thread
hubcio marked this conversation as resolved.

impl<T> From<T> for LeftRight<T, <T as Command>::Cmd>
where
T: Absorb<<T as Command>::Cmd> + Clone + Command,
Expand Down
Loading
Loading