From 55f9a6e17d9c85fe44d5928b73b8df1b44f7d7e4 Mon Sep 17 00:00:00 2001 From: Joseph Lee Hunsaker Date: Wed, 18 Feb 2026 16:35:48 -0700 Subject: [PATCH 1/2] feat: make redb storage optional with feature-flagged dual-mode architecture Gate redb behind `storage-redb` feature flag (default ON) in keri-core and teliox so the core protocol logic can compile and run without redb. This enables future alternative storage backends (e.g., DynamoDB for serverless). Key changes: - Split EventStorage constructors: generic `new()` (no mailbox) vs `new_redb()` (RedbDatabase with mailbox) vs `new_with_mailbox()` (inject) - Make mailbox_data an Option to support non-redb backends - Remove Any bound from EventValidator - Gate TelLogDatabase, teliox EscrowDatabase, and escrow module behind storage-redb feature - Genericize teliox escrow structs over K: EventDatabase for KEL storage - Add in-memory MemoryDatabase implementing all database traits for validation and testing - Move rkyv_adapter to database::rkyv_adapter (not under database::redb) Co-Authored-By: Claude Opus 4.6 --- .../watcher/src/watcher/watcher_data.rs | 2 +- components/witness/src/tests.rs | 6 +- components/witness/src/witness.rs | 4 +- keriox_core/Cargo.toml | 9 +- keriox_core/src/actor/error.rs | 2 + keriox_core/src/actor/simple_controller.rs | 5 +- keriox_core/src/database/memory.rs | 650 ++++++++++++++++++ keriox_core/src/database/mod.rs | 3 + keriox_core/src/database/redb/mod.rs | 2 +- keriox_core/src/database/rkyv_adapter/mod.rs | 58 ++ .../src/database/rkyv_adapter/said_wrapper.rs | 141 ++++ .../serialization_info_wrapper.rs | 58 ++ keriox_core/src/error/mod.rs | 8 +- .../src/event/event_data/interaction.rs | 2 +- keriox_core/src/event/event_data/rotation.rs | 2 +- keriox_core/src/event/sections/key_config.rs | 2 +- keriox_core/src/event/sections/seal.rs | 2 +- keriox_core/src/event_message/msg.rs | 4 +- keriox_core/src/prefix/mod.rs | 2 +- .../src/processor/escrow/reply_escrow.rs | 21 +- keriox_core/src/processor/event_storage.rs | 71 +- keriox_core/src/processor/validator.rs | 2 +- keriox_core/src/state/mod.rs | 2 +- support/teliox/Cargo.toml | 6 +- support/teliox/src/database/mod.rs | 31 +- support/teliox/src/error.rs | 4 + .../src/processor/escrow/missing_issuer.rs | 16 +- .../src/processor/escrow/missing_registry.rs | 14 +- support/teliox/src/processor/escrow/mod.rs | 16 +- .../src/processor/escrow/out_of_order.rs | 16 +- support/teliox/src/processor/mod.rs | 1 + support/teliox/src/tel/mod.rs | 2 +- 32 files changed, 1059 insertions(+), 105 deletions(-) create mode 100644 keriox_core/src/database/memory.rs create mode 100644 keriox_core/src/database/rkyv_adapter/mod.rs create mode 100644 keriox_core/src/database/rkyv_adapter/said_wrapper.rs create mode 100644 keriox_core/src/database/rkyv_adapter/serialization_info_wrapper.rs diff --git a/components/watcher/src/watcher/watcher_data.rs b/components/watcher/src/watcher/watcher_data.rs index f1311be5..6b89949c 100644 --- a/components/watcher/src/watcher/watcher_data.rs +++ b/components/watcher/src/watcher/watcher_data.rs @@ -112,7 +112,7 @@ impl WatcherData { let prefix = BasicPrefix::Ed25519NT(signer.public_key()); // watcher uses non transferable key let processor = BasicProcessor::new(events_db.clone(), Some(notification_bus)); - let storage = Arc::new(EventStorage::new(events_db)); + let storage = Arc::new(EventStorage::new_redb(events_db)); // construct witness loc scheme oobi let loc_scheme = LocationScheme::new( diff --git a/components/witness/src/tests.rs b/components/witness/src/tests.rs index 6dc5afbf..e376d67c 100644 --- a/components/witness/src/tests.rs +++ b/components/witness/src/tests.rs @@ -99,7 +99,7 @@ fn test_not_fully_witnessed() -> Result<(), Error> { let not = Notice::Event(inception_event.clone()); w.process_notice(not).unwrap(); w.event_storage - .mailbox_data + .mailbox_data.as_ref().unwrap() .get_mailbox_receipts(controller.prefix(), 0) .into_iter() .flatten() @@ -185,7 +185,7 @@ fn test_not_fully_witnessed() -> Result<(), Error> { // first_witness.respond(signer_arc.clone())?; let first_receipt = first_witness .event_storage - .mailbox_data + .mailbox_data.as_ref().unwrap() .get_mailbox_receipts(controller.prefix(), 0) .unwrap() .map(Notice::NontransferableRct) @@ -280,7 +280,7 @@ fn test_qry_rpy() -> Result<(), ActorError> { // send receipts to alice let receipt_to_alice = witness .event_storage - .mailbox_data + .mailbox_data.as_ref().unwrap() .get_mailbox_receipts(alice.prefix(), 0) .unwrap() .map(Notice::NontransferableRct) diff --git a/components/witness/src/witness.rs b/components/witness/src/witness.rs index f6b8eec5..9b555361 100644 --- a/components/witness/src/witness.rs +++ b/components/witness/src/witness.rs @@ -86,7 +86,7 @@ impl Notifier for WitnessReceiptGenerator { impl WitnessReceiptGenerator { pub fn new(signer: Arc, events_db: Arc) -> Self { - let storage = EventStorage::new(events_db.clone()); + let storage = EventStorage::new_redb(events_db.clone()); let prefix = BasicPrefix::Ed25519NT(signer.public_key()); Self { prefix, @@ -172,7 +172,7 @@ impl Witness { let events_db = Arc::new(RedbDatabase::new(&events_database_path).map_err(|_| Error::DbError)?); let mut witness_processor = WitnessProcessor::new(events_db.clone(), escrow_config); - let event_storage = Arc::new(EventStorage::new(events_db.clone())); + let event_storage = Arc::new(EventStorage::new_redb(events_db.clone())); let receipt_generator = Arc::new(WitnessReceiptGenerator::new( signer.clone(), diff --git a/keriox_core/Cargo.toml b/keriox_core/Cargo.toml index be7ed041..a1b5ffa7 100644 --- a/keriox_core/Cargo.toml +++ b/keriox_core/Cargo.toml @@ -13,11 +13,12 @@ repository.workspace = true crate-type = ["cdylib", "rlib"] [features] -default = [] +default = ["storage-redb"] +storage-redb = ["redb"] query = ["serde_cbor"] oobi = ["url", "strum_macros", "strum"] -oobi-manager = ["oobi", "query", "reqwest", "async-trait", "serde_cbor"] -mailbox = ["query", "serde_cbor"] +oobi-manager = ["oobi", "query", "storage-redb", "reqwest", "async-trait", "serde_cbor"] +mailbox = ["query", "storage-redb", "serde_cbor"] [dependencies] bytes = "1.3.0" @@ -43,7 +44,7 @@ chrono = { version = "0.4.18", features = ["serde"] } arrayref = "0.3.6" zeroize = "1.3.0" fraction = { version = "0.9", features = ["with-serde-support"] } -redb = "2.3.0" +redb = { version = "2.3.0", optional = true } # oobis dependecies async-trait = { version = "0.1.57", optional = true } diff --git a/keriox_core/src/actor/error.rs b/keriox_core/src/actor/error.rs index 06f4c7a6..15d2ad96 100644 --- a/keriox_core/src/actor/error.rs +++ b/keriox_core/src/actor/error.rs @@ -1,5 +1,6 @@ use http::StatusCode; +#[cfg(feature = "storage-redb")] use crate::database::redb::RedbError; use crate::event_message::cesr_adapter::ParseError; use crate::keys::KeysError; @@ -74,6 +75,7 @@ impl From for ActorError { } } +#[cfg(feature = "storage-redb")] impl From for ActorError { fn from(err: RedbError) -> Self { ActorError::DbError(err.to_string()) diff --git a/keriox_core/src/actor/simple_controller.rs b/keriox_core/src/actor/simple_controller.rs index 7ac2298a..17cca5c1 100644 --- a/keriox_core/src/actor/simple_controller.rs +++ b/keriox_core/src/actor/simple_controller.rs @@ -3,8 +3,10 @@ use std::{ sync::{Arc, Mutex}, }; +#[cfg(feature = "storage-redb")] +use crate::database::redb::RedbDatabase; use crate::{ - database::{redb::RedbDatabase, EscrowCreator, EventDatabase}, + database::{EscrowCreator, EventDatabase}, processor::escrow::{ maybe_out_of_order_escrow::MaybeOutOfOrderEscrow, partially_witnessed_escrow::PartiallyWitnessedEscrow, @@ -72,6 +74,7 @@ pub struct SimpleController SimpleController { +#[cfg(feature = "storage-redb")] impl SimpleController { // incept a state and keys pub fn new( diff --git a/keriox_core/src/database/memory.rs b/keriox_core/src/database/memory.rs new file mode 100644 index 00000000..3fc101b8 --- /dev/null +++ b/keriox_core/src/database/memory.rs @@ -0,0 +1,650 @@ +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +use said::SelfAddressingIdentifier; + +#[cfg(feature = "query")] +use crate::query::reply_event::SignedReply; +use crate::{ + database::{ + timestamped::{Timestamped, TimestampedSignedEventMessage}, + EscrowCreator, EscrowDatabase, EventDatabase, LogDatabase, QueryParameters, + SequencedEventDatabase, + }, + error::Error, + event::KeyEvent, + event_message::{ + msg::KeriEvent, + signature::{Nontransferable, Transferable}, + signed_event_message::{ + SignedEventMessage, SignedNontransferableReceipt, SignedTransferableReceipt, + }, + }, + prefix::{IdentifierPrefix, IndexedSignature}, + state::IdentifierState, +}; + +/// In-memory implementation of EventDatabase for testing and validation. +pub struct MemoryDatabase { + /// Events stored by identifier prefix, ordered by sn + events: RwLock>>, + /// Key state per identifier + states: RwLock>, + /// Transferable receipts by (id, sn) + receipts_t: RwLock>>, + /// Non-transferable receipts by (id, sn) + receipts_nt: RwLock>>, + /// Log database + log_db: Arc, + /// Escrow counter for creating unique table names + escrow_db: Arc>>>, + #[cfg(feature = "query")] + replies: RwLock>, +} + +impl MemoryDatabase { + pub fn new() -> Self { + Self { + events: RwLock::new(HashMap::new()), + states: RwLock::new(HashMap::new()), + receipts_t: RwLock::new(HashMap::new()), + receipts_nt: RwLock::new(HashMap::new()), + log_db: Arc::new(MemoryLogDatabase::new()), + escrow_db: Arc::new(RwLock::new(HashMap::new())), + #[cfg(feature = "query")] + replies: RwLock::new(HashMap::new()), + } + } +} + +impl EventDatabase for MemoryDatabase { + type Error = Error; + type LogDatabaseType = MemoryLogDatabase; + + fn get_log_db(&self) -> Arc { + self.log_db.clone() + } + + fn add_kel_finalized_event( + &self, + event: SignedEventMessage, + id: &IdentifierPrefix, + ) -> Result<(), Self::Error> { + // Update key state + let current_state = self + .states + .read() + .unwrap() + .get(id) + .cloned() + .unwrap_or_default(); + let new_state = current_state.apply(&event.event_message)?; + self.states.write().unwrap().insert(id.clone(), new_state); + + // Log the event + self.log_db.log_event_internal(&event); + + // Store in KEL + let timestamped = Timestamped::new(event); + self.events + .write() + .unwrap() + .entry(id.clone()) + .or_default() + .push(timestamped); + + Ok(()) + } + + fn add_receipt_t( + &self, + receipt: SignedTransferableReceipt, + id: &IdentifierPrefix, + ) -> Result<(), Self::Error> { + let sn = receipt.body.sn; + let transferable = Transferable::Seal(receipt.validator_seal, receipt.signatures); + self.receipts_t + .write() + .unwrap() + .entry((id.clone(), sn)) + .or_default() + .push(transferable); + Ok(()) + } + + fn add_receipt_nt( + &self, + receipt: SignedNontransferableReceipt, + id: &IdentifierPrefix, + ) -> Result<(), Self::Error> { + let sn = receipt.body.sn; + self.receipts_nt + .write() + .unwrap() + .entry((id.clone(), sn)) + .or_default() + .push(receipt); + Ok(()) + } + + fn get_key_state(&self, id: &IdentifierPrefix) -> Option { + self.states.read().unwrap().get(id).cloned() + } + + fn get_kel_finalized_events( + &self, + params: QueryParameters, + ) -> Option> { + let events = self.events.read().unwrap(); + match params { + QueryParameters::All { id } => { + events.get(id).cloned().map(|v| v.into_iter()) + } + QueryParameters::BySn { ref id, sn } => { + events.get(id).map(|evts| { + evts.iter() + .filter(move |e| e.signed_event_message.event_message.data.get_sn() == sn) + .cloned() + .collect::>() + .into_iter() + }) + } + QueryParameters::Range { + ref id, + start, + limit, + } => events.get(id).map(|evts| { + evts.iter() + .filter(move |e| { + let sn = e.signed_event_message.event_message.data.get_sn(); + sn >= start && sn < start + limit + }) + .cloned() + .collect::>() + .into_iter() + }), + } + } + + fn get_receipts_t( + &self, + params: QueryParameters, + ) -> Option> { + let receipts = self.receipts_t.read().unwrap(); + match params { + QueryParameters::BySn { ref id, sn } => { + receipts.get(&(id.clone(), sn)).cloned().map(|v| v.into_iter()) + } + _ => None, + } + } + + fn get_receipts_nt( + &self, + params: QueryParameters, + ) -> Option> { + let receipts = self.receipts_nt.read().unwrap(); + match params { + QueryParameters::BySn { ref id, sn } => { + receipts.get(&(id.clone(), sn)).cloned().map(|v| v.into_iter()) + } + _ => None, + } + } + + fn accept_to_kel(&self, _event: &KeriEvent) -> Result<(), Self::Error> { + // In redb, this saves the event to KEL tables. For memory, events + // are already in the events map from add_kel_finalized_event. + Ok(()) + } + + #[cfg(feature = "query")] + fn save_reply(&self, reply: SignedReply) -> Result<(), Self::Error> { + let id = reply.reply.get_prefix(); + let signer = reply + .signature + .get_signer() + .ok_or_else(|| Error::SemanticError("Missing signer".into()))?; + self.replies + .write() + .unwrap() + .insert((id, signer), reply); + Ok(()) + } + + #[cfg(feature = "query")] + fn get_reply( + &self, + id: &IdentifierPrefix, + from_who: &IdentifierPrefix, + ) -> Option { + self.replies + .read() + .unwrap() + .get(&(id.clone(), from_who.clone())) + .cloned() + } +} + +/// In-memory log database for storing events by digest. +pub struct MemoryLogDatabase { + events: RwLock>, + signatures: RwLock>>, + nontrans_couplets: RwLock>>, + trans_receipts: RwLock>>, +} + +impl MemoryLogDatabase { + pub fn new() -> Self { + Self { + events: RwLock::new(HashMap::new()), + signatures: RwLock::new(HashMap::new()), + nontrans_couplets: RwLock::new(HashMap::new()), + trans_receipts: RwLock::new(HashMap::new()), + } + } + + fn log_event_internal(&self, event: &SignedEventMessage) { + if let Ok(digest) = event.event_message.digest() { + let timestamped = Timestamped::new(event.clone()); + self.events.write().unwrap().insert(digest.clone(), timestamped); + self.signatures + .write() + .unwrap() + .insert(digest, event.signatures.clone()); + } + } + + fn log_receipt_internal(&self, receipt: &SignedNontransferableReceipt) { + let digest = receipt.body.receipted_event_digest.clone(); + self.nontrans_couplets + .write() + .unwrap() + .entry(digest) + .or_default() + .extend(receipt.signatures.clone()); + } +} + +impl LogDatabase<'static> for MemoryLogDatabase { + type DatabaseType = (); + type Error = Error; + type TransactionType = (); + + fn new(_db: Arc) -> Result { + Ok(Self::new()) + } + + fn log_event( + &self, + _txn: &Self::TransactionType, + signed_event: &SignedEventMessage, + ) -> Result<(), Self::Error> { + self.log_event_internal(signed_event); + Ok(()) + } + + fn log_event_with_new_transaction( + &self, + signed_event: &SignedEventMessage, + ) -> Result<(), Self::Error> { + self.log_event_internal(signed_event); + Ok(()) + } + + fn log_receipt( + &self, + _txn: &Self::TransactionType, + signed_receipt: &SignedNontransferableReceipt, + ) -> Result<(), Self::Error> { + self.log_receipt_internal(signed_receipt); + Ok(()) + } + + fn log_receipt_with_new_transaction( + &self, + signed_receipt: &SignedNontransferableReceipt, + ) -> Result<(), Self::Error> { + self.log_receipt_internal(signed_receipt); + Ok(()) + } + + fn get_signed_event( + &self, + said: &SelfAddressingIdentifier, + ) -> Result, Self::Error> { + Ok(self.events.read().unwrap().get(said).cloned()) + } + + fn get_event( + &self, + said: &SelfAddressingIdentifier, + ) -> Result>, Self::Error> { + Ok(self + .events + .read() + .unwrap() + .get(said) + .map(|t| t.signed_event_message.event_message.clone())) + } + + fn get_signatures( + &self, + said: &SelfAddressingIdentifier, + ) -> Result>, Self::Error> { + Ok(self + .signatures + .read() + .unwrap() + .get(said) + .cloned() + .map(|v| v.into_iter())) + } + + fn get_nontrans_couplets( + &self, + said: &SelfAddressingIdentifier, + ) -> Result>, Self::Error> { + Ok(self + .nontrans_couplets + .read() + .unwrap() + .get(said) + .cloned() + .map(|v| v.into_iter())) + } + + fn get_trans_receipts( + &self, + said: &SelfAddressingIdentifier, + ) -> Result, Self::Error> { + Ok(self + .trans_receipts + .read() + .unwrap() + .get(said) + .cloned() + .unwrap_or_default() + .into_iter()) + } + + fn remove_nontrans_receipt( + &self, + _txn_mode: &Self::TransactionType, + said: &SelfAddressingIdentifier, + nontrans: impl IntoIterator, + ) -> Result<(), Self::Error> { + let to_remove: Vec<_> = nontrans.into_iter().collect(); + if let Some(existing) = self.nontrans_couplets.write().unwrap().get_mut(said) { + existing.retain(|n| !to_remove.contains(n)); + } + Ok(()) + } + + fn remove_nontrans_receipt_with_new_transaction( + &self, + said: &SelfAddressingIdentifier, + nontrans: impl IntoIterator, + ) -> Result<(), Self::Error> { + self.remove_nontrans_receipt(&(), said, nontrans) + } +} + +/// In-memory sequenced event database for escrow storage. +pub struct MemorySequencedEventDb { + data: RwLock>>, +} + +impl MemorySequencedEventDb { + pub fn new() -> Self { + Self { + data: RwLock::new(HashMap::new()), + } + } +} + +impl SequencedEventDatabase for MemorySequencedEventDb { + type DatabaseType = (); + type Error = Error; + type DigestIter = Box>; + + fn new(_db: Arc, _table_name: &'static str) -> Result { + Ok(Self::new()) + } + + fn insert( + &self, + identifier: &IdentifierPrefix, + sn: u64, + digest: &SelfAddressingIdentifier, + ) -> Result<(), Self::Error> { + self.data + .write() + .unwrap() + .entry((identifier.clone(), sn)) + .or_default() + .push(digest.clone()); + Ok(()) + } + + fn get( + &self, + identifier: &IdentifierPrefix, + sn: u64, + ) -> Result { + let data = self.data.read().unwrap(); + let items = data + .get(&(identifier.clone(), sn)) + .cloned() + .unwrap_or_default(); + Ok(Box::new(items.into_iter())) + } + + fn get_greater_than( + &self, + identifier: &IdentifierPrefix, + sn: u64, + ) -> Result { + let data = self.data.read().unwrap(); + let items: Vec<_> = data + .iter() + .filter(|((id, s), _)| id == identifier && *s >= sn) + .flat_map(|(_, v)| v.clone()) + .collect(); + Ok(Box::new(items.into_iter())) + } + + fn remove( + &self, + identifier: &IdentifierPrefix, + sn: u64, + said: &SelfAddressingIdentifier, + ) -> Result<(), Self::Error> { + if let Some(v) = self.data.write().unwrap().get_mut(&(identifier.clone(), sn)) { + v.retain(|d| d != said); + } + Ok(()) + } +} + +/// In-memory escrow database. +pub struct MemoryEscrowDb { + sequenced: Arc, + log: Arc, +} + +impl EscrowDatabase for MemoryEscrowDb { + type EscrowDatabaseType = (); + type LogDatabaseType = MemoryLogDatabase; + type Error = Error; + type EventIter = std::vec::IntoIter; + + fn new( + _escrow: Arc< + dyn SequencedEventDatabase< + DatabaseType = Self::EscrowDatabaseType, + Error = Self::Error, + DigestIter = Box>, + >, + >, + log: Arc, + ) -> Self { + // We won't use this constructor in practice; use from_parts instead + Self { + sequenced: Arc::new(MemorySequencedEventDb::new()), + log, + } + } + + fn save_digest( + &self, + id: &IdentifierPrefix, + sn: u64, + event_digest: &SelfAddressingIdentifier, + ) -> Result<(), Self::Error> { + self.sequenced.insert(id, sn, event_digest) + } + + fn insert(&self, event: &SignedEventMessage) -> Result<(), Self::Error> { + let digest = event.event_message.digest()?; + let sn = event.event_message.data.get_sn(); + let id = event.event_message.data.get_prefix(); + self.sequenced.insert(&id, sn, &digest)?; + self.log.log_event_internal(event); + Ok(()) + } + + fn insert_key_value( + &self, + id: &IdentifierPrefix, + sn: u64, + event: &SignedEventMessage, + ) -> Result<(), Self::Error> { + let digest = event.event_message.digest()?; + self.sequenced.insert(id, sn, &digest)?; + self.log.log_event_internal(event); + Ok(()) + } + + fn get( + &self, + identifier: &IdentifierPrefix, + sn: u64, + ) -> Result { + let digests = self.sequenced.get(identifier, sn)?; + let events: Vec<_> = digests + .filter_map(|d| { + self.log + .get_signed_event(&d) + .ok() + .flatten() + .map(|t| t.signed_event_message) + }) + .collect(); + Ok(events.into_iter()) + } + + fn get_from_sn( + &self, + identifier: &IdentifierPrefix, + sn: u64, + ) -> Result { + let digests = self.sequenced.get_greater_than(identifier, sn)?; + let events: Vec<_> = digests + .filter_map(|d| { + self.log + .get_signed_event(&d) + .ok() + .flatten() + .map(|t| t.signed_event_message) + }) + .collect(); + Ok(events.into_iter()) + } + + fn remove(&self, event: &KeriEvent) { + if let Ok(digest) = event.digest() { + let sn = event.data.get_sn(); + let id = event.data.get_prefix(); + let _ = self.sequenced.remove(&id, sn, &digest); + } + } + + fn contains( + &self, + id: &IdentifierPrefix, + sn: u64, + digest: &SelfAddressingIdentifier, + ) -> Result { + let digests = self.sequenced.get(id, sn)?; + Ok(digests.collect::>().contains(digest)) + } +} + +impl EscrowCreator for MemoryDatabase { + type EscrowDatabaseType = MemoryEscrowDb; + + fn create_escrow_db(&self, table_name: &'static str) -> Self::EscrowDatabaseType { + let seq = Arc::new(MemorySequencedEventDb::new()); + self.escrow_db + .write() + .unwrap() + .insert(table_name, seq.clone()); + MemoryEscrowDb { + sequenced: seq, + log: self.log_db.clone(), + } + } +} + +#[cfg(test)] +mod tests { + use std::{convert::TryFrom, sync::Arc}; + + use cesrox::parse; + + use super::MemoryDatabase; + use crate::{ + error::Error, + event_message::signed_event_message::{Message, Notice}, + processor::{ + basic_processor::BasicProcessor, event_storage::EventStorage, Processor, + }, + }; + + #[test] + fn test_memory_db_process_icp() -> Result<(), Error> { + let db = Arc::new(MemoryDatabase::new()); + let processor = BasicProcessor::new(db.clone(), None); + let storage = EventStorage::new(db.clone()); + + // Inception event from keripy test_multisig_digprefix + let icp_raw = br#"{"v":"KERI10JSON0001e7_","t":"icp","d":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","i":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","s":"0","kt":"2","k":["DErocgXD2RGSyvn3MObcx59jeOsEQhv2TqHirVkzrp0Q","DFXLiTjiRdSBPLL6hLa0rskIxk3dh4XwJLfctkJFLRSS","DE9YgIQVgpLwocTVrG8tidKScsQSMWwLWywNC48fhq4f"],"nt":"2","n":["EDJk5EEpC4-tQ7YDwBiKbpaZahh1QCyQOnZRF7p2i8k8","EAXfDjKvUFRj-IEB_o4y-Y_qeJAjYfZtOMD9e7vHNFss","EN8l6yJC2PxribTN0xfri6bLz34Qvj-x3cNwcV3DvT2m"],"bt":"0","b":[],"c":[],"a":[]}-AADAAD4SyJSYlsQG22MGXzRGz2PTMqpkgOyUfq7cS99sC2BCWwdVmEMKiTEeWe5kv-l_d9auxdadQuArLtAGEArW8wEABD0z_vQmFImZXfdR-0lclcpZFfkJJJNXDcUNrf7a-mGsxNLprJo-LROwDkH5m7tVrb-a1jcor2dHD9Jez-r4bQIACBFeU05ywfZycLdR0FxCvAR9BfV9im8tWe1DglezqJLf-vHRQSChY1KafbYNc96hYYpbuN90WzuCRMgV8KgRsEC"#; + let parsed = parse(icp_raw).unwrap().1; + let deserialized_icp = Message::try_from(parsed).unwrap(); + + let id = match &deserialized_icp { + Message::Notice(Notice::Event(e)) => e.event_message.data.get_prefix(), + _ => panic!("unexpected message type"), + }; + + // Process inception event + processor.process(&deserialized_icp)?; + + // Verify state was created + let state = storage.get_state(&id); + assert!(state.is_some()); + let state = state.unwrap(); + assert_eq!(state.sn, 0); + assert_eq!(state.current.public_keys.len(), 3); + + // Verify KEL has the event + let kel = storage.get_kel_messages(&id)?; + assert!(kel.is_some()); + assert_eq!(kel.unwrap().len(), 1); + + Ok(()) + } +} diff --git a/keriox_core/src/database/mod.rs b/keriox_core/src/database/mod.rs index b667d134..432a840b 100644 --- a/keriox_core/src/database/mod.rs +++ b/keriox_core/src/database/mod.rs @@ -19,7 +19,10 @@ use crate::{ #[cfg(feature = "mailbox")] pub mod mailbox; +pub mod memory; +#[cfg(feature = "storage-redb")] pub mod redb; +pub(crate) mod rkyv_adapter; pub mod timestamped; pub enum QueryParameters<'a> { diff --git a/keriox_core/src/database/redb/mod.rs b/keriox_core/src/database/redb/mod.rs index cbebe810..bc2dca8a 100644 --- a/keriox_core/src/database/redb/mod.rs +++ b/keriox_core/src/database/redb/mod.rs @@ -2,7 +2,7 @@ pub mod escrow_database; #[cfg(feature = "query")] pub(crate) mod ksn_log; pub mod loging; -pub(crate) mod rkyv_adapter; +pub(crate) use super::rkyv_adapter; /// Kel storage. (identifier, sn) -> event digest /// The `KELS` table links an identifier and sequence number to the digest of an event, diff --git a/keriox_core/src/database/rkyv_adapter/mod.rs b/keriox_core/src/database/rkyv_adapter/mod.rs new file mode 100644 index 00000000..e721e9d4 --- /dev/null +++ b/keriox_core/src/database/rkyv_adapter/mod.rs @@ -0,0 +1,58 @@ +use rkyv::{util::AlignedVec, with::With}; +use said::SelfAddressingIdentifier; +use said_wrapper::{ArchivedSAIDef, SAIDef}; + +use crate::{ + event::sections::seal::{ArchivedSourceSeal, SourceSeal}, + event_message::signature::{ + ArchivedNontransferable, ArchivedTransferable, Nontransferable, Transferable, + }, + prefix::{attached_signature::ArchivedIndexedSignature, IndexedSignature}, + state::IdentifierState, +}; + +pub(crate) mod said_wrapper; +pub(crate) mod serialization_info_wrapper; + +pub fn serialize_said(said: &SelfAddressingIdentifier) -> Result { + Ok(rkyv::to_bytes( + With::::cast(said), + )?) +} + +pub fn deserialize_said(bytes: &[u8]) -> Result { + let archived: &ArchivedSAIDef = rkyv::access(&bytes)?; + let deserialized: SelfAddressingIdentifier = + rkyv::deserialize(With::::cast(archived))?; + Ok(deserialized) +} + +pub fn deserialize_nontransferable(bytes: &[u8]) -> Result { + let archived = rkyv::access::(&bytes).unwrap(); + rkyv::deserialize::(archived) +} + +pub fn deserialize_transferable(bytes: &[u8]) -> Result { + let archived = rkyv::access::(&bytes).unwrap(); + rkyv::deserialize::(archived) +} + +pub fn deserialize_indexed_signatures( + bytes: &[u8], +) -> Result { + let archived = rkyv::access::(&bytes).unwrap(); + rkyv::deserialize::(archived) +} + +pub fn deserialize_source_seal(bytes: &[u8]) -> Result { + let archived = rkyv::access::(&bytes).unwrap(); + rkyv::deserialize::(archived) +} + +pub fn deserialize_identifier_state(bytes: &[u8]) -> Result { + let mut aligned_bytes = + AlignedVec::<{ std::mem::align_of::() }>::with_capacity(bytes.len()); + aligned_bytes.extend_from_slice(bytes); + + rkyv::from_bytes::(&aligned_bytes) +} diff --git a/keriox_core/src/database/rkyv_adapter/said_wrapper.rs b/keriox_core/src/database/rkyv_adapter/said_wrapper.rs new file mode 100644 index 00000000..51bf63c4 --- /dev/null +++ b/keriox_core/src/database/rkyv_adapter/said_wrapper.rs @@ -0,0 +1,141 @@ +use said::{ + derivation::{HashFunction, HashFunctionCode}, + SelfAddressingIdentifier, +}; + +use rkyv::{Archive, Deserialize, Serialize}; + +#[derive( + Debug, Clone, Default, Eq, Hash, Archive, rkyv::Serialize, rkyv::Deserialize, PartialEq, +)] +#[rkyv(derive(Debug))] +pub struct SaidValue { + #[rkyv(with = SAIDef)] + pub said: SelfAddressingIdentifier, +} + +impl From for SaidValue { + fn from(value: SelfAddressingIdentifier) -> Self { + Self { said: value } + } +} + +impl From for SelfAddressingIdentifier { + fn from(value: SaidValue) -> Self { + value.said + } +} + +impl serde::Serialize for SaidValue { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.said.serialize(serializer) + } +} + +impl<'de> serde::Deserialize<'de> for SaidValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + SelfAddressingIdentifier::deserialize(deserializer).map(|said| SaidValue { said }) + } +} + +#[derive(Archive, Serialize, Deserialize)] +#[rkyv(remote = SelfAddressingIdentifier)] +#[rkyv(derive(Debug))] +pub(crate) struct SAIDef { + #[rkyv(with = HashFunctionDef)] + pub derivation: HashFunction, + pub digest: Vec, +} + +impl From for SelfAddressingIdentifier { + fn from(value: SAIDef) -> Self { + Self::new(value.derivation, value.digest) + } +} + +#[derive(Archive, Serialize, Deserialize, PartialEq)] +#[rkyv(remote = HashFunction)] +#[rkyv(derive(Debug))] +struct HashFunctionDef { + #[rkyv(getter = HashFunctionDef::get_code, with = HashFunctionCodeDef)] + pub f: HashFunctionCode, +} + +impl HashFunctionDef { + fn get_code(foo: &HashFunction) -> HashFunctionCode { + foo.into() + } +} + +impl From for HashFunction { + fn from(value: HashFunctionDef) -> Self { + value.f.into() + } +} + +#[derive(Archive, Serialize, Deserialize, PartialEq)] +#[rkyv(remote = HashFunctionCode)] +#[rkyv(compare(PartialEq), derive(Debug))] +enum HashFunctionCodeDef { + Blake3_256, + Blake2B256(Vec), + Blake2S256(Vec), + SHA3_256, + SHA2_256, + Blake3_512, + SHA3_512, + Blake2B512, + SHA2_512, +} + +impl From for HashFunctionCode { + fn from(value: HashFunctionCodeDef) -> Self { + match value { + HashFunctionCodeDef::Blake3_256 => HashFunctionCode::Blake3_256, + HashFunctionCodeDef::Blake2B256(vec) => HashFunctionCode::Blake2B256(vec), + HashFunctionCodeDef::Blake2S256(vec) => HashFunctionCode::Blake2S256(vec), + HashFunctionCodeDef::SHA3_256 => HashFunctionCode::SHA3_256, + HashFunctionCodeDef::SHA2_256 => HashFunctionCode::SHA2_256, + HashFunctionCodeDef::Blake3_512 => HashFunctionCode::Blake3_512, + HashFunctionCodeDef::SHA3_512 => HashFunctionCode::SHA3_512, + HashFunctionCodeDef::Blake2B512 => HashFunctionCode::Blake2B512, + HashFunctionCodeDef::SHA2_512 => HashFunctionCode::SHA2_512, + } + } +} + +#[derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)] +// #[rkyv( +// compare(PartialEq), +// derive(Debug), +// )] +struct OptionalSaid { + value: SaidValue, +} + +#[test] +fn test_rkyv_said_serialization() -> Result<(), rkyv::rancor::Failure> { + use rkyv::with::With; + let value: SelfAddressingIdentifier = "EJe_sKQb1otKrz6COIL8VFvBv3DEFvtKaVFGn1vm0IlL" + .parse() + .unwrap(); + + let bytes = rkyv::to_bytes(With::::cast(&value))?; + dbg!(&bytes); + let archived: &ArchivedSAIDef = rkyv::access(&bytes)?; + + let deserialized: SelfAddressingIdentifier = + rkyv::deserialize(With::::cast(archived))?; + + // let des = rkyv_adapter::deserialize_element::(&bytes); + + assert_eq!(value, deserialized); + + Ok(()) +} diff --git a/keriox_core/src/database/rkyv_adapter/serialization_info_wrapper.rs b/keriox_core/src/database/rkyv_adapter/serialization_info_wrapper.rs new file mode 100644 index 00000000..333c3138 --- /dev/null +++ b/keriox_core/src/database/rkyv_adapter/serialization_info_wrapper.rs @@ -0,0 +1,58 @@ +use said::{sad::SerializationFormats, version::SerializationInfo}; + +#[derive( + Debug, + Clone, + serde::Serialize, + serde::Deserialize, + Default, + rkyv::Archive, + rkyv::Serialize, + rkyv::Deserialize, + PartialEq, +)] +pub(crate) struct SerializationInfoValue { + #[rkyv(with = SerializationInfoDef)] + info: SerializationInfo, +} + +#[derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize, PartialEq)] +#[rkyv(remote = SerializationInfo)] +pub(crate) struct SerializationInfoDef { + pub protocol_code: String, + pub major_version: u8, + pub minor_version: u8, + pub size: usize, + #[rkyv(with = SerializationFormatsDef)] + pub kind: SerializationFormats, +} + +impl From for SerializationInfo { + fn from(value: SerializationInfoDef) -> Self { + SerializationInfo { + protocol_code: value.protocol_code, + major_version: value.major_version, + minor_version: value.minor_version, + size: value.size, + kind: value.kind, + } + } +} + +#[derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize, PartialEq)] +#[rkyv(remote = SerializationFormats)] +pub enum SerializationFormatsDef { + JSON, + MGPK, + CBOR, +} + +impl From for SerializationFormats { + fn from(value: SerializationFormatsDef) -> Self { + match value { + SerializationFormatsDef::JSON => Self::JSON, + SerializationFormatsDef::MGPK => Self::MGPK, + SerializationFormatsDef::CBOR => Self::CBOR, + } + } +} diff --git a/keriox_core/src/error/mod.rs b/keriox_core/src/error/mod.rs index f44594c9..52fbaf09 100644 --- a/keriox_core/src/error/mod.rs +++ b/keriox_core/src/error/mod.rs @@ -2,10 +2,11 @@ use said::version::error::Error as VersionError; use serde::{Deserialize, Serialize}; use thiserror::Error; +#[cfg(feature = "storage-redb")] +use crate::database::redb::RedbError; use crate::{ - database::redb::RedbError, event::sections::key_config::SignatureError, - event_message::cesr_adapter::ParseError, prefix::IdentifierPrefix, - processor::validator::VerificationError, + event::sections::key_config::SignatureError, event_message::cesr_adapter::ParseError, + prefix::IdentifierPrefix, processor::validator::VerificationError, }; pub mod serializer_error; @@ -128,6 +129,7 @@ impl From for Error { } } +#[cfg(feature = "storage-redb")] impl From for Error { fn from(_: RedbError) -> Self { Error::DbError diff --git a/keriox_core/src/event/event_data/interaction.rs b/keriox_core/src/event/event_data/interaction.rs index 9d6169dd..761c2a10 100644 --- a/keriox_core/src/event/event_data/interaction.rs +++ b/keriox_core/src/event/event_data/interaction.rs @@ -1,5 +1,5 @@ use super::super::sections::seal::*; -use crate::database::redb::rkyv_adapter::said_wrapper::SaidValue; +use crate::database::rkyv_adapter::said_wrapper::SaidValue; use crate::error::Error; use crate::state::{EventSemantics, IdentifierState}; use said::SelfAddressingIdentifier; diff --git a/keriox_core/src/event/event_data/rotation.rs b/keriox_core/src/event/event_data/rotation.rs index e42fac61..d91e9218 100644 --- a/keriox_core/src/event/event_data/rotation.rs +++ b/keriox_core/src/event/event_data/rotation.rs @@ -1,6 +1,6 @@ use super::super::sections::{seal::*, KeyConfig, RotationWitnessConfig}; use crate::{ - database::redb::rkyv_adapter::said_wrapper::SaidValue, + database::rkyv_adapter::said_wrapper::SaidValue, error::Error, prefix::BasicPrefix, state::{EventSemantics, IdentifierState, LastEstablishmentData, WitnessConfig}, diff --git a/keriox_core/src/event/sections/key_config.rs b/keriox_core/src/event/sections/key_config.rs index 0a4d662f..9d63105a 100644 --- a/keriox_core/src/event/sections/key_config.rs +++ b/keriox_core/src/event/sections/key_config.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use super::threshold::SignatureThreshold; use crate::{ - database::redb::rkyv_adapter::said_wrapper::SaidValue, + database::rkyv_adapter::said_wrapper::SaidValue, prefix::{attached_signature::Index, BasicPrefix, IndexedSignature}, }; diff --git a/keriox_core/src/event/sections/seal.rs b/keriox_core/src/event/sections/seal.rs index 445c15b5..e918ce35 100644 --- a/keriox_core/src/event/sections/seal.rs +++ b/keriox_core/src/event/sections/seal.rs @@ -1,6 +1,6 @@ use std::fmt::{self, Display}; -use crate::{database::redb::rkyv_adapter::said_wrapper::SaidValue, prefix::IdentifierPrefix}; +use crate::{database::rkyv_adapter::said_wrapper::SaidValue, prefix::IdentifierPrefix}; use said::SelfAddressingIdentifier; use serde::{Deserialize, Serialize}; use serde_hex::{Compact, SerHex}; diff --git a/keriox_core/src/event_message/msg.rs b/keriox_core/src/event_message/msg.rs index a8eb4436..64198564 100644 --- a/keriox_core/src/event_message/msg.rs +++ b/keriox_core/src/event_message/msg.rs @@ -5,8 +5,8 @@ use said::{ use serde::{Deserialize, Serialize}; use super::{EventTypeTag, Typeable}; -use crate::database::redb::rkyv_adapter::said_wrapper::SaidValue; -use crate::database::redb::rkyv_adapter::serialization_info_wrapper::SerializationInfoDef; +use crate::database::rkyv_adapter::said_wrapper::SaidValue; +use crate::database::rkyv_adapter::serialization_info_wrapper::SerializationInfoDef; use crate::error::Error; pub type KeriEvent = TypedEvent; diff --git a/keriox_core/src/prefix/mod.rs b/keriox_core/src/prefix/mod.rs index a14826dd..4b6983ec 100644 --- a/keriox_core/src/prefix/mod.rs +++ b/keriox_core/src/prefix/mod.rs @@ -1,5 +1,5 @@ use crate::{ - database::redb::rkyv_adapter::said_wrapper::SaidValue, + database::rkyv_adapter::said_wrapper::SaidValue, event::sections::key_config::SignatureError, }; diff --git a/keriox_core/src/processor/escrow/reply_escrow.rs b/keriox_core/src/processor/escrow/reply_escrow.rs index 2dc9be4b..0300fba3 100644 --- a/keriox_core/src/processor/escrow/reply_escrow.rs +++ b/keriox_core/src/processor/escrow/reply_escrow.rs @@ -2,15 +2,14 @@ use std::sync::Arc; use said::SelfAddressingIdentifier; +#[cfg(feature = "storage-redb")] +use crate::database::redb::{ + escrow_database::SnKeyDatabase, + ksn_log::{AcceptedKsn, KsnLogDatabase}, + RedbDatabase, RedbError, +}; use crate::{ - database::{ - redb::{ - escrow_database::SnKeyDatabase, - ksn_log::{AcceptedKsn, KsnLogDatabase}, - RedbDatabase, RedbError, - }, - EventDatabase, SequencedEventDatabase, - }, + database::{EventDatabase, SequencedEventDatabase}, error::Error, prefix::IdentifierPrefix, processor::{ @@ -20,6 +19,7 @@ use crate::{ query::reply_event::{ReplyEvent, ReplyRoute, SignedReply}, }; +#[cfg(feature = "storage-redb")] #[derive(Clone)] pub struct ReplyEscrow { events_db: Arc, @@ -27,6 +27,7 @@ pub struct ReplyEscrow { escrowed_reply: Arc, } +#[cfg(feature = "storage-redb")] impl ReplyEscrow { pub fn new(events_db: Arc) -> Self { let acc = Arc::new(AcceptedKsn::new(events_db.db.clone()).unwrap()); @@ -41,6 +42,7 @@ impl ReplyEscrow { } } } +#[cfg(feature = "storage-redb")] impl Notifier for ReplyEscrow { fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> { match notification { @@ -63,6 +65,7 @@ impl Notifier for ReplyEscrow { } } +#[cfg(feature = "storage-redb")] impl ReplyEscrow { pub fn process_reply_escrow( &self, @@ -100,11 +103,13 @@ impl ReplyEscrow { } } +#[cfg(feature = "storage-redb")] pub struct SnKeyReplyEscrow { escrow: Arc, log: Arc, } +#[cfg(feature = "storage-redb")] impl SnKeyReplyEscrow { pub(crate) fn new(escrow: Arc, log: Arc) -> Self { Self { escrow, log } diff --git a/keriox_core/src/processor/event_storage.rs b/keriox_core/src/processor/event_storage.rs index 666afd48..dea15067 100644 --- a/keriox_core/src/processor/event_storage.rs +++ b/keriox_core/src/processor/event_storage.rs @@ -20,7 +20,7 @@ use crate::{ }; #[cfg(feature = "mailbox")] use crate::{ - database::{mailbox::MailboxData, redb::RedbDatabase}, + database::mailbox::MailboxData, query::mailbox::QueryArgsMbx, }; use crate::{ @@ -37,27 +37,37 @@ use crate::mailbox::MailboxResponse; pub struct EventStorage { pub events_db: Arc, #[cfg(feature = "mailbox")] - pub mailbox_data: MailboxData, + pub mailbox_data: Option, } -impl EventStorage { +impl EventStorage { pub fn new(events_db: Arc) -> Self { - #[cfg(feature = "mailbox")] - { - if let Some(redb_db) = - (events_db.as_ref() as &dyn std::any::Any).downcast_ref::() - { - let mailbox_data = MailboxData::new(redb_db.db.clone()).unwrap(); - Self { - events_db, - mailbox_data, - } - } else { - panic!("Expected RedbDatabase for mailbox feature"); - } + Self { + events_db, + #[cfg(feature = "mailbox")] + mailbox_data: None, + } + } +} + +#[cfg(feature = "mailbox")] +impl EventStorage { + pub fn new_redb(events_db: Arc) -> Self { + let mailbox_data = MailboxData::new(events_db.db.clone()).unwrap(); + Self { + events_db, + mailbox_data: Some(mailbox_data), + } + } +} + +#[cfg(feature = "mailbox")] +impl EventStorage { + pub fn new_with_mailbox(events_db: Arc, mailbox_data: MailboxData) -> Self { + Self { + events_db, + mailbox_data: Some(mailbox_data), } - #[cfg(not(feature = "mailbox"))] - Self { events_db } } } @@ -173,13 +183,20 @@ impl EventStorage { } } + #[cfg(feature = "mailbox")] + fn mailbox(&self) -> Result<&MailboxData, Error> { + self.mailbox_data + .as_ref() + .ok_or_else(|| Error::SemanticError("Mailbox not initialized".into())) + } + #[cfg(feature = "mailbox")] pub fn add_mailbox_multisig( &self, receipient: &IdentifierPrefix, to_forward: SignedEventMessage, ) -> Result<(), Error> { - self.mailbox_data + self.mailbox()? .add_mailbox_multisig(receipient, to_forward)?; Ok(()) @@ -191,7 +208,7 @@ impl EventStorage { receipient: &IdentifierPrefix, to_forward: SignedEventMessage, ) -> Result<(), Error> { - self.mailbox_data + self.mailbox()? .add_mailbox_delegate(receipient, to_forward)?; Ok(()) @@ -200,7 +217,7 @@ impl EventStorage { #[cfg(feature = "mailbox")] pub fn add_mailbox_receipt(&self, receipt: SignedNontransferableReceipt) -> Result<(), Error> { let id = receipt.body.prefix.clone(); - self.mailbox_data.add_mailbox_receipt(&id, receipt)?; + self.mailbox()?.add_mailbox_receipt(&id, receipt)?; Ok(()) } @@ -208,34 +225,32 @@ impl EventStorage { #[cfg(feature = "mailbox")] pub fn add_mailbox_reply(&self, reply: SignedEventMessage) -> Result<(), Error> { let id = reply.event_message.data.get_prefix(); - self.mailbox_data.add_mailbox_reply(&id, reply)?; + self.mailbox()?.add_mailbox_reply(&id, reply)?; Ok(()) } #[cfg(feature = "mailbox")] pub fn get_mailbox_messages(&self, args: &QueryArgsMbx) -> Result { + let mailbox = self.mailbox()?; let id = args.i.clone(); // query receipts - let receipt = match self - .mailbox_data + let receipt = match mailbox .get_mailbox_receipts(&id, args.topics.receipt as u64) { Some(receipts) => receipts.collect(), None => vec![], }; - let multisig = match self - .mailbox_data + let multisig = match mailbox .get_mailbox_multisig(&id, args.topics.multisig as u64) { Some(multisig) => multisig.collect(), None => vec![], }; - let delegate = match self - .mailbox_data + let delegate = match mailbox .get_mailbox_delegate(&id, args.topics.delegate as u64) { Some(delegate) => delegate.collect(), diff --git a/keriox_core/src/processor/validator.rs b/keriox_core/src/processor/validator.rs index 5982ee1a..140bbe81 100644 --- a/keriox_core/src/processor/validator.rs +++ b/keriox_core/src/processor/validator.rs @@ -60,7 +60,7 @@ pub struct EventValidator { event_storage: EventStorage, } -impl EventValidator { +impl EventValidator { pub fn new(event_database: Arc) -> Self { Self { event_storage: EventStorage::new(event_database), diff --git a/keriox_core/src/state/mod.rs b/keriox_core/src/state/mod.rs index d687d9a2..e0eefd15 100644 --- a/keriox_core/src/state/mod.rs +++ b/keriox_core/src/state/mod.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use crate::{ - database::redb::rkyv_adapter::said_wrapper::SaidValue, + database::rkyv_adapter::said_wrapper::SaidValue, error::Error, event::{ event_data::EventData, diff --git a/support/teliox/Cargo.toml b/support/teliox/Cargo.toml index b2e92f3c..3b208cbe 100644 --- a/support/teliox/Cargo.toml +++ b/support/teliox/Cargo.toml @@ -9,6 +9,10 @@ repository.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["storage-redb"] +storage-redb = ["keri-core/storage-redb", "redb"] + [dependencies] keri-core = {path = "../../keriox_core", version= "0.17.9", features = ["query"]} said = { version = "0.4.0" } @@ -21,7 +25,7 @@ serde-hex = "0.1" chrono = { version = "0.4.18", features = ["serde"] } arrayref = "0.3.6" serde_cbor = "0.11.1" -redb = "2.6.0" +redb = { version = "2.6.0", optional = true } [dev-dependencies] diff --git a/support/teliox/src/database/mod.rs b/support/teliox/src/database/mod.rs index efdcfaab..1db25915 100644 --- a/support/teliox/src/database/mod.rs +++ b/support/teliox/src/database/mod.rs @@ -1,13 +1,12 @@ use crate::{error::Error, event::verifiable_event::VerifiableEvent}; -use ::redb::Database; -use keri_core::{database::redb::WriteTxnMode, prefix::IdentifierPrefix}; +use keri_core::prefix::IdentifierPrefix; +#[cfg(feature = "storage-redb")] use said::SelfAddressingIdentifier; -use std::{ - fs::{create_dir_all, exists}, - path::Path, - sync::Arc, -}; +use std::path::Path; + +#[cfg(feature = "storage-redb")] pub(crate) mod digest_key_database; +#[cfg(feature = "storage-redb")] pub mod redb; pub trait TelEventDatabase { @@ -28,15 +27,23 @@ pub trait TelEventDatabase { ) -> Option>; } +#[cfg(feature = "storage-redb")] pub trait TelLogDatabase { - fn log_event(&self, event: &VerifiableEvent, transaction: &WriteTxnMode) -> Result<(), Error>; + fn log_event( + &self, + event: &VerifiableEvent, + transaction: &keri_core::database::redb::WriteTxnMode, + ) -> Result<(), Error>; fn get(&self, digest: &SelfAddressingIdentifier) -> Result, Error>; } -pub struct EscrowDatabase(pub(crate) Arc); +#[cfg(feature = "storage-redb")] +pub struct EscrowDatabase(pub(crate) std::sync::Arc<::redb::Database>); +#[cfg(feature = "storage-redb")] impl EscrowDatabase { pub fn new(file_path: &Path) -> Result { + use std::fs::{create_dir_all, exists}; // Create file if not exists if !std::fs::exists(file_path).map_err(|e| Error::EscrowDatabaseError(e.to_string()))? { if let Some(parent) = file_path.parent() { @@ -46,8 +53,8 @@ impl EscrowDatabase { } } } - let db = - Database::create(file_path).map_err(|e| Error::EscrowDatabaseError(e.to_string()))?; - Ok(Self(Arc::new(db))) + let db = ::redb::Database::create(file_path) + .map_err(|e| Error::EscrowDatabaseError(e.to_string()))?; + Ok(Self(std::sync::Arc::new(db))) } } diff --git a/support/teliox/src/error.rs b/support/teliox/src/error.rs index 678aa1fc..5d5716c8 100644 --- a/support/teliox/src/error.rs +++ b/support/teliox/src/error.rs @@ -44,24 +44,28 @@ pub enum Error { RwLockingError, } +#[cfg(feature = "storage-redb")] impl From for Error { fn from(_: redb::TransactionError) -> Self { Error::RedbError } } +#[cfg(feature = "storage-redb")] impl From for Error { fn from(_: redb::TableError) -> Self { Error::RedbError } } +#[cfg(feature = "storage-redb")] impl From for Error { fn from(_: redb::CommitError) -> Self { Error::RedbError } } +#[cfg(feature = "storage-redb")] impl From for Error { fn from(_: redb::StorageError) -> Self { Error::RedbError diff --git a/support/teliox/src/processor/escrow/missing_issuer.rs b/support/teliox/src/processor/escrow/missing_issuer.rs index 38debb07..727c3245 100644 --- a/support/teliox/src/processor/escrow/missing_issuer.rs +++ b/support/teliox/src/processor/escrow/missing_issuer.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; use keri_core::{ - database::redb::{RedbDatabase, WriteTxnMode}, + database::{redb::WriteTxnMode, EventDatabase}, processor::{ event_storage::EventStorage, notification::{Notification, NotificationBus, Notifier}, @@ -22,19 +22,19 @@ use crate::{ }, }; -pub struct MissingIssuerEscrow { - kel_reference: Arc>, +pub struct MissingIssuerEscrow { + kel_reference: Arc>, tel_reference: Arc>, publisher: TelNotificationBus, escrowed_missing_issuer: DigestKeyDatabase, } -impl MissingIssuerEscrow { +impl MissingIssuerEscrow { pub fn new( db: Arc, escrow_db: &EscrowDatabase, duration: Duration, - kel_reference: Arc>, + kel_reference: Arc>, bus: TelNotificationBus, ) -> Self { let escrow = DigestKeyDatabase::new(escrow_db.0.clone(), "missing_issuer_escrow"); @@ -48,7 +48,7 @@ impl MissingIssuerEscrow { } } } -impl Notifier for MissingIssuerEscrow { +impl Notifier for MissingIssuerEscrow { fn notify( &self, notification: &Notification, @@ -71,7 +71,7 @@ impl Notifier for MissingIssuerEscrow { } } -impl TelNotifier for MissingIssuerEscrow { +impl TelNotifier for MissingIssuerEscrow { fn notify( &self, notification: &TelNotification, @@ -93,7 +93,7 @@ impl TelNotifier for MissingIssuerEscrow MissingIssuerEscrow { +impl MissingIssuerEscrow { /// Reprocess escrowed events that need issuer event of given digest for acceptance. pub fn process_missing_issuer_escrow( &self, diff --git a/support/teliox/src/processor/escrow/missing_registry.rs b/support/teliox/src/processor/escrow/missing_registry.rs index dda3ee91..a13499a5 100644 --- a/support/teliox/src/processor/escrow/missing_registry.rs +++ b/support/teliox/src/processor/escrow/missing_registry.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; use keri_core::{ - database::redb::{RedbDatabase, WriteTxnMode}, + database::{redb::WriteTxnMode, EventDatabase}, prefix::IdentifierPrefix, processor::event_storage::EventStorage, }; @@ -18,17 +18,17 @@ use crate::{ }, }; -pub struct MissingRegistryEscrow { +pub struct MissingRegistryEscrow { tel_reference: Arc>, - kel_reference: Arc>, + kel_reference: Arc>, // Key is the registry id, value is the escrowed tel events digests escrowed_missing_registry: DigestKeyDatabase, } -impl MissingRegistryEscrow { +impl MissingRegistryEscrow { pub fn new( tel_reference: Arc, - kel_reference: Arc>, + kel_reference: Arc>, escrow_db: &EscrowDatabase, duration: Duration, ) -> Self { @@ -42,7 +42,7 @@ impl MissingRegistryEscrow { } } -impl TelNotifier for MissingRegistryEscrow { +impl TelNotifier for MissingRegistryEscrow { fn notify( &self, notification: &TelNotification, @@ -69,7 +69,7 @@ impl TelNotifier for MissingRegistryEscrow } } -impl MissingRegistryEscrow { +impl MissingRegistryEscrow { pub fn process_missing_registry( &self, bus: &TelNotificationBus, diff --git a/support/teliox/src/processor/escrow/mod.rs b/support/teliox/src/processor/escrow/mod.rs index 36424a61..a5cc5f1a 100644 --- a/support/teliox/src/processor/escrow/mod.rs +++ b/support/teliox/src/processor/escrow/mod.rs @@ -1,9 +1,9 @@ use std::{sync::Arc, time::Duration}; -use keri_core::{database::redb::RedbDatabase, processor::event_storage::EventStorage}; +use keri_core::{database::EventDatabase, processor::event_storage::EventStorage}; use crate::{ - database::{redb::RedbTelDatabase, EscrowDatabase}, + database::{EscrowDatabase, TelEventDatabase, TelLogDatabase}, error::Error, processor::notification::TelNotificationKind, }; @@ -19,16 +19,16 @@ pub mod missing_issuer; pub mod missing_registry; pub mod out_of_order; -pub fn default_escrow_bus( - tel_storage: Arc, - kel_storage: Arc>, +pub fn default_escrow_bus( + tel_storage: Arc, + kel_storage: Arc>, tel_escrow_db: EscrowDatabase, ) -> Result< ( TelNotificationBus, - Arc>, - Arc>, - Arc>, + Arc>, + Arc>, + Arc>, ), Error, > { diff --git a/support/teliox/src/processor/escrow/out_of_order.rs b/support/teliox/src/processor/escrow/out_of_order.rs index 44f0650c..83d73e77 100644 --- a/support/teliox/src/processor/escrow/out_of_order.rs +++ b/support/teliox/src/processor/escrow/out_of_order.rs @@ -2,8 +2,8 @@ use std::{sync::Arc, time::Duration}; use keri_core::{ database::{ - redb::{escrow_database::SnKeyDatabase, RedbDatabase, WriteTxnMode}, - SequencedEventDatabase, + redb::{escrow_database::SnKeyDatabase, WriteTxnMode}, + EventDatabase, SequencedEventDatabase, }, prefix::IdentifierPrefix, processor::event_storage::EventStorage, @@ -19,17 +19,17 @@ use crate::{ }, }; -pub struct OutOfOrderEscrow { +pub struct OutOfOrderEscrow { tel_reference: Arc>, - kel_reference: Arc>, + kel_reference: Arc>, tel_log: Arc, escrowed_out_of_order: SnKeyDatabase, } -impl OutOfOrderEscrow { +impl OutOfOrderEscrow { pub fn new( tel_reference: Arc, - kel_reference: Arc>, + kel_reference: Arc>, escrow_db: &EscrowDatabase, duration: Duration, ) -> Self { @@ -44,7 +44,7 @@ impl OutOfOrderEscrow { } } -impl TelNotifier for OutOfOrderEscrow { +impl TelNotifier for OutOfOrderEscrow { fn notify( &self, notification: &TelNotification, @@ -72,7 +72,7 @@ impl TelNotifier for OutOfOrderEscrow { } } -impl OutOfOrderEscrow { +impl OutOfOrderEscrow { pub fn process_out_of_order_events( &self, bus: &TelNotificationBus, diff --git a/support/teliox/src/processor/mod.rs b/support/teliox/src/processor/mod.rs index cf88b7dc..12e10521 100644 --- a/support/teliox/src/processor/mod.rs +++ b/support/teliox/src/processor/mod.rs @@ -15,6 +15,7 @@ use self::{ validator::TelEventValidator, }; +#[cfg(feature = "storage-redb")] pub mod escrow; pub mod notification; pub mod storage; diff --git a/support/teliox/src/tel/mod.rs b/support/teliox/src/tel/mod.rs index 1de31bfc..e63d0acf 100644 --- a/support/teliox/src/tel/mod.rs +++ b/support/teliox/src/tel/mod.rs @@ -12,7 +12,7 @@ use crate::{ state::{vc_state::TelState, ManagerTelState}, }; use keri_core::{ - database::{redb::RedbDatabase, EventDatabase}, prefix::IdentifierPrefix, processor::event_storage::EventStorage, + database::EventDatabase, prefix::IdentifierPrefix, processor::event_storage::EventStorage, }; use said::SelfAddressingIdentifier; From 75c8a505faa75c038f960a860d990950a3b5b3cc Mon Sep 17 00:00:00 2001 From: Joseph Lee Hunsaker Date: Wed, 18 Feb 2026 18:59:23 -0700 Subject: [PATCH 2/2] feat: abstract NotificationBus for swappable dispatch Replace the concrete HashMap-based NotificationBus with a trait-based dispatch architecture. NotificationBus is now a Clone-able wrapper around Arc, enabling alternative notification backends (e.g. SQS for serverless) without adding generic type parameters anywhere in the codebase. - Add NotificationDispatch trait with dispatch() and register_observer() - Extract current HashMap logic into InProcessDispatch (RwLock + OnceLock) - Add NotificationBus::from_dispatch() for custom implementations - Change register_observer from &mut self to &self across all processors - Add RwLockingError variant to keri-core Error enum Co-Authored-By: Claude Opus 4.6 --- components/controller/src/known_events.rs | 2 +- .../watcher/src/watcher/watcher_data.rs | 2 +- components/witness/src/witness.rs | 2 +- components/witness/src/witness_processor.rs | 4 +- keriox_core/src/error/mod.rs | 3 + keriox_core/src/processor/basic_processor.rs | 2 +- .../escrow/maybe_out_of_order_escrow.rs | 2 +- keriox_core/src/processor/escrow/mod.rs | 2 +- .../escrow/partially_signed_escrow.rs | 4 +- .../escrow/partially_witnessed_escrow.rs | 8 +- .../src/processor/escrow/reply_escrow.rs | 2 +- keriox_core/src/processor/escrow_tests.rs | 6 +- keriox_core/src/processor/mod.rs | 4 +- keriox_core/src/processor/notification.rs | 111 +++++++++++++++--- .../src/processor/escrow/missing_issuer.rs | 2 +- 15 files changed, 117 insertions(+), 39 deletions(-) diff --git a/components/controller/src/known_events.rs b/components/controller/src/known_events.rs index 5acfa3bc..2b9d0174 100644 --- a/components/controller/src/known_events.rs +++ b/components/controller/src/known_events.rs @@ -65,7 +65,7 @@ impl KnownEvents { let oobi_manager = OobiManager::new(event_database.clone()); let ( - mut notification_bus, + notification_bus, ( _out_of_order_escrow, _partially_signed_escrow, diff --git a/components/watcher/src/watcher/watcher_data.rs b/components/watcher/src/watcher/watcher_data.rs index 6b89949c..2db44ae0 100644 --- a/components/watcher/src/watcher/watcher_data.rs +++ b/components/watcher/src/watcher/watcher_data.rs @@ -99,7 +99,7 @@ impl WatcherData { let oobi_manager = OobiManager::new(events_db.clone()); - let (mut notification_bus, _) = default_escrow_bus(events_db.clone(), escrow_config); + let (notification_bus, _) = default_escrow_bus(events_db.clone(), escrow_config); let reply_escrow = Arc::new(ReplyEscrow::new(events_db.clone())); notification_bus.register_observer( reply_escrow.clone(), diff --git a/components/witness/src/witness.rs b/components/witness/src/witness.rs index 9b555361..82a47497 100644 --- a/components/witness/src/witness.rs +++ b/components/witness/src/witness.rs @@ -171,7 +171,7 @@ impl Witness { let events_db = Arc::new(RedbDatabase::new(&events_database_path).map_err(|_| Error::DbError)?); - let mut witness_processor = WitnessProcessor::new(events_db.clone(), escrow_config); + let witness_processor = WitnessProcessor::new(events_db.clone(), escrow_config); let event_storage = Arc::new(EventStorage::new_redb(events_db.clone())); let receipt_generator = Arc::new(WitnessReceiptGenerator::new( diff --git a/components/witness/src/witness_processor.rs b/components/witness/src/witness_processor.rs index 88fe9e19..57fb1755 100644 --- a/components/witness/src/witness_processor.rs +++ b/components/witness/src/witness_processor.rs @@ -22,7 +22,7 @@ pub struct WitnessProcessor { impl Processor for WitnessProcessor { type Database = RedbDatabase; fn register_observer( - &mut self, + &self, observer: Arc, notifications: &[JustNotification], ) -> Result<(), Error> { @@ -62,7 +62,7 @@ impl Default for WitnessEscrowConfig { impl WitnessProcessor { pub fn new(redb: Arc, escrow_config: WitnessEscrowConfig) -> Self { - let mut bus = NotificationBus::new(); + let bus = NotificationBus::new(); let partially_signed_escrow = Arc::new(PartiallySignedEscrow::new( redb.clone(), escrow_config.partially_signed_timeout, diff --git a/keriox_core/src/error/mod.rs b/keriox_core/src/error/mod.rs index 52fbaf09..cea964ee 100644 --- a/keriox_core/src/error/mod.rs +++ b/keriox_core/src/error/mod.rs @@ -79,6 +79,9 @@ pub enum Error { #[error("mutex is poisoned")] MutexPoisoned, + #[error("RwLock poisoned")] + RwLockingError, + #[error("Incorrect event digest")] IncorrectDigest, diff --git a/keriox_core/src/processor/basic_processor.rs b/keriox_core/src/processor/basic_processor.rs index 66311249..13931563 100644 --- a/keriox_core/src/processor/basic_processor.rs +++ b/keriox_core/src/processor/basic_processor.rs @@ -18,7 +18,7 @@ pub struct BasicProcessor(EventProcessor); impl Processor for BasicProcessor { type Database = D; fn register_observer( - &mut self, + &self, observer: Arc, notification: &[JustNotification], ) -> Result<(), Error> { diff --git a/keriox_core/src/processor/escrow/maybe_out_of_order_escrow.rs b/keriox_core/src/processor/escrow/maybe_out_of_order_escrow.rs index 74308d96..6b6c3231 100644 --- a/keriox_core/src/processor/escrow/maybe_out_of_order_escrow.rs +++ b/keriox_core/src/processor/escrow/maybe_out_of_order_escrow.rs @@ -111,7 +111,7 @@ fn test_out_of_order() -> Result<(), Error> { let events_db_path = NamedTempFile::new().unwrap(); let redb = RedbDatabase::new(events_db_path.path()).unwrap(); let events_db = Arc::new(redb); - let mut processor = BasicProcessor::new(events_db.clone(), None); + let processor = BasicProcessor::new(events_db.clone(), None); // Register out of order escrow, to save and reprocess out of order events let new_ooo = Arc::new(MaybeOutOfOrderEscrow::new( diff --git a/keriox_core/src/processor/escrow/mod.rs b/keriox_core/src/processor/escrow/mod.rs index f6b9bd2d..80d99e8d 100644 --- a/keriox_core/src/processor/escrow/mod.rs +++ b/keriox_core/src/processor/escrow/mod.rs @@ -54,7 +54,7 @@ pub fn default_escrow_bus( where D: EventDatabase + EscrowCreator + Sync + Send + 'static, { - let mut bus = NotificationBus::new(); + let bus = NotificationBus::new(); // Register out of order escrow, to save and reprocess out of order events let ooo_escrow = Arc::new(MaybeOutOfOrderEscrow::new( diff --git a/keriox_core/src/processor/escrow/partially_signed_escrow.rs b/keriox_core/src/processor/escrow/partially_signed_escrow.rs index 66909e62..6501d4ee 100644 --- a/keriox_core/src/processor/escrow/partially_signed_escrow.rs +++ b/keriox_core/src/processor/escrow/partially_signed_escrow.rs @@ -199,7 +199,7 @@ mod tests { let path = witness_root.path(); let events_db_path = NamedTempFile::new().unwrap(); let events_db = Arc::new(RedbDatabase::new(events_db_path.path()).unwrap()); - let mut processor = BasicProcessor::new(events_db.clone(), None); + let processor = BasicProcessor::new(events_db.clone(), None); // Register out of order escrow, to save and reprocess out of order events let ooo_escrow = Arc::new(MaybeOutOfOrderEscrow::new( @@ -271,7 +271,7 @@ mod tests { std::fs::create_dir_all(path).unwrap(); let events_db_path = NamedTempFile::new().unwrap(); let events_db = Arc::new(RedbDatabase::new(events_db_path.path()).unwrap()); - let mut processor = BasicProcessor::new(events_db.clone(), None); + let processor = BasicProcessor::new(events_db.clone(), None); // Register partially signed escrow, to save and reprocess partially signed events let ps_escrow = Arc::new(PartiallySignedEscrow::new( diff --git a/keriox_core/src/processor/escrow/partially_witnessed_escrow.rs b/keriox_core/src/processor/escrow/partially_witnessed_escrow.rs index 124abe96..b3a6b08a 100644 --- a/keriox_core/src/processor/escrow/partially_witnessed_escrow.rs +++ b/keriox_core/src/processor/escrow/partially_witnessed_escrow.rs @@ -383,7 +383,7 @@ mod tests { let redb = RedbDatabase::new(events_db_path.path()).unwrap(); let log_db = redb.log_db.clone(); let events_db = Arc::new(redb); - let mut event_processor = BasicProcessor::new(events_db.clone(), None); + let event_processor = BasicProcessor::new(events_db.clone(), None); let event_storage = EventStorage::new(Arc::clone(&events_db)); // Register not fully witnessed escrow, to save and reprocess events @@ -528,7 +528,7 @@ mod tests { let redb = RedbDatabase::new(events_db_path.path()).unwrap(); let log_db = redb.log_db.clone(); let events_db = Arc::new(redb); - let mut event_processor = BasicProcessor::new(events_db.clone(), None); + let event_processor = BasicProcessor::new(events_db.clone(), None); let event_storage = EventStorage::new(Arc::clone(&events_db)); // Register not fully witnessed escrow, to save and reprocess events @@ -671,7 +671,7 @@ mod tests { let redb = RedbDatabase::new(events_db_path.path()).unwrap(); let log_db = redb.log_db.clone(); let events_db = Arc::new(redb); - let mut event_processor = BasicProcessor::new(events_db.clone(), None); + let event_processor = BasicProcessor::new(events_db.clone(), None); let event_storage = EventStorage::new(Arc::clone(&events_db)); // Register not fully witnessed escrow, to save and reprocess events @@ -775,7 +775,7 @@ mod tests { let redb = RedbDatabase::new(events_db_path.path()).unwrap(); let log_db = redb.log_db.clone(); let events_db = Arc::new(redb); - let mut event_processor = BasicProcessor::new(events_db.clone(), None); + let event_processor = BasicProcessor::new(events_db.clone(), None); let event_storage = EventStorage::new(Arc::clone(&events_db)); // Register not fully witnessed escrow, to save and reprocess events diff --git a/keriox_core/src/processor/escrow/reply_escrow.rs b/keriox_core/src/processor/escrow/reply_escrow.rs index 0300fba3..889b221f 100644 --- a/keriox_core/src/processor/escrow/reply_escrow.rs +++ b/keriox_core/src/processor/escrow/reply_escrow.rs @@ -196,7 +196,7 @@ mod tests { fs::create_dir_all(root.path()).unwrap(); let events_db_path = NamedTempFile::new().unwrap(); let events_db = Arc::new(RedbDatabase::new(events_db_path.path()).unwrap()); - let mut event_processor = BasicProcessor::new(events_db.clone(), None); + let event_processor = BasicProcessor::new(events_db.clone(), None); let rpy_escrow = Arc::new(ReplyEscrow::new(events_db.clone())); event_processor.register_observer( rpy_escrow.clone(), diff --git a/keriox_core/src/processor/escrow_tests.rs b/keriox_core/src/processor/escrow_tests.rs index 1baa45aa..e1619de9 100644 --- a/keriox_core/src/processor/escrow_tests.rs +++ b/keriox_core/src/processor/escrow_tests.rs @@ -50,7 +50,7 @@ fn test_out_of_order_cleanup() -> Result<(), Error> { let events_db_path = NamedTempFile::new().unwrap(); let redb = RedbDatabase::new(events_db_path.path()).unwrap(); let events_db = Arc::new(redb); - let mut processor = BasicProcessor::new(events_db.clone(), None); + let processor = BasicProcessor::new(events_db.clone(), None); // Register out of order escrow, to save and reprocess out of order events let ooo_escrow = Arc::new(MaybeOutOfOrderEscrow::new( @@ -145,7 +145,7 @@ fn test_partially_sign_escrow_cleanup() -> Result<(), Error> { std::fs::create_dir_all(path).unwrap(); let events_db_path = NamedTempFile::new().unwrap(); let events_db = Arc::new(RedbDatabase::new(events_db_path.path()).unwrap()); - let mut processor = BasicProcessor::new(events_db.clone(), None); + let processor = BasicProcessor::new(events_db.clone(), None); // Register partially signed escrow, to save and reprocess partially signed events let ps_escrow = Arc::new(PartiallySignedEscrow::new( @@ -230,7 +230,7 @@ pub fn test_partially_witnessed_escrow_cleanup() -> Result<(), Error> { let redb = RedbDatabase::new(events_db_path.path()).unwrap(); let log_db = redb.log_db.clone(); let events_db = Arc::new(redb); - let mut event_processor = BasicProcessor::new(events_db.clone(), None); + let event_processor = BasicProcessor::new(events_db.clone(), None); let event_storage = EventStorage::new(Arc::clone(&events_db)); // Register not fully witnessed escrow, to save and reprocess events let partially_witnessed_escrow = Arc::new(PartiallyWitnessedEscrow::new( diff --git a/keriox_core/src/processor/mod.rs b/keriox_core/src/processor/mod.rs index e5d250ff..9c81cf7f 100644 --- a/keriox_core/src/processor/mod.rs +++ b/keriox_core/src/processor/mod.rs @@ -38,7 +38,7 @@ pub trait Processor { fn process_op_reply(&self, reply: &SignedReply) -> Result<(), Error>; fn register_observer( - &mut self, + &self, observer: Arc, notifications: &[JustNotification], ) -> Result<(), Error>; @@ -92,7 +92,7 @@ impl EventProcessor { } pub fn register_observer( - &mut self, + &self, observer: Arc, notifications: Vec, ) -> Result<(), Error> { diff --git a/keriox_core/src/processor/notification.rs b/keriox_core/src/processor/notification.rs index 41e77069..4658214c 100644 --- a/keriox_core/src/processor/notification.rs +++ b/keriox_core/src/processor/notification.rs @@ -1,4 +1,7 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + sync::{Arc, OnceLock, RwLock}, +}; #[cfg(feature = "query")] use crate::query::reply_event::SignedReply; @@ -10,36 +13,108 @@ use crate::{ }, }; +/// Internal dispatch strategy — the swappable part. +/// Implement this trait to change how notifications are delivered +/// (e.g. in-process HashMap, SQS queue, etc.). +pub trait NotificationDispatch: Send + Sync { + fn dispatch(&self, notification: &Notification) -> Result<(), Error>; + fn register_observer( + &self, + observer: Arc, + notifications: Vec, + ) -> Result<(), Error>; +} + +/// In-process dispatch: preserves the original HashMap-based behavior. +/// Uses `RwLock` for interior mutability so `register_observer` takes `&self`. +struct InProcessDispatch { + observers: RwLock>>>, + /// Back-reference to the owning `NotificationBus` so we can pass it + /// to `Notifier::notify()` callbacks. + bus: OnceLock, +} + +impl InProcessDispatch { + fn new() -> Self { + Self { + observers: RwLock::new(HashMap::new()), + bus: OnceLock::new(), + } + } +} + +impl NotificationDispatch for InProcessDispatch { + fn dispatch(&self, notification: &Notification) -> Result<(), Error> { + let observers = self + .observers + .read() + .map_err(|_| Error::RwLockingError)?; + let bus = self.bus.get().ok_or_else(|| { + Error::SemanticError("InProcessDispatch: bus back-reference not set".into()) + })?; + if let Some(obs) = observers.get(¬ification.into()) { + for esc in obs.iter() { + esc.notify(notification, bus)?; + } + } + Ok(()) + } + + fn register_observer( + &self, + observer: Arc, + notifications: Vec, + ) -> Result<(), Error> { + let mut observers = self + .observers + .write() + .map_err(|_| Error::RwLockingError)?; + for notification in notifications { + observers + .entry(notification) + .or_default() + .push(observer.clone()); + } + Ok(()) + } +} + +/// Clone-able notification bus that delegates to an internal dispatch strategy. +#[derive(Clone)] pub struct NotificationBus { - observers: HashMap>>, + inner: Arc, } impl NotificationBus { + /// Create a new bus with the default in-process dispatch. pub fn new() -> Self { - Self { - observers: HashMap::new(), - } + let dispatch = Arc::new(InProcessDispatch::new()); + let bus = Self { + inner: dispatch.clone(), + }; + // Set the back-reference so InProcessDispatch can pass &NotificationBus + // to Notifier::notify() callbacks. + let _ = dispatch.bus.set(bus.clone()); + bus } + + /// Create a bus backed by a custom dispatch implementation. + pub fn from_dispatch(dispatch: Arc) -> Self { + Self { inner: dispatch } + } + pub fn register_observer( - &mut self, + &self, escrow: Arc, notification: Vec, ) { - notification.into_iter().for_each(|notification| { - self.observers - .entry(notification) - .or_default() - .push(escrow.clone()); - }); + // register_observer on InProcessDispatch should not fail in practice, + // but if it does we silently ignore to preserve the existing API signature. + let _ = self.inner.register_observer(escrow, notification); } pub fn notify(&self, notification: &Notification) -> Result<(), Error> { - if let Some(obs) = self.observers.get(¬ification.into()) { - for esc in obs.iter() { - esc.notify(notification, self)?; - } - }; - Ok(()) + self.inner.dispatch(notification) } } diff --git a/support/teliox/src/processor/escrow/missing_issuer.rs b/support/teliox/src/processor/escrow/missing_issuer.rs index 727c3245..9600f6bb 100644 --- a/support/teliox/src/processor/escrow/missing_issuer.rs +++ b/support/teliox/src/processor/escrow/missing_issuer.rs @@ -184,7 +184,7 @@ mod tests { // Setup issuer key event log. Without ixn events tel event's can't be validated. let keri_root = Builder::new().prefix("test-db").tempfile().unwrap(); let keri_db = Arc::new(RedbDatabase::new(keri_root.path()).unwrap()); - let mut keri_processor = BasicProcessor::new(keri_db.clone(), None); + let keri_processor = BasicProcessor::new(keri_db.clone(), None); let keri_storage = Arc::new(EventStorage::new(keri_db.clone())); let issuer_kel = r#"{"v":"KERI10JSON00012b_","t":"icp","d":"EETk5xW-rl2TgHTTXr8m5kGXiC30m3gMgsYcBAjOE9eI","i":"EETk5xW-rl2TgHTTXr8m5kGXiC30m3gMgsYcBAjOE9eI","s":"0","kt":"1","k":["DHdoiqT1iac2HI6-HfCYcc01Piz2FTTPvZDFt6vADioD"],"nt":"1","n":["EH8IzIWeQFiUr3rr2dh8xAiW9Akwl6EooDt8iduQYyq_"],"bt":"0","b":[],"c":[],"a":[]}-AABAABvFFeXb9uW2G16o3C9xJZvY3a_utMPxd4NIUcGWRTqykMO1NzKwjsA_AQrOEwgO5jselWHREcK6vcAxRfv6-QC{"v":"KERI10JSON00013a_","t":"ixn","d":"EMOzEVoFjbkS3ZS5JtmJO4LeZ4gydbr8iXNrEQAt1OR2","i":"EETk5xW-rl2TgHTTXr8m5kGXiC30m3gMgsYcBAjOE9eI","s":"1","p":"EETk5xW-rl2TgHTTXr8m5kGXiC30m3gMgsYcBAjOE9eI","a":[{"i":"EF3TVac5quxrbLGLKAHF21laISjMgjYQAIg3OsTen969","s":"0","d":"ENIKpuUkjM-1K2Sv_TZwF_k8FTVkefAgy8sIpiFp0uWh"}]}-AABAACvrSS_EZUMKQ6Ax8FaB_Sf99O0y6MmfoRDBKMphVWWtuCOlFQm6N0XrTwtYxO3pO0AEZkJ1vzu52-RDK-w3YAN{"v":"KERI10JSON00013a_","t":"ixn","d":"EDvnfU2yMZUXEy9D_22YOkeSZOq6YG9zfItawvx3GR_6","i":"EETk5xW-rl2TgHTTXr8m5kGXiC30m3gMgsYcBAjOE9eI","s":"2","p":"EMOzEVoFjbkS3ZS5JtmJO4LeZ4gydbr8iXNrEQAt1OR2","a":[{"i":"EC8Oej-3HAUpBY_kxzBK3B-0RV9j4dXw1H0NRKxJg7g-","s":"0","d":"EDBM1ys50vEJxRzvBjTOrmOhokELjVtozXy3ZbJ8-KFk"}]}-AABAAABtEQ7SoGt2IcZBMX0GaEaMqGdMsrGpj1fABDKgE5dA7s7AGXTkWrZjzA4GXkGXuOspi6upqBhpxr6d5ySeKQH"#;