Skip to content

Commit 45d3cdf

Browse files
Merge pull request #214 from code0-tech/213-non-blocking-bug
Non-Blocking gRPC Server
2 parents c1f5e2b + e6c1910 commit 45d3cdf

File tree

7 files changed

+120
-78
lines changed

7 files changed

+120
-78
lines changed

Cargo.lock

Lines changed: 51 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@ version = "0.1.0"
44
edition = "2024"
55

66
[dependencies]
7-
tokio = { version = "1.44.1", features = ["rt-multi-thread"] }
7+
tokio = { version = "1.44.1", features = ["rt-multi-thread", "signal"] }
88
futures = "0.3.31"
99
log = "0.4.26"
1010
env_logger = "0.11.8"
1111
prost = "0.14.1"
1212
tonic = "0.14.1"
13-
tucana = { version = "0.0.39", features = ["all"] }
14-
code0-flow = { version = "0.0.18" }
13+
tucana = { version = "0.0.42", features = ["all"] }
14+
code0-flow = { version = "0.0.19", features = ["flow_health", "flow_validator"] }
1515
serde_json = "1.0.140"
16-
async-nats = "0.44.2"
16+
async-nats = "0.45.0"
1717
tonic-health = "0.14.1"
1818
tokio-stream = "0.1.17"
1919
uuid = { version = "1.18.0", features = ["v4"] }

src/main.rs

Lines changed: 57 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::sagittarius::test_execution_client_impl::SagittariusTestExecutionServiceClient;
21
use crate::{configuration::Config as AquilaConfig, flow::get_flow_identifier};
32
use async_nats::jetstream::kv::Config;
43
use code0_flow::flow_config::load_env_file;
@@ -31,8 +30,14 @@ async fn main() {
3130

3231
//Create connection to JetStream
3332
let client = match async_nats::connect(config.nats_url.clone()).await {
34-
Ok(client) => client,
35-
Err(err) => panic!("Failed to connect to NATS server: {}", err),
33+
Ok(client) => {
34+
log::info!("Connected to NATS");
35+
client
36+
}
37+
Err(err) => {
38+
log::error!("Failed to connect to NATS: {:?}", err);
39+
panic!("Failed to connect to NATS server: {:?}", err)
40+
}
3641
};
3742

3843
let jet_stream = async_nats::jetstream::new(client.clone());
@@ -45,47 +50,60 @@ async fn main() {
4550
.await;
4651

4752
let kv_store = match jet_stream.get_key_value(config.nats_bucket.clone()).await {
48-
Ok(kv) => Arc::new(kv),
49-
Err(err) => panic!("Failed to get key-value store: {}", err),
53+
Ok(kv) => {
54+
log::info!("Connected to JetStream");
55+
Arc::new(kv)
56+
}
57+
Err(err) => {
58+
log::error!("Failed to get key-value store: {:?}", err);
59+
panic!("Failed to get key-value store: {:?}", err)
60+
}
5061
};
5162

52-
//Create connection to Sagittarius if the type is hybrid
53-
if !config.is_static() {
54-
let server = AquilaGRPCServer::new(&config);
55-
56-
match server.start().await {
57-
Ok(_) => {
58-
log::info!("Server started successfully");
59-
}
60-
Err(err) => {
61-
log::error!("Failed to start server: {:?}", err);
62-
panic!("Failed to start server");
63-
}
64-
};
63+
if config.is_static() {
64+
log::info!("Starting with static configuration");
65+
init_flows_from_json(config.flow_fallback_path, kv_store).await;
66+
return;
67+
}
6568

66-
// Connect to Sagittarius Flow Endpoint
67-
SagittariusFlowClient::new(
68-
config.backend_url.clone(),
69-
kv_store.clone(),
70-
config.runtime_token.clone(),
71-
)
72-
.await
73-
.init_flow_stream()
74-
.await;
69+
let server = AquilaGRPCServer::new(&config);
70+
let backend_url_flow = config.backend_url.clone();
71+
let runtime_token_flow = config.runtime_token.clone();
72+
let kv_for_flow = kv_store.clone();
7573

76-
// Connect to Sagittarius Execution Endpoint
77-
SagittariusTestExecutionServiceClient::new(
78-
client,
79-
kv_store,
80-
config.backend_url,
81-
config.runtime_token,
82-
)
83-
.await
84-
.logon()
85-
.await;
86-
} else {
87-
init_flows_from_json(config.flow_fallback_path, kv_store).await
74+
let mut server_task = tokio::spawn(async move {
75+
if let Err(err) = server.start().await {
76+
log::error!("gRPC server error: {:?}", err);
77+
} else {
78+
log::info!("gRPC server stopped gracefully");
79+
}
80+
});
81+
82+
let mut flow_task = tokio::spawn(async move {
83+
let mut flow_client =
84+
SagittariusFlowClient::new(backend_url_flow, kv_for_flow, runtime_token_flow).await;
85+
86+
flow_client.init_flow_stream().await;
87+
log::warn!("Flow stream task exited");
88+
});
89+
90+
tokio::select! {
91+
_ = &mut server_task => {
92+
log::warn!("gRPC server task finished, shutting down");
93+
flow_task.abort();
94+
}
95+
_ = &mut flow_task => {
96+
log::warn!("Flow stream task finished, shutting down");
97+
server_task.abort();
98+
}
99+
_ = tokio::signal::ctrl_c() => {
100+
log::info!("Ctrl+C/Exit signal received, shutting down");
101+
server_task.abort();
102+
flow_task.abort();
103+
}
88104
}
105+
106+
log::info!("Aquila shutdown complete");
89107
}
90108

91109
async fn init_flows_from_json(

src/sagittarius/flow_service_client_impl.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl SagittariusFlowClient {
4242
async fn handle_response(&mut self, response: FlowResponse) {
4343
let data = match response.data {
4444
Some(data) => {
45-
log::info!("Recieved a FlowResponse");
45+
log::info!("Received a FlowResponse");
4646
data
4747
}
4848
None => {
@@ -115,7 +115,7 @@ impl SagittariusFlowClient {
115115

116116
let response = match self.client.update(request).await {
117117
Ok(res) => {
118-
log::info!("Succesfully established a Stream (for Flows)");
118+
log::info!("Successfully established a Stream (for Flows)");
119119
res
120120
}
121121
Err(status) => {

src/server/data_type_service_server_impl.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,11 @@ impl DataTypeService for AquilaDataTypeServiceServer {
1818
async fn update(
1919
&self,
2020
request: tonic::Request<tucana::aquila::DataTypeUpdateRequest>,
21-
) -> std::result::Result<tonic::Response<tucana::aquila::DataTypeUpdateResponse>, tonic::Status>
22-
{
21+
) -> Result<tonic::Response<tucana::aquila::DataTypeUpdateResponse>, tonic::Status> {
2322
let data_type_update_request = request.into_inner();
2423

2524
log::info!(
26-
"Recieved DataTypes: {:?}",
25+
"Received DataTypes: {:?}",
2726
data_type_update_request.data_types
2827
);
2928

src/server/flow_type_service_server_impl.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@ impl FlowTypeService for AquilaFlowTypeServiceServer {
1919
async fn update(
2020
&self,
2121
request: tonic::Request<tucana::aquila::FlowTypeUpdateRequest>,
22-
) -> std::result::Result<tonic::Response<tucana::aquila::FlowTypeUpdateResponse>, tonic::Status>
23-
{
22+
) -> Result<tonic::Response<tucana::aquila::FlowTypeUpdateResponse>, tonic::Status> {
2423
let flow_type_update_request = request.into_inner();
2524

2625
log::info!(
27-
"Recieved FlowTypes: {:?}",
26+
"Received FlowTypes: {:?}",
2827
flow_type_update_request.flow_types
2928
);
3029

src/server/runtime_function_service_server_impl.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ impl RuntimeFunctionDefinitionService for AquilaRuntimeFunctionServiceServer {
2020
async fn update(
2121
&self,
2222
request: tonic::Request<tucana::aquila::RuntimeFunctionDefinitionUpdateRequest>,
23-
) -> std::result::Result<
23+
) -> Result<
2424
tonic::Response<tucana::aquila::RuntimeFunctionDefinitionUpdateResponse>,
2525
tonic::Status,
2626
> {
2727
let runtime_function_definition_update_request = request.into_inner();
2828

2929
log::info!(
30-
"Recieved RuntimeFunctions: {:?}",
30+
"Received RuntimeFunctions: {:?}",
3131
runtime_function_definition_update_request.runtime_functions
3232
);
3333

0 commit comments

Comments
 (0)