feat(metadata): impl Snapshot interface for Mux state machine.#2675
feat(metadata): impl Snapshot interface for Mux state machine.#2675krishvishal wants to merge 7 commits intoapache:masterfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #2675 +/- ##
==========================================
- Coverage 69.52% 69.32% -0.21%
==========================================
Files 568 569 +1
Lines 55238 55905 +667
Branches 55238 55905 +667
==========================================
+ Hits 38404 38755 +351
- Misses 14956 15247 +291
- Partials 1878 1903 +25
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
| /// # Arguments | ||
| /// * `mux` - The multiplexing state machine containing all sub-state machines | ||
| /// * `commit_number` - The VSR commit number this snapshot corresponds to | ||
| fn create<T>(mux: &MuxStateMachine<T>, commit_number: u64) -> Result<Self, Self::Error> |
There was a problem hiding this comment.
We impl StateMachine for MuxStateMachine, therefor I think we could accept as first argument stm: &mut T
| /// This is the interface that `MetadataHandle::Snapshot` must satisfy. | ||
| /// It provides methods for creating, encoding, decoding, and restoring snapshots. | ||
| #[allow(unused)] | ||
| pub trait MetadataSnapshot: Sized { |
There was a problem hiding this comment.
Lets rename this to Snapshot trait, if there already exists one (that I created as an stub), remove it.
| where | ||
| T: StateMachine + SnapshotContributor; | ||
|
|
||
| /// Get the VSR commit number this snapshot corresponds to. |
There was a problem hiding this comment.
Maybe we could use an sequence_number instead of commit number, the idea is that the sequence number would be monotonically growing on each snapshot, independent of the commit number from consensus.
| } | ||
|
|
||
| /// Consumer group member snapshot representation for serialization. | ||
| #[derive(Debug, Clone, Serialize, Deserialize)] |
There was a problem hiding this comment.
I think we don't need those intermediate structs, we can have the State structs impl Serialize and Deserialize directly.
| } | ||
| } | ||
|
|
||
| /// Recursive case for variadic tuple pattern: (Head, Tail) |
There was a problem hiding this comment.
Hmm, I am not sure if this is the simplest way to go around it.
I imagine the flow of abstraction to be identical to one of the StateMachine trait,
e.g we would have an Snapshotable trait, that would be implemented for everything that impls StateMachine trait,
since StateMachine trait is implemented both for MuxStateMachine aswell as the variadic!() tuple of state machines, the API would be called this way:
fn snapshot_metadata(&self, ....) {
let mut snapshot = MetadataSnapshot::default(); <-- This would impl the `Snapshot` trait.
// Here is the magic, `fill_snapshot` impl for `MuxStateMachine` would proxy to the `variadic!()` tuple impl,
// impl Snapshotable for variadic!(St, ... Rest)
// where St: StateMachine + FillSnapshot,
// Rest: Snapshotable,
// {
// fn fill_snapshot<S: Snapshot>(&self, snapshot: &mut S) {
// self.0.do_fill_snapshot(snapshot);
// self.1.fill_snapshot(snapshot);
// }
// fn restore_snapshot(...) {
// // ... Here similar code for restoring.
// }
// }
// You can choose whatever trait name for the `FillSnapshot` trait, smth that covers filling/recovering
// You can have associated types on the `Snapshot` trait that would be used to represent the currently filled snapshot
// (for example a binary blob), and use those as arguments into the `fill_snapshot` and `restore_snapshot` methods.
// and the snapshot from which we would restore State.
self.mux.fill_snapshot(&mut snapshot);
}
The idea is to avoid using those `const SECTION:NAME: &'str`, rather rely on the type checking to perform the walk for us.There was a problem hiding this comment.
We could alternatively look into extending the StateMachine trait, by adding extra bounds on it
trait StateMachine + Snapshotable + ...
{
}and then have single impl block.
| pub topic_name_index: Vec<((String, String), Vec<usize>)>, | ||
| } | ||
|
|
||
| impl Snapshotable for ConsumerGroupsInner { |
There was a problem hiding this comment.
Impl those for ConsumerGroup and Streams and Users, rather for their inners. You can create an metehod on the LeftRight, that exposes the read handle.
| /// Contains metadata about the snapshot and a collection of sections, | ||
| /// where each section corresponds to one state machine's serialized state. | ||
| #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| pub struct SnapshotEnvelope { |
There was a problem hiding this comment.
This is a valid approach, to store an type-erased bytes inside of the snapshot, but alternative approach (the one that redpanda did choose), is for the snapshot to store an intermediate representation of all the Snapshotable states and serialize those. So rather than serializing individual states, they serialize the snapshot instead.
There was a problem hiding this comment.
I didn't think about it a lot and not sure what is the trade off space between those two approaches (I guess you need more allocations with their approach, as you have to copy the data first and then serialize), where with our approach, we just serialize the state and append it to the snapshot envelope.
The snapshot system has three layers:
Snapshotableper-state-machine trait. EachStreamsInner,UsersInner,ConsumerGroupsInnerimplementsto_snapshot()/ from_snapshot() to convert between in-memory state.SnapshotContributorvisitor trait using the same recursive variadic tuple pattern asStateMachine::update. Walks (Users, (Streams, (ConsumerGroups, ()))) at compile time, collecting each state machine's serialized section into aVec<SnapshotSection>.MetadataSnapshota top-level interface for the generic snapshot type onIggyMetadata. Defines the full lifecycle: create (snapshot from mux) → encode (to bytes) → decode (from bytes) → restore (back to mux).