Skip to content

Commit d4237a9

Browse files
committed
feat: unified gRPC server with gRPC-Web for browser connectivity
Migrate from split REST+gRPC to unified gRPC server with tonic-web: Proto (teckel.proto): - Renamed PipelineService → TeckelService - Added high-level RPCs: Health, ValidatePipeline, ExplainPipeline, SubmitJob, GetJob, WaitForJob, CancelJob, ListJobs - Kept low-level Spark Connect RPCs: CreateSession, CloseSession, ReadInput, ApplyTransform, WriteOutput, ExecutePipeline teckel-worker: - Unified gRPC server implementing all RPCs (high-level + low-level) - Built-in job queue with semaphore-bounded concurrency - tonic-web layer for gRPC-Web browser support - CORS enabled for cross-origin requests - accept_http1(true) for gRPC-Web compatibility - Job management with cancellation tokens and status tracking teckel-remote: - Updated to use TeckelService (renamed from PipelineService) The UI now connects via gRPC-Web (ConnectRPC) instead of REST.
1 parent a1ff8e9 commit d4237a9

7 files changed

Lines changed: 435 additions & 172 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ reqwest = { version = "0.12", features = ["json"] }
3939

4040
# gRPC
4141
tonic = "0.13"
42+
tonic-web = "0.13"
4243
prost = "0.13"
4344

4445
# Polars

crates/teckel-remote/src/backend.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//! Each DataFrame is represented as a `RemoteRef` (opaque handle ID on the worker).
44
//! The actual data never leaves the worker — only handles are exchanged.
55
6-
use crate::proto::{self, pipeline_service_client::PipelineServiceClient};
6+
use crate::proto::{self, teckel_service_client::TeckelServiceClient};
77
use async_trait::async_trait;
88
use std::collections::{BTreeMap, HashMap};
99
use teckel_engine::Backend;
@@ -21,7 +21,7 @@ pub struct RemoteRef {
2121

2222
/// Backend that delegates execution to a remote gRPC worker.
2323
pub struct RemoteBackend {
24-
client: PipelineServiceClient<Channel>,
24+
client: TeckelServiceClient<Channel>,
2525
session_id: String,
2626
}
2727

@@ -34,7 +34,7 @@ impl RemoteBackend {
3434
.await
3535
.map_err(|e| TeckelError::Execution(format!("failed to connect to worker: {e}")))?;
3636

37-
let mut client = PipelineServiceClient::new(channel);
37+
let mut client = TeckelServiceClient::new(channel);
3838

3939
let response = client
4040
.create_session(proto::CreateSessionRequest {

crates/teckel-remote/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ pub mod proto {
55
}
66

77
pub use backend::RemoteBackend;
8-
pub use proto::pipeline_service_client::PipelineServiceClient;
9-
pub use proto::pipeline_service_server::{PipelineService, PipelineServiceServer};
8+
pub use proto::teckel_service_client::TeckelServiceClient;
9+
pub use proto::teckel_service_server::{TeckelService, TeckelServiceServer};

crates/teckel-worker/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,14 @@ teckel-datafusion = { workspace = true }
1414
teckel-api = { workspace = true }
1515
teckel-remote = { workspace = true }
1616
tonic = { workspace = true }
17+
tonic-web = { workspace = true }
18+
tower-http = { workspace = true }
19+
chrono = { workspace = true }
1720
tokio = { workspace = true }
1821
tracing = { workspace = true }
1922
tracing-subscriber = { workspace = true }
2023
uuid = { workspace = true }
2124
dashmap = { workspace = true }
2225
serde_json = { workspace = true }
2326
num_cpus = { workspace = true }
27+
tokio-util = { workspace = true }

crates/teckel-worker/src/main.rs

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1-
//! Teckel gRPC Workerreceives pipeline execution requests from remote clients.
1+
//! Teckel gRPC Serverunified API for the UI and programmatic clients.
22
//!
3-
//! Each session gets its own DataFusion SessionContext. The worker manages
4-
//! session lifecycle, DataFrame caching, and pipeline execution.
3+
//! Serves both the high-level API (validate, explain, jobs) and the
4+
//! low-level Spark Connect-style API (sessions, per-op execution).
5+
//!
6+
//! Enables gRPC-Web via tonic-web for direct browser connectivity.
57
68
mod service;
79

810
use std::net::SocketAddr;
9-
use teckel_remote::PipelineServiceServer;
11+
use teckel_remote::TeckelServiceServer;
1012
use tonic::transport::Server;
13+
use tower_http::cors::{Any, CorsLayer};
1114
use tracing_subscriber::EnvFilter;
1215

1316
#[tokio::main]
@@ -18,19 +21,35 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1821
)
1922
.init();
2023

21-
let host = std::env::var("TECKEL_WORKER_HOST").unwrap_or_else(|_| "0.0.0.0".to_string());
22-
let port: u16 = std::env::var("TECKEL_WORKER_PORT")
24+
let host = std::env::var("TECKEL_HOST").unwrap_or_else(|_| "0.0.0.0".to_string());
25+
let port: u16 = std::env::var("TECKEL_PORT")
2326
.ok()
2427
.and_then(|p| p.parse().ok())
2528
.unwrap_or(50051);
29+
let max_concurrency: usize = std::env::var("TECKEL_MAX_CONCURRENCY")
30+
.ok()
31+
.and_then(|c| c.parse().ok())
32+
.unwrap_or_else(num_cpus::get);
2633

2734
let addr: SocketAddr = format!("{host}:{port}").parse()?;
28-
let worker = service::TeckelWorker::new();
35+
let worker = service::TeckelWorker::new(max_concurrency);
36+
37+
// gRPC-Web + CORS for browser access
38+
let cors = CorsLayer::new()
39+
.allow_origin(Any)
40+
.allow_methods(Any)
41+
.allow_headers(Any)
42+
.expose_headers(Any);
43+
44+
let grpc_web = tonic_web::GrpcWebLayer::new();
2945

30-
tracing::info!(%addr, "teckel-worker starting (gRPC)");
46+
tracing::info!(%addr, max_concurrency, "teckel gRPC server starting (gRPC-Web enabled)");
3147

3248
Server::builder()
33-
.add_service(PipelineServiceServer::new(worker))
49+
.accept_http1(true) // Required for gRPC-Web
50+
.layer(cors)
51+
.layer(grpc_web)
52+
.add_service(TeckelServiceServer::new(worker))
3453
.serve(addr)
3554
.await?;
3655

0 commit comments

Comments
 (0)