From 59f2d2c43e1c76c08f678d650bb36e13b37aaab4 Mon Sep 17 00:00:00 2001 From: Kevin Webber Date: Tue, 24 Mar 2026 20:46:47 -0400 Subject: [PATCH 1/5] improved versioning comparator for journals/framework Signed-off-by: Kevin Webber --- .../src/journal/disk/replay_archive.rs | 10 +- .../tests/replay_archive_test.rs | 126 ++++++++++++++++++ crates/obzenflow_runtime/src/replay.rs | 2 +- 3 files changed, 134 insertions(+), 4 deletions(-) diff --git a/crates/obzenflow_infra/src/journal/disk/replay_archive.rs b/crates/obzenflow_infra/src/journal/disk/replay_archive.rs index d8a1bdd1..b6c89597 100644 --- a/crates/obzenflow_infra/src/journal/disk/replay_archive.rs +++ b/crates/obzenflow_infra/src/journal/disk/replay_archive.rs @@ -82,7 +82,7 @@ impl DiskReplayArchive { }); } - if is_newer_semver(&manifest.obzenflow_version, OBZENFLOW_VERSION) { + if !is_compatible_semver(&manifest.obzenflow_version, OBZENFLOW_VERSION) { return Err(ReplayError::VersionMismatch { archive_version: manifest.obzenflow_version.clone(), current_version: OBZENFLOW_VERSION.to_string(), @@ -337,14 +337,18 @@ fn derive_status_derivation_from_system_log(path: &Path) -> Result bool { +/// Check whether an archive's ObzenFlow version is compatible with the running +/// framework. Compatibility requires an exact major.minor match; patch-level +/// differences are tolerated. If either version string cannot be parsed the +/// check fails closed (incompatible). +fn is_compatible_semver(archive_version: &str, current_version: &str) -> bool { let Some(archive) = parse_semver_triplet(archive_version) else { return false; }; let Some(current) = parse_semver_triplet(current_version) else { return false; }; - archive > current + archive.0 == current.0 && archive.1 == current.1 } fn parse_semver_triplet(version: &str) -> Option<(u64, u64, u64)> { diff --git a/crates/obzenflow_infra/tests/replay_archive_test.rs b/crates/obzenflow_infra/tests/replay_archive_test.rs index 6d51496a..08ca95cd 100644 --- a/crates/obzenflow_infra/tests/replay_archive_test.rs +++ b/crates/obzenflow_infra/tests/replay_archive_test.rs @@ -153,6 +153,132 @@ async fn open_requires_completed_status_by_default() { assert!(matches!(err, ReplayError::IncompleteArchive { .. })); } +/// Write a manifest with a custom `obzenflow_version` for version-compat tests. +fn write_manifest_with_version(dir: &Path, version: &str) { + let mut stages = HashMap::new(); + stages.insert( + "returns".to_string(), + RunManifestStage { + dsl_var: "source".to_string(), + stage_type: StageType::FiniteSource, + stage_id: "stage_01H000000000000000000000000".to_string(), + data_journal_file: "FiniteSource_returns_stage_01H000000000000000000000000.log" + .to_string(), + error_journal_file: "FiniteSource_returns_error_stage_01H000000000000000000000000.log" + .to_string(), + }, + ); + + let manifest = RunManifest { + manifest_version: RUN_MANIFEST_VERSION.to_string(), + obzenflow_version: version.to_string(), + flow_id: "flow_01H000000000000000000000000".to_string(), + flow_name: "test_flow".to_string(), + created_at: Utc::now(), + replay: None, + stages, + system_journal_file: "system.log".to_string(), + }; + + let body = serde_json::to_string_pretty(&manifest).unwrap(); + std::fs::write(dir.join(RUN_MANIFEST_FILENAME), body).unwrap(); +} + +#[tokio::test] +async fn open_rejects_archive_from_newer_minor_version() { + let dir = tempdir().unwrap(); + // Bump the minor version beyond what the framework reports. + let (major, minor, _patch) = { + let parts: Vec = OBZENFLOW_VERSION + .split('.') + .map(|p| p.parse().unwrap()) + .collect(); + (parts[0], parts[1], parts[2]) + }; + let future_version = format!("{}.{}.0", major, minor + 1); + write_manifest_with_version(dir.path(), &future_version); + write_system_log_completed(dir.path()); + + let err = DiskReplayArchive::open(dir.path().to_path_buf(), false) + .await + .err() + .unwrap(); + assert!( + matches!(err, ReplayError::VersionMismatch { .. }), + "expected VersionMismatch for newer minor, got: {err}" + ); +} + +#[tokio::test] +async fn open_rejects_archive_from_older_minor_version() { + let dir = tempdir().unwrap(); + let (major, minor, _patch) = { + let parts: Vec = OBZENFLOW_VERSION + .split('.') + .map(|p| p.parse().unwrap()) + .collect(); + (parts[0], parts[1], parts[2]) + }; + + // Only meaningful if minor > 0; otherwise skip gracefully. + if minor == 0 { + // Bump major instead to guarantee a mismatch. + let old_version = format!("{}.999.0", major.saturating_sub(1)); + write_manifest_with_version(dir.path(), &old_version); + } else { + let old_version = format!("{}.{}.0", major, minor - 1); + write_manifest_with_version(dir.path(), &old_version); + } + write_system_log_completed(dir.path()); + + let err = DiskReplayArchive::open(dir.path().to_path_buf(), false) + .await + .err() + .unwrap(); + assert!( + matches!(err, ReplayError::VersionMismatch { .. }), + "expected VersionMismatch for older minor, got: {err}" + ); +} + +#[tokio::test] +async fn open_accepts_archive_with_different_patch_version() { + let dir = tempdir().unwrap(); + let (major, minor, patch) = { + let parts: Vec = OBZENFLOW_VERSION + .split('.') + .map(|p| p.parse().unwrap()) + .collect(); + (parts[0], parts[1], parts[2]) + }; + // Use a different patch version (if current is 0, use 99; otherwise use 0). + let alt_patch = if patch == 0 { 99 } else { 0 }; + let compat_version = format!("{}.{}.{}", major, minor, alt_patch); + write_manifest_with_version(dir.path(), &compat_version); + write_system_log_completed(dir.path()); + + let archive = DiskReplayArchive::open(dir.path().to_path_buf(), false) + .await + .expect("same major.minor with different patch should be accepted"); + assert_eq!(archive.status(), ArchiveStatus::Completed); +} + +#[tokio::test] +async fn open_rejects_archive_with_unparseable_version() { + let dir = tempdir().unwrap(); + write_manifest_with_version(dir.path(), "not-a-version"); + write_system_log_completed(dir.path()); + + let err = DiskReplayArchive::open(dir.path().to_path_buf(), false) + .await + .err() + .unwrap(); + assert!( + matches!(err, ReplayError::VersionMismatch { .. }), + "unparseable version should fail closed as VersionMismatch, got: {err}" + ); +} + #[tokio::test] async fn open_source_reader_errors_when_journal_missing() { let dir = tempdir().unwrap(); diff --git a/crates/obzenflow_runtime/src/replay.rs b/crates/obzenflow_runtime/src/replay.rs index 7dc1dfa4..1ba3f1c0 100644 --- a/crates/obzenflow_runtime/src/replay.rs +++ b/crates/obzenflow_runtime/src/replay.rs @@ -32,7 +32,7 @@ pub enum ReplayError { supported: &'static str, }, - #[error("Replay archive was created with newer ObzenFlow version {archive_version} (current: {current_version})")] + #[error("Replay archive version {archive_version} is incompatible with running framework version {current_version} (major.minor must match)")] VersionMismatch { archive_version: String, current_version: String, From 0d86985c7b568f801cc06968a66bc1a6c4e89511 Mon Sep 17 00:00:00 2001 From: Kevin Webber Date: Tue, 24 Mar 2026 21:38:01 -0400 Subject: [PATCH 2/5] remaining scope of wall clock tiebreak removal Signed-off-by: Kevin Webber --- .../obzenflow_core/src/event/vector_clock.rs | 15 ++++ crates/obzenflow_core/src/journal/journal.rs | 4 + .../src/journal/disk/disk_journal.rs | 73 ++++++++++++++-- .../src/journal/memory/memory_journal.rs | 10 +-- .../obzenflow_infra/tests/journal_parity.rs | 84 +++++++++++++++++-- 5 files changed, 164 insertions(+), 22 deletions(-) diff --git a/crates/obzenflow_core/src/event/vector_clock.rs b/crates/obzenflow_core/src/event/vector_clock.rs index c14c1875..e5db0e61 100644 --- a/crates/obzenflow_core/src/event/vector_clock.rs +++ b/crates/obzenflow_core/src/event/vector_clock.rs @@ -10,6 +10,8 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; +use crate::event::types::EventId; + /// Vector clock data structure for causal ordering /// /// This is a pure data structure representing the causal history of an event. @@ -114,6 +116,19 @@ impl CausalOrderingService { } } + /// Deterministically compare two concurrent vector clocks using `EventId` as the tie-break. + /// + /// This is a total comparator intended for *iteration* surfaces such as + /// `Journal::read_causally_ordered()`. It must not use wall-clock timestamps. + pub fn total_compare_by_event_id( + a_clock: &VectorClock, + a_event_id: &EventId, + b_clock: &VectorClock, + b_event_id: &EventId, + ) -> std::cmp::Ordering { + Self::causal_compare(a_clock, b_clock).unwrap_or_else(|| a_event_id.cmp(b_event_id)) + } + /// Calculate L1 (Manhattan) distance between two vector clocks. /// /// This represents the total number of events that happened diff --git a/crates/obzenflow_core/src/journal/journal.rs b/crates/obzenflow_core/src/journal/journal.rs index 6139a1d9..db866258 100644 --- a/crates/obzenflow_core/src/journal/journal.rs +++ b/crates/obzenflow_core/src/journal/journal.rs @@ -65,6 +65,10 @@ where /// /// Events MUST be ordered such that if A happened-before B, /// then A appears before B in the result + /// + /// For concurrent events (no happened-before relation), implementations MUST + /// return a deterministic total order. The current framework contract is to + /// break concurrent ties by `EventId` ordering, not by wall-clock time. async fn read_causally_ordered(&self) -> Result>, JournalError>; /// Read events causally after the given event diff --git a/crates/obzenflow_infra/src/journal/disk/disk_journal.rs b/crates/obzenflow_infra/src/journal/disk/disk_journal.rs index dc10a837..c7dd544f 100644 --- a/crates/obzenflow_infra/src/journal/disk/disk_journal.rs +++ b/crates/obzenflow_infra/src/journal/disk/disk_journal.rs @@ -378,6 +378,12 @@ impl Journal for DiskJournal { // Get writer_id from the event let writer_id = *event.writer_id(); + // Acquire the journal write lock before advancing writer clocks. + // + // This serialises append operations and ensures concurrent appends + // cannot compute the same `writer_seq` from a stale snapshot. + let _lock = self.read_write_lock.write().await; + // Get or create vector clock for this writer let mut vector_clock = { let writer_clocks = self.writer_clocks.read().await; @@ -432,9 +438,6 @@ impl Journal for DiskJournal { bytes.extend_from_slice(&json_body); bytes.push(b'\n'); - // Acquire write lock for atomic write (blocks readers) - let _lock = self.read_write_lock.write().await; - // Get current offset and reserve space let offset = self.write_offset.load(Ordering::SeqCst); self.write_offset @@ -524,11 +527,11 @@ impl Journal for DiskJournal { // Sort by vector clock for causal ordering events.sort_by(|a, b| { - CausalOrderingService::causal_compare(&a.vector_clock, &b.vector_clock).unwrap_or_else( - || { - // For concurrent events, use timestamp as tiebreaker - a.timestamp.cmp(&b.timestamp) - }, + CausalOrderingService::total_compare_by_event_id( + &a.vector_clock, + a.event.id(), + &b.vector_clock, + b.event.id(), ) }); @@ -750,6 +753,7 @@ mod tests { use super::*; use obzenflow_core::event::chain_event::{ChainEvent, ChainEventFactory}; use obzenflow_core::id::StageId; + use tokio::sync::Barrier; use uuid::Uuid; @@ -883,4 +887,57 @@ mod tests { // Cleanup std::fs::remove_dir_all(&test_dir).ok(); } + + #[tokio::test] + async fn test_same_writer_concurrent_appends_have_unique_writer_seq() { + let test_id = Uuid::new_v4(); + let test_dir = std::path::PathBuf::from(format!( + "target/test-logs/test_same_writer_concurrent_appends_have_unique_writer_seq_{test_id}" + )); + std::fs::create_dir_all(&test_dir).unwrap(); + + let test_stage_id = obzenflow_core::StageId::new(); + let owner = obzenflow_core::JournalOwner::stage(test_stage_id); + let log_path = test_dir.join("same_writer_concurrent.log"); + let log = Arc::new(DiskJournal::::with_owner(log_path, owner).unwrap()); + + let writer_id = WriterId::from(StageId::new()); + let writer_key = writer_id.to_string(); + + let task_count: usize = 20; + let barrier = Arc::new(Barrier::new(task_count)); + + let mut handles = Vec::with_capacity(task_count); + for i in 0..task_count { + let log_clone = log.clone(); + let barrier_clone = barrier.clone(); + let handle = tokio::spawn(async move { + barrier_clone.wait().await; + let event = ChainEventFactory::data_event( + writer_id, + "concurrent.same_writer", + serde_json::json!({ "i": i }), + ); + log_clone.append(event, None).await + }); + handles.push(handle); + } + + for handle in handles { + handle.await.unwrap().unwrap(); + } + + let events = log.read_causally_ordered().await.unwrap(); + assert_eq!(events.len(), task_count); + + let writer_seqs: Vec = events + .iter() + .map(|e| e.vector_clock.get(&writer_key)) + .collect(); + + let expected: Vec = (1..=task_count as u64).collect(); + assert_eq!(writer_seqs, expected); + + std::fs::remove_dir_all(&test_dir).ok(); + } } diff --git a/crates/obzenflow_infra/src/journal/memory/memory_journal.rs b/crates/obzenflow_infra/src/journal/memory/memory_journal.rs index 76f28ac4..83aebd20 100644 --- a/crates/obzenflow_infra/src/journal/memory/memory_journal.rs +++ b/crates/obzenflow_infra/src/journal/memory/memory_journal.rs @@ -167,11 +167,11 @@ impl Journal for MemoryJournal { // Sort by vector clock for causal ordering events_copy.sort_by(|a, b| { - CausalOrderingService::causal_compare(&a.vector_clock, &b.vector_clock).unwrap_or_else( - || { - // For concurrent events, use timestamp as tiebreaker - a.timestamp.cmp(&b.timestamp) - }, + CausalOrderingService::total_compare_by_event_id( + &a.vector_clock, + a.event.id(), + &b.vector_clock, + b.event.id(), ) }); diff --git a/crates/obzenflow_infra/tests/journal_parity.rs b/crates/obzenflow_infra/tests/journal_parity.rs index b10a1601..188e4386 100644 --- a/crates/obzenflow_infra/tests/journal_parity.rs +++ b/crates/obzenflow_infra/tests/journal_parity.rs @@ -5,12 +5,14 @@ //! Simple parity test to ensure DiskJournal and MemoryJournal behave identically use obzenflow_core::event::chain_event::{ChainEvent, ChainEventFactory}; +use obzenflow_core::event::types::EventId; use obzenflow_core::journal::journal_owner::JournalOwner; use obzenflow_core::Journal; use obzenflow_core::{StageId, WriterId}; use obzenflow_infra::journal::{DiskJournal, MemoryJournal}; use serde_json::json; use std::sync::Arc; +use std::time::Duration; use tempfile::TempDir; #[tokio::test] @@ -29,24 +31,27 @@ async fn test_journal_parity() { let writer1 = WriterId::from(StageId::new()); let writer2 = WriterId::from(StageId::new()); + let event1 = ChainEventFactory::data_event(writer1, "test.first", json!({ "value": 42 })); + let event2 = ChainEventFactory::data_event(writer1, "test.second", json!({ "value": 84 })); + let event3 = ChainEventFactory::data_event(writer2, "test.parallel", json!({ "value": 100 })); + let event4 = ChainEventFactory::data_event(writer1, "test.third", json!({ "value": 168 })); + // Test both journals with identical operations for journal in [&disk_journal, &memory_journal] { // Event 1: No parent - let event1 = ChainEventFactory::data_event(writer1, "test.first", json!({ "value": 42 })); - let envelope1 = journal.append(event1, None).await.unwrap(); + let envelope1 = journal.append(event1.clone(), None).await.unwrap(); // Event 2: Child of event 1 - let event2 = ChainEventFactory::data_event(writer1, "test.second", json!({ "value": 84 })); - let envelope2 = journal.append(event2, Some(&envelope1)).await.unwrap(); + let envelope2 = journal + .append(event2.clone(), Some(&envelope1)) + .await + .unwrap(); // Event 3: From different writer, no parent - let event3 = - ChainEventFactory::data_event(writer2, "test.parallel", json!({ "value": 100 })); - let _envelope3 = journal.append(event3, None).await.unwrap(); + let _envelope3 = journal.append(event3.clone(), None).await.unwrap(); // Event 4: Child of event 2 - let event4 = ChainEventFactory::data_event(writer1, "test.third", json!({ "value": 168 })); - journal.append(event4, Some(&envelope2)).await.unwrap(); + journal.append(event4.clone(), Some(&envelope2)).await.unwrap(); } // Compare results @@ -60,6 +65,10 @@ async fn test_journal_parity() { // Compare each event for (i, (disk_event, memory_event)) in disk_events.iter().zip(memory_events.iter()).enumerate() { + assert_eq!( + disk_event.event.id, memory_event.event.id, + "EventId mismatch at index {i}" + ); assert_eq!( disk_event.event.event_type(), memory_event.event.event_type(), @@ -112,3 +121,60 @@ async fn test_journal_parity() { "MemoryJournal should have 3 events after first" ); } + +#[tokio::test] +async fn test_journal_concurrent_tiebreak_is_event_id() { + let temp_dir = TempDir::new().unwrap(); + let owner = JournalOwner::stage(StageId::new()); + let log_path = temp_dir.path().join("tiebreak_test.log"); + + let disk_journal = Arc::new(DiskJournal::with_owner(log_path, owner.clone()).unwrap()) + as Arc + Send + Sync>; + let memory_journal = + Arc::new(MemoryJournal::with_owner(owner)) as Arc + Send + Sync>; + + let writer_a = WriterId::from(StageId::new()); + let writer_b = WriterId::from(StageId::new()); + + let mut a = ChainEventFactory::data_event(writer_a, "test.concurrent", json!({ "i": 1 })); + a.id = EventId::new(); + + let mut b = ChainEventFactory::data_event(writer_b, "test.concurrent", json!({ "i": 0 })); + b.id = EventId::new(); + + let (low, high) = if a.id < b.id { (a, b) } else { (b, a) }; + + for journal in [&disk_journal, &memory_journal] { + // Append in the opposite order of the desired causal-tie-break order. + // If timestamps were used as the concurrent tie-break, this would tend to sort as: + // high (earlier append) then low (later append). + journal.append(high.clone(), None).await.unwrap(); + tokio::time::sleep(Duration::from_millis(2)).await; + journal.append(low.clone(), None).await.unwrap(); + + let ordered = journal.read_causally_ordered().await.unwrap(); + let ordered_ids: Vec<_> = ordered.iter().map(|e| e.event.id).collect(); + assert_eq!( + ordered_ids, + vec![low.id, high.id], + "concurrent tie-break should use EventId ordering" + ); + } + + // Parity: disk and memory should agree on the deterministic order. + let disk_ids: Vec<_> = disk_journal + .read_causally_ordered() + .await + .unwrap() + .into_iter() + .map(|e| e.event.id) + .collect(); + let memory_ids: Vec<_> = memory_journal + .read_causally_ordered() + .await + .unwrap() + .into_iter() + .map(|e| e.event.id) + .collect(); + assert_eq!(disk_ids, memory_ids); +} From 6e11ef0b6c0782d9b6830e2b8d3ce6bf605c7595 Mon Sep 17 00:00:00 2001 From: Kevin Webber Date: Tue, 24 Mar 2026 22:23:07 -0400 Subject: [PATCH 3/5] additional tests and fixes Signed-off-by: Kevin Webber --- .../src/journal/journal_reader.rs | 6 +- .../obzenflow_infra/tests/journal_parity.rs | 36 +++++++++++ .../tests/replay_determinism.rs | 59 +++++++++++++++++++ 3 files changed, 100 insertions(+), 1 deletion(-) diff --git a/crates/obzenflow_core/src/journal/journal_reader.rs b/crates/obzenflow_core/src/journal/journal_reader.rs index f9117dda..d2144113 100644 --- a/crates/obzenflow_core/src/journal/journal_reader.rs +++ b/crates/obzenflow_core/src/journal/journal_reader.rs @@ -21,7 +21,11 @@ pub trait JournalReader: Send + Sync where T: JournalEvent, { - /// Read the next event from the current position + /// Read the next event from the current position (append order). + /// + /// This is an append-order cursor, not a causal-order iterator. Callers that need + /// deterministic causal ordering should use `Journal::read_causally_ordered()` / + /// `Journal::read_causally_after(...)` instead of `JournalReader::next()`. /// /// Returns None if no more events are available (EOF). /// This method should be efficient - O(1) regardless of journal size. diff --git a/crates/obzenflow_infra/tests/journal_parity.rs b/crates/obzenflow_infra/tests/journal_parity.rs index 188e4386..3a793cea 100644 --- a/crates/obzenflow_infra/tests/journal_parity.rs +++ b/crates/obzenflow_infra/tests/journal_parity.rs @@ -178,3 +178,39 @@ async fn test_journal_concurrent_tiebreak_is_event_id() { .collect(); assert_eq!(disk_ids, memory_ids); } + +#[tokio::test] +async fn test_read_causally_after_matches_slice_with_concurrent_events() { + let temp_dir = TempDir::new().unwrap(); + let owner = JournalOwner::stage(StageId::new()); + let log_path = temp_dir.path().join("after_slice_test.log"); + + let disk_journal = Arc::new(DiskJournal::with_owner(log_path, owner.clone()).unwrap()) + as Arc + Send + Sync>; + let memory_journal = + Arc::new(MemoryJournal::with_owner(owner)) as Arc + Send + Sync>; + + let writer_a = WriterId::from(StageId::new()); + let writer_b = WriterId::from(StageId::new()); + + let event_a = ChainEventFactory::data_event(writer_a, "test.a", json!({ "seq": "a" })); + let event_b = ChainEventFactory::data_event(writer_b, "test.b", json!({ "seq": "b" })); + let event_c = ChainEventFactory::data_event(writer_a, "test.c", json!({ "seq": "c" })); + + for journal in [&disk_journal, &memory_journal] { + let env_a = journal.append(event_a.clone(), None).await.unwrap(); + let _env_b = journal.append(event_b.clone(), None).await.unwrap(); + let _env_c = journal.append(event_c.clone(), Some(&env_a)).await.unwrap(); + + let ordered = journal.read_causally_ordered().await.unwrap(); + assert_eq!(ordered.len(), 3); + + let reference_id = ordered[0].event.id; + let expected_ids: Vec<_> = ordered.iter().skip(1).map(|e| e.event.id).collect(); + + let after = journal.read_causally_after(&reference_id).await.unwrap(); + let after_ids: Vec<_> = after.into_iter().map(|e| e.event.id).collect(); + + assert_eq!(after_ids, expected_ids); + } +} diff --git a/crates/obzenflow_infra/tests/replay_determinism.rs b/crates/obzenflow_infra/tests/replay_determinism.rs index d272df09..c7e4f61e 100644 --- a/crates/obzenflow_infra/tests/replay_determinism.rs +++ b/crates/obzenflow_infra/tests/replay_determinism.rs @@ -128,6 +128,26 @@ async fn run_stateful_fold_once( .collect() } +async fn run_order_sensitive_fold_once( + upstream_journal: Arc>, +) -> Vec { + let events = upstream_journal + .read_causally_ordered() + .await + .expect("read upstream journal"); + + let mut seen = Vec::new(); + for envelope in events { + if let ChainEventContent::Data { payload, .. } = &envelope.event.content { + if let Some(seq) = payload.get("seq").and_then(|v| v.as_u64()) { + seen.push(seq); + } + } + } + + seen +} + #[tokio::test(flavor = "multi_thread")] async fn stateful_replay_produces_identical_aggregates() { let upstream_stage = StageId::new(); @@ -162,6 +182,45 @@ async fn stateful_replay_produces_identical_aggregates() { assert_eq!(aggregates_run1[0]["count"], json!(3)); } +#[tokio::test(flavor = "multi_thread")] +async fn replay_determinism_covers_concurrent_writer_ordering() { + let upstream_stage = StageId::new(); + let upstream_journal: Arc> = Arc::new(MemoryJournal::with_owner( + JournalOwner::stage(upstream_stage), + )); + + let writer_a = WriterId::from(StageId::new()); + let writer_b = WriterId::from(StageId::new()); + + // Force EventId ordering to be independent of append timing: + // ensure `low` has the smaller EventId, but append `high` first (earlier timestamp). + let mut low = ChainEventFactory::data_event(writer_a, "test.concurrent", json!({ "seq": 0 })); + low.id = obzenflow_core::event::types::EventId::new(); + + let mut high = ChainEventFactory::data_event(writer_b, "test.concurrent", json!({ "seq": 1 })); + high.id = obzenflow_core::event::types::EventId::new(); + + if low.id > high.id { + std::mem::swap(&mut low.id, &mut high.id); + } + + upstream_journal + .append(high, None) + .await + .expect("append high-id event"); + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + upstream_journal + .append(low, None) + .await + .expect("append low-id event"); + + let seen_run1 = run_order_sensitive_fold_once(upstream_journal.clone()).await; + let seen_run2 = run_order_sensitive_fold_once(upstream_journal.clone()).await; + + assert_eq!(seen_run1, vec![0, 1]); + assert_eq!(seen_run1, seen_run2); +} + // ========================= // Join replay harness // ========================= From 7663324dfb47a8efbd27f84c4d795a89274cb8fb Mon Sep 17 00:00:00 2001 From: Kevin Webber Date: Wed, 25 Mar 2026 00:33:26 -0400 Subject: [PATCH 4/5] fixed comparator bug; cargo fmt errors Signed-off-by: Kevin Webber --- .../obzenflow_core/src/event/vector_clock.rs | 46 +++++++++++++++++-- .../src/journal/disk/disk_journal.rs | 15 +----- .../src/journal/memory/memory_journal.rs | 14 +----- .../obzenflow_infra/tests/journal_parity.rs | 5 +- 4 files changed, 50 insertions(+), 30 deletions(-) diff --git a/crates/obzenflow_core/src/event/vector_clock.rs b/crates/obzenflow_core/src/event/vector_clock.rs index e5db0e61..a993ffb3 100644 --- a/crates/obzenflow_core/src/event/vector_clock.rs +++ b/crates/obzenflow_core/src/event/vector_clock.rs @@ -10,6 +10,8 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; +use crate::journal::JournalError; + use crate::event::types::EventId; /// Vector clock data structure for causal ordering @@ -116,17 +118,32 @@ impl CausalOrderingService { } } - /// Deterministically compare two concurrent vector clocks using `EventId` as the tie-break. + /// A deterministic scalar derived from a vector clock. + /// + /// This scalar is strictly monotonic under happened-before: if `a` happened-before `b`, then + /// `causal_rank(a) < causal_rank(b)`. + pub fn causal_rank(clock: &VectorClock) -> u128 { + clock + .clocks + .values() + .fold(0u128, |acc, &seq| acc.saturating_add(seq as u128)) + } + + /// Deterministically compare two vector clocks using a monotonic scalar plus `EventId`. /// - /// This is a total comparator intended for *iteration* surfaces such as - /// `Journal::read_causally_ordered()`. It must not use wall-clock timestamps. + /// Note: a comparator defined as "happened-before first, otherwise `EventId`" is not a strict + /// total order and must not be used with `slice::sort_by`, as it can violate transitivity and + /// trigger Rust's sort-time total-order checks. pub fn total_compare_by_event_id( a_clock: &VectorClock, a_event_id: &EventId, b_clock: &VectorClock, b_event_id: &EventId, ) -> std::cmp::Ordering { - Self::causal_compare(a_clock, b_clock).unwrap_or_else(|| a_event_id.cmp(b_event_id)) + let a_rank = Self::causal_rank(a_clock); + let b_rank = Self::causal_rank(b_clock); + + a_rank.cmp(&b_rank).then_with(|| a_event_id.cmp(b_event_id)) } /// Calculate L1 (Manhattan) distance between two vector clocks. @@ -161,4 +178,25 @@ impl CausalOrderingService { distance } + + /// Produce a deterministic causal readback order using a monotonic scalar plus `EventId`. + /// + /// This is intended for *iteration* APIs like `Journal::read_causally_ordered()` that must be + /// deterministic and must not use wall-clock timestamps. It guarantees that if `a` + /// happened-before `b`, then `a` appears before `b` in the output. + pub fn order_envelopes_by_event_id( + mut events: Vec>, + ) -> Result>, JournalError> + where + T: super::JournalEvent, + { + // Fast path. + if events.len() <= 1 { + return Ok(events); + } + + events.sort_by_cached_key(|e| (Self::causal_rank(&e.vector_clock), *e.event.id())); + + Ok(events) + } } diff --git a/crates/obzenflow_infra/src/journal/disk/disk_journal.rs b/crates/obzenflow_infra/src/journal/disk/disk_journal.rs index c7dd544f..f8767445 100644 --- a/crates/obzenflow_infra/src/journal/disk/disk_journal.rs +++ b/crates/obzenflow_infra/src/journal/disk/disk_journal.rs @@ -523,19 +523,8 @@ impl Journal for DiskJournal { } async fn read_causally_ordered(&self) -> Result>, JournalError> { - let mut events = self.read_all_raw().await?; - - // Sort by vector clock for causal ordering - events.sort_by(|a, b| { - CausalOrderingService::total_compare_by_event_id( - &a.vector_clock, - a.event.id(), - &b.vector_clock, - b.event.id(), - ) - }); - - Ok(events) + let events = self.read_all_raw().await?; + CausalOrderingService::order_envelopes_by_event_id(events) } async fn read_causally_after( diff --git a/crates/obzenflow_infra/src/journal/memory/memory_journal.rs b/crates/obzenflow_infra/src/journal/memory/memory_journal.rs index 83aebd20..e90c7c8b 100644 --- a/crates/obzenflow_infra/src/journal/memory/memory_journal.rs +++ b/crates/obzenflow_infra/src/journal/memory/memory_journal.rs @@ -162,20 +162,10 @@ impl Journal for MemoryJournal { async fn read_causally_ordered(&self) -> Result>, JournalError> { let events = self.events.lock().unwrap(); - let mut events_copy = events.clone(); + let events_copy = events.clone(); drop(events); - // Sort by vector clock for causal ordering - events_copy.sort_by(|a, b| { - CausalOrderingService::total_compare_by_event_id( - &a.vector_clock, - a.event.id(), - &b.vector_clock, - b.event.id(), - ) - }); - - Ok(events_copy) + CausalOrderingService::order_envelopes_by_event_id(events_copy) } async fn read_causally_after( diff --git a/crates/obzenflow_infra/tests/journal_parity.rs b/crates/obzenflow_infra/tests/journal_parity.rs index 3a793cea..22bb72d6 100644 --- a/crates/obzenflow_infra/tests/journal_parity.rs +++ b/crates/obzenflow_infra/tests/journal_parity.rs @@ -51,7 +51,10 @@ async fn test_journal_parity() { let _envelope3 = journal.append(event3.clone(), None).await.unwrap(); // Event 4: Child of event 2 - journal.append(event4.clone(), Some(&envelope2)).await.unwrap(); + journal + .append(event4.clone(), Some(&envelope2)) + .await + .unwrap(); } // Compare results From 533ef183b2682a10330b5b4748d37e5e88ef4089 Mon Sep 17 00:00:00 2001 From: Kevin Webber Date: Wed, 25 Mar 2026 00:55:10 -0400 Subject: [PATCH 5/5] fixed transivity violation; added tests Signed-off-by: Kevin Webber --- .../obzenflow_core/src/event/vector_clock.rs | 138 +++++++++++++++ .../obzenflow_infra/tests/journal_parity.rs | 159 ++++++++++++++++++ 2 files changed, 297 insertions(+) diff --git a/crates/obzenflow_core/src/event/vector_clock.rs b/crates/obzenflow_core/src/event/vector_clock.rs index a993ffb3..2b079f2f 100644 --- a/crates/obzenflow_core/src/event/vector_clock.rs +++ b/crates/obzenflow_core/src/event/vector_clock.rs @@ -200,3 +200,141 @@ impl CausalOrderingService { Ok(events) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::event::chain_event::ChainEventFactory; + use crate::event::{ChainEvent, EventEnvelope}; + use crate::{StageId, WriterId}; + use chrono::Utc; + use serde_json::json; + + fn envelope_with_event_id_and_clock( + event_id: EventId, + vector_clock: VectorClock, + ) -> EventEnvelope { + let writer_id = WriterId::from(StageId::new()); + let mut event = + ChainEventFactory::data_event(writer_id, "test.vector_clock", json!({ "ok": true })); + event.id = event_id; + + EventEnvelope { + journal_writer_id: crate::event::JournalWriterId::new(), + vector_clock, + timestamp: Utc::now(), + event, + } + } + + #[test] + fn transitivity_violation_regression_orders_deterministically() { + let w1 = WriterId::from(StageId::new()).to_string(); + let w2 = WriterId::from(StageId::new()).to_string(); + let w3 = WriterId::from(StageId::new()).to_string(); + + let mut clock_a = VectorClock::new(); + clock_a.clocks.insert(w1.clone(), 1); + + let mut clock_b = VectorClock::new(); + clock_b.clocks.insert(w1.clone(), 1); + clock_b.clocks.insert(w2.clone(), 1); + + let mut clock_c = VectorClock::new(); + clock_c.clocks.insert(w2.clone(), 1); + clock_c.clocks.insert(w3.clone(), 1); + + assert!(CausalOrderingService::happened_before(&clock_a, &clock_b)); + assert!(CausalOrderingService::are_concurrent(&clock_b, &clock_c)); + assert!(CausalOrderingService::are_concurrent(&clock_a, &clock_c)); + + let a_id = EventId::from_string("ZZZZZZZZZZZZZZZZZZZZZZZZZZ").unwrap(); + let b_id = EventId::from_string("00000000000000000000000000").unwrap(); + let c_id = EventId::from_string("MMMMMMMMMMMMMMMMMMMMMMMMMM").unwrap(); + + let a = envelope_with_event_id_and_clock(a_id, clock_a); + let b = envelope_with_event_id_and_clock(b_id, clock_b); + let c = envelope_with_event_id_and_clock(c_id, clock_c); + + let input = vec![c.clone(), a.clone(), b.clone()]; + let output1 = CausalOrderingService::order_envelopes_by_event_id(input.clone()).unwrap(); + let output2 = CausalOrderingService::order_envelopes_by_event_id(input).unwrap(); + + let ids1: Vec<_> = output1.iter().map(|e| e.event.id).collect(); + let ids2: Vec<_> = output2.iter().map(|e| e.event.id).collect(); + + assert_eq!(ids1, vec![a_id, b_id, c_id]); + assert_eq!(ids1, ids2); + + let idx_a = ids1.iter().position(|id| *id == a_id).unwrap(); + let idx_b = ids1.iter().position(|id| *id == b_id).unwrap(); + assert!(idx_a < idx_b); + } + + #[test] + fn order_is_stable_under_permutation() { + let w1 = WriterId::from(StageId::new()).to_string(); + let w2 = WriterId::from(StageId::new()).to_string(); + let w3 = WriterId::from(StageId::new()).to_string(); + + let mut clock_a = VectorClock::new(); + clock_a.clocks.insert(w1.clone(), 1); + + let mut clock_b = VectorClock::new(); + clock_b.clocks.insert(w1.clone(), 1); + clock_b.clocks.insert(w2.clone(), 1); + + let mut clock_c = VectorClock::new(); + clock_c.clocks.insert(w2.clone(), 1); + clock_c.clocks.insert(w3.clone(), 1); + + let a_id = EventId::from_string("ZZZZZZZZZZZZZZZZZZZZZZZZZZ").unwrap(); + let b_id = EventId::from_string("00000000000000000000000000").unwrap(); + let c_id = EventId::from_string("MMMMMMMMMMMMMMMMMMMMMMMMMM").unwrap(); + + let a = envelope_with_event_id_and_clock(a_id, clock_a); + let b = envelope_with_event_id_and_clock(b_id, clock_b); + let c = envelope_with_event_id_and_clock(c_id, clock_c); + + let expected = vec![a_id, b_id, c_id]; + let permutations = [ + vec![a.clone(), b.clone(), c.clone()], + vec![a.clone(), c.clone(), b.clone()], + vec![b.clone(), a.clone(), c.clone()], + vec![b.clone(), c.clone(), a.clone()], + vec![c.clone(), a.clone(), b.clone()], + vec![c.clone(), b.clone(), a.clone()], + ]; + + for permutation in permutations { + let ordered = CausalOrderingService::order_envelopes_by_event_id(permutation).unwrap(); + let ordered_ids: Vec<_> = ordered.iter().map(|e| e.event.id).collect(); + assert_eq!(ordered_ids, expected); + } + } + + #[test] + fn causal_rank_sums_components_and_respects_happened_before() { + let empty = VectorClock::new(); + assert_eq!(CausalOrderingService::causal_rank(&empty), 0); + + let mut single = VectorClock::new(); + single.clocks.insert("writer_1".to_string(), 3); + assert_eq!(CausalOrderingService::causal_rank(&single), 3); + + let mut multi = VectorClock::new(); + multi.clocks.insert("writer_1".to_string(), 2); + multi.clocks.insert("writer_2".to_string(), 3); + assert_eq!(CausalOrderingService::causal_rank(&multi), 5); + + let mut a = VectorClock::new(); + a.clocks.insert("writer_1".to_string(), 1); + + let mut b = VectorClock::new(); + b.clocks.insert("writer_1".to_string(), 1); + b.clocks.insert("writer_2".to_string(), 1); + + assert!(CausalOrderingService::happened_before(&a, &b)); + assert!(CausalOrderingService::causal_rank(&a) < CausalOrderingService::causal_rank(&b)); + } +} diff --git a/crates/obzenflow_infra/tests/journal_parity.rs b/crates/obzenflow_infra/tests/journal_parity.rs index 22bb72d6..a67bc01c 100644 --- a/crates/obzenflow_infra/tests/journal_parity.rs +++ b/crates/obzenflow_infra/tests/journal_parity.rs @@ -217,3 +217,162 @@ async fn test_read_causally_after_matches_slice_with_concurrent_events() { assert_eq!(after_ids, expected_ids); } } + +#[tokio::test] +async fn test_diamond_like_dag_respects_causality_and_event_id() { + let temp_dir = TempDir::new().unwrap(); + let owner = JournalOwner::stage(StageId::new()); + let log_path = temp_dir.path().join("diamond_like.log"); + + let disk_journal = Arc::new(DiskJournal::with_owner(log_path, owner.clone()).unwrap()) + as Arc + Send + Sync>; + let memory_journal = + Arc::new(MemoryJournal::with_owner(owner)) as Arc + Send + Sync>; + + let writer_root = WriterId::from(StageId::new()); + let writer_left = WriterId::from(StageId::new()); + let writer_right = WriterId::from(StageId::new()); + let writer_join = WriterId::from(StageId::new()); + + let mut root = ChainEventFactory::data_event(writer_root, "test.root", json!({ "n": 0 })); + root.id = EventId::from_string("11111111111111111111111111").unwrap(); + + let mut left = ChainEventFactory::data_event(writer_left, "test.left", json!({ "n": 1 })); + left.id = EventId::from_string("00000000000000000000000000").unwrap(); + + let mut right = ChainEventFactory::data_event(writer_right, "test.right", json!({ "n": 2 })); + right.id = EventId::from_string("ZZZZZZZZZZZZZZZZZZZZZZZZZZ").unwrap(); + + let mut merge_left = + ChainEventFactory::data_event(writer_join, "test.merge_left", json!({ "n": 3 })); + merge_left.id = EventId::from_string("22222222222222222222222222").unwrap(); + + let mut join = ChainEventFactory::data_event(writer_join, "test.join", json!({ "n": 4 })); + join.id = EventId::from_string("33333333333333333333333333").unwrap(); + + for journal in [&disk_journal, &memory_journal] { + let env_root = journal.append(root.clone(), None).await.unwrap(); + + let env_right = journal + .append(right.clone(), Some(&env_root)) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(2)).await; + let env_left = journal.append(left.clone(), Some(&env_root)).await.unwrap(); + + let env_merge_left = journal + .append(merge_left.clone(), Some(&env_left)) + .await + .unwrap(); + let env_join = journal + .append(join.clone(), Some(&env_right)) + .await + .unwrap(); + + assert!( + obzenflow_core::event::vector_clock::CausalOrderingService::happened_before( + &env_root.vector_clock, + &env_left.vector_clock + ) + ); + assert!( + obzenflow_core::event::vector_clock::CausalOrderingService::happened_before( + &env_root.vector_clock, + &env_right.vector_clock + ) + ); + assert!( + obzenflow_core::event::vector_clock::CausalOrderingService::are_concurrent( + &env_left.vector_clock, + &env_right.vector_clock + ) + ); + + assert!( + obzenflow_core::event::vector_clock::CausalOrderingService::happened_before( + &env_left.vector_clock, + &env_join.vector_clock + ) + ); + assert!( + obzenflow_core::event::vector_clock::CausalOrderingService::happened_before( + &env_right.vector_clock, + &env_join.vector_clock + ) + ); + assert!( + obzenflow_core::event::vector_clock::CausalOrderingService::happened_before( + &env_merge_left.vector_clock, + &env_join.vector_clock + ) + ); + + let ordered = journal.read_causally_ordered().await.unwrap(); + let ordered_ids: Vec<_> = ordered.iter().map(|e| e.event.id).collect(); + assert_eq!( + ordered_ids, + vec![root.id, left.id, right.id, merge_left.id, join.id], + ); + } +} + +#[tokio::test] +async fn test_diamond_like_dag_is_timestamp_independent_for_concurrent_siblings() { + let temp_dir = TempDir::new().unwrap(); + let owner = JournalOwner::stage(StageId::new()); + let log_path = temp_dir.path().join("diamond_like_timestamp.log"); + + let disk_journal = Arc::new(DiskJournal::with_owner(log_path, owner.clone()).unwrap()) + as Arc + Send + Sync>; + let memory_journal = + Arc::new(MemoryJournal::with_owner(owner)) as Arc + Send + Sync>; + + let writer_root = WriterId::from(StageId::new()); + let writer_left = WriterId::from(StageId::new()); + let writer_right = WriterId::from(StageId::new()); + let writer_join = WriterId::from(StageId::new()); + + let mut root = ChainEventFactory::data_event(writer_root, "test.root", json!({ "n": 0 })); + root.id = EventId::from_string("11111111111111111111111111").unwrap(); + + let mut left = ChainEventFactory::data_event(writer_left, "test.left", json!({ "n": 1 })); + left.id = EventId::from_string("00000000000000000000000000").unwrap(); + + let mut right = ChainEventFactory::data_event(writer_right, "test.right", json!({ "n": 2 })); + right.id = EventId::from_string("ZZZZZZZZZZZZZZZZZZZZZZZZZZ").unwrap(); + + let mut merge_left = + ChainEventFactory::data_event(writer_join, "test.merge_left", json!({ "n": 3 })); + merge_left.id = EventId::from_string("22222222222222222222222222").unwrap(); + + let mut join = ChainEventFactory::data_event(writer_join, "test.join", json!({ "n": 4 })); + join.id = EventId::from_string("33333333333333333333333333").unwrap(); + + for journal in [&disk_journal, &memory_journal] { + let env_root = journal.append(root.clone(), None).await.unwrap(); + + // Append the higher EventId sibling first, then the lower EventId sibling. + // If wall-clock timestamps were used as a concurrent tie-break, this would tend to order + // right then left. + journal + .append(right.clone(), Some(&env_root)) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(2)).await; + let env_left = journal.append(left.clone(), Some(&env_root)).await.unwrap(); + + journal + .append(merge_left.clone(), Some(&env_left)) + .await + .unwrap(); + journal.append(join.clone(), Some(&env_left)).await.unwrap(); + + let ordered = journal.read_causally_ordered().await.unwrap(); + let ordered_ids: Vec<_> = ordered.iter().map(|e| e.event.id).collect(); + assert_eq!( + ordered_ids[0..3], + [root.id, left.id, right.id], + "concurrent siblings should be ordered by EventId, not by append-time timestamp" + ); + } +}