From b482becb7eae28387cb638786402f9ec274d1eb4 Mon Sep 17 00:00:00 2001 From: Raphael Date: Wed, 19 Nov 2025 09:56:06 +0100 Subject: [PATCH 1/8] feat: init cron adapter --- adapter/cron/Cargo.toml | 15 +++++++ adapter/cron/src/main.rs | 87 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 adapter/cron/Cargo.toml create mode 100644 adapter/cron/src/main.rs diff --git a/adapter/cron/Cargo.toml b/adapter/cron/Cargo.toml new file mode 100644 index 0000000..041aa60 --- /dev/null +++ b/adapter/cron/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "cron" +version.workspace = true +edition.workspace = true + +[dependencies] +tokio = {workspace = true} +cron-parser = {workspace = true} +chrono = {workspace = true} +cron = {workspace = true} +async-nats = {workspace = true} +base = {workspace = true} +tucana = {workspace = true} +async-trait = {workspace = true} +anyhow = {workspace = true} \ No newline at end of file diff --git a/adapter/cron/src/main.rs b/adapter/cron/src/main.rs new file mode 100644 index 0000000..1351552 --- /dev/null +++ b/adapter/cron/src/main.rs @@ -0,0 +1,87 @@ +use std::str::FromStr; +use async_trait::async_trait; +use chrono::{Local, Utc}; +use cron::Schedule; +use base::extract_flow_setting_field; +use base::runner::{ServerContext, ServerRunner}; +use base::traits::{IdentifiableFlow, LoadConfig, Server}; + +#[derive(Default)] +struct Cron { +} + +#[derive(Clone)] +struct CronConfig {} + +impl LoadConfig for CronConfig { + fn load() -> Self { + Self {} + } +} + +#[tokio::main] +async fn main() { + let server = Cron::default(); + let runner = ServerRunner::new(server).await.unwrap(); + runner.serve().await.unwrap(); +} + +struct Time { + utc: Utc +} + +impl IdentifiableFlow for Time { + fn identify(&self, flow: &tucana::shared::ValidationFlow) -> bool { + let Some(minute) = extract_flow_setting_field(&flow.settings, "CRON_MINUTE", "minute") else { + return false; + }; + let Some(hour) = extract_flow_setting_field(&flow.settings, "CRON_MOUR", "hour") else { + return false; + }; + let Some(dom) = extract_flow_setting_field(&flow.settings, "CRON_DAY_OF_MONTH", "day_of_month") else { + return false; + }; + let Some(month) = extract_flow_setting_field(&flow.settings, "CRON_MONTH", "month") else { + return false; + }; + let Some(dow) = extract_flow_setting_field(&flow.settings, "CRON_DAY_OF_WEEK", "day_of_week") else { + return false; + }; + + let expression = format!("{} {} {} {} {}", minute, hour, dom, month, dow); + + + + + todo!() + } +} + +#[async_trait] +impl Server for Cron { + async fn init(&mut self, _ctx: &ServerContext) -> anyhow::Result<()> { + Ok(()) + } + + async fn run(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { + let expression = "0 * * * * *"; + let schedule = Schedule::from_str(expression).expect("Failed to parse CRON expression"); + + loop { + let now = Utc::now(); + if let Some(next) = schedule.upcoming(Utc).take(1).next() { + let until_next = next - now; + tokio::time::sleep(until_next.to_std()?).await; + println!( + "Running every minute. Current time: {}", + Local::now().format("%Y-%m-%d %H:%M:%S") + ); + } + } + Ok(()) + } + + async fn shutdown(&mut self, _ctx: &ServerContext) -> anyhow::Result<()> { + Ok(()) + } +} From ff4501f48647a4f527c0dc2702ef8cf7f3c6558e Mon Sep 17 00:00:00 2001 From: Raphael Date: Wed, 19 Nov 2025 09:56:48 +0100 Subject: [PATCH 2/8] feat: added execute function that differences between pub/sub and request/respond --- crates/base/src/store.rs | 55 +++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/crates/base/src/store.rs b/crates/base/src/store.rs index 872e440..274d4d8 100644 --- a/crates/base/src/store.rs +++ b/crates/base/src/store.rs @@ -6,8 +6,8 @@ use prost::Message; use tucana::shared::{ExecutionFlow, ValidationFlow, Value}; pub struct AdapterStore { - client: async_nats::Client, - kv: async_nats::jetstream::kv::Store, + pub client: async_nats::Client, + pub kv: async_nats::jetstream::kv::Store, } pub enum FlowIdentifyResult { @@ -56,7 +56,7 @@ impl AdapterStore { /// - id: The identifier to use for identifying the possible matches. Its just a fine grain identifier that can be used to identify the possible matches. For a REST Flow this will be the regex matcher, for a CRON Flow the trait just return true every time. /// /// Returns: - /// - FlowIdenfiyResult: The result of the flow identification process. This can be one of the following: + /// - FlowIdentifyResult: The result of the flow identification process. This can be one of the following: /// - None: No flows matched the identifier. /// - Single(ValidationFlow): A single flow matched the identifier. /// - Multiple(Vec): Multiple flows matched the identifier. @@ -101,6 +101,36 @@ impl AdapterStore { } } + pub async fn execute(&self, flow: ExecutionFlow, wait_for_result: bool) -> Option { + let uuid = uuid::Uuid::new_v4().to_string(); + let topic = format!("execution.{}", uuid); + let bytes = flow.encode_to_vec(); + + if wait_for_result { + match self.client.request(topic, bytes.into()).await { + Ok(message) => match Value::decode(message.payload) { + Ok(value) => Some(value), + Err(err) => { + log::error!("Failed to decode response from NATS server: {:?}", err); + None + } + }, + Err(err) => { + log::error!("Failed to send request to NATS server: {:?}", err); + None + } + } + } else { + match self.client.publish(topic, bytes.into()).await { + Ok(_) => None, + Err(err) => { + log::error!("Failed to send request to NATS server: {:?}", err); + None + } + } + } + } + /// validate_and_execute_flow /// /// This function will validate the flow. If the flow is valid, it will execute (send the flow to the execution and wait for a/multiple result/s) the flow. @@ -124,25 +154,8 @@ impl AdapterStore { }; } - let uuid = uuid::Uuid::new_v4().to_string(); let execution_flow: ExecutionFlow = Self::convert_validation_flow(flow, input_value); - let bytes = execution_flow.encode_to_vec(); - let topic = format!("execution.{}", uuid); - let result = self.client.request(topic, bytes.into()).await; - - match result { - Ok(message) => match Value::decode(message.payload) { - Ok(value) => Some(value), - Err(err) => { - log::error!("Failed to decode response from NATS server: {:?}", err); - None - } - }, - Err(err) => { - log::error!("Failed to send request to NATS server: {:?}", err); - None - } - } + self.execute(execution_flow, true).await } fn convert_validation_flow(flow: ValidationFlow, input_value: Option) -> ExecutionFlow { From bcbc9a5ae5d51c5b4eca8a7664a9848db83409dc Mon Sep 17 00:00:00 2001 From: Raphael Date: Wed, 19 Nov 2025 09:57:09 +0100 Subject: [PATCH 3/8] dependencies: added chrono, cron-parser and cron --- Cargo.lock | 144 +++++++++++++++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 5 +- 2 files changed, 143 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8e8d366..e43bd8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.18" @@ -253,7 +262,11 @@ version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ + "iana-time-zone", + "js-sys", "num-traits", + "wasm-bindgen", + "windows-link", ] [[package]] @@ -323,6 +336,41 @@ dependencies = [ "libc", ] +[[package]] +name = "cron" +version = "0.0.0" +dependencies = [ + "anyhow", + "async-nats", + "async-trait", + "base", + "chrono", + "cron 0.15.0", + "cron-parser", + "tokio", + "tucana", +] + +[[package]] +name = "cron" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5877d3fbf742507b66bc2a1945106bd30dd8504019d596901ddd012a4dd01740" +dependencies = [ + "chrono", + "once_cell", + "winnow", +] + +[[package]] +name = "cron-parser" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "702858ce99daf23d8822fb22ec363b641b4bdcd9704182211fc113b01870f6de" +dependencies = [ + "chrono", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -748,6 +796,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "iana-time-zone" +version = "0.1.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33e57f83510bb73707521ebaffa789ec8caf86f9657cad665b092b581d40e9fb" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icu_collections" version = "1.5.0" @@ -960,9 +1032,9 @@ checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" [[package]] name = "linux-raw-sys" -version = "0.9.3" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe7db12097d22ec582439daf8618b8fdd1a7bef6270e9af3b1ebcd30893cf413" +checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" [[package]] name = "litemap" @@ -1410,15 +1482,15 @@ dependencies = [ [[package]] name = "rustix" -version = "1.0.5" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d97817398dd4bb2e6da002002db259209759911da105da92bec29ccb12cf58bf" +checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" dependencies = [ "bitflags", "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2236,12 +2308,65 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -2333,6 +2458,15 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.6.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e90edd2ac1aa278a5c4599b1d89cf03074b610800f866d4026dc199d7929a28" +dependencies = [ + "memchr", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" diff --git a/Cargo.toml b/Cargo.toml index 622a623..18083bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["crates/http", "adapter/rest", "crates/base"] +members = ["crates/http", "adapter/rest", "crates/base", "adapter/cron"] resolver = "3" [workspace.package] @@ -22,6 +22,9 @@ anyhow = "1.0.98" prost = "0.14.0" tonic-health = "0.14.0" futures-lite = "2.6.1" +chrono = "0.4.42" +cron-parser = "0.11.0" +cron = "0.15.0" [workspace.dependencies.http] path = "../draco/crates/http" From 3b76c005093468e78e4edd352422253f1f36b4b3 Mon Sep 17 00:00:00 2001 From: Raphael Date: Wed, 19 Nov 2025 16:28:41 +0100 Subject: [PATCH 4/8] dependencies: removed cron-parser --- Cargo.lock | 11 ----------- Cargo.toml | 1 - adapter/cron/Cargo.toml | 2 -- 3 files changed, 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e43bd8e..5bcaf24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -341,12 +341,10 @@ name = "cron" version = "0.0.0" dependencies = [ "anyhow", - "async-nats", "async-trait", "base", "chrono", "cron 0.15.0", - "cron-parser", "tokio", "tucana", ] @@ -362,15 +360,6 @@ dependencies = [ "winnow", ] -[[package]] -name = "cron-parser" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "702858ce99daf23d8822fb22ec363b641b4bdcd9704182211fc113b01870f6de" -dependencies = [ - "chrono", -] - [[package]] name = "crypto-common" version = "0.1.6" diff --git a/Cargo.toml b/Cargo.toml index 18083bc..4124f3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,6 @@ prost = "0.14.0" tonic-health = "0.14.0" futures-lite = "2.6.1" chrono = "0.4.42" -cron-parser = "0.11.0" cron = "0.15.0" [workspace.dependencies.http] diff --git a/adapter/cron/Cargo.toml b/adapter/cron/Cargo.toml index 041aa60..0d348f5 100644 --- a/adapter/cron/Cargo.toml +++ b/adapter/cron/Cargo.toml @@ -5,10 +5,8 @@ edition.workspace = true [dependencies] tokio = {workspace = true} -cron-parser = {workspace = true} chrono = {workspace = true} cron = {workspace = true} -async-nats = {workspace = true} base = {workspace = true} tucana = {workspace = true} async-trait = {workspace = true} From 0af707686b8a99bc57126638a96069f6d4a6c2b5 Mon Sep 17 00:00:00 2001 From: Raphael Date: Wed, 19 Nov 2025 16:28:52 +0100 Subject: [PATCH 5/8] feat: implemented cron matching logic --- adapter/cron/src/main.rs | 39 ++++++++++++++++++---------- adapter/rest/src/main.rs | 2 +- crates/base/src/store.rs | 55 +++++++++++++++++++--------------------- 3 files changed, 53 insertions(+), 43 deletions(-) diff --git a/adapter/cron/src/main.rs b/adapter/cron/src/main.rs index 1351552..7d6f7d7 100644 --- a/adapter/cron/src/main.rs +++ b/adapter/cron/src/main.rs @@ -1,9 +1,10 @@ use std::str::FromStr; use async_trait::async_trait; -use chrono::{Local, Utc}; +use chrono::{DateTime, Datelike, Timelike, Utc}; use cron::Schedule; use base::extract_flow_setting_field; use base::runner::{ServerContext, ServerRunner}; +use base::store::FlowIdentifyResult; use base::traits::{IdentifiableFlow, LoadConfig, Server}; #[derive(Default)] @@ -27,7 +28,7 @@ async fn main() { } struct Time { - utc: Utc + now: DateTime } impl IdentifiableFlow for Time { @@ -35,7 +36,7 @@ impl IdentifiableFlow for Time { let Some(minute) = extract_flow_setting_field(&flow.settings, "CRON_MINUTE", "minute") else { return false; }; - let Some(hour) = extract_flow_setting_field(&flow.settings, "CRON_MOUR", "hour") else { + let Some(hour) = extract_flow_setting_field(&flow.settings, "CRON_HOUR", "hour") else { return false; }; let Some(dom) = extract_flow_setting_field(&flow.settings, "CRON_DAY_OF_MONTH", "day_of_month") else { @@ -48,12 +49,15 @@ impl IdentifiableFlow for Time { return false; }; - let expression = format!("{} {} {} {} {}", minute, hour, dom, month, dow); + let expression = format!("* {} {} {} {} {}", minute, hour, dom, month, dow); + let schedule = Schedule::from_str(expression.as_str()).unwrap(); + let next = schedule.upcoming(Utc).next().unwrap(); - - - - todo!() + self.now.year() == next.year() && + self.now.month() == next.month() && + self.now.day() == next.day() && + self.now.hour() == next.hour() && + self.now.minute() == next.minute() } } @@ -66,19 +70,28 @@ impl Server for Cron { async fn run(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { let expression = "0 * * * * *"; let schedule = Schedule::from_str(expression).expect("Failed to parse CRON expression"); + let pattern = "*.*.CRON.*"; loop { let now = Utc::now(); if let Some(next) = schedule.upcoming(Utc).take(1).next() { let until_next = next - now; tokio::time::sleep(until_next.to_std()?).await; - println!( - "Running every minute. Current time: {}", - Local::now().format("%Y-%m-%d %H:%M:%S") - ); + + let time = Time { now }; + match ctx.adapter_store.get_possible_flow_match(pattern.to_string(), time).await { + FlowIdentifyResult::None => {} + FlowIdentifyResult::Single(flow) => { + ctx.adapter_store.validate_and_execute_flow(flow, None, false).await; + } + FlowIdentifyResult::Multiple(flows) => { + for flow in flows { + ctx.adapter_store.validate_and_execute_flow(flow, None, false).await; + } + }, + } } } - Ok(()) } async fn shutdown(&mut self, _ctx: &ServerContext) -> anyhow::Result<()> { diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index 31eec45..56d7c21 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -96,7 +96,7 @@ async fn execute_flow( request: HttpRequest, store: Arc, ) -> Option { - match store.validate_and_execute_flow(flow, request.body).await { + match store.validate_and_execute_flow(flow, request.body, true).await { Some(result) => { let Value { kind: Some(StructValue(Struct { fields })), diff --git a/crates/base/src/store.rs b/crates/base/src/store.rs index 274d4d8..6c8ee84 100644 --- a/crates/base/src/store.rs +++ b/crates/base/src/store.rs @@ -101,10 +101,34 @@ impl AdapterStore { } } - pub async fn execute(&self, flow: ExecutionFlow, wait_for_result: bool) -> Option { + /// validate_and_execute_flow + /// + /// This function will validate the flow. If the flow is valid, it will execute (send the flow to the execution and wait for a/multiple result/s) the flow. + /// + /// Arguments: + /// - flow: The flow to be validated and executed. + /// - input_value: The input value to be used for the flow execution. + pub async fn validate_and_execute_flow( + &self, + flow: ValidationFlow, + input_value: Option, + wait_for_result: bool, + ) -> Option { + if let Some(body) = input_value.clone() { + let verify_result = verify_flow(flow.clone(), body); + + match verify_result { + Ok(()) => {} + Err(_err) => { + return None; + } + }; + } + + let execution_flow: ExecutionFlow = Self::convert_validation_flow(flow, input_value); let uuid = uuid::Uuid::new_v4().to_string(); let topic = format!("execution.{}", uuid); - let bytes = flow.encode_to_vec(); + let bytes = execution_flow.encode_to_vec(); if wait_for_result { match self.client.request(topic, bytes.into()).await { @@ -131,33 +155,6 @@ impl AdapterStore { } } - /// validate_and_execute_flow - /// - /// This function will validate the flow. If the flow is valid, it will execute (send the flow to the execution and wait for a/multiple result/s) the flow. - /// - /// Arguments: - /// - flow: The flow to be validated and executed. - /// - input_value: The input value to be used for the flow execution. - pub async fn validate_and_execute_flow( - &self, - flow: ValidationFlow, - input_value: Option, - ) -> Option { - if let Some(body) = input_value.clone() { - let verify_result = verify_flow(flow.clone(), body); - - match verify_result { - Ok(()) => {} - Err(_err) => { - return None; - } - }; - } - - let execution_flow: ExecutionFlow = Self::convert_validation_flow(flow, input_value); - self.execute(execution_flow, true).await - } - fn convert_validation_flow(flow: ValidationFlow, input_value: Option) -> ExecutionFlow { ExecutionFlow { flow_id: flow.flow_id, From 07f4211133206182c6663a3dc9f40411a8c79b7b Mon Sep 17 00:00:00 2001 From: Raphael Date: Wed, 19 Nov 2025 16:36:15 +0100 Subject: [PATCH 6/8] ref: cargo fmt --- adapter/cron/src/main.rs | 48 +++++++++++++++++++++++++--------------- adapter/rest/src/main.rs | 5 ++++- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/adapter/cron/src/main.rs b/adapter/cron/src/main.rs index 7d6f7d7..9d032a5 100644 --- a/adapter/cron/src/main.rs +++ b/adapter/cron/src/main.rs @@ -1,15 +1,14 @@ -use std::str::FromStr; use async_trait::async_trait; -use chrono::{DateTime, Datelike, Timelike, Utc}; -use cron::Schedule; use base::extract_flow_setting_field; use base::runner::{ServerContext, ServerRunner}; use base::store::FlowIdentifyResult; use base::traits::{IdentifiableFlow, LoadConfig, Server}; +use chrono::{DateTime, Datelike, Timelike, Utc}; +use cron::Schedule; +use std::str::FromStr; #[derive(Default)] -struct Cron { -} +struct Cron {} #[derive(Clone)] struct CronConfig {} @@ -28,24 +27,29 @@ async fn main() { } struct Time { - now: DateTime + now: DateTime, } impl IdentifiableFlow for Time { fn identify(&self, flow: &tucana::shared::ValidationFlow) -> bool { - let Some(minute) = extract_flow_setting_field(&flow.settings, "CRON_MINUTE", "minute") else { + let Some(minute) = extract_flow_setting_field(&flow.settings, "CRON_MINUTE", "minute") + else { return false; }; let Some(hour) = extract_flow_setting_field(&flow.settings, "CRON_HOUR", "hour") else { return false; }; - let Some(dom) = extract_flow_setting_field(&flow.settings, "CRON_DAY_OF_MONTH", "day_of_month") else { + let Some(dom) = + extract_flow_setting_field(&flow.settings, "CRON_DAY_OF_MONTH", "day_of_month") + else { return false; }; let Some(month) = extract_flow_setting_field(&flow.settings, "CRON_MONTH", "month") else { return false; }; - let Some(dow) = extract_flow_setting_field(&flow.settings, "CRON_DAY_OF_WEEK", "day_of_week") else { + let Some(dow) = + extract_flow_setting_field(&flow.settings, "CRON_DAY_OF_WEEK", "day_of_week") + else { return false; }; @@ -53,11 +57,11 @@ impl IdentifiableFlow for Time { let schedule = Schedule::from_str(expression.as_str()).unwrap(); let next = schedule.upcoming(Utc).next().unwrap(); - self.now.year() == next.year() && - self.now.month() == next.month() && - self.now.day() == next.day() && - self.now.hour() == next.hour() && - self.now.minute() == next.minute() + self.now.year() == next.year() + && self.now.month() == next.month() + && self.now.day() == next.day() + && self.now.hour() == next.hour() + && self.now.minute() == next.minute() } } @@ -79,16 +83,24 @@ impl Server for Cron { tokio::time::sleep(until_next.to_std()?).await; let time = Time { now }; - match ctx.adapter_store.get_possible_flow_match(pattern.to_string(), time).await { + match ctx + .adapter_store + .get_possible_flow_match(pattern.to_string(), time) + .await + { FlowIdentifyResult::None => {} FlowIdentifyResult::Single(flow) => { - ctx.adapter_store.validate_and_execute_flow(flow, None, false).await; + ctx.adapter_store + .validate_and_execute_flow(flow, None, false) + .await; } FlowIdentifyResult::Multiple(flows) => { for flow in flows { - ctx.adapter_store.validate_and_execute_flow(flow, None, false).await; + ctx.adapter_store + .validate_and_execute_flow(flow, None, false) + .await; } - }, + } } } } diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index 56d7c21..8c9310f 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -96,7 +96,10 @@ async fn execute_flow( request: HttpRequest, store: Arc, ) -> Option { - match store.validate_and_execute_flow(flow, request.body, true).await { + match store + .validate_and_execute_flow(flow, request.body, true) + .await + { Some(result) => { let Value { kind: Some(StructValue(Struct { fields })), From 81c8d0dde5629e0bf29d9204b22c1a8a55a845ab Mon Sep 17 00:00:00 2001 From: Raphael Date: Wed, 19 Nov 2025 21:04:18 +0100 Subject: [PATCH 7/8] docs: made cron job under construction --- README.md | 2 +- adapter/cron/src/main.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index afb6697..026092f 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ See: [Installation]() | HTTP | 🚧 | | MQTT | 📝 | | AMQP | 📝 | -| Cron-Jobs | 📝 | +| Cron-Jobs | 🚧 | **Legend:** - ✅ Done: Fully implemented and ready to use diff --git a/adapter/cron/src/main.rs b/adapter/cron/src/main.rs index 9d032a5..f29dda6 100644 --- a/adapter/cron/src/main.rs +++ b/adapter/cron/src/main.rs @@ -73,7 +73,7 @@ impl Server for Cron { async fn run(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { let expression = "0 * * * * *"; - let schedule = Schedule::from_str(expression).expect("Failed to parse CRON expression"); + let schedule = Schedule::from_str(expression)?; let pattern = "*.*.CRON.*"; loop { From 35f839ae12c5f5573642d082ecd94baac7ddd9f3 Mon Sep 17 00:00:00 2001 From: Raphael Date: Wed, 19 Nov 2025 21:05:38 +0100 Subject: [PATCH 8/8] docs: removed AMQP as a pland adapter, will be done someday --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index 026092f..6b829d5 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,6 @@ See: [Installation]() |----------|--------| | HTTP | 🚧 | | MQTT | 📝 | -| AMQP | 📝 | | Cron-Jobs | 🚧 | **Legend:**