From b812a21ebd7be9188a4f82db9e2706c3c4f1bdc7 Mon Sep 17 00:00:00 2001 From: "Ankit.Mahato" Date: Mon, 8 Jun 2026 18:28:21 +0530 Subject: [PATCH 1/3] fix: deduplicate workspace provisioning --- .../common/migrations}/workspace_v1.sql | 0 crates/common/src/db.rs | 8 ++--- crates/common/src/db/workspaces.rs | 27 ++++++++++------- crates/worker/src/client.rs | 29 ++----------------- 4 files changed, 21 insertions(+), 43 deletions(-) rename {migrations => crates/common/migrations}/workspace_v1.sql (100%) diff --git a/migrations/workspace_v1.sql b/crates/common/migrations/workspace_v1.sql similarity index 100% rename from migrations/workspace_v1.sql rename to crates/common/migrations/workspace_v1.sql diff --git a/crates/common/src/db.rs b/crates/common/src/db.rs index 181b0e1..03b5574 100644 --- a/crates/common/src/db.rs +++ b/crates/common/src/db.rs @@ -26,12 +26,8 @@ impl<'a> DbContext<'a> { } /// Build a (potentially prefixed) table name. -/// `tbl("sched", "jobs")` → `"sched_jobs"`, `tbl("", "jobs")` → `"jobs"`. +/// `tbl("sched_", "jobs")` → `"sched_jobs"`, `tbl("", "jobs")` → `"jobs"`. #[inline] pub fn tbl(prefix: &str, name: &str) -> String { - if prefix.is_empty() { - name.to_string() - } else { - format!("{prefix}_{name}") - } + format!("{prefix}{name}") } diff --git a/crates/common/src/db/workspaces.rs b/crates/common/src/db/workspaces.rs index 9bdb224..7a5649e 100644 --- a/crates/common/src/db/workspaces.rs +++ b/crates/common/src/db/workspaces.rs @@ -2,7 +2,7 @@ use crate::models::workspace::Workspace; use crate::tenant::validate_schema_name; use sqlx::PgPool; -const WORKSPACE_SCHEMA_V1: &str = include_str!("../../../../migrations/workspace_v1.sql"); +const WORKSPACE_SCHEMA_V1: &str = include_str!("../../migrations/workspace_v1.sql"); pub async fn create( pool: &PgPool, @@ -30,7 +30,7 @@ pub async fn create( .await?; // Create the schema and apply workspace DDL - provision_schema(pool, schema_name).await?; + provision_schema(pool, schema_name, "").await?; // Update schema_version sqlx::query( @@ -101,17 +101,24 @@ pub async fn resolve_schema( Ok(row.map(|r| r.0)) } -async fn provision_schema(pool: &PgPool, schema_name: &str) -> Result<(), sqlx::Error> { - let create_schema = format!("CREATE SCHEMA IF NOT EXISTS \"{}\"", schema_name); - sqlx::query(&create_schema).execute(pool).await?; +pub async fn provision_schema( + pool: &PgPool, + schema_name: &str, + table_prefix: &str, +) -> Result<(), sqlx::Error> { + sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS \"{}\"", schema_name)) + .execute(pool) + .await?; - let mut conn = pool.acquire().await?; + let mut conn = crate::db::scoped::scoped_connection(pool, schema_name).await?; - sqlx::query(&format!("SET search_path TO \"{}\"", schema_name)) - .execute(&mut *conn) - .await?; + let p = if table_prefix.is_empty() { + String::new() + } else { + format!("{}_", table_prefix) + }; - let ddl = WORKSPACE_SCHEMA_V1.replace("{p}", ""); + let ddl = WORKSPACE_SCHEMA_V1.replace("{p}", &p); for stmt in ddl.split(';') { let stmt = stmt.trim(); if !stmt.is_empty() { diff --git a/crates/worker/src/client.rs b/crates/worker/src/client.rs index dfdc544..53da8a7 100644 --- a/crates/worker/src/client.rs +++ b/crates/worker/src/client.rs @@ -115,13 +115,7 @@ pub struct KronosLibraryClient { } impl KronosLibraryClient { - /// Create a new client. - /// - /// - `pool`: caller-owned sqlx pool pointing at the same PostgreSQL instance. - /// - `table_prefix`: prefix for all Kronos tables (e.g. `"sched"` → `sched_jobs`). - /// Empty string means no prefix (original table names). - /// - `encryption_key`: 64 hex-char AES-256 key for secrets; pass zeros if not using secrets. - /// - `http_client`: optional reqwest client to reuse the caller's connection pool. + /// `table_prefix` must include the trailing underscore (e.g. `"sched_"`); use `""` for no prefix. pub fn new( pool: PgPool, table_prefix: &str, @@ -431,26 +425,7 @@ impl KronosClient for KronosLibraryClient { } async fn provision_workspace(&self, schema_name: &str) -> anyhow::Result<()> { - const TEMPLATE: &str = include_str!("../../../migrations/workspace_v1.sql"); - - let prefix = &self.ctx.table_prefix; - let p = if prefix.is_empty() { - String::new() - } else { - format!("{}_", prefix) - }; - - let ddl = TEMPLATE.replace("{p}", &p); - let mut conn = db::scoped::scoped_connection(&self.pool, schema_name).await?; - - for stmt in ddl.split(';') { - let stmt = stmt.trim(); - if !stmt.is_empty() { - sqlx::query(stmt).execute(&mut *conn).await?; - } - } - - Ok(()) + Ok(db::workspaces::provision_schema(&self.pool, schema_name, &self.ctx.table_prefix).await?) } async fn cancel_job(&self, schema_name: &str, job_id: &str) -> anyhow::Result<()> { From 6a1ed4e5442e3b27b11f772bee69ffd0352dbe02 Mon Sep 17 00:00:00 2001 From: "Ankit.Mahato" Date: Wed, 10 Jun 2026 14:29:45 +0530 Subject: [PATCH 2/3] feat: re-export sqlx and add pool-building constructor Embedders previously needed their own sqlx dependency for two reasons: building the PgPool to pass to KronosLibraryClient::new, and naming sqlx types (PgPool, sqlx::Error) when implementing SchemaProvider. Re-exporting sqlx from kronos-common removes the need for a separate pin (which could also drift to an incompatible version), and from_database_url + pool() let callers skip pool construction entirely while still sharing the pool. Co-Authored-By: Claude Fable 5 --- crates/common/src/lib.rs | 5 +++++ crates/worker/src/client.rs | 24 ++++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index d44eec9..f8bf516 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -1,3 +1,8 @@ +// Re-exported so embedders don't need their own sqlx dependency: sqlx types +// (PgPool, Error) appear in this crate's public API (e.g. SchemaProvider), +// and a separately pinned sqlx version would not type-check against them. +pub use sqlx; + pub mod cache; pub mod config; pub mod crypto; diff --git a/crates/worker/src/client.rs b/crates/worker/src/client.rs index 53da8a7..3a11489 100644 --- a/crates/worker/src/client.rs +++ b/crates/worker/src/client.rs @@ -141,6 +141,30 @@ impl KronosLibraryClient { Ok(Self { pool, ctx }) } + /// Convenience constructor for callers that don't manage their own `PgPool`: + /// builds an internal pool from `database_url` and `max_connections`. + /// Use [`Self::pool`] to share the pool (e.g. with a `SchemaProvider`). + /// Callers that need finer pool control should build a `PgPool` themselves + /// and use [`Self::new`]. + pub async fn from_database_url( + database_url: &str, + max_connections: u32, + table_prefix: &str, + encryption_key: &str, + http_client: Option, + ) -> anyhow::Result { + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(max_connections) + .connect(database_url) + .await?; + Self::new(pool, table_prefix, encryption_key, http_client) + } + + /// The connection pool this client runs on. + pub fn pool(&self) -> &PgPool { + &self.pool + } + /// Create a job in the given workspace schema and return the execution_id. pub async fn create_job( &self, From 5f928aa918ad83c3b29864b6f411c17d91071c7a Mon Sep 17 00:00:00 2001 From: "Ankit.Mahato" Date: Wed, 10 Jun 2026 16:26:40 +0530 Subject: [PATCH 3/3] fix: remove async trait --- Cargo.lock | 1 - crates/common/Cargo.toml | 1 - crates/common/src/tenant.rs | 12 ++++++++---- crates/worker/src/client.rs | 39 +++++++++++++++++++++++++++++-------- crates/worker/src/lib.rs | 4 +++- 5 files changed, 42 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f9eb318..17adb56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2589,7 +2589,6 @@ dependencies = [ "actix-web", "aes-gcm", "anyhow", - "async-trait", "aws-config", "aws-sdk-kms", "base64 0.22.1", diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index bb7ed7b..43284fb 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -11,7 +11,6 @@ kms = ["aws-config", "aws-sdk-kms"] aws-config = { workspace = true, optional = true } aws-sdk-kms = { workspace = true, optional = true } tokio.workspace = true -async-trait.workspace = true sqlx.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/crates/common/src/tenant.rs b/crates/common/src/tenant.rs index 10b6f7d..a88561a 100644 --- a/crates/common/src/tenant.rs +++ b/crates/common/src/tenant.rs @@ -1,5 +1,5 @@ -use async_trait::async_trait; use sqlx::PgPool; +use std::future::Future; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::RwLock; @@ -35,9 +35,14 @@ pub fn build_schema_name(org_id: &str, workspace_slug: &str) -> String { /// Trait for discovering active workspace schemas. /// Implement this to tell Kronos's worker where to find the list of workspaces. /// Kronos ships `SchemaRegistry` as the default implementation. -#[async_trait] +/// +/// Declared with `-> impl Future + Send` (rather than `async fn`) so the +/// returned future is usable inside the spawned worker task; implementors +/// can simply write `async fn get_active_schemas(&self)`. pub trait SchemaProvider: Send + Sync + 'static { - async fn get_active_schemas(&self) -> Result, sqlx::Error>; + fn get_active_schemas( + &self, + ) -> impl Future, sqlx::Error>> + Send; } /// Cached registry of active workspace schemas. @@ -68,7 +73,6 @@ impl SchemaRegistry { } } -#[async_trait] impl SchemaProvider for SchemaRegistry { async fn get_active_schemas(&self) -> Result, sqlx::Error> { // Check cache first diff --git a/crates/worker/src/client.rs b/crates/worker/src/client.rs index 3a11489..035a220 100644 --- a/crates/worker/src/client.rs +++ b/crates/worker/src/client.rs @@ -339,26 +339,49 @@ impl KronosLibraryClient { Ok(db::executions::get(&mut db, execution_id).await?) } - /// Start the background worker. Returns a JoinHandle — the caller should - /// await it (or drop it) on shutdown. + /// Start the background worker. Returns a [`WorkerHandle`] — call + /// [`WorkerHandle::shutdown`] on shutdown, then await [`WorkerHandle::join`]. /// /// Pass a `WorkerConfig` to control concurrency, poll interval, etc. - /// Pass a `CancellationToken` that the caller cancels on shutdown. pub fn start_worker( &self, schema_provider: S, - cancel: CancellationToken, worker_config: WorkerConfig, - ) -> tokio::task::JoinHandle> { + ) -> WorkerHandle { let pool = self.pool.clone(); let ctx = self.ctx.clone(); // Build an AppConfig-compatible struct from the context + worker_config let config = build_app_config(&ctx, &worker_config); - tokio::spawn(async move { - poller::run(pool, config, schema_provider, cancel).await - }) + let cancel = CancellationToken::new(); + let join = { + let cancel = cancel.clone(); + tokio::spawn(async move { + poller::run(pool, config, schema_provider, cancel).await + }) + }; + WorkerHandle { cancel, join } + } +} + +/// Handle to a running background worker. Owns the cancellation token and the +/// task handle so embedders don't need their own tokio-util dependency. +pub struct WorkerHandle { + cancel: CancellationToken, + join: tokio::task::JoinHandle>, +} + +impl WorkerHandle { + /// Signal the worker to stop. Returns immediately; the worker finishes + /// in-flight jobs (bounded by `WorkerConfig::shutdown_timeout_sec`). + pub fn shutdown(&self) { + self.cancel.cancel(); + } + + /// Wait for the worker task to finish. + pub async fn join(self) -> anyhow::Result<()> { + self.join.await? } } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index a9183c4..b71e8b0 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -4,5 +4,7 @@ pub mod dispatcher; pub mod pipeline; pub mod poller; -pub use client::{JobTrigger, KronosClient, KronosHttpClient, KronosLibraryClient, WorkerConfig}; +pub use client::{ + JobTrigger, KronosClient, KronosHttpClient, KronosLibraryClient, WorkerConfig, WorkerHandle, +}; pub use kronos_common::models::Execution;