Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ where
continue;
}

// Diagnostics-only checks are used by stages that intentionally defer
// authoritative EOF verification (e.g. sinks that flush receipts before
// running final contract checks). Once a reader has reached terminal EOF,
// stall detection is meaningless and can produce a spurious
// `reader_stalled` failure during shutdown.
if mode == ContractCheckMode::DiagnosticsOnly && self.state.is_reader_eof(index) {
continue;
}

// Continuous contract evaluation (FLOWIP-080r).
//
// We run `check_progress` independently of progress emission so that
Expand All @@ -91,12 +100,7 @@ where
self.check_progress_contracts_for_reader(progress, index, &mut status)
.await;

let should_emit_progress =
if mode == ContractCheckMode::DiagnosticsOnly && self.state.is_reader_eof(index) {
false
} else {
self.should_emit_progress(progress, index, now)
};
let should_emit_progress = self.should_emit_progress(progress, index, now);

if should_emit_progress {
self.emit_progress_for_reader(progress, index, now, &mut status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,77 @@ async fn diagnostics_only_eof_check_does_not_emit_final_or_latch_state() {
);
}

#[tokio::test]
async fn diagnostics_only_eof_check_does_not_emit_stall() {
let upstream_stage = StageId::new();
let upstream_owner = JournalOwner::stage(upstream_stage);
let upstream_journal: Arc<dyn Journal<ChainEvent>> = Arc::new(TestJournal::new(upstream_owner));
let upstreams = [(upstream_stage, "upstream".to_string(), upstream_journal)];

let mut subscription = UpstreamSubscription::new_with_names("test_owner", &upstreams)
.await
.unwrap();

let contract_stage = StageId::new();
let contract_owner = JournalOwner::stage(contract_stage);
let contract_journal: Arc<dyn Journal<ChainEvent>> = Arc::new(TestJournal::new(contract_owner));

let reader_stage = StageId::new();
let system_owner = JournalOwner::stage(reader_stage);
let system_journal: Arc<dyn Journal<SystemEvent>> = Arc::new(TestJournal::new(system_owner));

let config = ContractConfig {
progress_min_events: Count(100),
progress_max_interval: DurationMs(10_000),
stall_threshold: DurationMs(100),
stall_cooloff: DurationMs(0),
stall_checks_before_emit: 1,
};

subscription = subscription.with_contracts(ContractsWiring {
writer_id: WriterId::from(contract_stage),
contract_journal: contract_journal.clone(),
config,
system_journal: Some(system_journal.clone()),
reader_stage: Some(reader_stage),
control_middleware: Arc::new(NoControlMiddleware),
include_delivery_contract: true,
cycle_guard_config: None,
});

subscription.state.mark_reader_eof(0);

let mut reader_progress = [ReaderProgress::new(upstream_stage)];
reader_progress[0].reader_seq = SeqNo(1);
reader_progress[0].receipted_seq = SeqNo(1);
reader_progress[0].advertised_writer_seq = Some(SeqNo(1));
reader_progress[0].last_read_instant =
Some(Instant::now() - std::time::Duration::from_millis(250));

let status = subscription
.check_contracts_diagnostics_only(&mut reader_progress)
.await;

assert!(
matches!(status, ContractStatus::Healthy),
"EOF readers must not be stall-checked in diagnostics-only mode"
);
assert_eq!(reader_progress[0].consecutive_stall_checks, 0);
assert!(reader_progress[0].stalled_since.is_none());
assert!(!reader_progress[0].contract_violated);

assert!(contract_journal
.read_causally_ordered()
.await
.expect("read contract journal")
.is_empty());
assert!(system_journal
.read_causally_ordered()
.await
.expect("read system journal")
.is_empty());
}

#[tokio::test]
async fn contract_status_append_failure_keeps_final_emitted_false() {
let upstream_stage = StageId::new();
Expand Down
Loading