Skip to content

Commit 23b040e

Browse files
authored
feat: add sandbox env injection case via cloud control (#19383)
* feat: add sandbox env injection case via cloud control * feat: add worker ddl and cloud env mock * chore: codefmt * chore: add worker parse test * feat: support size/auto_suspend.. options for worker * feat: add options/suspend/resume for Alter Worker * fix: Normalize UNSET worker option names before planning * chore: codefmt
1 parent fa0c074 commit 23b040e

38 files changed

Lines changed: 1855 additions & 137 deletions

src/common/cloud_control/proto/resource.proto

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,68 @@ option go_package = "databend.com/cloudcontrol/proto";
33

44
package resourceproto;
55

6-
message ApplyResourceRequest {
7-
string type = 3;
8-
string script = 4;
6+
message Worker {
7+
string name = 1;
8+
map<string, string> tags = 2;
9+
string created_at = 3;
10+
string updated_at = 4;
11+
map<string, string> options = 5;
912
}
1013

11-
message ApplyResourceResponse {
12-
string endpoint = 1;
13-
map<string, string> headers = 2;
14+
message CreateWorkerRequest {
15+
string tenant_id = 1;
16+
string name = 2;
17+
bool if_not_exists = 3;
18+
map<string, string> tags = 4;
19+
string type = 5;
20+
string script = 6;
21+
map<string, string> options = 7;
1422
}
1523

16-
service ResourceService {
17-
rpc ApplyResource(ApplyResourceRequest) returns (ApplyResourceResponse);
24+
message CreateWorkerResponse {
25+
Worker worker = 1;
26+
string endpoint = 2;
27+
map<string, string> headers = 3;
28+
}
29+
30+
message AlterWorkerRequest {
31+
enum WorkerStateAction {
32+
Unspecified = 0;
33+
Suspend = 1;
34+
Resume = 2;
35+
}
36+
string tenant_id = 1;
37+
string name = 2;
38+
map<string, string> set_tags = 3;
39+
repeated string unset_tags = 4;
40+
map<string, string> set_options = 5;
41+
repeated string unset_options = 6;
42+
WorkerStateAction state_action = 7;
43+
}
44+
45+
message AlterWorkerResponse {
46+
Worker worker = 1;
47+
}
48+
49+
message DropWorkerRequest {
50+
string tenant_id = 1;
51+
string name = 2;
52+
bool if_exists = 3;
53+
}
54+
55+
message DropWorkerResponse {}
56+
57+
message ListWorkersRequest {
58+
string tenant_id = 1;
59+
}
60+
61+
message ListWorkersResponse {
62+
repeated Worker workers = 1;
63+
}
64+
65+
service WorkerService {
66+
rpc CreateWorker(CreateWorkerRequest) returns (CreateWorkerResponse);
67+
rpc AlterWorker(AlterWorkerRequest) returns (AlterWorkerResponse);
68+
rpc DropWorker(DropWorkerRequest) returns (DropWorkerResponse);
69+
rpc ListWorkers(ListWorkersRequest) returns (ListWorkersResponse);
1870
}

src/common/cloud_control/src/client_config.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ impl ClientConfig {
5858
);
5959
}
6060

61-
pub fn add_resource_version_info(&mut self) {
61+
pub fn add_worker_version_info(&mut self) {
6262
self.add_metadata(
63-
crate::resource_client::RESOURCE_CLIENT_VERSION_NAME,
64-
crate::resource_client::RESOURCE_CLIENT_VERSION,
63+
crate::worker_client::WORKER_CLIENT_VERSION_NAME,
64+
crate::worker_client::WORKER_CLIENT_VERSION,
6565
);
6666
}
6767
pub fn get_metadata(&self) -> &Vec<(String, String)> {

src/common/cloud_control/src/cloud_api.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@ use databend_common_exception::ErrorCode;
2020
use databend_common_exception::Result;
2121

2222
use crate::notification_client::NotificationClient;
23-
use crate::resource_client::ResourceClient;
2423
use crate::task_client::TaskClient;
24+
use crate::worker_client::WorkerClient;
2525

2626
pub const CLOUD_REQUEST_TIMEOUT_SEC: u64 = 5; // 5 seconds
2727

2828
pub struct CloudControlApiProvider {
2929
pub task_client: Arc<TaskClient>,
3030
pub notification_client: Arc<NotificationClient>,
31-
pub resource_client: Arc<ResourceClient>,
31+
pub worker_client: Arc<WorkerClient>,
3232
pub timeout: Duration,
3333
}
3434

@@ -44,11 +44,11 @@ impl CloudControlApiProvider {
4444
let channel = endpoint.connect_lazy();
4545
let task_client = TaskClient::new(channel.clone()).await?;
4646
let notification_client = NotificationClient::new(channel.clone()).await?;
47-
let resource_client = ResourceClient::new(channel).await?;
47+
let worker_client = WorkerClient::new(channel).await?;
4848
Ok(Arc::new(CloudControlApiProvider {
4949
task_client,
5050
notification_client,
51-
resource_client,
51+
worker_client,
5252
timeout,
5353
}))
5454
}
@@ -89,8 +89,8 @@ impl CloudControlApiProvider {
8989
self.notification_client.clone()
9090
}
9191

92-
pub fn get_resource_client(&self) -> Arc<ResourceClient> {
93-
self.resource_client.clone()
92+
pub fn get_worker_client(&self) -> Arc<WorkerClient> {
93+
self.worker_client.clone()
9494
}
9595
pub fn get_timeout(&self) -> Duration {
9696
self.timeout

src/common/cloud_control/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ pub mod client_config;
1616
pub mod cloud_api;
1717
pub mod notification_client;
1818
pub mod notification_utils;
19-
pub mod resource_client;
2019
pub mod task_client;
2120
pub mod task_utils;
21+
pub mod worker_client;
2222

2323
#[allow(clippy::derive_partial_eq_without_eq)]
2424
#[allow(clippy::large_enum_variant)]

src/common/cloud_control/src/resource_client.rs

Lines changed: 0 additions & 46 deletions
This file was deleted.
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use tonic::Request;
18+
use tonic::transport::Channel;
19+
20+
use crate::pb::AlterWorkerRequest;
21+
use crate::pb::AlterWorkerResponse;
22+
use crate::pb::CreateWorkerRequest;
23+
use crate::pb::CreateWorkerResponse;
24+
use crate::pb::DropWorkerRequest;
25+
use crate::pb::DropWorkerResponse;
26+
use crate::pb::ListWorkersRequest;
27+
use crate::pb::ListWorkersResponse;
28+
use crate::pb::worker_service_client::WorkerServiceClient;
29+
30+
pub(crate) const WORKER_CLIENT_VERSION: &str = "v1";
31+
pub(crate) const WORKER_CLIENT_VERSION_NAME: &str = "WORKER_CLIENT_VERSION";
32+
33+
pub struct WorkerClient {
34+
pub client: WorkerServiceClient<Channel>,
35+
}
36+
37+
impl WorkerClient {
38+
pub async fn new(channel: Channel) -> databend_common_exception::Result<Arc<WorkerClient>> {
39+
let client = WorkerServiceClient::new(channel);
40+
Ok(Arc::new(WorkerClient { client }))
41+
}
42+
43+
pub async fn create_worker(
44+
&self,
45+
req: Request<CreateWorkerRequest>,
46+
) -> databend_common_exception::Result<CreateWorkerResponse> {
47+
let mut client = self.client.clone();
48+
let resp = client.create_worker(req).await?;
49+
Ok(resp.into_inner())
50+
}
51+
52+
pub async fn alter_worker(
53+
&self,
54+
req: Request<AlterWorkerRequest>,
55+
) -> databend_common_exception::Result<AlterWorkerResponse> {
56+
let mut client = self.client.clone();
57+
let resp = client.alter_worker(req).await?;
58+
Ok(resp.into_inner())
59+
}
60+
61+
pub async fn drop_worker(
62+
&self,
63+
req: Request<DropWorkerRequest>,
64+
) -> databend_common_exception::Result<DropWorkerResponse> {
65+
let mut client = self.client.clone();
66+
let resp = client.drop_worker(req).await?;
67+
Ok(resp.into_inner())
68+
}
69+
70+
pub async fn list_workers(
71+
&self,
72+
req: Request<ListWorkersRequest>,
73+
) -> databend_common_exception::Result<ListWorkersResponse> {
74+
let mut client = self.client.clone();
75+
let resp = client.list_workers(req).await?;
76+
Ok(resp.into_inner())
77+
}
78+
}

src/query/ast/src/ast/statements/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ mod user;
6060
mod view;
6161
mod virtual_column;
6262
mod warehouse;
63+
mod worker;
6364
mod workload;
6465

6566
pub use call::*;
@@ -110,4 +111,5 @@ pub use user::*;
110111
pub use view::*;
111112
pub use virtual_column::*;
112113
pub use warehouse::*;
114+
pub use worker::*;
113115
pub use workload::*;

src/query/ast/src/ast/statements/statement.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::ast::statements::role::AlterRoleStmt;
3535
use crate::ast::statements::settings::Settings;
3636
use crate::ast::statements::task::CreateTaskStmt;
3737
use crate::ast::statements::warehouse::ShowWarehousesStmt;
38+
use crate::ast::statements::worker::ShowWorkersStmt;
3839
use crate::ast::statements::workload::CreateWorkloadGroupStmt;
3940
use crate::ast::statements::workload::DropWorkloadGroupStmt;
4041
use crate::ast::statements::workload::RenameWorkloadGroupStmt;
@@ -153,6 +154,12 @@ pub enum Statement {
153154
AssignWarehouseNodes(AssignWarehouseNodesStmt),
154155
UnassignWarehouseNodes(UnassignWarehouseNodesStmt),
155156

157+
// Workers
158+
ShowWorkers(ShowWorkersStmt),
159+
CreateWorker(CreateWorkerStmt),
160+
AlterWorker(AlterWorkerStmt),
161+
DropWorker(DropWorkerStmt),
162+
156163
// Workloads
157164
ShowWorkloadGroups(ShowWorkloadGroupsStmt),
158165
CreateWorkloadGroup(CreateWorkloadGroupStmt),
@@ -545,6 +552,7 @@ impl Statement {
545552
| Statement::DescProcedure(..)
546553
| Statement::CallProcedure(..)
547554
| Statement::ShowWarehouses(..)
555+
| Statement::ShowWorkers(..)
548556
| Statement::ShowOnlineNodes(..)
549557
| Statement::InspectWarehouse(..) => true,
550558

@@ -628,6 +636,9 @@ impl Statement {
628636
| Statement::UnassignWarehouseNodes(..)
629637
| Statement::ResumeWarehouse(..)
630638
| Statement::SuspendWarehouse(..)
639+
| Statement::CreateWorker(..)
640+
| Statement::AlterWorker(..)
641+
| Statement::DropWorker(..)
631642
| Statement::ShowWorkloadGroups(..)
632643
| Statement::CreateWorkloadGroup(..)
633644
| Statement::DropWorkloadGroup(..)
@@ -1109,6 +1120,10 @@ impl Display for Statement {
11091120
Statement::RenameWarehouseCluster(stmt) => write!(f, "{stmt}")?,
11101121
Statement::AssignWarehouseNodes(stmt) => write!(f, "{stmt}")?,
11111122
Statement::UnassignWarehouseNodes(stmt) => write!(f, "{stmt}")?,
1123+
Statement::ShowWorkers(stmt) => write!(f, "{stmt}")?,
1124+
Statement::CreateWorker(stmt) => write!(f, "{stmt}")?,
1125+
Statement::AlterWorker(stmt) => write!(f, "{stmt}")?,
1126+
Statement::DropWorker(stmt) => write!(f, "{stmt}")?,
11121127
Statement::ShowWorkloadGroups(stmt) => write!(f, "{stmt}")?,
11131128
Statement::CreateWorkloadGroup(stmt) => write!(f, "{stmt}")?,
11141129
Statement::DropWorkloadGroup(stmt) => write!(f, "{stmt}")?,

0 commit comments

Comments
 (0)