Skip to content

Commit 495e705

Browse files
feat: abstract NotificationBus for swappable dispatch
Replace the concrete HashMap-based NotificationBus with a trait-based dispatch architecture. NotificationBus is now a Clone-able wrapper around Arc<dyn NotificationDispatch>, enabling alternative notification backends (e.g. SQS for serverless) without adding generic type parameters anywhere in the codebase. - Add NotificationDispatch trait with dispatch() and register_observer() - Extract current HashMap logic into InProcessDispatch (RwLock + OnceLock) - Add NotificationBus::from_dispatch() for custom implementations - Change register_observer from &mut self to &self across all processors - Add RwLockingError variant to keri-core Error enum Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a80c403 commit 495e705

15 files changed

Lines changed: 117 additions & 39 deletions

File tree

components/controller/src/known_events.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ impl KnownEvents {
6565
let oobi_manager = OobiManager::new(event_database.clone());
6666

6767
let (
68-
mut notification_bus,
68+
notification_bus,
6969
(
7070
_out_of_order_escrow,
7171
_partially_signed_escrow,

components/watcher/src/watcher/watcher_data.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl WatcherData {
9999

100100
let oobi_manager = OobiManager::new(events_db.clone());
101101

102-
let (mut notification_bus, _) = default_escrow_bus(events_db.clone(), escrow_config);
102+
let (notification_bus, _) = default_escrow_bus(events_db.clone(), escrow_config);
103103
let reply_escrow = Arc::new(ReplyEscrow::new(events_db.clone()));
104104
notification_bus.register_observer(
105105
reply_escrow.clone(),

components/witness/src/witness.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ impl Witness {
171171

172172
let events_db =
173173
Arc::new(RedbDatabase::new(&events_database_path).map_err(|_| Error::DbError)?);
174-
let mut witness_processor = WitnessProcessor::new(events_db.clone(), escrow_config);
174+
let witness_processor = WitnessProcessor::new(events_db.clone(), escrow_config);
175175
let event_storage = Arc::new(EventStorage::new_redb(events_db.clone()));
176176

177177
let receipt_generator = Arc::new(WitnessReceiptGenerator::new(

components/witness/src/witness_processor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub struct WitnessProcessor {
2222
impl Processor for WitnessProcessor {
2323
type Database = RedbDatabase;
2424
fn register_observer(
25-
&mut self,
25+
&self,
2626
observer: Arc<dyn Notifier + Send + Sync>,
2727
notifications: &[JustNotification],
2828
) -> Result<(), Error> {
@@ -62,7 +62,7 @@ impl Default for WitnessEscrowConfig {
6262

6363
impl WitnessProcessor {
6464
pub fn new(redb: Arc<RedbDatabase>, escrow_config: WitnessEscrowConfig) -> Self {
65-
let mut bus = NotificationBus::new();
65+
let bus = NotificationBus::new();
6666
let partially_signed_escrow = Arc::new(PartiallySignedEscrow::new(
6767
redb.clone(),
6868
escrow_config.partially_signed_timeout,

keriox_core/src/error/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ pub enum Error {
7979
#[error("mutex is poisoned")]
8080
MutexPoisoned,
8181

82+
#[error("RwLock poisoned")]
83+
RwLockingError,
84+
8285
#[error("Incorrect event digest")]
8386
IncorrectDigest,
8487

keriox_core/src/processor/basic_processor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub struct BasicProcessor<D: EventDatabase>(EventProcessor<D>);
1818
impl<D: EventDatabase + 'static> Processor for BasicProcessor<D> {
1919
type Database = D;
2020
fn register_observer(
21-
&mut self,
21+
&self,
2222
observer: Arc<dyn Notifier + Send + Sync>,
2323
notification: &[JustNotification],
2424
) -> Result<(), Error> {

keriox_core/src/processor/escrow/maybe_out_of_order_escrow.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ fn test_out_of_order() -> Result<(), Error> {
111111
let events_db_path = NamedTempFile::new().unwrap();
112112
let redb = RedbDatabase::new(events_db_path.path()).unwrap();
113113
let events_db = Arc::new(redb);
114-
let mut processor = BasicProcessor::new(events_db.clone(), None);
114+
let processor = BasicProcessor::new(events_db.clone(), None);
115115

116116
// Register out of order escrow, to save and reprocess out of order events
117117
let new_ooo = Arc::new(MaybeOutOfOrderEscrow::new(

keriox_core/src/processor/escrow/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ pub fn default_escrow_bus<D>(
5454
where
5555
D: EventDatabase + EscrowCreator + Sync + Send + 'static,
5656
{
57-
let mut bus = NotificationBus::new();
57+
let bus = NotificationBus::new();
5858

5959
// Register out of order escrow, to save and reprocess out of order events
6060
let ooo_escrow = Arc::new(MaybeOutOfOrderEscrow::new(

keriox_core/src/processor/escrow/partially_signed_escrow.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ mod tests {
199199
let path = witness_root.path();
200200
let events_db_path = NamedTempFile::new().unwrap();
201201
let events_db = Arc::new(RedbDatabase::new(events_db_path.path()).unwrap());
202-
let mut processor = BasicProcessor::new(events_db.clone(), None);
202+
let processor = BasicProcessor::new(events_db.clone(), None);
203203

204204
// Register out of order escrow, to save and reprocess out of order events
205205
let ooo_escrow = Arc::new(MaybeOutOfOrderEscrow::new(
@@ -271,7 +271,7 @@ mod tests {
271271
std::fs::create_dir_all(path).unwrap();
272272
let events_db_path = NamedTempFile::new().unwrap();
273273
let events_db = Arc::new(RedbDatabase::new(events_db_path.path()).unwrap());
274-
let mut processor = BasicProcessor::new(events_db.clone(), None);
274+
let processor = BasicProcessor::new(events_db.clone(), None);
275275

276276
// Register partially signed escrow, to save and reprocess partially signed events
277277
let ps_escrow = Arc::new(PartiallySignedEscrow::new(

keriox_core/src/processor/escrow/partially_witnessed_escrow.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ mod tests {
383383
let redb = RedbDatabase::new(events_db_path.path()).unwrap();
384384
let log_db = redb.log_db.clone();
385385
let events_db = Arc::new(redb);
386-
let mut event_processor = BasicProcessor::new(events_db.clone(), None);
386+
let event_processor = BasicProcessor::new(events_db.clone(), None);
387387
let event_storage = EventStorage::new(Arc::clone(&events_db));
388388

389389
// Register not fully witnessed escrow, to save and reprocess events
@@ -528,7 +528,7 @@ mod tests {
528528
let redb = RedbDatabase::new(events_db_path.path()).unwrap();
529529
let log_db = redb.log_db.clone();
530530
let events_db = Arc::new(redb);
531-
let mut event_processor = BasicProcessor::new(events_db.clone(), None);
531+
let event_processor = BasicProcessor::new(events_db.clone(), None);
532532
let event_storage = EventStorage::new(Arc::clone(&events_db));
533533

534534
// Register not fully witnessed escrow, to save and reprocess events
@@ -671,7 +671,7 @@ mod tests {
671671
let redb = RedbDatabase::new(events_db_path.path()).unwrap();
672672
let log_db = redb.log_db.clone();
673673
let events_db = Arc::new(redb);
674-
let mut event_processor = BasicProcessor::new(events_db.clone(), None);
674+
let event_processor = BasicProcessor::new(events_db.clone(), None);
675675
let event_storage = EventStorage::new(Arc::clone(&events_db));
676676

677677
// Register not fully witnessed escrow, to save and reprocess events
@@ -775,7 +775,7 @@ mod tests {
775775
let redb = RedbDatabase::new(events_db_path.path()).unwrap();
776776
let log_db = redb.log_db.clone();
777777
let events_db = Arc::new(redb);
778-
let mut event_processor = BasicProcessor::new(events_db.clone(), None);
778+
let event_processor = BasicProcessor::new(events_db.clone(), None);
779779
let event_storage = EventStorage::new(Arc::clone(&events_db));
780780

781781
// Register not fully witnessed escrow, to save and reprocess events

0 commit comments

Comments
 (0)