Skip to content

Commit 52f3377

Browse files
Merge pull request #69 from code0-tech/68-add-health-server
feat: added health service
2 parents 01cbd50 + 48fbf7e commit 52f3377

File tree

5 files changed

+47
-17
lines changed

5 files changed

+47
-17
lines changed

.env-example

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
ENVIRONMENT='development'
22
MODE='dynamic'
3-
RABBITMQ_URL='amqp://localhost:5672'
3+
NATS_URL='amqp://localhost:5672'
44
AQUILA_URL='http://localhost:8080'
5+
WITH_HEALTH_SERVICE=false
6+
GRPC_HOST='127.0.0.1'
7+
GRPC_PORT=50051

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,5 @@ base64 = "0.22.1"
1616
env_logger = "0.11.8"
1717
async-nats = "0.42.0"
1818
prost = "0.14.1"
19-
20-
[dev-dependencies]
21-
tempfile = "3.19.1"
19+
tonic-health = "0.14.1"
20+
tonic = "0.14.1"

src/configuration.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@ pub struct Config {
1515
/// `hybrid`
1616
pub mode: Mode,
1717

18-
/// Verification Token required for internal communication
19-
pub rabbitmq_url: String,
18+
pub nats_url: String,
2019

21-
/// URL to the `Sagittarius` Server.
2220
pub aquila_url: String,
21+
22+
pub with_health_service: bool,
23+
24+
pub grpc_host: String,
25+
26+
pub grpc_port: u16,
2327
}
2428

2529
/// Implementation for all relevant `Aquila` startup configurations
@@ -31,8 +35,11 @@ impl Config {
3135
Config {
3236
environment: env_with_default("ENVIRONMENT", Environment::Development),
3337
mode: env_with_default("MODE", Mode::STATIC),
34-
rabbitmq_url: env_with_default("RABBITMQ_URL", String::from("amqp://localhost:5672")),
35-
aquila_url: env_with_default("AQUILA_URL", String::from("http://localhost:8080")),
38+
nats_url: env_with_default("RABBITMQ_URL", String::from("amqp://localhost:5672")),
39+
aquila_url: env_with_default("AQUILA_URL", String::from("http://localhost:50051")),
40+
with_health_service: env_with_default("WITH_HEALTH_SERVICE", false),
41+
grpc_host: env_with_default("GRPC_HOST", "127.0.0.1".to_string()),
42+
grpc_port: env_with_default("GRPC_PORT", 50051),
3643
}
3744
}
3845
}

src/main.rs

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ use code0_flow::flow_config::load_env_file;
1010
use context::{Context, ContextEntry, ContextResult};
1111
use error::RuntimeError;
1212
use futures_lite::StreamExt;
13+
use log::error;
1314
use prost::Message;
15+
use tonic_health::pb::health_server::HealthServer;
1416
use registry::FunctionStore;
1517
use tucana::shared::value::Kind;
1618
use tucana::shared::{ExecutionFlow, ListValue, NodeFunction, Value};
@@ -170,13 +172,34 @@ async fn main() {
170172
let mut store = FunctionStore::new();
171173
store.populate(collect());
172174

173-
let client = match async_nats::connect("nats://127.0.0.1:4222").await {
175+
let client = match async_nats::connect(config.nats_url.clone()).await {
174176
Ok(client) => client,
175177
Err(err) => {
176178
panic!("Failed to connect to NATS server: {}", err);
177179
}
178180
};
179181

182+
if config.with_health_service {
183+
let health_service =
184+
code0_flow::flow_health::HealthService::new(config.nats_url.clone());
185+
let address = match format!("{}:{}", config.grpc_host, config.grpc_port).parse() {
186+
Ok(address) => address,
187+
Err(err) => {
188+
error!("Failed to parse grpc address: {:?}", err);
189+
return;
190+
}
191+
};
192+
193+
tokio::spawn(async move {
194+
let _ = tonic::transport::Server::builder()
195+
.add_service(HealthServer::new(health_service))
196+
.serve(address)
197+
.await;
198+
});
199+
200+
println!("Health server started at {}", address);
201+
}
202+
180203
let _ = match client
181204
.queue_subscribe(String::from("execution.*"), "taurus".into())
182205
.await
@@ -193,12 +216,9 @@ async fn main() {
193216
}
194217
};
195218

196-
let value = match handle_message(flow, &store) {
197-
Some(value) => value,
198-
None => Value {
199-
kind: Some(Kind::NullValue(0)),
200-
},
201-
};
219+
let value = handle_message(flow, &store).unwrap_or_else(|| Value {
220+
kind: Some(Kind::NullValue(0)),
221+
});
202222

203223
// Send a response to the reply subject
204224
if let Some(reply) = msg.reply {

0 commit comments

Comments
 (0)