diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..bd0be0d --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,16 @@ +{ + "rust-analyzer.linkedProjects": [ + "./Cargo.toml", + ], + "rust-analyzer.cargo.buildScripts.enable": true, + "sqltools.format": { + "language": "sql", + "uppercaseKeywords": true, + "linesBetweenQueries": 2, + "keywordCase": "upper", + "identifierCase": "unchanged", + }, + "sqltools.formatLanguages": [ + "sql" + ], +} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index b309509..7187b2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,18 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[[bin]] # Bin to run the engine +name = "event" +path = "src/components/event.rs" + +[[bin]] # Bin to run the engine +name = "task" +path = "src/components/task.rs" + +[[bin]] # Bin to run the engine +name = "cli" +path = "src/cli.rs" + [dependencies] serde_json = { version = "1.0.104", features = ["preserve_order"] } clap = { version = "4.3.5", features = ["derive"] } @@ -23,5 +35,31 @@ serde_derive = "1.0.164" diesel = { version = "2.1.0", features = ["chrono", "postgres"] } diesel_migrations = { version = "2.1.0", features = ["postgres"] } tracing = "0.1.37" +tracing-subscriber = "0.3.17" prettytable-rs = "^0.10.0" -pnet = "0.34.0" \ No newline at end of file +pnet = "0.34.0" +tonic = "0.9.2" +prost = "0.11.9" +tokio = { version = "1.32", features = ["rt-multi-thread", "macros", "sync", "time", "io-std", "io-util"] } +# flume = { version = "0.11.0", features = ["async"] } +rand = "0.8.5" +async-stream = "0.3.5" +tokio-stream = "0.1.14" +crossbeam = "0.8.2" +crossbeam-channel = "0.5.8" +crossbeam-utils = "0.8.16" +bus = "2.4.1" + +[build-dependencies] +tonic-build = "0.9.0" + +# [unstable] +# profile-rustflags = true + +# [profile.dev] +# # Enable the missing_docs warning for development builds +# rustflags = ["-C", "warn=missing_docs"] + +# [profile.release] +# # Enable the missing_docs warning for release builds +# rustflags = ["-C", "warn=missing_docs"] \ No newline at end of file diff --git a/Readme.md b/Readme.md index 0dcc2e5..3af3c3f 100755 --- a/Readme.md +++ b/Readme.md @@ -118,13 +118,13 @@ LRANGE tasks 0 -1 - [ ] Make it distributed - [ ] Add support for multiple engines - [x] Add a testing environment with multiple engines using docker compose - - [ ] Create a network attached storage for sharing workflow files - - [ ] Implement gRPC to stream output of tasks and events + + - [ ] Implement gRPC to stream output of task and event processes - [ ] Implement round robin algorithm for distributing workflows to engines - [ ] Automate container deployment using - [ ] Kubernetes - - [ ] Ansible -- [ ] LLM integration + + --- diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..f3dd018 --- /dev/null +++ b/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/grpc.proto")?; + Ok(()) +} diff --git a/deploy/ansible/inventory.ini b/deploy/ansible/inventory.ini new file mode 100644 index 0000000..7a272a2 --- /dev/null +++ b/deploy/ansible/inventory.ini @@ -0,0 +1,3 @@ +[container_hosts] +container1_ip_address ansible_ssh_port=22 +container2_ip_address ansible_ssh_port=22 diff --git a/deploy/ansible/playbook.yml b/deploy/ansible/playbook.yml new file mode 100644 index 0000000..989d6e9 --- /dev/null +++ b/deploy/ansible/playbook.yml @@ -0,0 +1,28 @@ + +- hosts: localhost + gather_facts: false + tasks: + - name: Ensure the Docker image is available + docker_image: + name: your_docker_image_name + source: pull + + - name: Create Docker containers + docker_container: + name: "{{ item }}" + image: your_docker_image_name + state: started + with_items: + - container1 + - container2 + +- hosts: container_hosts + gather_facts: false + tasks: + - name: Copy binary file to Docker containers + copy: + src: /path/to/your/binary_file + dest: /path/where/you/want/to/copy/binary_file + mode: '0755' + become: true + remote_user: your_ssh_username diff --git a/docker-compose.yaml b/docker-compose.yaml index 2dc4625..60a55b7 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -76,6 +76,8 @@ services: networks: - postgres_ntw - engine3_ntw + volumes: + - workflow_files:/app/workflow_files networks: postgres_ntw: @@ -85,4 +87,12 @@ networks: engine2_ntw: driver: bridge engine3_ntw: - driver: bridge \ No newline at end of file + driver: bridge + +volumes: + workflow_files: + driver: local + driver_opts: + type: none + o: bind + device: ./workflow_files diff --git a/justfile b/justfile new file mode 100644 index 0000000..d280dc7 --- /dev/null +++ b/justfile @@ -0,0 +1,16 @@ +build: + #!/usr/bin/env sh + cargo build -j 6 + +start-containers: + #!/usr/bin/env sh + + POSTGRES_PASSWORD=$(grep -oP '(?<=POSTGRES_PASSWORD=).*' .env) + docker run --name workflow-redis -d redis redis-server --save 60 1 --loglevel warning + docker run --name workflow-postgres -e POSTGRES_PASSWORD="$POSTGRES_PASSWORD" -p 5432:5432 -d postgres + +stop-containers: + #!/usr/bin/env sh + docker container stop workflow-redis workflow-postgres | true + docker container rm workflow-redis workflow-postgres | true + diff --git a/lazy/config.yml b/lazy/config.yml new file mode 100644 index 0000000..e69de29 diff --git a/proto/grpc.proto b/proto/grpc.proto new file mode 100644 index 0000000..3f63234 --- /dev/null +++ b/proto/grpc.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; +package grpc; + +service OutputStreaming { + rpc StreamOutput(OutputChunk) returns ( stream Response); +} + +message OutputChunk { + string content = 1; +} + +message Response { + string message = 1; +} \ No newline at end of file diff --git a/src/cli.rs b/src/cli.rs index ad7d720..c0a6c34 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,4 +1,6 @@ -use anyhow::{anyhow, Error as AnyError, Result}; +//! + +use anyhow::{anyhow, Error as AnyError, Ok, Result}; use clap::{Parser, Subcommand}; use diesel::{ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, SelectableHelper}; use dotenv::dotenv; @@ -9,13 +11,20 @@ use std::env; use std::fs::File; use std::process::Command; use tracing::field; -use workflow::engine::{create_new_engine_entry, handle_stop, run_event_process}; -use workflow::engine::{run_task_process, update_engine_status}; +use workflow::engine_utils::{create_new_engine_entry, handle_stop, update_engine_status}; use workflow::models::{Engine, EngineStatus, Event, Task}; use workflow::parser::process_yaml_file; use workflow::utils::establish_pg_connection; use workflow::utils::run_migrations; +pub mod grpc { + tonic::include_proto!("grpc"); +} +use grpc::output_streaming_client::OutputStreamingClient; +use grpc::{OutputChunk, Response as GrpcResponse}; + +use tonic::transport::Channel; + const PRETTY_TABLE_MAX_CELL_LEN: usize = 50; const ENGINE_NAME: &str = "workflow-engine"; const ENGINE_IP_ADDRESS: &str = "0.0.0.0"; @@ -41,6 +50,11 @@ enum Commands { StartEventProcess { engine_uid: i32, }, + // show logs for event or task process + Logs { + #[clap(subcommand)] + subcommand: LogsSubcommands, + }, Stop {}, /// Adds workflow to the queue Add { @@ -96,6 +110,16 @@ enum ShowSubcommands { Engine { uid: i32 }, } +#[derive(Subcommand)] +enum LogsSubcommands { + // Lists all tasks + Task, + // Lists all events + Event, + // Lists all engines + // Engine { uid: i32 }, +} + #[derive(PartialEq)] enum ProcessType { Task, @@ -109,7 +133,7 @@ fn create_and_clear_log_file(file_path: &str) -> Result { } fn start_process( - subcommand_name: &str, + binary_name: &str, process_type: ProcessType, engine_uid: i32, ) -> Result<(), AnyError> { @@ -141,8 +165,8 @@ fn start_process( } let command = binding - .arg(subcommand_name) - .arg("--") + .arg("--bin") + .arg(binary_name) .arg(engine_uid.to_string()) .stdout(stdout) .stderr(stderr); @@ -151,7 +175,7 @@ fn start_process( Ok(()) } -pub fn cli() { +pub async fn cli() { let cli = Cli::parse(); match &cli.command { @@ -174,15 +198,22 @@ pub fn cli() { } Commands::StartEventProcess { engine_uid } => { println!("StartEventProcess"); - if let Err(e) = run_event_process(*engine_uid) { - println!("Failed to start event process, {}", e); - std::process::exit(1); - }; + // if let Err(e) = run_event_process(*engine_uid) { + // println!("Failed to start event process, {}", e); + // std::process::exit(1); + // }; } Commands::StartTaskProcess { engine_uid } => { println!("StartTaskProcess"); - if let Err(e) = run_task_process(*engine_uid) { - println!("Failed to start task process, {}", e); + // if let Err(e) = run_task_process(*engine_uid) { + // println!("Failed to start task process, {}", e); + // std::process::exit(1); + // }; + } + Commands::Logs { subcommand } => { + println!("Logs"); + if let Err(e) = process_log_command(subcommand).await { + println!("Failed to stop the engine, {}", e); std::process::exit(1); }; } @@ -229,6 +260,29 @@ pub fn cli() { std::process::exit(0); } +async fn process_log_command(subcommand: &LogsSubcommands) -> Result<(), AnyError> { + match subcommand { + LogsSubcommands::Task => show_log("10000".to_owned()).await?, + LogsSubcommands::Event => show_log("10001".to_owned()).await?, + }; + Ok(()) +} + +async fn show_log(server_port: String) -> Result<(), AnyError> { + let mut client = + OutputStreamingClient::connect(format!("http://[::1]:{}", server_port)).await?; + + let mut stream = client + .stream_output(OutputChunk::default()) + .await? + .into_inner(); + + while let Some(log_message) = stream.message().await? { + println!("NOTE = {:?}", log_message); + } + Ok(()) +} + fn get_system_ip_address() -> Result { // Get a vector with all network interfaces found let all_interfaces = interfaces(); @@ -262,13 +316,13 @@ fn process_start_command() -> Result<(), AnyError> { )?; println!("created new engine entry with uid: {}", engine_uid); - if let Err(e) = start_process("start-event-process", ProcessType::Event, engine_uid) { - eprintln!("Failed to start Event process: {}", e); - eprintln!("exiting..."); - std::process::exit(1); - } + // if let Err(e) = start_process("event", ProcessType::Event, engine_uid) { + // eprintln!("Failed to start Event process: {}", e); + // eprintln!("exiting..."); + // std::process::exit(1); + // } - if let Err(e) = start_process("start-task-process", ProcessType::Task, engine_uid) { + if let Err(e) = start_process("task", ProcessType::Task, engine_uid) { eprintln!("Failed to start Task process: {}", e); eprintln!("exiting..."); std::process::exit(1); @@ -416,6 +470,13 @@ fn list_items(items: Vec) -> Result<(), AnyError> { Ok(()) } +#[tokio::main] + +async fn main() -> Result<(), AnyError> { + cli().await; + Ok(()) +} + // fn is_redis_running() -> bool { // let redis_result = create_redis_connection(); // if let Err(e) = redis_result { diff --git a/src/components/event.rs b/src/components/event.rs new file mode 100644 index 0000000..9a499c8 --- /dev/null +++ b/src/components/event.rs @@ -0,0 +1,290 @@ +use anyhow::Error as AnyError; +use ctrlc::set_handler; +use diesel::prelude::*; +use std::path::Path; +use std::process::{Command as ShellCommand, Stdio}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::Sender; +use std::sync::{mpsc, Arc, RwLock}; +use std::time::Duration; +use std::{env, str, thread}; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, DuplexStream}; +// use tokio::net::unix::pipe::Sender; +use bus::{Bus, BusReader}; +use tokio::sync::mpsc as tokio_mpsc; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::Mutex; +use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; +use tracing::{event, span, Level}; +use tracing_subscriber::FmtSubscriber; +use workflow::models::{EventStatus, LightEvent, LightTask, ProcessStatus}; +use workflow::schema; +use workflow::utils::{establish_pg_connection, push_tasks_to_queue}; +pub mod grpc { + tonic::include_proto!("grpc"); +} +use grpc::output_streaming_server::{OutputStreaming, OutputStreamingServer}; +use grpc::{OutputChunk, Response as GrpcResponse}; + +#[derive(Debug)] +pub struct OutputStreamer { + // an stdout pipe steam + receiver: Arc>>, +} + +#[tonic::async_trait] +impl OutputStreaming for OutputStreamer { + type StreamOutputStream = ReceiverStream>; + async fn stream_output( + &self, // Make self mutable here + request: Request, + ) -> Result, Status> { + let (mut tx, rx) = tokio_mpsc::channel(4); + let mut bus_mutex = self.receiver.clone(); + let mut bus = bus_mutex.lock().await; + let mut bus_rx = bus.add_rx(); + + tokio::spawn(async move { + // while I recieve messages on the bus, send them to the client + for message in bus_rx.iter() { + tx.send(Ok(message)).await.unwrap(); + } + }); + + println!("Client disconnected"); + Ok(Response::new(ReceiverStream::new(rx))) + } +} + +// if async is used here, then there is no guarantee that the event will be executed in order +async fn broadcast_message( + bus_mutex: Arc>>, + message: String, +) -> Result<(), AnyError> { + let mut bus = bus_mutex.lock().await; + let _ = bus.broadcast(GrpcResponse { message }); + Ok(()) +} + +async fn poll_events( + running: Arc, + engine_uid: i32, + bus_mutex: Arc>>, +) -> Result<(), AnyError> { + let mut event_uids: Vec = Vec::new(); + let pg_conn = &mut establish_pg_connection(); + + use workflow::schema::engines::dsl::*; + diesel::update(engines) + .filter(uid.eq(engine_uid)) + .set(event_process_status.eq(ProcessStatus::Running.to_string())) + .execute(pg_conn)?; + + while running.load(Ordering::SeqCst) { + let events: Vec = schema::events::dsl::events + .select(LightEvent::as_select()) + .filter(schema::events::status.ne(EventStatus::Succeeded.to_string())) + .load(pg_conn)?; + + for event in events { + println!("Event: {}", event); + let future = broadcast_message(bus_mutex.clone(), event.trigger.to_owned()); + let _ = future.await; + // async execute_event + let _ = execute_event(event); + } + + if event_uids.is_empty() { + event!(Level::INFO, "No events to process"); + println!("No events to process"); + broadcast_message(bus_mutex.clone(), "No events to process".to_owned()).await; + thread::sleep(Duration::from_millis(2000)); + } + + let received_stop_signal_result: Result, _> = engines + .find(engine_uid) + .select(stop_signal) + .first(pg_conn) + .optional(); + match received_stop_signal_result { + Ok(Some(signal_on)) => { + if signal_on { + println!("Received stop signal"); + broadcast_message(bus_mutex.clone(), "Received stop signal".to_owned()).await; + break; + } + } + Ok(None) => { + println!("No stop signal"); + broadcast_message(bus_mutex.clone(), "No stop signal".to_owned()).await; + } + Err(e) => { + eprintln!("Failed to query engine status {}", e); + eprintln!("exiting..."); + broadcast_message( + bus_mutex.clone(), + format!("Failed to query engine status {}\nExisting ...", e), + ) + .await; + std::process::exit(1); + } + } + + event_uids.clear(); + } + if !running.load(Ordering::SeqCst) { + println!("\nCtrl+C signal detected. Exiting..."); + } + + diesel::update(engines) + .filter(uid.eq(engine_uid)) + .set(event_process_status.eq(ProcessStatus::Stopped.to_string())) + .execute(pg_conn)?; + Ok(()) +} + +fn execute_event(event: LightEvent) -> Result<(), AnyError> { + println!("Event Executor"); + + let conn = &mut establish_pg_connection(); + + let path_basename = match Path::new(&event.trigger).file_name() { + Some(basename) => basename, + None => return Err(AnyError::msg("Failed to get path basename")), + }; + let path_dirname = Path::new(&event.trigger).parent().unwrap(); + + let output = ShellCommand::new("bash") + .arg(path_basename) + .current_dir(path_dirname) + .output() + .expect("failed to execute process"); + + // if shell command return 0, then the event was triggered successfully + use workflow::schema::events::dsl::*; + if output.status.code().unwrap() == 0 { + diesel::update(events.find(event.uid)) + .set(status.eq(EventStatus::Succeeded.to_string())) + .execute(conn)?; + + { + use workflow::schema::tasks::dsl::*; + let light_tasks: Vec = tasks + .select(LightTask::as_select()) + .filter(event_uid.eq(event.uid)) + .load(conn)?; + let _ = push_tasks_to_queue(light_tasks); + } + } else { + diesel::update(events.find(event.uid)) + .set(( + status.eq(EventStatus::Retrying.to_string()), + triggered_at.eq(diesel::dsl::now), + )) + .execute(conn)?; + }; + + diesel::update(schema::events::dsl::events.find(event.uid)) + .set(( + stdout.eq(str::from_utf8(&output.stdout)?), + stderr.eq(str::from_utf8(&output.stderr)?), + )) + .execute(conn)?; + + println!( + "event id: {} , trigger: {}\nFinished executing with a status: {}", + event.uid, event.trigger, output.status + ); + println!("##############################################"); + println!("stdout: {}", str::from_utf8(&output.stdout)?); + println!("----------------------------------------------"); + println!("stderr: {}", str::from_utf8(&output.stderr)?); + println!("##############################################"); + Ok(()) +} + +pub fn run_process( + process_name: &str, + process_fn: F, + engine_uid: i32, + bus: Arc>>, +) -> Result<(), AnyError> +where + F: FnOnce(Arc, i32, Arc>>) -> Result<(), AnyError>, +{ + let running = Arc::new(AtomicBool::new(true)); + let r = running.clone(); + + set_handler(move || { + r.store(false, Ordering::SeqCst); + }) + .expect("Error setting Ctrl-C handler"); + + if let Err(e) = process_fn(running, engine_uid, bus) { + eprintln!("Failed to start {} process: {}", process_name, e); + eprintln!("exiting..."); + std::process::exit(1); + } + println!("{} process stopped correctly", process_name); + + Ok(()) +} + +pub fn run_event_process( + engine_uid: i32, + bus: Arc>>, +) -> Result<(), AnyError> { + run_process("Event", poll_events, engine_uid, bus) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args: Vec = env::args().collect(); + println!("args: {:?}", args); + + let engine_uid = args[1].parse::().unwrap(); + println!("engine_uid: {}", engine_uid); + + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::TRACE) + .finish(); + + tracing::subscriber::set_global_default(subscriber).expect("Failed to set subscriber"); + + // let stdout_pipe: Arc> = Arc::new(RwLock::new(Stdio::piped())); + // let (stdout_read, mut stdout_write) = tokio::io::duplex(1024); + + // let (read, write) = tokio::io::duplex(1024); + // let (read_rx, _) = tokio::io::split(stdout_read); + // let (mut write_rx, mut read_tx) = tokio::io::split(stdout_write); + + let (sender, receiver) = tokio_mpsc::unbounded_channel::(); + let receiver: Arc>> = + Arc::new(tokio::sync::Mutex::new(receiver)); + + let mut bus = Bus::::new(100); + let mut bus_mutex = Arc::new(tokio::sync::Mutex::new(bus)); + let bus_mutex_grpc = bus_mutex.clone(); + // let mut rx1 = bus.add_rx(); + + tokio::spawn(async move { + if let Err(e) = run_event_process(engine_uid, bus_mutex) { + println!("Failed to start event process, {}", e); + std::process::exit(1); + }; + }); + + let addr = "[::1]:10001".parse().unwrap(); + + let stream = OutputStreamer { + receiver: bus_mutex_grpc, + }; + + let svc = OutputStreamingServer::new(stream); + + Server::builder().add_service(svc).serve(addr).await?; + + Ok(()) +} diff --git a/src/engine/task.rs b/src/components/task.rs similarity index 50% rename from src/engine/task.rs rename to src/components/task.rs index d51cfa9..3a381f3 100644 --- a/src/engine/task.rs +++ b/src/components/task.rs @@ -1,16 +1,62 @@ -use crate::models::{Engine, EngineStatus, LightTask, ProcessStatus, TaskStatus}; -use crate::utils::{self, create_redis_connection, establish_pg_connection}; use anyhow::Error as AnyError; use bincode::deserialize; use diesel::prelude::*; +use rand::seq::IteratorRandom; use rayon::ThreadPoolBuilder; use redis::{Commands as RedisCommand, FromRedisValue}; +use tonic::server::ServerStreamingService; +// use std::io::BufReader; use std::path::Path; -use std::process::Command as ShellCommand; +use std::pin::Pin; +use std::process::{ChildStdout, Command as ShellCommand, Stdio}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use std::{str, thread}; +use std::{env, str, thread}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; +use workflow::engine_utils::run_process; +use workflow::models::{LightTask, ProcessStatus, TaskStatus}; +use workflow::utils::{self, create_redis_connection, establish_pg_connection}; + +pub mod grpc { + tonic::include_proto!("grpc"); +} +// use grpc::output_streaming_client::OutputStreamingClient; +use grpc::output_streaming_server::{OutputStreaming, OutputStreamingServer}; +use grpc::{OutputChunk, Response as GrpcResponse}; + +#[derive(Debug)] +pub struct OutputStreamer { + // an stdout pipe steam + features: Arc>, +} + +#[tonic::async_trait] +impl OutputStreaming for OutputStreamer { + type StreamOutputStream = ReceiverStream>; + + async fn stream_output( + &self, + request: Request, + ) -> Result, Status> { + let (mut tx, rx) = mpsc::channel(4); + let features = self.features.clone(); + + // Spawn an async task to send the output data to the client + + tokio::spawn(async move { + for feature in &features[..] { + tx.send(Ok(feature.clone())).await.unwrap(); + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} const THREAD_COUNT: usize = 4; @@ -19,7 +65,7 @@ pub fn queue_processor(running: Arc, engine_uid: i32) -> Result<(), let pg_conn = &mut establish_pg_connection(); let mut redis_con = create_redis_connection()?; - use crate::schema::engines::dsl::*; + use workflow::schema::engines::dsl::*; diesel::update(engines) .filter(uid.eq(engine_uid)) @@ -36,9 +82,10 @@ pub fn queue_processor(running: Arc, engine_uid: i32) -> Result<(), thread_pool.spawn(move || { let task: LightTask = deserialize(popped_value.as_bytes()).unwrap(); println!("Task: {}", task); - if let Err(e) = execute_task(task) { - println!("Failed to execute task {}", e); - }; + let future = execute_task(task); + // if let Err(e) = execute_task(task) { + // println!("Failed to execute task {}", e); + // }; }); } None => { @@ -83,13 +130,12 @@ pub fn queue_processor(running: Arc, engine_uid: i32) -> Result<(), Ok(()) } -fn execute_task(task: LightTask) -> Result<(), AnyError> { +async fn execute_task(task: LightTask) -> Result<(), AnyError> { println!("Task Executor"); - use crate::schema::tasks::dsl::*; + use workflow::schema::tasks::dsl::*; let conn = &mut establish_pg_connection(); - // todo , update task status to running - + // let mut client = OutputStreamingClient::connect("http://[::1]:50051").await?; diesel::update(tasks.find(task.uid)) .set(( status.eq(TaskStatus::Running.to_string()), @@ -103,12 +149,62 @@ fn execute_task(task: LightTask) -> Result<(), AnyError> { }; let path_dirname = Path::new(&task.path).parent().unwrap(); - let output = ShellCommand::new("bash") + let mut cmd = ShellCommand::new("bash") .arg(path_basename) .current_dir(path_dirname) - .output() + .stdout(Stdio::piped()) + .spawn() .expect("failed to execute process"); + let stdout_content = cmd + .stdout + .take() + .expect("Could not capture standard output"); + // let reader = tokio::io::BufReader::new(stdout_content.into()); + + // let mut client_stream = client + // .stream_output(Request::new(OutputChunk { + // content: "Tonic".into(), + // })) + // .await? + // .into_inner(); + + // tokio::spawn(async move { + // let mut buf = String::new(); + // let mut reader = reader; + // loop { + // buf.clear(); + // if reader.read_line(&mut buf).await.unwrap_or(0) == 0 { + // break; + // } + + // let request = Request::new(grpc::OutputChunk { + // content: buf.clone(), + // }); + + // if let Err(_) = client_stream.send(request).await { + // break; + // } + // } + // }); + + let output = cmd.wait_with_output().expect("Failed to wait for command"); + + println!( + "task id: {} , path: {}\nFinished executing with a status: {}", + task.uid, task.path, output.status + ); + + // let stdout_content = str::from_utf8(stdout_content)?; + + // for chunk in stdout_content.chars().collect::>().chunks(10) { + // let request = tonic::Request::new(mygrpc::OutputChunk { + // content: chunk.iter().collect(), + // }); + + // client.stream_output(request).await?; + // } + if output.status.code().unwrap() == 0 { diesel::update(tasks.find(task.uid)) .set(( @@ -147,3 +243,37 @@ fn execute_task(task: LightTask) -> Result<(), AnyError> { println!("##############################################"); Ok(()) } + +pub fn run_task_process(engine_uid: i32) -> Result<(), AnyError> { + run_process("Task", queue_processor, engine_uid) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args: Vec = env::args().collect(); + println!("args: {:?}", args); + + let engine_uid = args[1].parse::().unwrap(); + println!("engine_uid: {}", engine_uid); + + tokio::spawn(async move { + if let Err(e) = run_task_process(engine_uid) { + println!("Failed to start event process, {}", e); + std::process::exit(1); + }; + }); + + let addr = "[::1]:10000".parse().unwrap(); + + let stream = OutputStreamer { + features: Arc::new(vec![GrpcResponse { + message: "Hello from task".into(), + }]), + }; + + let svc = OutputStreamingServer::new(stream); + + Server::builder().add_service(svc).serve(addr).await?; + + Ok(()) +} diff --git a/src/components/test.rs b/src/components/test.rs new file mode 100644 index 0000000..3a2a8f4 --- /dev/null +++ b/src/components/test.rs @@ -0,0 +1,100 @@ +use anyhow::Error as AnyError; +use diesel::prelude::*; +use std::path::Path; +use std::process::Command as ShellCommand; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use std::{env, str, thread}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; +use workflow::engine_utils::run_process; +use workflow::models::{EventStatus, LightEvent, LightTask, ProcessStatus}; +use workflow::schema; +use workflow::utils::{establish_pg_connection, push_tasks_to_queue}; + +pub mod grpc { + tonic::include_proto!("grpc"); +} +use grpc::output_streaming_server::{OutputStreaming, OutputStreamingServer}; +use grpc::{OutputChunk, Response as GrpcResponse}; + +#[derive(Debug)] +pub struct OutputStreamer { + // TODO: an stdout pipe steam + features: Arc>, +} + +#[tonic::async_trait] +impl OutputStreaming for OutputStreamer { + type StreamOutputStream = ReceiverStream>; + + async fn stream_output( + &self, + request: Request, + ) -> Result, Status> { + let (mut tx, rx) = mpsc::channel(4); + let features = self.features.clone(); + + // Spawn an async task to send the output data to the client + + tokio::spawn(async move { + for feature in &features[..] { + tx.send(Ok(feature.clone())).await.unwrap(); + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} + +fn run_event_process(engine_uid: i32) -> Result<(), AnyError> { + println!("run_event_process"); + let path_basename = Path::new(&env::current_exe().unwrap()) + .file_name() + .unwrap() + .to_str() + .unwrap() + .to_string(); + let mut cmd = ShellCommand::new("bash") + .arg(path_basename) + .stdout(Stdio::piped()) + .spawn() + .expect("failed to execute process"); + + // TODO: an stdout pipe steam + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args: Vec = env::args().collect(); + println!("args: {:?}", args); + + let engine_uid = args[1].parse::().unwrap(); + println!("engine_uid: {}", engine_uid); + + tokio::spawn(async move { + if let Err(e) = run_event_process(engine_uid) { + println!("Failed to start event process, {}", e); + std::process::exit(1); + }; + }); + + let addr = "[::1]:10001".parse().unwrap(); + + let stream = OutputStreamer { + features: Arc::new(vec![GrpcResponse { + message: "Hello from event".into(), + }]), + }; + + let svc = OutputStreamingServer::new(stream); + + Server::builder().add_service(svc).serve(addr).await?; + Ok(()) +} diff --git a/src/engine.rs b/src/engine.rs index bbbaeff..570f1b0 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,86 +1,81 @@ -use crate::models::EngineStatus; -use crate::utils::establish_pg_connection; -use crate::{models, schema}; use anyhow::Error as AnyError; -use ctrlc::set_handler; -use diesel::PgConnection; -use std::str; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::env; +use workflow::components::event::poll_events; +use workflow::components::task::queue_processor; +use workflow::engine_utils::run_process; +// mod models; +// mod utils; use diesel::prelude::*; -use self::event::poll_events; -use self::task::queue_processor; +// I want to use the poll_events function from src/event.rs -mod event; -mod task; +use tonic::{transport::Server, Request, Response, Status}; -fn run_process(process_name: &str, process_fn: F, engine_uid: i32) -> Result<(), AnyError> -where - F: FnOnce(Arc, i32) -> Result<(), AnyError>, -{ - let running = Arc::new(AtomicBool::new(true)); - let r = running.clone(); +use grpc::greeter_server::{Greeter, GreeterServer}; +use grpc::{LogMessageRequest, LogMessageResponse}; - set_handler(move || { - r.store(false, Ordering::SeqCst); - }) - .expect("Error setting Ctrl-C handler"); - - if let Err(e) = process_fn(running, engine_uid) { - eprintln!("Failed to start {} process: {}", process_name, e); - eprintln!("exiting..."); - std::process::exit(1); - } - println!("{} process stopped correctly", process_name); - - Ok(()) +pub mod grpc { + tonic::include_proto!("grpc"); } -pub fn run_task_process(engine_uid: i32) -> Result<(), AnyError> { - run_process("Task", queue_processor, engine_uid) -} +// pub fn run_task_process(engine_uid: i32) -> Result<(), AnyError> { +// run_process("Task", queue_processor, engine_uid) +// } -pub fn run_event_process(engine_uid: i32) -> Result<(), AnyError> { - run_process("Event", poll_events, engine_uid) -} +// pub fn run_event_process(engine_uid: i32) -> Result<(), AnyError> { +// run_process("Event", poll_events, engine_uid) +// } -pub fn handle_stop() -> Result<(), AnyError> { - diesel::update(schema::engines::table) - .set(schema::engines::stop_signal.eq(true)) - .execute(&mut establish_pg_connection())?; - Ok(()) -} +#[derive(Debug, Default)] +pub struct MyGreeter {} -pub fn update_engine_status( - conn: &mut PgConnection, - engine_uid: i32, - engine_status: EngineStatus, -) -> Result<(), diesel::result::Error> { - use crate::schema::engines::dsl::*; +#[tonic::async_trait] +impl Greeter for MyGreeter { + async fn say_hello( + &self, + request: Request, + ) -> Result, Status> { + println!("Got a request: {:?}", request); - diesel::update(engines) - .filter(uid.eq(engine_uid)) - .set(status.eq(engine_status.to_string())) - .execute(conn)?; + let reply = grpc::LogMessageResponse { + message: format!("Hello {}!", request.into_inner().content).into(), + }; - Ok(()) + Ok(Response::new(reply)) + } } -pub fn create_new_engine_entry( - conn: &mut PgConnection, - name: &str, - ip_address: &str, -) -> Result { - use crate::schema::engines::table as engines; - use crate::schema::engines::uid as engine_uid; - - let new_engine = models::NewEngine { name, ip_address }; - - //insert and return uid - diesel::insert_into(engines) - .values(&new_engine) - .returning(engine_uid) - .get_result::(conn) +//main function that takes an argument +#[tokio::main] +async fn main() -> Result<(), Box> { + let args: Vec = env::args().collect(); + println!("args: {:?}", args); + + let engine_uid = args[1].parse::().unwrap(); + println!("engine_uid: {}", engine_uid); + + // tokio::spawn(async move { + // if let Err(e) = run_event_process(engine_uid) { + // println!("Failed to start event process, {}", e); + // std::process::exit(1); + // }; + // }); + + // tokio::spawn(async move { + // if let Err(e) = run_task_process(engine_uid) { + // println!("Failed to start event process, {}", e); + // std::process::exit(1); + // }; + // }); + + let addr = "[::1]:50051".parse()?; + let greeter = MyGreeter::default(); + + Server::builder() + .add_service(GreeterServer::new(greeter)) + .serve(addr) + .await?; + + Ok(()) } diff --git a/src/engine/event.rs b/src/engine/event.rs deleted file mode 100644 index 9d01718..0000000 --- a/src/engine/event.rs +++ /dev/null @@ -1,134 +0,0 @@ -use crate::models::{EventStatus, LightEvent, LightTask, ProcessStatus}; -use crate::schema; -use crate::utils::{establish_pg_connection, push_tasks_to_queue}; -use anyhow::Error as AnyError; -use diesel::prelude::*; -use std::path::Path; -use std::process::Command as ShellCommand; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::time::Duration; -use std::{str, thread}; - -pub fn poll_events(running: Arc, engine_uid: i32) -> Result<(), AnyError> { - let mut event_uids: Vec = Vec::new(); - let pg_conn = &mut establish_pg_connection(); - - use crate::schema::engines::dsl::*; - - diesel::update(engines) - .filter(uid.eq(engine_uid)) - .set(event_process_status.eq(ProcessStatus::Running.to_string())) - .execute(pg_conn)?; - - while running.load(Ordering::SeqCst) { - let events: Vec = schema::events::dsl::events - .select(LightEvent::as_select()) - .filter(schema::events::status.ne(EventStatus::Succeeded.to_string())) - .load(pg_conn)?; - - for event in events { - println!("Event: {}", event); - // async execute_event - let _ = execute_event(event); - } - - if event_uids.is_empty() { - println!("No events to process"); - thread::sleep(Duration::from_millis(2000)); - } - - let received_stop_signal_result: Result, _> = engines - .find(engine_uid) - .select(stop_signal) - .first(pg_conn) - .optional(); - match received_stop_signal_result { - Ok(Some(signal_on)) => { - if signal_on { - println!("Received stop signal"); - break; - } - } - Ok(None) => { - println!("No stop signal"); - } - Err(e) => { - eprintln!("Failed to query engine status {}", e); - eprintln!("exiting..."); - std::process::exit(1); - } - } - - event_uids.clear(); - } - if !running.load(Ordering::SeqCst) { - println!("\nCtrl+C signal detected. Exiting..."); - } - - diesel::update(engines) - .filter(uid.eq(engine_uid)) - .set(event_process_status.eq(ProcessStatus::Stopped.to_string())) - .execute(pg_conn)?; - Ok(()) -} - -fn execute_event(event: LightEvent) -> Result<(), AnyError> { - println!("Event Executor"); - - let conn = &mut establish_pg_connection(); - - let path_basename = match Path::new(&event.trigger).file_name() { - Some(basename) => basename, - None => return Err(AnyError::msg("Failed to get path basename")), - }; - let path_dirname = Path::new(&event.trigger).parent().unwrap(); - - let output = ShellCommand::new("bash") - .arg(path_basename) - .current_dir(path_dirname) - .output() - .expect("failed to execute process"); - - // if shell command return 0, then the event was triggered successfully - use crate::schema::events::dsl::*; - if output.status.code().unwrap() == 0 { - diesel::update(events.find(event.uid)) - .set(status.eq(EventStatus::Succeeded.to_string())) - .execute(conn)?; - - { - use crate::schema::tasks::dsl::*; - let light_tasks: Vec = tasks - .select(LightTask::as_select()) - .filter(event_uid.eq(event.uid)) - .load(conn)?; - let _ = push_tasks_to_queue(light_tasks); - } - } else { - diesel::update(events.find(event.uid)) - .set(( - status.eq(EventStatus::Retrying.to_string()), - triggered_at.eq(diesel::dsl::now), - )) - .execute(conn)?; - }; - - diesel::update(schema::events::dsl::events.find(event.uid)) - .set(( - stdout.eq(str::from_utf8(&output.stdout)?), - stderr.eq(str::from_utf8(&output.stderr)?), - )) - .execute(conn)?; - - println!( - "event id: {} , trigger: {}\nFinished executing with a status: {}", - event.uid, event.trigger, output.status - ); - println!("##############################################"); - println!("stdout: {}", str::from_utf8(&output.stdout)?); - println!("----------------------------------------------"); - println!("stderr: {}", str::from_utf8(&output.stderr)?); - println!("##############################################"); - Ok(()) -} diff --git a/src/engine_utils.rs b/src/engine_utils.rs new file mode 100644 index 0000000..6af005c --- /dev/null +++ b/src/engine_utils.rs @@ -0,0 +1,84 @@ +use anyhow::Error as AnyError; +use ctrlc::set_handler; +use diesel::PgConnection; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::{env, str}; +// use workflow::components::event::poll_events; +// use workflow::components::task::queue_processor; +use crate::models::{EngineStatus, NewEngine}; +use crate::schema; +use crate::utils::establish_pg_connection; +// mod models; +// mod utils; + +use diesel::prelude::*; + +pub fn run_process(process_name: &str, process_fn: F, engine_uid: i32) -> Result<(), AnyError> +where + F: FnOnce(Arc, i32) -> Result<(), AnyError>, +{ + let running = Arc::new(AtomicBool::new(true)); + let r = running.clone(); + + set_handler(move || { + r.store(false, Ordering::SeqCst); + }) + .expect("Error setting Ctrl-C handler"); + + if let Err(e) = process_fn(running, engine_uid) { + eprintln!("Failed to start {} process: {}", process_name, e); + eprintln!("exiting..."); + std::process::exit(1); + } + println!("{} process stopped correctly", process_name); + + Ok(()) +} + +// pub fn run_task_process(engine_uid: i32) -> Result<(), AnyError> { +// run_process("Task", queue_processor, engine_uid) +// } + +// pub fn run_event_process(engine_uid: i32) -> Result<(), AnyError> { +// run_process("Event", poll_events, engine_uid) +// } + +pub fn handle_stop() -> Result<(), AnyError> { + diesel::update(schema::engines::table) + .set(schema::engines::stop_signal.eq(true)) + .execute(&mut establish_pg_connection())?; + Ok(()) +} + +pub fn update_engine_status( + conn: &mut PgConnection, + engine_uid: i32, + engine_status: EngineStatus, +) -> Result<(), diesel::result::Error> { + use crate::schema::engines::dsl::*; + + diesel::update(engines) + .filter(uid.eq(engine_uid)) + .set(status.eq(engine_status.to_string())) + .execute(conn)?; + + Ok(()) +} + +pub fn create_new_engine_entry( + conn: &mut PgConnection, + name: &str, + ip_address: &str, +) -> Result { + use crate::schema::engines::table as engines; + use crate::schema::engines::uid as engine_uid; + + let new_engine = NewEngine { name, ip_address }; + + //insert and return uid + diesel::insert_into(engines) + .values(&new_engine) + .returning(engine_uid) + .get_result::(conn) +} diff --git a/src/lib.rs b/src/lib.rs index 6379791..d349bed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,8 @@ -pub mod engine; +// pub mod components { +// pub mod event; +// pub mod task; +// } +pub mod engine_utils; pub mod models; pub mod parser; pub mod schema; diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index a9e1011..0000000 --- a/src/main.rs +++ /dev/null @@ -1,8 +0,0 @@ -use anyhow::Error as AnyError; - -mod cli; - -fn main() -> Result<(), AnyError> { - cli::cli(); - Ok(()) -} diff --git a/tests/workflows/log_stream/ping.sh b/tests/workflows/log_stream/ping.sh new file mode 100644 index 0000000..38d9020 --- /dev/null +++ b/tests/workflows/log_stream/ping.sh @@ -0,0 +1,3 @@ +#! /bin/sh +set -e +ping -c 1 google.com \ No newline at end of file diff --git a/tests/workflows/log_stream/tasks/stream.sh b/tests/workflows/log_stream/tasks/stream.sh new file mode 100644 index 0000000..54ae910 --- /dev/null +++ b/tests/workflows/log_stream/tasks/stream.sh @@ -0,0 +1,6 @@ +#! /bin/bash + +for second in $(seq 1 100); do + echo "Stream $second" + sleep 1 +done \ No newline at end of file diff --git a/tests/workflows/log_stream/workflow.yml b/tests/workflows/log_stream/workflow.yml new file mode 100644 index 0000000..6b0dcdd --- /dev/null +++ b/tests/workflows/log_stream/workflow.yml @@ -0,0 +1,7 @@ +name: simulation of log stream +description: simulation of log stream +events: + - name: Event2 + trigger: ./ping.sh + tasks: + - path: ./tasks/stream.sh