From 48fbf7eba84fab9d3d70a0853b348e82c4d7de18 Mon Sep 17 00:00:00 2001 From: Raphael Date: Wed, 20 Aug 2025 08:51:48 +0200 Subject: [PATCH] feat: added health service --- .env-example | 5 ++++- Cargo.lock | 3 ++- Cargo.toml | 5 ++--- src/configuration.rs | 17 ++++++++++++----- src/main.rs | 34 +++++++++++++++++++++++++++------- 5 files changed, 47 insertions(+), 17 deletions(-) diff --git a/.env-example b/.env-example index c75ec79..56a4f63 100644 --- a/.env-example +++ b/.env-example @@ -1,4 +1,7 @@ ENVIRONMENT='development' MODE='dynamic' -RABBITMQ_URL='amqp://localhost:5672' +NATS_URL='amqp://localhost:5672' AQUILA_URL='http://localhost:8080' +WITH_HEALTH_SERVICE=false +GRPC_HOST='127.0.0.1' +GRPC_PORT=50051 \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 7bd3c05..8a6f607 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1709,8 +1709,9 @@ dependencies = [ "rand 0.9.2", "serde", "serde_json", - "tempfile", "tokio", + "tonic", + "tonic-health", "tucana", ] diff --git a/Cargo.toml b/Cargo.toml index 968702b..c413cd1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,5 @@ base64 = "0.22.1" env_logger = "0.11.8" async-nats = "0.42.0" prost = "0.14.1" - -[dev-dependencies] -tempfile = "3.19.1" +tonic-health = "0.14.1" +tonic = "0.14.1" diff --git a/src/configuration.rs b/src/configuration.rs index 7507c46..ba0396f 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -15,11 +15,15 @@ pub struct Config { /// `hybrid` pub mode: Mode, - /// Verification Token required for internal communication - pub rabbitmq_url: String, + pub nats_url: String, - /// URL to the `Sagittarius` Server. pub aquila_url: String, + + pub with_health_service: bool, + + pub grpc_host: String, + + pub grpc_port: u16, } /// Implementation for all relevant `Aquila` startup configurations @@ -31,8 +35,11 @@ impl Config { Config { environment: env_with_default("ENVIRONMENT", Environment::Development), mode: env_with_default("MODE", Mode::STATIC), - rabbitmq_url: env_with_default("RABBITMQ_URL", String::from("amqp://localhost:5672")), - aquila_url: env_with_default("AQUILA_URL", String::from("http://localhost:8080")), + nats_url: env_with_default("RABBITMQ_URL", String::from("amqp://localhost:5672")), + aquila_url: env_with_default("AQUILA_URL", String::from("http://localhost:50051")), + with_health_service: env_with_default("WITH_HEALTH_SERVICE", false), + grpc_host: env_with_default("GRPC_HOST", "127.0.0.1".to_string()), + grpc_port: env_with_default("GRPC_PORT", 50051), } } } diff --git a/src/main.rs b/src/main.rs index 0dba951..205b013 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,9 @@ use code0_flow::flow_config::load_env_file; use context::{Context, ContextEntry, ContextResult}; use error::RuntimeError; use futures_lite::StreamExt; +use log::error; use prost::Message; +use tonic_health::pb::health_server::HealthServer; use registry::FunctionStore; use tucana::shared::value::Kind; use tucana::shared::{ExecutionFlow, ListValue, NodeFunction, Value}; @@ -170,13 +172,34 @@ async fn main() { let mut store = FunctionStore::new(); store.populate(collect()); - let client = match async_nats::connect("nats://127.0.0.1:4222").await { + let client = match async_nats::connect(config.nats_url.clone()).await { Ok(client) => client, Err(err) => { panic!("Failed to connect to NATS server: {}", err); } }; + if config.with_health_service { + let health_service = + code0_flow::flow_health::HealthService::new(config.nats_url.clone()); + let address = match format!("{}:{}", config.grpc_host, config.grpc_port).parse() { + Ok(address) => address, + Err(err) => { + error!("Failed to parse grpc address: {:?}", err); + return; + } + }; + + tokio::spawn(async move { + let _ = tonic::transport::Server::builder() + .add_service(HealthServer::new(health_service)) + .serve(address) + .await; + }); + + println!("Health server started at {}", address); + } + let _ = match client .queue_subscribe(String::from("execution.*"), "taurus".into()) .await @@ -193,12 +216,9 @@ async fn main() { } }; - let value = match handle_message(flow, &store) { - Some(value) => value, - None => Value { - kind: Some(Kind::NullValue(0)), - }, - }; + let value = handle_message(flow, &store).unwrap_or_else(|| Value { + kind: Some(Kind::NullValue(0)), + }); // Send a response to the reply subject if let Some(reply) = msg.reply {