From 005efa62c65c77effcc422784614eab53048a993 Mon Sep 17 00:00:00 2001 From: Dodecahedr0x Date: Wed, 20 May 2026 15:56:46 +0200 Subject: [PATCH 1/7] feat: coordination mode aware crank --- magicblock-task-scheduler/src/service.rs | 31 ++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/magicblock-task-scheduler/src/service.rs b/magicblock-task-scheduler/src/service.rs index fa0b19550..6ad4d9eeb 100644 --- a/magicblock-task-scheduler/src/service.rs +++ b/magicblock-task-scheduler/src/service.rs @@ -6,7 +6,9 @@ use std::{ use futures_util::StreamExt; use magicblock_config::config::TaskSchedulerConfig; -use magicblock_core::link::transactions::ScheduledTasksRx; +use magicblock_core::{ + coordination_mode::CoordinationMode, link::transactions::ScheduledTasksRx, +}; use magicblock_ledger::LatestBlock; use magicblock_program::{ args::{CancelTaskRequest, TaskRequest}, @@ -123,6 +125,20 @@ impl TaskSchedulerService { pub async fn start( mut self, ) -> TaskSchedulerResult>> { + if self.is_primary_mode().await { + self.load_persisted_tasks().await?; + Ok(tokio::spawn(self.run())) + } else { + debug!("Task scheduler on standby mode does not start"); + Ok(tokio::spawn(async move { Ok(()) })) + } + } + + async fn load_persisted_tasks(&mut self) -> TaskSchedulerResult<()> { + self.task_queue.clear(); + self.task_queue_keys.clear(); + self.task_execution_retries.clear(); + // Reschedule all tasks that are due let tasks = self.db.get_tasks().await?; let now = chrono::Utc::now().timestamp_millis(); @@ -159,7 +175,7 @@ impl TaskSchedulerService { self.task_queue_keys.insert(task_id, key); } - Ok(tokio::spawn(self.run())) + Ok(()) } async fn process_request( @@ -479,6 +495,17 @@ impl TaskSchedulerService { .await .map_err(Box::new)?) } + + /// Waits until the coordination mode is not StartingUp. + /// Should be fast because task scheduler is started after the ledger replay completes. + async fn is_primary_mode(&self) -> bool { + let mut mode = CoordinationMode::current(); + while mode == CoordinationMode::StartingUp { + tokio::time::sleep(Duration::from_millis(100)).await; + mode = CoordinationMode::current(); + } + mode == CoordinationMode::Primary + } } fn is_valid_task_interval(interval: i64) -> bool { From cb607bd2242ffd2b26a11317eee4f517c5d1a912 Mon Sep 17 00:00:00 2001 From: Dodecahedr0x Date: Wed, 20 May 2026 16:21:26 +0200 Subject: [PATCH 2/7] test: service does not start --- magicblock-task-scheduler/src/service.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/magicblock-task-scheduler/src/service.rs b/magicblock-task-scheduler/src/service.rs index 6ad4d9eeb..8dcf5f8f1 100644 --- a/magicblock-task-scheduler/src/service.rs +++ b/magicblock-task-scheduler/src/service.rs @@ -519,6 +519,7 @@ fn is_retryable_task_execution_error(error: &TaskSchedulerError) -> bool { #[cfg(test)] mod tests { + use magicblock_core::coordination_mode::switch_to_replica_mode; use magicblock_program::{ args::ScheduleTaskRequest, validator::generate_validator_authority_if_needed, @@ -729,4 +730,22 @@ mod tests { .unwrap(); handle.abort(); } + + #[tokio::test] + async fn test_task_scheduler_does_not_start_on_standby_mode() { + magicblock_core::logger::init_for_tests(); + switch_to_replica_mode(); + + let (_tx, rx) = mpsc::unbounded_channel(); + let db = SchedulerDatabase::new(":memory:").unwrap(); + let service = test_service(db.clone(), rx); + let handle = service.start().await.unwrap(); + + // Handle should join immediately because it's in standby mode + timeout(Duration::from_secs(1), async move { handle.await }) + .await + .unwrap() + .unwrap() + .unwrap(); + } } From 9bd4171aae9f9a2f892c62d161b8ff27f0343ab6 Mon Sep 17 00:00:00 2001 From: Dodecahedr0x Date: Wed, 20 May 2026 16:41:31 +0200 Subject: [PATCH 3/7] style: lint --- magicblock-task-scheduler/src/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/magicblock-task-scheduler/src/service.rs b/magicblock-task-scheduler/src/service.rs index 8dcf5f8f1..31591cec5 100644 --- a/magicblock-task-scheduler/src/service.rs +++ b/magicblock-task-scheduler/src/service.rs @@ -742,7 +742,7 @@ mod tests { let handle = service.start().await.unwrap(); // Handle should join immediately because it's in standby mode - timeout(Duration::from_secs(1), async move { handle.await }) + timeout(Duration::from_secs(1), handle) .await .unwrap() .unwrap() From 4a745a2797689aea9661110c9a78c84d8c9e218e Mon Sep 17 00:00:00 2001 From: Dodecahedr0x Date: Wed, 20 May 2026 16:43:05 +0200 Subject: [PATCH 4/7] feat: cancellation aware --- magicblock-task-scheduler/src/service.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/magicblock-task-scheduler/src/service.rs b/magicblock-task-scheduler/src/service.rs index 31591cec5..9951fe2cc 100644 --- a/magicblock-task-scheduler/src/service.rs +++ b/magicblock-task-scheduler/src/service.rs @@ -501,7 +501,10 @@ impl TaskSchedulerService { async fn is_primary_mode(&self) -> bool { let mut mode = CoordinationMode::current(); while mode == CoordinationMode::StartingUp { - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::select! { + _ = self.token.cancelled() => return false, + _ = tokio::time::sleep(Duration::from_millis(100)) => {} + } mode = CoordinationMode::current(); } mode == CoordinationMode::Primary From 22e83825204398fc09607955c467b66a8b6b977b Mon Sep 17 00:00:00 2001 From: Dodecahedr0x Date: Thu, 21 May 2026 10:34:08 +0200 Subject: [PATCH 5/7] feat: serialize tests --- Cargo.lock | 1 + magicblock-task-scheduler/Cargo.toml | 3 +++ magicblock-task-scheduler/src/service.rs | 12 +++++++++++- 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 32e18451a..c71c1d726 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4336,6 +4336,7 @@ dependencies = [ "magicblock-ledger", "magicblock-program", "rusqlite", + "serial_test", "solana-instruction 3.4.0", "solana-message 3.1.0", "solana-pubkey 3.0.0", diff --git a/magicblock-task-scheduler/Cargo.toml b/magicblock-task-scheduler/Cargo.toml index 0295a5f9d..fa0ede3d6 100644 --- a/magicblock-task-scheduler/Cargo.toml +++ b/magicblock-task-scheduler/Cargo.toml @@ -28,3 +28,6 @@ solana-transaction-error = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true, features = ["time"] } + +[dev-dependencies] +serial_test = { workspace = true } diff --git a/magicblock-task-scheduler/src/service.rs b/magicblock-task-scheduler/src/service.rs index 9951fe2cc..45d90a80a 100644 --- a/magicblock-task-scheduler/src/service.rs +++ b/magicblock-task-scheduler/src/service.rs @@ -522,11 +522,14 @@ fn is_retryable_task_execution_error(error: &TaskSchedulerError) -> bool { #[cfg(test)] mod tests { - use magicblock_core::coordination_mode::switch_to_replica_mode; + use magicblock_core::coordination_mode::{ + switch_to_primary_mode, switch_to_replica_mode, + }; use magicblock_program::{ args::ScheduleTaskRequest, validator::generate_validator_authority_if_needed, }; + use serial_test::serial; use solana_pubkey::Pubkey; use tokio::{sync::mpsc, time::timeout}; @@ -553,6 +556,7 @@ mod tests { } } + #[serial] #[tokio::test] async fn test_schedule_invalid_tasks() { magicblock_core::logger::init_for_tests(); @@ -603,6 +607,7 @@ mod tests { handle.abort(); } + #[serial] #[tokio::test] async fn test_remove_invalid_tasks_on_startup() { magicblock_core::logger::init_for_tests(); @@ -651,6 +656,7 @@ mod tests { handle.abort(); } + #[serial] #[tokio::test] async fn test_completed_tasks_are_removed_on_startup() { magicblock_core::logger::init_for_tests(); @@ -698,6 +704,7 @@ mod tests { handle.abort(); } + #[serial] #[tokio::test] async fn test_failed_records_are_cleaned_up_periodically() { magicblock_core::logger::init_for_tests(); @@ -734,6 +741,7 @@ mod tests { handle.abort(); } + #[serial] #[tokio::test] async fn test_task_scheduler_does_not_start_on_standby_mode() { magicblock_core::logger::init_for_tests(); @@ -744,6 +752,8 @@ mod tests { let service = test_service(db.clone(), rx); let handle = service.start().await.unwrap(); + switch_to_primary_mode(); + // Handle should join immediately because it's in standby mode timeout(Duration::from_secs(1), handle) .await From bc9162e058a572c14ad703f54dfb38b31a7ee3f1 Mon Sep 17 00:00:00 2001 From: Dodecahedr0x Date: Thu, 21 May 2026 11:36:17 +0200 Subject: [PATCH 6/7] fix: set primary for tests --- magicblock-task-scheduler/src/service.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/magicblock-task-scheduler/src/service.rs b/magicblock-task-scheduler/src/service.rs index 45d90a80a..2f18e2b75 100644 --- a/magicblock-task-scheduler/src/service.rs +++ b/magicblock-task-scheduler/src/service.rs @@ -560,6 +560,7 @@ mod tests { #[tokio::test] async fn test_schedule_invalid_tasks() { magicblock_core::logger::init_for_tests(); + switch_to_primary_mode(); generate_validator_authority_if_needed(); let (tx, rx) = mpsc::unbounded_channel(); @@ -611,6 +612,7 @@ mod tests { #[tokio::test] async fn test_remove_invalid_tasks_on_startup() { magicblock_core::logger::init_for_tests(); + switch_to_primary_mode(); let (_tx, rx) = mpsc::unbounded_channel(); let db = SchedulerDatabase::new(":memory:").unwrap(); @@ -660,6 +662,7 @@ mod tests { #[tokio::test] async fn test_completed_tasks_are_removed_on_startup() { magicblock_core::logger::init_for_tests(); + switch_to_primary_mode(); let (_tx, rx) = mpsc::unbounded_channel(); let db = SchedulerDatabase::new(":memory:").unwrap(); @@ -708,6 +711,7 @@ mod tests { #[tokio::test] async fn test_failed_records_are_cleaned_up_periodically() { magicblock_core::logger::init_for_tests(); + switch_to_primary_mode(); let (_tx, rx) = mpsc::unbounded_channel(); let db = SchedulerDatabase::new(":memory:").unwrap(); From b13a200be5377f6e6b8e445d8cc75d2319720205 Mon Sep 17 00:00:00 2001 From: Dodecahedr0x Date: Thu, 21 May 2026 15:46:12 +0200 Subject: [PATCH 7/7] fix: swap test names --- magicblock-task-scheduler/src/service.rs | 147 ++++++++++++----------- 1 file changed, 74 insertions(+), 73 deletions(-) diff --git a/magicblock-task-scheduler/src/service.rs b/magicblock-task-scheduler/src/service.rs index d5e6a7720..1d30a15e0 100644 --- a/magicblock-task-scheduler/src/service.rs +++ b/magicblock-task-scheduler/src/service.rs @@ -742,6 +742,20 @@ impl TaskSchedulerService { .unwrap_or(TASK_EXECUTION_RETRY_MAX_DELAY) .min(TASK_EXECUTION_RETRY_MAX_DELAY) } + + /// Waits until the coordination mode is not StartingUp. + /// Should be fast because task scheduler is started after the ledger replay completes. + async fn is_primary_mode(&self) -> bool { + let mut mode = CoordinationMode::current(); + while mode == CoordinationMode::StartingUp { + tokio::select! { + _ = self.token.cancelled() => return false, + _ = tokio::time::sleep(Duration::from_millis(100)) => {} + } + mode = CoordinationMode::current(); + } + mode == CoordinationMode::Primary + } } fn is_valid_task_interval(interval: i64) -> bool { @@ -758,20 +772,6 @@ fn next_execution_millis( } else { last_execution_millis + execution_interval_millis } - - /// Waits until the coordination mode is not StartingUp. - /// Should be fast because task scheduler is started after the ledger replay completes. - async fn is_primary_mode(&self) -> bool { - let mut mode = CoordinationMode::current(); - while mode == CoordinationMode::StartingUp { - tokio::select! { - _ = self.token.cancelled() => return false, - _ = tokio::time::sleep(Duration::from_millis(100)) => {} - } - mode = CoordinationMode::current(); - } - mode == CoordinationMode::Primary - } } fn delay_until_millis(execution_millis: i64, now: i64) -> Duration { @@ -1005,65 +1005,6 @@ mod tests { magicblock_core::logger::init_for_tests(); switch_to_primary_mode(); - let (_tx, rx) = mpsc::unbounded_channel(); - let db = SchedulerDatabase::new(":memory:").unwrap(); - - db.insert_failed_scheduling(1, "schedule failed".to_string()) - .await - .unwrap(); - db.insert_failed_task(2, "task failed".to_string()) - .await - .unwrap(); - tokio::time::sleep(Duration::from_millis(2)).await; - - let mut service = test_service(db.clone(), rx); - service.failed_task_retention = Duration::from_millis(1); - service.failed_task_cleanup_interval = Duration::from_millis(5); - - let handle = service.start().await.unwrap(); - - timeout(Duration::from_secs(1), async move { - loop { - if db.get_failed_schedulings().await?.is_empty() - && db.get_failed_tasks().await?.is_empty() - { - return Ok::<_, TaskSchedulerError>(()); - } - tokio::time::sleep(Duration::from_millis(5)).await; - } - }) - .await - .unwrap() - .unwrap(); - handle.abort(); - } - - #[serial] - #[tokio::test] - async fn test_task_scheduler_does_not_start_on_standby_mode() { - magicblock_core::logger::init_for_tests(); - switch_to_replica_mode(); - - let (_tx, rx) = mpsc::unbounded_channel(); - let db = SchedulerDatabase::new(":memory:").unwrap(); - let service = test_service(db.clone(), rx); - let handle = service.start().await.unwrap(); - - switch_to_primary_mode(); - - // Handle should join immediately because it's in standby mode - timeout(Duration::from_secs(1), handle) - .await - .unwrap() - .unwrap() - .unwrap(); - } - - #[serial] - #[tokio::test] - async fn test_failed_records_are_cleaned_up_periodically() { - magicblock_core::logger::init_for_tests(); - let (_tx, rx) = mpsc::unbounded_channel(); let db = SchedulerDatabase::new(":memory:").unwrap(); let authority = Pubkey::new_unique(); @@ -1110,4 +1051,64 @@ mod tests { assert_eq!(queued.updated_at, replacement.updated_at); assert_eq!(queued.executions_left, replacement.executions_left); } + + #[serial] + #[tokio::test] + async fn test_task_scheduler_does_not_start_on_standby_mode() { + magicblock_core::logger::init_for_tests(); + switch_to_replica_mode(); + + let (_tx, rx) = mpsc::unbounded_channel(); + let db = SchedulerDatabase::new(":memory:").unwrap(); + let service = test_service(db.clone(), rx); + let handle = service.start().await.unwrap(); + + switch_to_primary_mode(); + + // Handle should join immediately because it's in standby mode + timeout(Duration::from_secs(1), handle) + .await + .unwrap() + .unwrap() + .unwrap(); + } + + #[serial] + #[tokio::test] + async fn test_failed_records_are_cleaned_up_periodically() { + magicblock_core::logger::init_for_tests(); + switch_to_primary_mode(); + + let (_tx, rx) = mpsc::unbounded_channel(); + let db = SchedulerDatabase::new(":memory:").unwrap(); + + db.insert_failed_scheduling(1, "schedule failed".to_string()) + .await + .unwrap(); + db.insert_failed_task(2, "task failed".to_string()) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(2)).await; + + let mut service = test_service(db.clone(), rx); + service.failed_task_retention = Duration::from_millis(1); + service.failed_task_cleanup_interval = Duration::from_millis(5); + + let handle = service.start().await.unwrap(); + + timeout(Duration::from_secs(1), async move { + loop { + if db.get_failed_schedulings().await?.is_empty() + && db.get_failed_tasks().await?.is_empty() + { + return Ok::<_, TaskSchedulerError>(()); + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + }) + .await + .unwrap() + .unwrap(); + handle.abort(); + } }