Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7445,22 +7445,6 @@ async fn test_abort_flow_after_task_finishes() {
.times(1)
.in_sequence(&mut evaluation_seq)
.returning(|_| Ok(TransformStatus::UpToDate));
// bar: after foo finishes task 0
mock_transform_flow_evaluator
.expect_evaluate_transform_status()
.times(1)
.in_sequence(&mut evaluation_seq)
.returning(|_| {
Ok(TransformStatus::NewInputDataAvailable {
input_advancements: vec![odf::metadata::ExecuteTransformInput {
dataset_id: odf::DatasetID::new_seeded_ed25519(b"foo"),
new_block_hash: Some(odf::Multihash::from_digest_sha3_256(b"foo-new-slice")),
prev_block_hash: Some(odf::Multihash::from_digest_sha3_256(b"foo-old-slice")),
prev_offset: Some(5),
new_offset: Some(8),
}],
})
});

let harness = FlowHarness::with_overrides(FlowHarnessOverrides {
mock_dataset_changes: Some(mock_dataset_changes),
Expand Down Expand Up @@ -7903,7 +7887,7 @@ async fn test_respect_last_success_time_for_derived_dataset_when_activate_config
let mut mock_dataset_changes = MockDatasetIncrementQueryService::new();
mock_dataset_changes
.expect_get_increment_between()
.times(5)
.times(3)
.returning(|_, _, _| {
Ok(MetadataChainIncrementInterval {
num_blocks: 1,
Expand Down
82 changes: 10 additions & 72 deletions src/domain/task-system/services/tests/tests/test_task_agent_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,12 @@ async fn test_run_single_task() {
let mut mock_outbox = MockOutbox::new();
TaskAgentHarness::add_outbox_task_expectations(&mut mock_outbox, TaskID::new(0));

// Expect logical plan runner to run probe
let mut mock_task_planner = MockTaskDefinitionPlanner::new();
let mut mock_task_runner = MockTaskRunner::new();
TaskAgentHarness::add_plan_probe_plan_expectations(
&mut mock_task_planner,
LogicalPlanProbe::default(),
1,
);
TaskAgentHarness::add_run_probe_plan_expectations(
&mut mock_task_runner,
LogicalPlanProbe::default(),
1,
);

// Schedule the only task
let harness = TaskAgentHarness::new(mock_outbox, mock_task_planner, mock_task_runner);
let harness = TaskAgentHarness::new(
mock_outbox,
MockTaskDefinitionPlanner::new(),
MockTaskRunner::new(),
);
let task_id = harness
.schedule_probe_task(LogicalPlanProbe::default())
.await;
Expand All @@ -123,22 +113,12 @@ async fn test_run_two_of_three_tasks() {
TaskAgentHarness::add_outbox_task_expectations(&mut mock_outbox, TaskID::new(0));
TaskAgentHarness::add_outbox_task_expectations(&mut mock_outbox, TaskID::new(1));

// Expect logical plan runner to run probe twice
let mut mock_task_planner = MockTaskDefinitionPlanner::new();
let mut mock_task_runner = MockTaskRunner::new();
TaskAgentHarness::add_plan_probe_plan_expectations(
&mut mock_task_planner,
LogicalPlanProbe::default(),
2,
);
TaskAgentHarness::add_run_probe_plan_expectations(
&mut mock_task_runner,
LogicalPlanProbe::default(),
2,
// Schedule 3 tasks (use real probe planner/runner registered in harness)
let harness = TaskAgentHarness::new(
mock_outbox,
MockTaskDefinitionPlanner::new(),
MockTaskRunner::new(),
);

// Schedule 3 tasks
let harness = TaskAgentHarness::new(mock_outbox, mock_task_planner, mock_task_runner);
let task_id_1 = harness
.schedule_probe_task(LogicalPlanProbe::default())
.await;
Expand Down Expand Up @@ -304,48 +284,6 @@ impl TaskAgentHarness {
.times(1)
.returning(|_, _, _| Ok(()));
}
fn add_plan_probe_plan_expectations(
mock_task_planner: &mut MockTaskDefinitionPlanner,
probe: LogicalPlanProbe,
times: usize,
) {
let probe_clone = probe.clone();

mock_task_planner
.expect_prepare_task_definition()
.withf(move |_task_id, plan| {
plan.plan_type == LogicalPlanProbe::TYPE_ID
&& LogicalPlanProbe::from_logical_plan(plan).unwrap() == probe_clone
})
.times(times)
.returning(move |_, _| {
Ok(TaskDefinition::new(TaskDefinitionProbe {
probe: probe.clone(),
}))
});
}

fn add_run_probe_plan_expectations(
mock_task_runner: &mut MockTaskRunner,
probe: LogicalPlanProbe,
times: usize,
) {
let probe_plan = probe.clone();

mock_task_runner
.expect_run_task()
.withf(move |td| {
td.downcast_ref::<TaskDefinitionProbe>()
.is_some_and(|task_probe| task_probe.probe == probe_plan)
})
.times(times)
.returning(move |_| {
Ok(probe
.end_with_outcome
.clone()
.unwrap_or(TaskOutcome::Success(TaskResult::empty())))
});
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading