Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,6 @@ where

// Check for stalls
let Some(last) = progress.last_read_instant else {
progress.last_read_instant = Some(now);
return;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,63 @@ async fn stall_append_failure_does_not_set_stalled_since() {
assert!(!reader_progress[0].contract_violated);
}

#[tokio::test]
async fn idle_reader_without_any_reads_does_not_emit_stall() {
tokio::time::pause();

let upstream_stage = StageId::new();
let upstream_owner = JournalOwner::stage(upstream_stage);
let upstream_journal: Arc<dyn Journal<ChainEvent>> = Arc::new(TestJournal::new(upstream_owner));
let upstreams = [(upstream_stage, "upstream".to_string(), upstream_journal)];

let mut subscription = UpstreamSubscription::new_with_names("test_owner", &upstreams)
.await
.unwrap();

let contract_stage = StageId::new();
let contract_owner = JournalOwner::stage(contract_stage);
let contract_journal: Arc<dyn Journal<ChainEvent>> = Arc::new(TestJournal::new(contract_owner));

let config = ContractConfig {
progress_min_events: Count(100),
progress_max_interval: DurationMs(10_000),
stall_threshold: DurationMs(100),
stall_cooloff: DurationMs(0),
stall_checks_before_emit: 1,
};

subscription = subscription.with_contracts(ContractsWiring {
writer_id: WriterId::from(contract_stage),
contract_journal: contract_journal.clone(),
config,
system_journal: None,
reader_stage: None,
control_middleware: Arc::new(NoControlMiddleware),
include_delivery_contract: false,
cycle_guard_config: None,
});

let mut reader_progress = [ReaderProgress::new(upstream_stage)];

for _ in 0..4 {
tokio::time::advance(std::time::Duration::from_millis(150)).await;
let status = subscription.check_contracts(&mut reader_progress).await;
assert!(
matches!(status, ContractStatus::Healthy),
"idle reader should stay healthy before any reads"
);
}

assert!(reader_progress[0].last_read_instant.is_none());
assert!(reader_progress[0].stalled_since.is_none());
assert!(!reader_progress[0].contract_violated);
assert!(contract_journal
.read_causally_ordered()
.await
.expect("read contract journal")
.is_empty());
}

#[tokio::test]
async fn multi_reader_progress_isolated_under_partial_append_failure() {
let upstream_a = StageId::new();
Expand Down
Loading