From 0d40ecd1be8cb57bf92e42e42a72fc1accefd337 Mon Sep 17 00:00:00 2001 From: MT <59728838+mt-empty@users.noreply.github.com> Date: Sun, 27 Aug 2023 01:08:53 +1000 Subject: [PATCH 1/6] init grpc --- Cargo.toml | 24 ++++- build.rs | 4 + proto/grpc.proto | 14 +++ src/cli.rs | 34 +++--- src/{engine => components}/event.rs | 89 ++++++++++++++-- src/{engine => components}/task.rs | 158 +++++++++++++++++++++++++--- src/engine.rs | 133 +++++++++++------------ src/engine_utils.rs | 84 +++++++++++++++ src/lib.rs | 6 +- src/main.rs | 8 -- 10 files changed, 439 insertions(+), 115 deletions(-) create mode 100644 build.rs create mode 100644 proto/grpc.proto rename src/{engine => components}/event.rs (63%) rename src/{engine => components}/task.rs (50%) create mode 100644 src/engine_utils.rs delete mode 100644 src/main.rs diff --git a/Cargo.toml b/Cargo.toml index b309509..16eb106 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,18 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[[bin]] # Bin to run the engine +name = "event" +path = "src/components/event.rs" + +[[bin]] # Bin to run the engine +name = "task" +path = "src/components/task.rs" + +[[bin]] # Bin to run the engine +name = "cli" +path = "src/cli.rs" + [dependencies] serde_json = { version = "1.0.104", features = ["preserve_order"] } clap = { version = "4.3.5", features = ["derive"] } @@ -24,4 +36,14 @@ diesel = { version = "2.1.0", features = ["chrono", "postgres"] } diesel_migrations = { version = "2.1.0", features = ["postgres"] } tracing = "0.1.37" prettytable-rs = "^0.10.0" -pnet = "0.34.0" \ No newline at end of file +pnet = "0.34.0" +tonic = "0.9.2" +prost = "0.11.9" +tokio = { version = "1.32", features = ["rt-multi-thread", "macros", "sync", "time"] } +# futures-util-preview = "0.2.2" +rand = "0.8.5" +async-stream = "0.3.5" +tokio-stream = "0.1.14" + +[build-dependencies] +tonic-build = "0.9" \ No newline at end of file diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..f3dd018 --- /dev/null +++ b/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/grpc.proto")?; + Ok(()) +} diff --git a/proto/grpc.proto b/proto/grpc.proto new file mode 100644 index 0000000..3f63234 --- /dev/null +++ b/proto/grpc.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; +package grpc; + +service OutputStreaming { + rpc StreamOutput(OutputChunk) returns ( stream Response); +} + +message OutputChunk { + string content = 1; +} + +message Response { + string message = 1; +} \ No newline at end of file diff --git a/src/cli.rs b/src/cli.rs index ad7d720..e8b6fd4 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -9,8 +9,7 @@ use std::env; use std::fs::File; use std::process::Command; use tracing::field; -use workflow::engine::{create_new_engine_entry, handle_stop, run_event_process}; -use workflow::engine::{run_task_process, update_engine_status}; +use workflow::engine_utils::{create_new_engine_entry, handle_stop, update_engine_status}; use workflow::models::{Engine, EngineStatus, Event, Task}; use workflow::parser::process_yaml_file; use workflow::utils::establish_pg_connection; @@ -109,7 +108,7 @@ fn create_and_clear_log_file(file_path: &str) -> Result { } fn start_process( - subcommand_name: &str, + binary_name: &str, process_type: ProcessType, engine_uid: i32, ) -> Result<(), AnyError> { @@ -141,8 +140,8 @@ fn start_process( } let command = binding - .arg(subcommand_name) - .arg("--") + .arg("--bin") + .arg(binary_name) .arg(engine_uid.to_string()) .stdout(stdout) .stderr(stderr); @@ -174,17 +173,17 @@ pub fn cli() { } Commands::StartEventProcess { engine_uid } => { println!("StartEventProcess"); - if let Err(e) = run_event_process(*engine_uid) { - println!("Failed to start event process, {}", e); - std::process::exit(1); - }; + // if let Err(e) = run_event_process(*engine_uid) { + // println!("Failed to start event process, {}", e); + // std::process::exit(1); + // }; } Commands::StartTaskProcess { engine_uid } => { println!("StartTaskProcess"); - if let Err(e) = run_task_process(*engine_uid) { - println!("Failed to start task process, {}", e); - std::process::exit(1); - }; + // if let Err(e) = run_task_process(*engine_uid) { + // println!("Failed to start task process, {}", e); + // std::process::exit(1); + // }; } Commands::Stop {} => { println!("Stopping the engine"); @@ -262,13 +261,13 @@ fn process_start_command() -> Result<(), AnyError> { )?; println!("created new engine entry with uid: {}", engine_uid); - if let Err(e) = start_process("start-event-process", ProcessType::Event, engine_uid) { + if let Err(e) = start_process("event", ProcessType::Event, engine_uid) { eprintln!("Failed to start Event process: {}", e); eprintln!("exiting..."); std::process::exit(1); } - if let Err(e) = start_process("start-task-process", ProcessType::Task, engine_uid) { + if let Err(e) = start_process("task", ProcessType::Task, engine_uid) { eprintln!("Failed to start Task process: {}", e); eprintln!("exiting..."); std::process::exit(1); @@ -416,6 +415,11 @@ fn list_items(items: Vec) -> Result<(), AnyError> { Ok(()) } +fn main() -> Result<(), AnyError> { + cli(); + Ok(()) +} + // fn is_redis_running() -> bool { // let redis_result = create_redis_connection(); // if let Err(e) = redis_result { diff --git a/src/engine/event.rs b/src/components/event.rs similarity index 63% rename from src/engine/event.rs rename to src/components/event.rs index 9d01718..a4ad1f1 100644 --- a/src/engine/event.rs +++ b/src/components/event.rs @@ -1,6 +1,3 @@ -use crate::models::{EventStatus, LightEvent, LightTask, ProcessStatus}; -use crate::schema; -use crate::utils::{establish_pg_connection, push_tasks_to_queue}; use anyhow::Error as AnyError; use diesel::prelude::*; use std::path::Path; @@ -8,13 +5,57 @@ use std::process::Command as ShellCommand; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use std::{str, thread}; +use std::{env, str, thread}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; +use workflow::engine_utils::run_process; +use workflow::models::{EventStatus, LightEvent, LightTask, ProcessStatus}; +use workflow::schema; +use workflow::utils::{establish_pg_connection, push_tasks_to_queue}; + +pub mod grpc { + tonic::include_proto!("grpc"); +} +use grpc::output_streaming_server::{OutputStreaming, OutputStreamingServer}; +use grpc::{OutputChunk, Response as GrpcResponse}; + +#[derive(Debug)] +pub struct OutputStreamer { + // an stdout pipe steam + features: Arc>, +} + +#[tonic::async_trait] +impl OutputStreaming for OutputStreamer { + type StreamOutputStream = ReceiverStream>; + + async fn stream_output( + &self, + request: Request, + ) -> Result, Status> { + let (mut tx, rx) = mpsc::channel(4); + let features = self.features.clone(); + + // Spawn an async task to send the output data to the client + + tokio::spawn(async move { + for feature in &features[..] { + tx.send(Ok(feature.clone())).await.unwrap(); + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} pub fn poll_events(running: Arc, engine_uid: i32) -> Result<(), AnyError> { let mut event_uids: Vec = Vec::new(); let pg_conn = &mut establish_pg_connection(); - use crate::schema::engines::dsl::*; + use workflow::schema::engines::dsl::*; diesel::update(engines) .filter(uid.eq(engine_uid)) @@ -91,14 +132,14 @@ fn execute_event(event: LightEvent) -> Result<(), AnyError> { .expect("failed to execute process"); // if shell command return 0, then the event was triggered successfully - use crate::schema::events::dsl::*; + use workflow::schema::events::dsl::*; if output.status.code().unwrap() == 0 { diesel::update(events.find(event.uid)) .set(status.eq(EventStatus::Succeeded.to_string())) .execute(conn)?; { - use crate::schema::tasks::dsl::*; + use workflow::schema::tasks::dsl::*; let light_tasks: Vec = tasks .select(LightTask::as_select()) .filter(event_uid.eq(event.uid)) @@ -132,3 +173,37 @@ fn execute_event(event: LightEvent) -> Result<(), AnyError> { println!("##############################################"); Ok(()) } + +pub fn run_event_process(engine_uid: i32) -> Result<(), AnyError> { + run_process("Event", poll_events, engine_uid) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args: Vec = env::args().collect(); + println!("args: {:?}", args); + + let engine_uid = args[1].parse::().unwrap(); + println!("engine_uid: {}", engine_uid); + + tokio::spawn(async move { + if let Err(e) = run_event_process(engine_uid) { + println!("Failed to start event process, {}", e); + std::process::exit(1); + }; + }); + + let addr = "[::1]:10001".parse().unwrap(); + + let stream = OutputStreamer { + features: Arc::new(vec![GrpcResponse { + message: "Hello".into(), + }]), + }; + + let svc = OutputStreamingServer::new(stream); + + Server::builder().add_service(svc).serve(addr).await?; + + Ok(()) +} diff --git a/src/engine/task.rs b/src/components/task.rs similarity index 50% rename from src/engine/task.rs rename to src/components/task.rs index d51cfa9..5f967ef 100644 --- a/src/engine/task.rs +++ b/src/components/task.rs @@ -1,16 +1,62 @@ -use crate::models::{Engine, EngineStatus, LightTask, ProcessStatus, TaskStatus}; -use crate::utils::{self, create_redis_connection, establish_pg_connection}; use anyhow::Error as AnyError; use bincode::deserialize; use diesel::prelude::*; +use rand::seq::IteratorRandom; use rayon::ThreadPoolBuilder; use redis::{Commands as RedisCommand, FromRedisValue}; +use tonic::server::ServerStreamingService; +// use std::io::BufReader; use std::path::Path; -use std::process::Command as ShellCommand; +use std::pin::Pin; +use std::process::{ChildStdout, Command as ShellCommand, Stdio}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use std::{str, thread}; +use std::{env, str, thread}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; +use workflow::engine_utils::run_process; +use workflow::models::{LightTask, ProcessStatus, TaskStatus}; +use workflow::utils::{self, create_redis_connection, establish_pg_connection}; + +pub mod grpc { + tonic::include_proto!("grpc"); +} +// use grpc::output_streaming_client::OutputStreamingClient; +use grpc::output_streaming_server::{OutputStreaming, OutputStreamingServer}; +use grpc::{OutputChunk, Response as GrpcResponse}; + +#[derive(Debug)] +pub struct OutputStreamer { + // an stdout pipe steam + features: Arc>, +} + +#[tonic::async_trait] +impl OutputStreaming for OutputStreamer { + type StreamOutputStream = ReceiverStream>; + + async fn stream_output( + &self, + request: Request, + ) -> Result, Status> { + let (mut tx, rx) = mpsc::channel(4); + let features = self.features.clone(); + + // Spawn an async task to send the output data to the client + + tokio::spawn(async move { + for feature in &features[..] { + tx.send(Ok(feature.clone())).await.unwrap(); + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} const THREAD_COUNT: usize = 4; @@ -19,7 +65,7 @@ pub fn queue_processor(running: Arc, engine_uid: i32) -> Result<(), let pg_conn = &mut establish_pg_connection(); let mut redis_con = create_redis_connection()?; - use crate::schema::engines::dsl::*; + use workflow::schema::engines::dsl::*; diesel::update(engines) .filter(uid.eq(engine_uid)) @@ -36,9 +82,10 @@ pub fn queue_processor(running: Arc, engine_uid: i32) -> Result<(), thread_pool.spawn(move || { let task: LightTask = deserialize(popped_value.as_bytes()).unwrap(); println!("Task: {}", task); - if let Err(e) = execute_task(task) { - println!("Failed to execute task {}", e); - }; + let future = execute_task(task); + // if let Err(e) = execute_task(task) { + // println!("Failed to execute task {}", e); + // }; }); } None => { @@ -83,13 +130,12 @@ pub fn queue_processor(running: Arc, engine_uid: i32) -> Result<(), Ok(()) } -fn execute_task(task: LightTask) -> Result<(), AnyError> { +async fn execute_task(task: LightTask) -> Result<(), AnyError> { println!("Task Executor"); - use crate::schema::tasks::dsl::*; + use workflow::schema::tasks::dsl::*; let conn = &mut establish_pg_connection(); - // todo , update task status to running - + // let mut client = OutputStreamingClient::connect("http://[::1]:50051").await?; diesel::update(tasks.find(task.uid)) .set(( status.eq(TaskStatus::Running.to_string()), @@ -103,12 +149,62 @@ fn execute_task(task: LightTask) -> Result<(), AnyError> { }; let path_dirname = Path::new(&task.path).parent().unwrap(); - let output = ShellCommand::new("bash") + let mut cmd = ShellCommand::new("bash") .arg(path_basename) .current_dir(path_dirname) - .output() + .stdout(Stdio::piped()) + .spawn() .expect("failed to execute process"); + let stdout_content = cmd + .stdout + .take() + .expect("Could not capture standard output"); + // let reader = tokio::io::BufReader::new(stdout_content.into()); + + // let mut client_stream = client + // .stream_output(Request::new(OutputChunk { + // content: "Tonic".into(), + // })) + // .await? + // .into_inner(); + + // tokio::spawn(async move { + // let mut buf = String::new(); + // let mut reader = reader; + // loop { + // buf.clear(); + // if reader.read_line(&mut buf).await.unwrap_or(0) == 0 { + // break; + // } + + // let request = Request::new(grpc::OutputChunk { + // content: buf.clone(), + // }); + + // if let Err(_) = client_stream.send(request).await { + // break; + // } + // } + // }); + + let output = cmd.wait_with_output().expect("Failed to wait for command"); + + println!( + "task id: {} , path: {}\nFinished executing with a status: {}", + task.uid, task.path, output.status + ); + + // let stdout_content = str::from_utf8(stdout_content)?; + + // for chunk in stdout_content.chars().collect::>().chunks(10) { + // let request = tonic::Request::new(mygrpc::OutputChunk { + // content: chunk.iter().collect(), + // }); + + // client.stream_output(request).await?; + // } + if output.status.code().unwrap() == 0 { diesel::update(tasks.find(task.uid)) .set(( @@ -147,3 +243,37 @@ fn execute_task(task: LightTask) -> Result<(), AnyError> { println!("##############################################"); Ok(()) } + +pub fn run_task_process(engine_uid: i32) -> Result<(), AnyError> { + run_process("Task", queue_processor, engine_uid) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args: Vec = env::args().collect(); + println!("args: {:?}", args); + + let engine_uid = args[1].parse::().unwrap(); + println!("engine_uid: {}", engine_uid); + + tokio::spawn(async move { + if let Err(e) = run_task_process(engine_uid) { + println!("Failed to start event process, {}", e); + std::process::exit(1); + }; + }); + + let addr = "[::1]:10000".parse().unwrap(); + + let stream = OutputStreamer { + features: Arc::new(vec![GrpcResponse { + message: "Hello".into(), + }]), + }; + + let svc = OutputStreamingServer::new(stream); + + Server::builder().add_service(svc).serve(addr).await?; + + Ok(()) +} diff --git a/src/engine.rs b/src/engine.rs index bbbaeff..570f1b0 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,86 +1,81 @@ -use crate::models::EngineStatus; -use crate::utils::establish_pg_connection; -use crate::{models, schema}; use anyhow::Error as AnyError; -use ctrlc::set_handler; -use diesel::PgConnection; -use std::str; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::env; +use workflow::components::event::poll_events; +use workflow::components::task::queue_processor; +use workflow::engine_utils::run_process; +// mod models; +// mod utils; use diesel::prelude::*; -use self::event::poll_events; -use self::task::queue_processor; +// I want to use the poll_events function from src/event.rs -mod event; -mod task; +use tonic::{transport::Server, Request, Response, Status}; -fn run_process(process_name: &str, process_fn: F, engine_uid: i32) -> Result<(), AnyError> -where - F: FnOnce(Arc, i32) -> Result<(), AnyError>, -{ - let running = Arc::new(AtomicBool::new(true)); - let r = running.clone(); +use grpc::greeter_server::{Greeter, GreeterServer}; +use grpc::{LogMessageRequest, LogMessageResponse}; - set_handler(move || { - r.store(false, Ordering::SeqCst); - }) - .expect("Error setting Ctrl-C handler"); - - if let Err(e) = process_fn(running, engine_uid) { - eprintln!("Failed to start {} process: {}", process_name, e); - eprintln!("exiting..."); - std::process::exit(1); - } - println!("{} process stopped correctly", process_name); - - Ok(()) +pub mod grpc { + tonic::include_proto!("grpc"); } -pub fn run_task_process(engine_uid: i32) -> Result<(), AnyError> { - run_process("Task", queue_processor, engine_uid) -} +// pub fn run_task_process(engine_uid: i32) -> Result<(), AnyError> { +// run_process("Task", queue_processor, engine_uid) +// } -pub fn run_event_process(engine_uid: i32) -> Result<(), AnyError> { - run_process("Event", poll_events, engine_uid) -} +// pub fn run_event_process(engine_uid: i32) -> Result<(), AnyError> { +// run_process("Event", poll_events, engine_uid) +// } -pub fn handle_stop() -> Result<(), AnyError> { - diesel::update(schema::engines::table) - .set(schema::engines::stop_signal.eq(true)) - .execute(&mut establish_pg_connection())?; - Ok(()) -} +#[derive(Debug, Default)] +pub struct MyGreeter {} -pub fn update_engine_status( - conn: &mut PgConnection, - engine_uid: i32, - engine_status: EngineStatus, -) -> Result<(), diesel::result::Error> { - use crate::schema::engines::dsl::*; +#[tonic::async_trait] +impl Greeter for MyGreeter { + async fn say_hello( + &self, + request: Request, + ) -> Result, Status> { + println!("Got a request: {:?}", request); - diesel::update(engines) - .filter(uid.eq(engine_uid)) - .set(status.eq(engine_status.to_string())) - .execute(conn)?; + let reply = grpc::LogMessageResponse { + message: format!("Hello {}!", request.into_inner().content).into(), + }; - Ok(()) + Ok(Response::new(reply)) + } } -pub fn create_new_engine_entry( - conn: &mut PgConnection, - name: &str, - ip_address: &str, -) -> Result { - use crate::schema::engines::table as engines; - use crate::schema::engines::uid as engine_uid; - - let new_engine = models::NewEngine { name, ip_address }; - - //insert and return uid - diesel::insert_into(engines) - .values(&new_engine) - .returning(engine_uid) - .get_result::(conn) +//main function that takes an argument +#[tokio::main] +async fn main() -> Result<(), Box> { + let args: Vec = env::args().collect(); + println!("args: {:?}", args); + + let engine_uid = args[1].parse::().unwrap(); + println!("engine_uid: {}", engine_uid); + + // tokio::spawn(async move { + // if let Err(e) = run_event_process(engine_uid) { + // println!("Failed to start event process, {}", e); + // std::process::exit(1); + // }; + // }); + + // tokio::spawn(async move { + // if let Err(e) = run_task_process(engine_uid) { + // println!("Failed to start event process, {}", e); + // std::process::exit(1); + // }; + // }); + + let addr = "[::1]:50051".parse()?; + let greeter = MyGreeter::default(); + + Server::builder() + .add_service(GreeterServer::new(greeter)) + .serve(addr) + .await?; + + Ok(()) } diff --git a/src/engine_utils.rs b/src/engine_utils.rs new file mode 100644 index 0000000..6af005c --- /dev/null +++ b/src/engine_utils.rs @@ -0,0 +1,84 @@ +use anyhow::Error as AnyError; +use ctrlc::set_handler; +use diesel::PgConnection; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::{env, str}; +// use workflow::components::event::poll_events; +// use workflow::components::task::queue_processor; +use crate::models::{EngineStatus, NewEngine}; +use crate::schema; +use crate::utils::establish_pg_connection; +// mod models; +// mod utils; + +use diesel::prelude::*; + +pub fn run_process(process_name: &str, process_fn: F, engine_uid: i32) -> Result<(), AnyError> +where + F: FnOnce(Arc, i32) -> Result<(), AnyError>, +{ + let running = Arc::new(AtomicBool::new(true)); + let r = running.clone(); + + set_handler(move || { + r.store(false, Ordering::SeqCst); + }) + .expect("Error setting Ctrl-C handler"); + + if let Err(e) = process_fn(running, engine_uid) { + eprintln!("Failed to start {} process: {}", process_name, e); + eprintln!("exiting..."); + std::process::exit(1); + } + println!("{} process stopped correctly", process_name); + + Ok(()) +} + +// pub fn run_task_process(engine_uid: i32) -> Result<(), AnyError> { +// run_process("Task", queue_processor, engine_uid) +// } + +// pub fn run_event_process(engine_uid: i32) -> Result<(), AnyError> { +// run_process("Event", poll_events, engine_uid) +// } + +pub fn handle_stop() -> Result<(), AnyError> { + diesel::update(schema::engines::table) + .set(schema::engines::stop_signal.eq(true)) + .execute(&mut establish_pg_connection())?; + Ok(()) +} + +pub fn update_engine_status( + conn: &mut PgConnection, + engine_uid: i32, + engine_status: EngineStatus, +) -> Result<(), diesel::result::Error> { + use crate::schema::engines::dsl::*; + + diesel::update(engines) + .filter(uid.eq(engine_uid)) + .set(status.eq(engine_status.to_string())) + .execute(conn)?; + + Ok(()) +} + +pub fn create_new_engine_entry( + conn: &mut PgConnection, + name: &str, + ip_address: &str, +) -> Result { + use crate::schema::engines::table as engines; + use crate::schema::engines::uid as engine_uid; + + let new_engine = NewEngine { name, ip_address }; + + //insert and return uid + diesel::insert_into(engines) + .values(&new_engine) + .returning(engine_uid) + .get_result::(conn) +} diff --git a/src/lib.rs b/src/lib.rs index 6379791..d349bed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,8 @@ -pub mod engine; +// pub mod components { +// pub mod event; +// pub mod task; +// } +pub mod engine_utils; pub mod models; pub mod parser; pub mod schema; diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index a9e1011..0000000 --- a/src/main.rs +++ /dev/null @@ -1,8 +0,0 @@ -use anyhow::Error as AnyError; - -mod cli; - -fn main() -> Result<(), AnyError> { - cli::cli(); - Ok(()) -} From f5f30a8ca400c13c49dc99dfbde903b97e82a564 Mon Sep 17 00:00:00 2001 From: MT <59728838+mt-empty@users.noreply.github.com> Date: Sun, 27 Aug 2023 23:56:39 +1000 Subject: [PATCH 2/6] implemented grpc server and client along with new log subcommand --- src/cli.rs | 61 ++++++++++++++++++++++++++++++++++++++--- src/components/event.rs | 2 +- src/components/task.rs | 2 +- 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index e8b6fd4..d47cc5a 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, Error as AnyError, Result}; +use anyhow::{anyhow, Error as AnyError, Ok, Result}; use clap::{Parser, Subcommand}; use diesel::{ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, SelectableHelper}; use dotenv::dotenv; @@ -40,6 +40,10 @@ enum Commands { StartEventProcess { engine_uid: i32, }, + Logs { + #[clap(subcommand)] + subcommand: LogsSubcommands, + }, Stop {}, /// Adds workflow to the queue Add { @@ -95,6 +99,16 @@ enum ShowSubcommands { Engine { uid: i32 }, } +#[derive(Subcommand)] +enum LogsSubcommands { + // Lists all tasks + Task { uid: i32 }, + // Lists all events + Event { uid: i32 }, + // Lists all engines + // Engine { uid: i32 }, +} + #[derive(PartialEq)] enum ProcessType { Task, @@ -150,7 +164,7 @@ fn start_process( Ok(()) } -pub fn cli() { +pub async fn cli() { let cli = Cli::parse(); match &cli.command { @@ -185,6 +199,13 @@ pub fn cli() { // std::process::exit(1); // }; } + Commands::Logs { subcommand } => { + println!("Logs"); + if let Err(e) = process_log_command(subcommand).await { + println!("Failed to stop the engine, {}", e); + std::process::exit(1); + }; + } Commands::Stop {} => { println!("Stopping the engine"); //todo: handle stop for multiple engines @@ -228,6 +249,36 @@ pub fn cli() { std::process::exit(0); } +async fn process_log_command(subcommand: &LogsSubcommands) -> Result<(), AnyError> { + match subcommand { + LogsSubcommands::Task { uid } => show_log(uid.to_string()).await?, + LogsSubcommands::Event { uid } => show_log(uid.to_string()).await?, + }; + Ok(()) +} +pub mod grpc { + tonic::include_proto!("grpc"); +} +use grpc::output_streaming_client::OutputStreamingClient; +use grpc::{OutputChunk, Response as GrpcResponse}; + +use std::error::Error; +use tonic::transport::Channel; + +async fn show_log(server_id: String) -> Result<(), AnyError> { + let mut client = OutputStreamingClient::connect("http://[::1]:10000").await?; + + let mut stream = client + .stream_output(OutputChunk::default()) + .await? + .into_inner(); + + while let Some(log_message) = stream.message().await? { + println!("NOTE = {:?}", log_message); + } + Ok(()) +} + fn get_system_ip_address() -> Result { // Get a vector with all network interfaces found let all_interfaces = interfaces(); @@ -415,8 +466,10 @@ fn list_items(items: Vec) -> Result<(), AnyError> { Ok(()) } -fn main() -> Result<(), AnyError> { - cli(); +#[tokio::main] + +async fn main() -> Result<(), AnyError> { + cli().await; Ok(()) } diff --git a/src/components/event.rs b/src/components/event.rs index a4ad1f1..bcbf3f5 100644 --- a/src/components/event.rs +++ b/src/components/event.rs @@ -197,7 +197,7 @@ async fn main() -> Result<(), Box> { let stream = OutputStreamer { features: Arc::new(vec![GrpcResponse { - message: "Hello".into(), + message: "Hello from event".into(), }]), }; diff --git a/src/components/task.rs b/src/components/task.rs index 5f967ef..3a381f3 100644 --- a/src/components/task.rs +++ b/src/components/task.rs @@ -267,7 +267,7 @@ async fn main() -> Result<(), Box> { let stream = OutputStreamer { features: Arc::new(vec![GrpcResponse { - message: "Hello".into(), + message: "Hello from task".into(), }]), }; From d2db1654498335fe38b6b774aba4d425ca6bcb7a Mon Sep 17 00:00:00 2001 From: MT <59728838+mt-empty@users.noreply.github.com> Date: Tue, 29 Aug 2023 22:40:31 +1000 Subject: [PATCH 3/6] added a workflow example for streaming logs --- tests/workflows/log_stream/ping.sh | 3 +++ tests/workflows/log_stream/tasks/stream.sh | 6 ++++++ tests/workflows/log_stream/workflow.yml | 7 +++++++ 3 files changed, 16 insertions(+) create mode 100644 tests/workflows/log_stream/ping.sh create mode 100644 tests/workflows/log_stream/tasks/stream.sh create mode 100644 tests/workflows/log_stream/workflow.yml diff --git a/tests/workflows/log_stream/ping.sh b/tests/workflows/log_stream/ping.sh new file mode 100644 index 0000000..38d9020 --- /dev/null +++ b/tests/workflows/log_stream/ping.sh @@ -0,0 +1,3 @@ +#! /bin/sh +set -e +ping -c 1 google.com \ No newline at end of file diff --git a/tests/workflows/log_stream/tasks/stream.sh b/tests/workflows/log_stream/tasks/stream.sh new file mode 100644 index 0000000..54ae910 --- /dev/null +++ b/tests/workflows/log_stream/tasks/stream.sh @@ -0,0 +1,6 @@ +#! /bin/bash + +for second in $(seq 1 100); do + echo "Stream $second" + sleep 1 +done \ No newline at end of file diff --git a/tests/workflows/log_stream/workflow.yml b/tests/workflows/log_stream/workflow.yml new file mode 100644 index 0000000..6b0dcdd --- /dev/null +++ b/tests/workflows/log_stream/workflow.yml @@ -0,0 +1,7 @@ +name: simulation of log stream +description: simulation of log stream +events: + - name: Event2 + trigger: ./ping.sh + tasks: + - path: ./tasks/stream.sh From b637e26502e90fc7806af005858d44fbb60c172e Mon Sep 17 00:00:00 2001 From: MT <59728838+mt-empty@users.noreply.github.com> Date: Thu, 7 Sep 2023 22:11:02 +1000 Subject: [PATCH 4/6] experimenting with Duplex Stream --- src/components/event.rs | 100 +++++++++++++++++++++++++++++++--------- src/components/test.rs | 100 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+), 23 deletions(-) create mode 100644 src/components/test.rs diff --git a/src/components/event.rs b/src/components/event.rs index bcbf3f5..b9dc43a 100644 --- a/src/components/event.rs +++ b/src/components/event.rs @@ -1,17 +1,17 @@ use anyhow::Error as AnyError; +use ctrlc::set_handler; use diesel::prelude::*; use std::path::Path; -use std::process::Command as ShellCommand; +use std::process::{Command as ShellCommand, Stdio}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use std::{env, str, thread}; -use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio::sync::mpsc; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, DuplexStream}; +use tokio::sync::{mpsc, RwLock}; use tokio_stream::{wrappers::ReceiverStream, Stream}; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; -use workflow::engine_utils::run_process; use workflow::models::{EventStatus, LightEvent, LightTask, ProcessStatus}; use workflow::schema; use workflow::utils::{establish_pg_connection, push_tasks_to_queue}; @@ -25,25 +25,39 @@ use grpc::{OutputChunk, Response as GrpcResponse}; #[derive(Debug)] pub struct OutputStreamer { // an stdout pipe steam - features: Arc>, + read_pipe: tokio::io::ReadHalf, } #[tonic::async_trait] impl OutputStreaming for OutputStreamer { type StreamOutputStream = ReceiverStream>; - async fn stream_output( - &self, + &self, // Make self mutable here request: Request, ) -> Result, Status> { let (mut tx, rx) = mpsc::channel(4); - let features = self.features.clone(); - - // Spawn an async task to send the output data to the client - + let pipe = &self.read_pipe; tokio::spawn(async move { - for feature in &features[..] { - tx.send(Ok(feature.clone())).await.unwrap(); + let mut buf = String::new(); + loop { + let n = pipe.read_to_string(&mut buf).await; + match n { + Ok(n) => { + if n == 0 { + break; + } + } + Err(e) => { + println!("Error reading from pipe: {}", e); + break; + } + }; + tx.send(Ok(GrpcResponse { + message: buf.clone(), + })) + .await + .unwrap(); + buf.clear(); } }); @@ -51,12 +65,15 @@ impl OutputStreaming for OutputStreamer { } } -pub fn poll_events(running: Arc, engine_uid: i32) -> Result<(), AnyError> { +pub fn poll_events( + running: Arc, + engine_uid: i32, + stdout_write: &mut DuplexStream, +) -> Result<(), AnyError> { let mut event_uids: Vec = Vec::new(); let pg_conn = &mut establish_pg_connection(); use workflow::schema::engines::dsl::*; - diesel::update(engines) .filter(uid.eq(engine_uid)) .set(event_process_status.eq(ProcessStatus::Running.to_string())) @@ -70,12 +87,14 @@ pub fn poll_events(running: Arc, engine_uid: i32) -> Result<(), AnyE for event in events { println!("Event: {}", event); + stdout_write.write_all(event.trigger.as_bytes()); // async execute_event let _ = execute_event(event); } if event_uids.is_empty() { println!("No events to process"); + stdout_write.write_all(b"No events to process"); thread::sleep(Duration::from_millis(2000)); } @@ -88,15 +107,19 @@ pub fn poll_events(running: Arc, engine_uid: i32) -> Result<(), AnyE Ok(Some(signal_on)) => { if signal_on { println!("Received stop signal"); + stdout_write.write_all(b"Received stop signal"); break; } } Ok(None) => { println!("No stop signal"); + stdout_write.write_all(b"No stop signal"); } Err(e) => { eprintln!("Failed to query engine status {}", e); eprintln!("exiting..."); + stdout_write.write_all(format!("Failed to query engine status {}", e).as_bytes()); + stdout_write.write_all(b"exiting..."); std::process::exit(1); } } @@ -174,8 +197,35 @@ fn execute_event(event: LightEvent) -> Result<(), AnyError> { Ok(()) } -pub fn run_event_process(engine_uid: i32) -> Result<(), AnyError> { - run_process("Event", poll_events, engine_uid) +pub fn run_process( + process_name: &str, + process_fn: F, + engine_uid: i32, + stdout_pipe: &mut DuplexStream, +) -> Result<(), AnyError> +where + F: FnOnce(Arc, i32, &mut DuplexStream) -> Result<(), AnyError>, +{ + let running = Arc::new(AtomicBool::new(true)); + let r = running.clone(); + + set_handler(move || { + r.store(false, Ordering::SeqCst); + }) + .expect("Error setting Ctrl-C handler"); + + if let Err(e) = process_fn(running, engine_uid, stdout_pipe) { + eprintln!("Failed to start {} process: {}", process_name, e); + eprintln!("exiting..."); + std::process::exit(1); + } + println!("{} process stopped correctly", process_name); + + Ok(()) +} + +pub fn run_event_process(engine_uid: i32, stdout_pipe: &mut DuplexStream) -> Result<(), AnyError> { + run_process("Event", poll_events, engine_uid, stdout_pipe) } #[tokio::main] @@ -186,8 +236,16 @@ async fn main() -> Result<(), Box> { let engine_uid = args[1].parse::().unwrap(); println!("engine_uid: {}", engine_uid); + // let stdout_pipe: Arc> = Arc::new(RwLock::new(Stdio::piped())); + // let stdout_pipe = Stdio::piped(); + let (stdout_read, mut stdout_write) = tokio::io::duplex(1024); + + // let (read, write) = tokio::io::duplex(1024); + let (read_rx, _) = tokio::io::split(stdout_read); + // let (mut write_rx, mut read_tx) = tokio::io::split(stdout_write); + tokio::spawn(async move { - if let Err(e) = run_event_process(engine_uid) { + if let Err(e) = run_event_process(engine_uid, &mut stdout_write) { println!("Failed to start event process, {}", e); std::process::exit(1); }; @@ -195,11 +253,7 @@ async fn main() -> Result<(), Box> { let addr = "[::1]:10001".parse().unwrap(); - let stream = OutputStreamer { - features: Arc::new(vec![GrpcResponse { - message: "Hello from event".into(), - }]), - }; + let stream = OutputStreamer { read_pipe: read_rx }; let svc = OutputStreamingServer::new(stream); diff --git a/src/components/test.rs b/src/components/test.rs new file mode 100644 index 0000000..3a2a8f4 --- /dev/null +++ b/src/components/test.rs @@ -0,0 +1,100 @@ +use anyhow::Error as AnyError; +use diesel::prelude::*; +use std::path::Path; +use std::process::Command as ShellCommand; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use std::{env, str, thread}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; +use workflow::engine_utils::run_process; +use workflow::models::{EventStatus, LightEvent, LightTask, ProcessStatus}; +use workflow::schema; +use workflow::utils::{establish_pg_connection, push_tasks_to_queue}; + +pub mod grpc { + tonic::include_proto!("grpc"); +} +use grpc::output_streaming_server::{OutputStreaming, OutputStreamingServer}; +use grpc::{OutputChunk, Response as GrpcResponse}; + +#[derive(Debug)] +pub struct OutputStreamer { + // TODO: an stdout pipe steam + features: Arc>, +} + +#[tonic::async_trait] +impl OutputStreaming for OutputStreamer { + type StreamOutputStream = ReceiverStream>; + + async fn stream_output( + &self, + request: Request, + ) -> Result, Status> { + let (mut tx, rx) = mpsc::channel(4); + let features = self.features.clone(); + + // Spawn an async task to send the output data to the client + + tokio::spawn(async move { + for feature in &features[..] { + tx.send(Ok(feature.clone())).await.unwrap(); + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} + +fn run_event_process(engine_uid: i32) -> Result<(), AnyError> { + println!("run_event_process"); + let path_basename = Path::new(&env::current_exe().unwrap()) + .file_name() + .unwrap() + .to_str() + .unwrap() + .to_string(); + let mut cmd = ShellCommand::new("bash") + .arg(path_basename) + .stdout(Stdio::piped()) + .spawn() + .expect("failed to execute process"); + + // TODO: an stdout pipe steam + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args: Vec = env::args().collect(); + println!("args: {:?}", args); + + let engine_uid = args[1].parse::().unwrap(); + println!("engine_uid: {}", engine_uid); + + tokio::spawn(async move { + if let Err(e) = run_event_process(engine_uid) { + println!("Failed to start event process, {}", e); + std::process::exit(1); + }; + }); + + let addr = "[::1]:10001".parse().unwrap(); + + let stream = OutputStreamer { + features: Arc::new(vec![GrpcResponse { + message: "Hello from event".into(), + }]), + }; + + let svc = OutputStreamingServer::new(stream); + + Server::builder().add_service(svc).serve(addr).await?; + Ok(()) +} From 6fe6e0b8a244bb214c9c12e7a5dff6eb9212f320 Mon Sep 17 00:00:00 2001 From: MT <59728838+mt-empty@users.noreply.github.com> Date: Sat, 16 Sep 2023 23:24:41 +1000 Subject: [PATCH 5/6] failed attempt using tokio unbounded channel --- Cargo.toml | 7 +-- src/cli.rs | 30 ++++++------ src/components/event.rs | 101 +++++++++++++++++++++++----------------- 3 files changed, 78 insertions(+), 60 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 16eb106..6b26a00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,15 +35,16 @@ serde_derive = "1.0.164" diesel = { version = "2.1.0", features = ["chrono", "postgres"] } diesel_migrations = { version = "2.1.0", features = ["postgres"] } tracing = "0.1.37" +tracing-subscriber = "0.3.17" prettytable-rs = "^0.10.0" pnet = "0.34.0" tonic = "0.9.2" prost = "0.11.9" -tokio = { version = "1.32", features = ["rt-multi-thread", "macros", "sync", "time"] } -# futures-util-preview = "0.2.2" +tokio = { version = "1.32", features = ["rt-multi-thread", "macros", "sync", "time", "io-std", "io-util"] } +# flume = { version = "0.11.0", features = ["async"] } rand = "0.8.5" async-stream = "0.3.5" tokio-stream = "0.1.14" [build-dependencies] -tonic-build = "0.9" \ No newline at end of file +tonic-build = "0.9.0" \ No newline at end of file diff --git a/src/cli.rs b/src/cli.rs index d47cc5a..6794018 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -15,6 +15,14 @@ use workflow::parser::process_yaml_file; use workflow::utils::establish_pg_connection; use workflow::utils::run_migrations; +pub mod grpc { + tonic::include_proto!("grpc"); +} +use grpc::output_streaming_client::OutputStreamingClient; +use grpc::{OutputChunk, Response as GrpcResponse}; + +use tonic::transport::Channel; + const PRETTY_TABLE_MAX_CELL_LEN: usize = 50; const ENGINE_NAME: &str = "workflow-engine"; const ENGINE_IP_ADDRESS: &str = "0.0.0.0"; @@ -40,6 +48,7 @@ enum Commands { StartEventProcess { engine_uid: i32, }, + // show logs for event or task process Logs { #[clap(subcommand)] subcommand: LogsSubcommands, @@ -102,9 +111,9 @@ enum ShowSubcommands { #[derive(Subcommand)] enum LogsSubcommands { // Lists all tasks - Task { uid: i32 }, + Task, // Lists all events - Event { uid: i32 }, + Event, // Lists all engines // Engine { uid: i32 }, } @@ -251,22 +260,15 @@ pub async fn cli() { async fn process_log_command(subcommand: &LogsSubcommands) -> Result<(), AnyError> { match subcommand { - LogsSubcommands::Task { uid } => show_log(uid.to_string()).await?, - LogsSubcommands::Event { uid } => show_log(uid.to_string()).await?, + LogsSubcommands::Task => show_log("10000".to_owned()).await?, + LogsSubcommands::Event => show_log("10001".to_owned()).await?, }; Ok(()) } -pub mod grpc { - tonic::include_proto!("grpc"); -} -use grpc::output_streaming_client::OutputStreamingClient; -use grpc::{OutputChunk, Response as GrpcResponse}; - -use std::error::Error; -use tonic::transport::Channel; -async fn show_log(server_id: String) -> Result<(), AnyError> { - let mut client = OutputStreamingClient::connect("http://[::1]:10000").await?; +async fn show_log(server_port: String) -> Result<(), AnyError> { + let mut client = + OutputStreamingClient::connect(format!("http://[::1]:{}", server_port)).await?; let mut stream = client .stream_output(OutputChunk::default()) diff --git a/src/components/event.rs b/src/components/event.rs index b9dc43a..be5d929 100644 --- a/src/components/event.rs +++ b/src/components/event.rs @@ -4,18 +4,22 @@ use diesel::prelude::*; use std::path::Path; use std::process::{Command as ShellCommand, Stdio}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::mpsc::Sender; +use std::sync::{mpsc, Arc, Mutex, RwLock}; use std::time::Duration; use std::{env, str, thread}; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, DuplexStream}; -use tokio::sync::{mpsc, RwLock}; +// use tokio::net::unix::pipe::Sender; +use tokio::sync::mpsc as tokio_mpsc; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio_stream::{wrappers::ReceiverStream, Stream}; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; +use tracing::{event, span, Level}; +use tracing_subscriber::FmtSubscriber; use workflow::models::{EventStatus, LightEvent, LightTask, ProcessStatus}; use workflow::schema; use workflow::utils::{establish_pg_connection, push_tasks_to_queue}; - pub mod grpc { tonic::include_proto!("grpc"); } @@ -25,7 +29,7 @@ use grpc::{OutputChunk, Response as GrpcResponse}; #[derive(Debug)] pub struct OutputStreamer { // an stdout pipe steam - read_pipe: tokio::io::ReadHalf, + receiver: Arc>>, } #[tonic::async_trait] @@ -35,32 +39,18 @@ impl OutputStreaming for OutputStreamer { &self, // Make self mutable here request: Request, ) -> Result, Status> { - let (mut tx, rx) = mpsc::channel(4); - let pipe = &self.read_pipe; + let (mut tx, rx) = tokio_mpsc::channel(4); + let receiver_clone = Arc::clone(&self.receiver); tokio::spawn(async move { - let mut buf = String::new(); - loop { - let n = pipe.read_to_string(&mut buf).await; - match n { - Ok(n) => { - if n == 0 { - break; - } - } - Err(e) => { - println!("Error reading from pipe: {}", e); - break; - } - }; - tx.send(Ok(GrpcResponse { - message: buf.clone(), - })) - .await - .unwrap(); - buf.clear(); + // use receiver_clone to send the recieved data to the client + let mut receiver = receiver_clone.lock().await; + while let Some(data) = receiver.recv().await { + tx.send(Ok(data)).await.unwrap(); } + // release the lock }); + println!("Client disconnected"); Ok(Response::new(ReceiverStream::new(rx))) } } @@ -68,7 +58,7 @@ impl OutputStreaming for OutputStreamer { pub fn poll_events( running: Arc, engine_uid: i32, - stdout_write: &mut DuplexStream, + sender: UnboundedSender, ) -> Result<(), AnyError> { let mut event_uids: Vec = Vec::new(); let pg_conn = &mut establish_pg_connection(); @@ -87,14 +77,19 @@ pub fn poll_events( for event in events { println!("Event: {}", event); - stdout_write.write_all(event.trigger.as_bytes()); + let _ = sender.send(GrpcResponse { + message: event.trigger.to_owned(), + }); // async execute_event let _ = execute_event(event); } if event_uids.is_empty() { + event!(Level::INFO, "No events to process"); println!("No events to process"); - stdout_write.write_all(b"No events to process"); + let _ = sender.send(GrpcResponse { + message: "No events to process".to_owned(), + }); thread::sleep(Duration::from_millis(2000)); } @@ -107,19 +102,27 @@ pub fn poll_events( Ok(Some(signal_on)) => { if signal_on { println!("Received stop signal"); - stdout_write.write_all(b"Received stop signal"); + let _ = sender.send(GrpcResponse { + message: "Received stop signal".to_owned(), + }); break; } } Ok(None) => { println!("No stop signal"); - stdout_write.write_all(b"No stop signal"); + let _ = sender.send(GrpcResponse { + message: "No stop signal".to_owned(), + }); } Err(e) => { eprintln!("Failed to query engine status {}", e); eprintln!("exiting..."); - stdout_write.write_all(format!("Failed to query engine status {}", e).as_bytes()); - stdout_write.write_all(b"exiting..."); + let _ = sender.send(GrpcResponse { + message: format!("Failed to query engine status {}", e), + }); + let _ = sender.send(GrpcResponse { + message: "exiting...".to_owned(), + }); std::process::exit(1); } } @@ -201,10 +204,10 @@ pub fn run_process( process_name: &str, process_fn: F, engine_uid: i32, - stdout_pipe: &mut DuplexStream, + sender: UnboundedSender, ) -> Result<(), AnyError> where - F: FnOnce(Arc, i32, &mut DuplexStream) -> Result<(), AnyError>, + F: FnOnce(Arc, i32, UnboundedSender) -> Result<(), AnyError>, { let running = Arc::new(AtomicBool::new(true)); let r = running.clone(); @@ -214,7 +217,7 @@ where }) .expect("Error setting Ctrl-C handler"); - if let Err(e) = process_fn(running, engine_uid, stdout_pipe) { + if let Err(e) = process_fn(running, engine_uid, sender) { eprintln!("Failed to start {} process: {}", process_name, e); eprintln!("exiting..."); std::process::exit(1); @@ -224,8 +227,11 @@ where Ok(()) } -pub fn run_event_process(engine_uid: i32, stdout_pipe: &mut DuplexStream) -> Result<(), AnyError> { - run_process("Event", poll_events, engine_uid, stdout_pipe) +pub fn run_event_process( + engine_uid: i32, + sender: UnboundedSender, +) -> Result<(), AnyError> { + run_process("Event", poll_events, engine_uid, sender) } #[tokio::main] @@ -236,16 +242,25 @@ async fn main() -> Result<(), Box> { let engine_uid = args[1].parse::().unwrap(); println!("engine_uid: {}", engine_uid); + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::TRACE) + .finish(); + + tracing::subscriber::set_global_default(subscriber).expect("Failed to set subscriber"); + // let stdout_pipe: Arc> = Arc::new(RwLock::new(Stdio::piped())); - // let stdout_pipe = Stdio::piped(); - let (stdout_read, mut stdout_write) = tokio::io::duplex(1024); + // let (stdout_read, mut stdout_write) = tokio::io::duplex(1024); // let (read, write) = tokio::io::duplex(1024); - let (read_rx, _) = tokio::io::split(stdout_read); + // let (read_rx, _) = tokio::io::split(stdout_read); // let (mut write_rx, mut read_tx) = tokio::io::split(stdout_write); + let (sender, receiver) = tokio_mpsc::unbounded_channel::(); + let receiver: Arc>> = + Arc::new(tokio::sync::Mutex::new(receiver)); + tokio::spawn(async move { - if let Err(e) = run_event_process(engine_uid, &mut stdout_write) { + if let Err(e) = run_event_process(engine_uid, sender) { println!("Failed to start event process, {}", e); std::process::exit(1); }; @@ -253,7 +268,7 @@ async fn main() -> Result<(), Box> { let addr = "[::1]:10001".parse().unwrap(); - let stream = OutputStreamer { read_pipe: read_rx }; + let stream = OutputStreamer { receiver }; let svc = OutputStreamingServer::new(stream); From edad3bc9022993c8f7829a96c212a79826bac790 Mon Sep 17 00:00:00 2001 From: MT <59728838+mt-empty@users.noreply.github.com> Date: Tue, 27 Jan 2026 01:10:56 +1100 Subject: [PATCH 6/6] WIP: Add gRPC streaming, refactor process spawning, setup Ansible deployment - Switch from UnboundedChannel to Bus for event broadcast messaging - Add gRPC log streaming for event/task processes (ports 10001/10000) - Change CLI process spawning to use separate binaries (event/task) - Add justfile for container management shortcuts - Add Ansible deployment skeleton (inventory, playbook) - Add docker-compose volume mount for workflow_files - Comment out event process spawning in start command - Add VS Code settings for rust-analyzer and sqltools --- .vscode/settings.json | 16 +++++++ Cargo.toml | 17 +++++++- Readme.md | 8 ++-- deploy/ansible/inventory.ini | 3 ++ deploy/ansible/playbook.yml | 28 ++++++++++++ docker-compose.yaml | 12 +++++- justfile | 16 +++++++ lazy/config.yml | 0 src/cli.rs | 12 +++--- src/components/event.rs | 82 +++++++++++++++++++++--------------- 10 files changed, 148 insertions(+), 46 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 deploy/ansible/inventory.ini create mode 100644 deploy/ansible/playbook.yml create mode 100644 justfile create mode 100644 lazy/config.yml diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..bd0be0d --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,16 @@ +{ + "rust-analyzer.linkedProjects": [ + "./Cargo.toml", + ], + "rust-analyzer.cargo.buildScripts.enable": true, + "sqltools.format": { + "language": "sql", + "uppercaseKeywords": true, + "linesBetweenQueries": 2, + "keywordCase": "upper", + "identifierCase": "unchanged", + }, + "sqltools.formatLanguages": [ + "sql" + ], +} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 6b26a00..7187b2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,21 @@ tokio = { version = "1.32", features = ["rt-multi-thread", "macros", "sync", "ti rand = "0.8.5" async-stream = "0.3.5" tokio-stream = "0.1.14" +crossbeam = "0.8.2" +crossbeam-channel = "0.5.8" +crossbeam-utils = "0.8.16" +bus = "2.4.1" [build-dependencies] -tonic-build = "0.9.0" \ No newline at end of file +tonic-build = "0.9.0" + +# [unstable] +# profile-rustflags = true + +# [profile.dev] +# # Enable the missing_docs warning for development builds +# rustflags = ["-C", "warn=missing_docs"] + +# [profile.release] +# # Enable the missing_docs warning for release builds +# rustflags = ["-C", "warn=missing_docs"] \ No newline at end of file diff --git a/Readme.md b/Readme.md index 0dcc2e5..3af3c3f 100755 --- a/Readme.md +++ b/Readme.md @@ -118,13 +118,13 @@ LRANGE tasks 0 -1 - [ ] Make it distributed - [ ] Add support for multiple engines - [x] Add a testing environment with multiple engines using docker compose - - [ ] Create a network attached storage for sharing workflow files - - [ ] Implement gRPC to stream output of tasks and events + + - [ ] Implement gRPC to stream output of task and event processes - [ ] Implement round robin algorithm for distributing workflows to engines - [ ] Automate container deployment using - [ ] Kubernetes - - [ ] Ansible -- [ ] LLM integration + + --- diff --git a/deploy/ansible/inventory.ini b/deploy/ansible/inventory.ini new file mode 100644 index 0000000..7a272a2 --- /dev/null +++ b/deploy/ansible/inventory.ini @@ -0,0 +1,3 @@ +[container_hosts] +container1_ip_address ansible_ssh_port=22 +container2_ip_address ansible_ssh_port=22 diff --git a/deploy/ansible/playbook.yml b/deploy/ansible/playbook.yml new file mode 100644 index 0000000..989d6e9 --- /dev/null +++ b/deploy/ansible/playbook.yml @@ -0,0 +1,28 @@ + +- hosts: localhost + gather_facts: false + tasks: + - name: Ensure the Docker image is available + docker_image: + name: your_docker_image_name + source: pull + + - name: Create Docker containers + docker_container: + name: "{{ item }}" + image: your_docker_image_name + state: started + with_items: + - container1 + - container2 + +- hosts: container_hosts + gather_facts: false + tasks: + - name: Copy binary file to Docker containers + copy: + src: /path/to/your/binary_file + dest: /path/where/you/want/to/copy/binary_file + mode: '0755' + become: true + remote_user: your_ssh_username diff --git a/docker-compose.yaml b/docker-compose.yaml index 2dc4625..60a55b7 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -76,6 +76,8 @@ services: networks: - postgres_ntw - engine3_ntw + volumes: + - workflow_files:/app/workflow_files networks: postgres_ntw: @@ -85,4 +87,12 @@ networks: engine2_ntw: driver: bridge engine3_ntw: - driver: bridge \ No newline at end of file + driver: bridge + +volumes: + workflow_files: + driver: local + driver_opts: + type: none + o: bind + device: ./workflow_files diff --git a/justfile b/justfile new file mode 100644 index 0000000..d280dc7 --- /dev/null +++ b/justfile @@ -0,0 +1,16 @@ +build: + #!/usr/bin/env sh + cargo build -j 6 + +start-containers: + #!/usr/bin/env sh + + POSTGRES_PASSWORD=$(grep -oP '(?<=POSTGRES_PASSWORD=).*' .env) + docker run --name workflow-redis -d redis redis-server --save 60 1 --loglevel warning + docker run --name workflow-postgres -e POSTGRES_PASSWORD="$POSTGRES_PASSWORD" -p 5432:5432 -d postgres + +stop-containers: + #!/usr/bin/env sh + docker container stop workflow-redis workflow-postgres | true + docker container rm workflow-redis workflow-postgres | true + diff --git a/lazy/config.yml b/lazy/config.yml new file mode 100644 index 0000000..e69de29 diff --git a/src/cli.rs b/src/cli.rs index 6794018..c0a6c34 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,3 +1,5 @@ +//! + use anyhow::{anyhow, Error as AnyError, Ok, Result}; use clap::{Parser, Subcommand}; use diesel::{ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, SelectableHelper}; @@ -314,11 +316,11 @@ fn process_start_command() -> Result<(), AnyError> { )?; println!("created new engine entry with uid: {}", engine_uid); - if let Err(e) = start_process("event", ProcessType::Event, engine_uid) { - eprintln!("Failed to start Event process: {}", e); - eprintln!("exiting..."); - std::process::exit(1); - } + // if let Err(e) = start_process("event", ProcessType::Event, engine_uid) { + // eprintln!("Failed to start Event process: {}", e); + // eprintln!("exiting..."); + // std::process::exit(1); + // } if let Err(e) = start_process("task", ProcessType::Task, engine_uid) { eprintln!("Failed to start Task process: {}", e); diff --git a/src/components/event.rs b/src/components/event.rs index be5d929..9a499c8 100644 --- a/src/components/event.rs +++ b/src/components/event.rs @@ -5,13 +5,15 @@ use std::path::Path; use std::process::{Command as ShellCommand, Stdio}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::Sender; -use std::sync::{mpsc, Arc, Mutex, RwLock}; +use std::sync::{mpsc, Arc, RwLock}; use std::time::Duration; use std::{env, str, thread}; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, DuplexStream}; // use tokio::net::unix::pipe::Sender; +use bus::{Bus, BusReader}; use tokio::sync::mpsc as tokio_mpsc; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::Mutex; use tokio_stream::{wrappers::ReceiverStream, Stream}; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; @@ -29,7 +31,7 @@ use grpc::{OutputChunk, Response as GrpcResponse}; #[derive(Debug)] pub struct OutputStreamer { // an stdout pipe steam - receiver: Arc>>, + receiver: Arc>>, } #[tonic::async_trait] @@ -40,14 +42,15 @@ impl OutputStreaming for OutputStreamer { request: Request, ) -> Result, Status> { let (mut tx, rx) = tokio_mpsc::channel(4); - let receiver_clone = Arc::clone(&self.receiver); + let mut bus_mutex = self.receiver.clone(); + let mut bus = bus_mutex.lock().await; + let mut bus_rx = bus.add_rx(); + tokio::spawn(async move { - // use receiver_clone to send the recieved data to the client - let mut receiver = receiver_clone.lock().await; - while let Some(data) = receiver.recv().await { - tx.send(Ok(data)).await.unwrap(); + // while I recieve messages on the bus, send them to the client + for message in bus_rx.iter() { + tx.send(Ok(message)).await.unwrap(); } - // release the lock }); println!("Client disconnected"); @@ -55,10 +58,20 @@ impl OutputStreaming for OutputStreamer { } } -pub fn poll_events( +// if async is used here, then there is no guarantee that the event will be executed in order +async fn broadcast_message( + bus_mutex: Arc>>, + message: String, +) -> Result<(), AnyError> { + let mut bus = bus_mutex.lock().await; + let _ = bus.broadcast(GrpcResponse { message }); + Ok(()) +} + +async fn poll_events( running: Arc, engine_uid: i32, - sender: UnboundedSender, + bus_mutex: Arc>>, ) -> Result<(), AnyError> { let mut event_uids: Vec = Vec::new(); let pg_conn = &mut establish_pg_connection(); @@ -77,9 +90,8 @@ pub fn poll_events( for event in events { println!("Event: {}", event); - let _ = sender.send(GrpcResponse { - message: event.trigger.to_owned(), - }); + let future = broadcast_message(bus_mutex.clone(), event.trigger.to_owned()); + let _ = future.await; // async execute_event let _ = execute_event(event); } @@ -87,9 +99,7 @@ pub fn poll_events( if event_uids.is_empty() { event!(Level::INFO, "No events to process"); println!("No events to process"); - let _ = sender.send(GrpcResponse { - message: "No events to process".to_owned(), - }); + broadcast_message(bus_mutex.clone(), "No events to process".to_owned()).await; thread::sleep(Duration::from_millis(2000)); } @@ -102,27 +112,22 @@ pub fn poll_events( Ok(Some(signal_on)) => { if signal_on { println!("Received stop signal"); - let _ = sender.send(GrpcResponse { - message: "Received stop signal".to_owned(), - }); + broadcast_message(bus_mutex.clone(), "Received stop signal".to_owned()).await; break; } } Ok(None) => { println!("No stop signal"); - let _ = sender.send(GrpcResponse { - message: "No stop signal".to_owned(), - }); + broadcast_message(bus_mutex.clone(), "No stop signal".to_owned()).await; } Err(e) => { eprintln!("Failed to query engine status {}", e); eprintln!("exiting..."); - let _ = sender.send(GrpcResponse { - message: format!("Failed to query engine status {}", e), - }); - let _ = sender.send(GrpcResponse { - message: "exiting...".to_owned(), - }); + broadcast_message( + bus_mutex.clone(), + format!("Failed to query engine status {}\nExisting ...", e), + ) + .await; std::process::exit(1); } } @@ -204,10 +209,10 @@ pub fn run_process( process_name: &str, process_fn: F, engine_uid: i32, - sender: UnboundedSender, + bus: Arc>>, ) -> Result<(), AnyError> where - F: FnOnce(Arc, i32, UnboundedSender) -> Result<(), AnyError>, + F: FnOnce(Arc, i32, Arc>>) -> Result<(), AnyError>, { let running = Arc::new(AtomicBool::new(true)); let r = running.clone(); @@ -217,7 +222,7 @@ where }) .expect("Error setting Ctrl-C handler"); - if let Err(e) = process_fn(running, engine_uid, sender) { + if let Err(e) = process_fn(running, engine_uid, bus) { eprintln!("Failed to start {} process: {}", process_name, e); eprintln!("exiting..."); std::process::exit(1); @@ -229,9 +234,9 @@ where pub fn run_event_process( engine_uid: i32, - sender: UnboundedSender, + bus: Arc>>, ) -> Result<(), AnyError> { - run_process("Event", poll_events, engine_uid, sender) + run_process("Event", poll_events, engine_uid, bus) } #[tokio::main] @@ -259,8 +264,13 @@ async fn main() -> Result<(), Box> { let receiver: Arc>> = Arc::new(tokio::sync::Mutex::new(receiver)); + let mut bus = Bus::::new(100); + let mut bus_mutex = Arc::new(tokio::sync::Mutex::new(bus)); + let bus_mutex_grpc = bus_mutex.clone(); + // let mut rx1 = bus.add_rx(); + tokio::spawn(async move { - if let Err(e) = run_event_process(engine_uid, sender) { + if let Err(e) = run_event_process(engine_uid, bus_mutex) { println!("Failed to start event process, {}", e); std::process::exit(1); }; @@ -268,7 +278,9 @@ async fn main() -> Result<(), Box> { let addr = "[::1]:10001".parse().unwrap(); - let stream = OutputStreamer { receiver }; + let stream = OutputStreamer { + receiver: bus_mutex_grpc, + }; let svc = OutputStreamingServer::new(stream);