Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 191 additions & 0 deletions crates/obzenflow_core/src/event/vector_clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

use crate::journal::JournalError;

use crate::event::types::EventId;

/// Vector clock data structure for causal ordering
///
/// This is a pure data structure representing the causal history of an event.
Expand Down Expand Up @@ -114,6 +118,34 @@ impl CausalOrderingService {
}
}

/// A deterministic scalar derived from a vector clock.
///
/// This scalar is strictly monotonic under happened-before: if `a` happened-before `b`, then
/// `causal_rank(a) < causal_rank(b)`.
pub fn causal_rank(clock: &VectorClock) -> u128 {
clock
.clocks
.values()
.fold(0u128, |acc, &seq| acc.saturating_add(seq as u128))
}

/// Deterministically compare two vector clocks using a monotonic scalar plus `EventId`.
///
/// Note: a comparator defined as "happened-before first, otherwise `EventId`" is not a strict
/// total order and must not be used with `slice::sort_by`, as it can violate transitivity and
/// trigger Rust's sort-time total-order checks.
pub fn total_compare_by_event_id(
a_clock: &VectorClock,
a_event_id: &EventId,
b_clock: &VectorClock,
b_event_id: &EventId,
) -> std::cmp::Ordering {
let a_rank = Self::causal_rank(a_clock);
let b_rank = Self::causal_rank(b_clock);

a_rank.cmp(&b_rank).then_with(|| a_event_id.cmp(b_event_id))
}

/// Calculate L1 (Manhattan) distance between two vector clocks.
///
/// This represents the total number of events that happened
Expand Down Expand Up @@ -146,4 +178,163 @@ impl CausalOrderingService {

distance
}

/// Produce a deterministic causal readback order using a monotonic scalar plus `EventId`.
///
/// This is intended for *iteration* APIs like `Journal::read_causally_ordered()` that must be
/// deterministic and must not use wall-clock timestamps. It guarantees that if `a`
/// happened-before `b`, then `a` appears before `b` in the output.
pub fn order_envelopes_by_event_id<T>(
mut events: Vec<super::EventEnvelope<T>>,
) -> Result<Vec<super::EventEnvelope<T>>, JournalError>
where
T: super::JournalEvent,
{
// Fast path.
if events.len() <= 1 {
return Ok(events);
}

events.sort_by_cached_key(|e| (Self::causal_rank(&e.vector_clock), *e.event.id()));

Ok(events)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::event::chain_event::ChainEventFactory;
use crate::event::{ChainEvent, EventEnvelope};
use crate::{StageId, WriterId};
use chrono::Utc;
use serde_json::json;

fn envelope_with_event_id_and_clock(
event_id: EventId,
vector_clock: VectorClock,
) -> EventEnvelope<ChainEvent> {
let writer_id = WriterId::from(StageId::new());
let mut event =
ChainEventFactory::data_event(writer_id, "test.vector_clock", json!({ "ok": true }));
event.id = event_id;

EventEnvelope {
journal_writer_id: crate::event::JournalWriterId::new(),
vector_clock,
timestamp: Utc::now(),
event,
}
}

#[test]
fn transitivity_violation_regression_orders_deterministically() {
let w1 = WriterId::from(StageId::new()).to_string();
let w2 = WriterId::from(StageId::new()).to_string();
let w3 = WriterId::from(StageId::new()).to_string();

let mut clock_a = VectorClock::new();
clock_a.clocks.insert(w1.clone(), 1);

let mut clock_b = VectorClock::new();
clock_b.clocks.insert(w1.clone(), 1);
clock_b.clocks.insert(w2.clone(), 1);

let mut clock_c = VectorClock::new();
clock_c.clocks.insert(w2.clone(), 1);
clock_c.clocks.insert(w3.clone(), 1);

assert!(CausalOrderingService::happened_before(&clock_a, &clock_b));
assert!(CausalOrderingService::are_concurrent(&clock_b, &clock_c));
assert!(CausalOrderingService::are_concurrent(&clock_a, &clock_c));

let a_id = EventId::from_string("ZZZZZZZZZZZZZZZZZZZZZZZZZZ").unwrap();
let b_id = EventId::from_string("00000000000000000000000000").unwrap();
let c_id = EventId::from_string("MMMMMMMMMMMMMMMMMMMMMMMMMM").unwrap();

let a = envelope_with_event_id_and_clock(a_id, clock_a);
let b = envelope_with_event_id_and_clock(b_id, clock_b);
let c = envelope_with_event_id_and_clock(c_id, clock_c);

let input = vec![c.clone(), a.clone(), b.clone()];
let output1 = CausalOrderingService::order_envelopes_by_event_id(input.clone()).unwrap();
let output2 = CausalOrderingService::order_envelopes_by_event_id(input).unwrap();

let ids1: Vec<_> = output1.iter().map(|e| e.event.id).collect();
let ids2: Vec<_> = output2.iter().map(|e| e.event.id).collect();

assert_eq!(ids1, vec![a_id, b_id, c_id]);
assert_eq!(ids1, ids2);

let idx_a = ids1.iter().position(|id| *id == a_id).unwrap();
let idx_b = ids1.iter().position(|id| *id == b_id).unwrap();
assert!(idx_a < idx_b);
}

#[test]
fn order_is_stable_under_permutation() {
let w1 = WriterId::from(StageId::new()).to_string();
let w2 = WriterId::from(StageId::new()).to_string();
let w3 = WriterId::from(StageId::new()).to_string();

let mut clock_a = VectorClock::new();
clock_a.clocks.insert(w1.clone(), 1);

let mut clock_b = VectorClock::new();
clock_b.clocks.insert(w1.clone(), 1);
clock_b.clocks.insert(w2.clone(), 1);

let mut clock_c = VectorClock::new();
clock_c.clocks.insert(w2.clone(), 1);
clock_c.clocks.insert(w3.clone(), 1);

let a_id = EventId::from_string("ZZZZZZZZZZZZZZZZZZZZZZZZZZ").unwrap();
let b_id = EventId::from_string("00000000000000000000000000").unwrap();
let c_id = EventId::from_string("MMMMMMMMMMMMMMMMMMMMMMMMMM").unwrap();

let a = envelope_with_event_id_and_clock(a_id, clock_a);
let b = envelope_with_event_id_and_clock(b_id, clock_b);
let c = envelope_with_event_id_and_clock(c_id, clock_c);

let expected = vec![a_id, b_id, c_id];
let permutations = [
vec![a.clone(), b.clone(), c.clone()],
vec![a.clone(), c.clone(), b.clone()],
vec![b.clone(), a.clone(), c.clone()],
vec![b.clone(), c.clone(), a.clone()],
vec![c.clone(), a.clone(), b.clone()],
vec![c.clone(), b.clone(), a.clone()],
];

for permutation in permutations {
let ordered = CausalOrderingService::order_envelopes_by_event_id(permutation).unwrap();
let ordered_ids: Vec<_> = ordered.iter().map(|e| e.event.id).collect();
assert_eq!(ordered_ids, expected);
}
}

#[test]
fn causal_rank_sums_components_and_respects_happened_before() {
let empty = VectorClock::new();
assert_eq!(CausalOrderingService::causal_rank(&empty), 0);

let mut single = VectorClock::new();
single.clocks.insert("writer_1".to_string(), 3);
assert_eq!(CausalOrderingService::causal_rank(&single), 3);

let mut multi = VectorClock::new();
multi.clocks.insert("writer_1".to_string(), 2);
multi.clocks.insert("writer_2".to_string(), 3);
assert_eq!(CausalOrderingService::causal_rank(&multi), 5);

let mut a = VectorClock::new();
a.clocks.insert("writer_1".to_string(), 1);

let mut b = VectorClock::new();
b.clocks.insert("writer_1".to_string(), 1);
b.clocks.insert("writer_2".to_string(), 1);

assert!(CausalOrderingService::happened_before(&a, &b));
assert!(CausalOrderingService::causal_rank(&a) < CausalOrderingService::causal_rank(&b));
}
}
4 changes: 4 additions & 0 deletions crates/obzenflow_core/src/journal/journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ where
///
/// Events MUST be ordered such that if A happened-before B,
/// then A appears before B in the result
///
/// For concurrent events (no happened-before relation), implementations MUST
/// return a deterministic total order. The current framework contract is to
/// break concurrent ties by `EventId` ordering, not by wall-clock time.
async fn read_causally_ordered(&self) -> Result<Vec<EventEnvelope<T>>, JournalError>;

/// Read events causally after the given event
Expand Down
6 changes: 5 additions & 1 deletion crates/obzenflow_core/src/journal/journal_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ pub trait JournalReader<T>: Send + Sync
where
T: JournalEvent,
{
/// Read the next event from the current position
/// Read the next event from the current position (append order).
///
/// This is an append-order cursor, not a causal-order iterator. Callers that need
/// deterministic causal ordering should use `Journal::read_causally_ordered()` /
/// `Journal::read_causally_after(...)` instead of `JournalReader::next()`.
///
/// Returns None if no more events are available (EOF).
/// This method should be efficient - O(1) regardless of journal size.
Expand Down
78 changes: 62 additions & 16 deletions crates/obzenflow_infra/src/journal/disk/disk_journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,12 @@ impl<T: JournalEvent + 'static> Journal<T> for DiskJournal<T> {
// Get writer_id from the event
let writer_id = *event.writer_id();

// Acquire the journal write lock before advancing writer clocks.
//
// This serialises append operations and ensures concurrent appends
// cannot compute the same `writer_seq` from a stale snapshot.
let _lock = self.read_write_lock.write().await;

// Get or create vector clock for this writer
let mut vector_clock = {
let writer_clocks = self.writer_clocks.read().await;
Expand Down Expand Up @@ -432,9 +438,6 @@ impl<T: JournalEvent + 'static> Journal<T> for DiskJournal<T> {
bytes.extend_from_slice(&json_body);
bytes.push(b'\n');

// Acquire write lock for atomic write (blocks readers)
let _lock = self.read_write_lock.write().await;

// Get current offset and reserve space
let offset = self.write_offset.load(Ordering::SeqCst);
self.write_offset
Expand Down Expand Up @@ -520,19 +523,8 @@ impl<T: JournalEvent + 'static> Journal<T> for DiskJournal<T> {
}

async fn read_causally_ordered(&self) -> Result<Vec<EventEnvelope<T>>, JournalError> {
let mut events = self.read_all_raw().await?;

// Sort by vector clock for causal ordering
events.sort_by(|a, b| {
CausalOrderingService::causal_compare(&a.vector_clock, &b.vector_clock).unwrap_or_else(
|| {
// For concurrent events, use timestamp as tiebreaker
a.timestamp.cmp(&b.timestamp)
},
)
});

Ok(events)
let events = self.read_all_raw().await?;
CausalOrderingService::order_envelopes_by_event_id(events)
}

async fn read_causally_after(
Expand Down Expand Up @@ -750,6 +742,7 @@ mod tests {
use super::*;
use obzenflow_core::event::chain_event::{ChainEvent, ChainEventFactory};
use obzenflow_core::id::StageId;
use tokio::sync::Barrier;

use uuid::Uuid;

Expand Down Expand Up @@ -883,4 +876,57 @@ mod tests {
// Cleanup
std::fs::remove_dir_all(&test_dir).ok();
}

#[tokio::test]
async fn test_same_writer_concurrent_appends_have_unique_writer_seq() {
let test_id = Uuid::new_v4();
let test_dir = std::path::PathBuf::from(format!(
"target/test-logs/test_same_writer_concurrent_appends_have_unique_writer_seq_{test_id}"
));
std::fs::create_dir_all(&test_dir).unwrap();

let test_stage_id = obzenflow_core::StageId::new();
let owner = obzenflow_core::JournalOwner::stage(test_stage_id);
let log_path = test_dir.join("same_writer_concurrent.log");
let log = Arc::new(DiskJournal::<ChainEvent>::with_owner(log_path, owner).unwrap());

let writer_id = WriterId::from(StageId::new());
let writer_key = writer_id.to_string();

let task_count: usize = 20;
let barrier = Arc::new(Barrier::new(task_count));

let mut handles = Vec::with_capacity(task_count);
for i in 0..task_count {
let log_clone = log.clone();
let barrier_clone = barrier.clone();
let handle = tokio::spawn(async move {
barrier_clone.wait().await;
let event = ChainEventFactory::data_event(
writer_id,
"concurrent.same_writer",
serde_json::json!({ "i": i }),
);
log_clone.append(event, None).await
});
handles.push(handle);
}

for handle in handles {
handle.await.unwrap().unwrap();
}

let events = log.read_causally_ordered().await.unwrap();
assert_eq!(events.len(), task_count);

let writer_seqs: Vec<u64> = events
.iter()
.map(|e| e.vector_clock.get(&writer_key))
.collect();

let expected: Vec<u64> = (1..=task_count as u64).collect();
assert_eq!(writer_seqs, expected);

std::fs::remove_dir_all(&test_dir).ok();
}
}
10 changes: 7 additions & 3 deletions crates/obzenflow_infra/src/journal/disk/replay_archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl DiskReplayArchive {
});
}

if is_newer_semver(&manifest.obzenflow_version, OBZENFLOW_VERSION) {
if !is_compatible_semver(&manifest.obzenflow_version, OBZENFLOW_VERSION) {
return Err(ReplayError::VersionMismatch {
archive_version: manifest.obzenflow_version.clone(),
current_version: OBZENFLOW_VERSION.to_string(),
Expand Down Expand Up @@ -337,14 +337,18 @@ fn derive_status_derivation_from_system_log(path: &Path) -> Result<StatusDerivat
})
}

fn is_newer_semver(archive_version: &str, current_version: &str) -> bool {
/// Check whether an archive's ObzenFlow version is compatible with the running
/// framework. Compatibility requires an exact major.minor match; patch-level
/// differences are tolerated. If either version string cannot be parsed the
/// check fails closed (incompatible).
fn is_compatible_semver(archive_version: &str, current_version: &str) -> bool {
let Some(archive) = parse_semver_triplet(archive_version) else {
return false;
};
let Some(current) = parse_semver_triplet(current_version) else {
return false;
};
archive > current
archive.0 == current.0 && archive.1 == current.1
}

fn parse_semver_triplet(version: &str) -> Option<(u64, u64, u64)> {
Expand Down
14 changes: 2 additions & 12 deletions crates/obzenflow_infra/src/journal/memory/memory_journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,20 +162,10 @@ impl<T: JournalEvent + 'static> Journal<T> for MemoryJournal<T> {

async fn read_causally_ordered(&self) -> Result<Vec<EventEnvelope<T>>, JournalError> {
let events = self.events.lock().unwrap();
let mut events_copy = events.clone();
let events_copy = events.clone();
drop(events);

// Sort by vector clock for causal ordering
events_copy.sort_by(|a, b| {
CausalOrderingService::causal_compare(&a.vector_clock, &b.vector_clock).unwrap_or_else(
|| {
// For concurrent events, use timestamp as tiebreaker
a.timestamp.cmp(&b.timestamp)
},
)
});

Ok(events_copy)
CausalOrderingService::order_envelopes_by_event_id(events_copy)
}

async fn read_causally_after(
Expand Down
Loading
Loading