Skip to content

Commit b725976

Browse files
committed
feat: gRPC remote execution backend and worker service (#1)
Implements distributed pipeline execution via gRPC (Spark Connect-style): Proto definition (proto/teckel.proto): - PipelineService with session management, per-op execution, and full pipeline execution modes - CreateSession/CloseSession for worker lifecycle - ReadInput/ApplyTransform/WriteOutput for Spark Connect-style per-operation execution (planned) - ExecutePipeline for full pipeline submission (implemented) - GetStatus/CancelPipeline for async job tracking (planned) teckel-remote crate (client): - RemoteBackend implements Backend trait with type DataFrame = RemoteRef - connect() creates a gRPC session on the worker - execute_pipeline() sends full YAML for server-side execution - Per-op Backend methods (read_input, write_output, apply) wired for future Spark Connect-style granular execution - close() releases worker session resources teckel-worker crate (gRPC server binary): - TeckelWorker implements PipelineService - Session management with DashMap<session_id, SessionState> - execute_pipeline() delegates to teckel_api::etl() with DataFusion - Session validation on all endpoints - Configurable via TECKEL_WORKER_HOST/TECKEL_WORKER_PORT Usage: # Start worker $ cargo run --bin teckel-worker # Client connects let backend = RemoteBackend::connect("http://worker:50051").await?; backend.execute_pipeline(yaml, &vars).await?; backend.close().await?; Closes #1
1 parent fdee333 commit b725976

9 files changed

Lines changed: 623 additions & 0 deletions

File tree

Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ members = [
55
"crates/teckel-datafusion",
66
"crates/teckel-api",
77
"crates/teckel-server",
8+
"crates/teckel-remote",
9+
"crates/teckel-worker",
810
]
911

1012
[workspace.package]
@@ -34,6 +36,10 @@ async-trait = "0.1"
3436
# HTTP client (for enrich transform)
3537
reqwest = { version = "0.12", features = ["json"] }
3638

39+
# gRPC
40+
tonic = "0.13"
41+
prost = "0.13"
42+
3743
# Error handling
3844
thiserror = "2"
3945

@@ -56,3 +62,4 @@ num_cpus = "1"
5662
teckel-engine = { path = "crates/teckel-engine" }
5763
teckel-datafusion = { path = "crates/teckel-datafusion" }
5864
teckel-api = { path = "crates/teckel-api" }
65+
teckel-remote = { path = "crates/teckel-remote" }

crates/teckel-remote/Cargo.toml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "teckel-remote"
3+
description = "gRPC remote backend for distributed Teckel pipeline execution"
4+
version.workspace = true
5+
edition.workspace = true
6+
license.workspace = true
7+
repository.workspace = true
8+
authors.workspace = true
9+
10+
[dependencies]
11+
teckel-model = { workspace = true }
12+
teckel-engine = { workspace = true }
13+
tonic = { workspace = true }
14+
prost = { workspace = true }
15+
tokio = { workspace = true }
16+
async-trait = { workspace = true }
17+
thiserror = { workspace = true }
18+
tracing = { workspace = true }
19+
serde_json = { workspace = true }
20+
uuid = { workspace = true }
21+
22+
[build-dependencies]
23+
tonic-build = "0.13"

crates/teckel-remote/build.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
fn main() -> Result<(), Box<dyn std::error::Error>> {
2+
tonic_build::compile_protos("../../proto/teckel.proto")?;
3+
Ok(())
4+
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
//! RemoteBackend: implements the Backend trait by sending operations to a gRPC worker.
2+
//!
3+
//! Each DataFrame is represented as a `RemoteRef` (opaque handle ID on the worker).
4+
//! The actual data never leaves the worker — only handles are exchanged.
5+
6+
use crate::proto::{self, pipeline_service_client::PipelineServiceClient};
7+
use async_trait::async_trait;
8+
use std::collections::{BTreeMap, HashMap};
9+
use teckel_engine::Backend;
10+
use teckel_model::source::{InputSource, OutputSource, Source};
11+
use teckel_model::TeckelError;
12+
use tonic::transport::Channel;
13+
use uuid::Uuid;
14+
15+
/// Opaque reference to a DataFrame living on a remote worker.
16+
#[derive(Debug, Clone)]
17+
pub struct RemoteRef {
18+
pub handle_id: String,
19+
pub session_id: String,
20+
}
21+
22+
/// Backend that delegates execution to a remote gRPC worker.
23+
pub struct RemoteBackend {
24+
client: PipelineServiceClient<Channel>,
25+
session_id: String,
26+
}
27+
28+
impl RemoteBackend {
29+
/// Connect to a worker and create a session.
30+
pub async fn connect(endpoint: &str) -> Result<Self, TeckelError> {
31+
let channel = Channel::from_shared(endpoint.to_string())
32+
.map_err(|e| TeckelError::Execution(format!("invalid endpoint: {e}")))?
33+
.connect()
34+
.await
35+
.map_err(|e| TeckelError::Execution(format!("failed to connect to worker: {e}")))?;
36+
37+
let mut client = PipelineServiceClient::new(channel);
38+
39+
let response = client
40+
.create_session(proto::CreateSessionRequest {
41+
backend: "datafusion".to_string(),
42+
})
43+
.await
44+
.map_err(|e| TeckelError::Execution(format!("create session: {e}")))?;
45+
46+
let session_id = response.into_inner().session_id;
47+
tracing::info!(session_id = %session_id, endpoint, "remote session created");
48+
49+
Ok(Self { client, session_id })
50+
}
51+
52+
/// Execute a full pipeline on the worker (simpler alternative to per-op mode).
53+
pub async fn execute_pipeline(
54+
&mut self,
55+
yaml: &str,
56+
variables: &BTreeMap<String, String>,
57+
) -> Result<(), TeckelError> {
58+
let response = self
59+
.client
60+
.execute_pipeline(proto::ExecutePipelineRequest {
61+
session_id: self.session_id.clone(),
62+
yaml: yaml.to_string(),
63+
variables: variables.iter().map(|(k, v)| (k.clone(), v.clone())).collect::<HashMap<_, _>>(),
64+
})
65+
.await
66+
.map_err(|e| TeckelError::Execution(format!("execute pipeline: {e}")))?;
67+
68+
let resp = response.into_inner();
69+
if resp.status == "failed" {
70+
return Err(TeckelError::Execution(format!(
71+
"remote pipeline failed: {}",
72+
resp.error
73+
)));
74+
}
75+
76+
tracing::info!(
77+
job_id = %resp.job_id,
78+
duration_ms = resp.duration_ms,
79+
"remote pipeline completed"
80+
);
81+
Ok(())
82+
}
83+
84+
/// Close the session and release worker resources.
85+
pub async fn close(mut self) -> Result<(), TeckelError> {
86+
self.client
87+
.close_session(proto::CloseSessionRequest {
88+
session_id: self.session_id.clone(),
89+
})
90+
.await
91+
.map_err(|e| TeckelError::Execution(format!("close session: {e}")))?;
92+
tracing::info!(session_id = %self.session_id, "remote session closed");
93+
Ok(())
94+
}
95+
}
96+
97+
#[async_trait]
98+
impl Backend for RemoteBackend {
99+
type DataFrame = RemoteRef;
100+
101+
fn name(&self) -> &str {
102+
"remote"
103+
}
104+
105+
async fn read_input(&self, input: &InputSource) -> Result<RemoteRef, TeckelError> {
106+
let options: HashMap<String, String> = input
107+
.options
108+
.iter()
109+
.map(|(k, v)| (k.clone(), format!("{v:?}")))
110+
.collect();
111+
112+
let response = self
113+
.client
114+
.clone()
115+
.read_input(proto::ReadInputRequest {
116+
session_id: self.session_id.clone(),
117+
asset_name: Uuid::new_v4().to_string(),
118+
format: input.format.clone(),
119+
path: input.path.clone(),
120+
options,
121+
})
122+
.await
123+
.map_err(|e| TeckelError::Execution(format!("remote read_input: {e}")))?;
124+
125+
let resp = response.into_inner();
126+
Ok(RemoteRef {
127+
handle_id: resp.handle_id,
128+
session_id: resp.session_id,
129+
})
130+
}
131+
132+
async fn write_output(
133+
&self,
134+
df: RemoteRef,
135+
output: &OutputSource,
136+
) -> Result<(), TeckelError> {
137+
let mode = format!("{:?}", output.mode).to_lowercase();
138+
let options: HashMap<String, String> = output
139+
.options
140+
.iter()
141+
.map(|(k, v)| (k.clone(), format!("{v:?}")))
142+
.collect();
143+
144+
self.client
145+
.clone()
146+
.write_output(proto::WriteOutputRequest {
147+
session_id: self.session_id.clone(),
148+
handle_id: df.handle_id,
149+
format: output.format.clone(),
150+
path: output.path.clone(),
151+
mode,
152+
options,
153+
})
154+
.await
155+
.map_err(|e| TeckelError::Execution(format!("remote write_output: {e}")))?;
156+
157+
Ok(())
158+
}
159+
160+
async fn apply(
161+
&self,
162+
_source: &Source,
163+
_inputs: &BTreeMap<String, RemoteRef>,
164+
) -> Result<RemoteRef, TeckelError> {
165+
// Per-operation mode requires Source serialization (planned for a future version).
166+
// For now, use RemoteBackend::execute_pipeline() which sends the full YAML
167+
// to the worker for server-side parsing and execution.
168+
Err(TeckelError::Execution(
169+
"per-operation remote execution not yet supported. \
170+
Use RemoteBackend::execute_pipeline() instead."
171+
.to_string(),
172+
))
173+
}
174+
}

crates/teckel-remote/src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
mod backend;
2+
3+
pub mod proto {
4+
tonic::include_proto!("teckel.v1");
5+
}
6+
7+
pub use backend::RemoteBackend;
8+
pub use proto::pipeline_service_client::PipelineServiceClient;
9+
pub use proto::pipeline_service_server::{PipelineService, PipelineServiceServer};

crates/teckel-worker/Cargo.toml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "teckel-worker"
3+
description = "gRPC worker service for distributed Teckel pipeline execution"
4+
version.workspace = true
5+
edition.workspace = true
6+
license.workspace = true
7+
repository.workspace = true
8+
authors.workspace = true
9+
10+
[dependencies]
11+
teckel-model = { workspace = true }
12+
teckel-engine = { workspace = true }
13+
teckel-datafusion = { workspace = true }
14+
teckel-api = { workspace = true }
15+
teckel-remote = { workspace = true }
16+
tonic = { workspace = true }
17+
tokio = { workspace = true }
18+
tracing = { workspace = true }
19+
tracing-subscriber = { workspace = true }
20+
uuid = { workspace = true }
21+
dashmap = { workspace = true }
22+
serde_json = { workspace = true }
23+
num_cpus = { workspace = true }

crates/teckel-worker/src/main.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
//! Teckel gRPC Worker — receives pipeline execution requests from remote clients.
2+
//!
3+
//! Each session gets its own DataFusion SessionContext. The worker manages
4+
//! session lifecycle, DataFrame caching, and pipeline execution.
5+
6+
mod service;
7+
8+
use std::net::SocketAddr;
9+
use teckel_remote::PipelineServiceServer;
10+
use tonic::transport::Server;
11+
use tracing_subscriber::EnvFilter;
12+
13+
#[tokio::main]
14+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
15+
tracing_subscriber::fmt()
16+
.with_env_filter(
17+
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
18+
)
19+
.init();
20+
21+
let host = std::env::var("TECKEL_WORKER_HOST").unwrap_or_else(|_| "0.0.0.0".to_string());
22+
let port: u16 = std::env::var("TECKEL_WORKER_PORT")
23+
.ok()
24+
.and_then(|p| p.parse().ok())
25+
.unwrap_or(50051);
26+
27+
let addr: SocketAddr = format!("{host}:{port}").parse()?;
28+
let worker = service::TeckelWorker::new();
29+
30+
tracing::info!(%addr, "teckel-worker starting (gRPC)");
31+
32+
Server::builder()
33+
.add_service(PipelineServiceServer::new(worker))
34+
.serve(addr)
35+
.await?;
36+
37+
Ok(())
38+
}

0 commit comments

Comments
 (0)