Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ kms = ["aws-config", "aws-sdk-kms"]
aws-config = { workspace = true, optional = true }
aws-sdk-kms = { workspace = true, optional = true }
tokio.workspace = true
async-trait.workspace = true
sqlx.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
File renamed without changes.
8 changes: 2 additions & 6 deletions crates/common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@ impl<'a> DbContext<'a> {
}

/// Build a (potentially prefixed) table name.
/// `tbl("sched", "jobs")` → `"sched_jobs"`, `tbl("", "jobs")` → `"jobs"`.
/// `tbl("sched_", "jobs")` → `"sched_jobs"`, `tbl("", "jobs")` → `"jobs"`.
#[inline]
pub fn tbl(prefix: &str, name: &str) -> String {
if prefix.is_empty() {
name.to_string()
} else {
format!("{prefix}_{name}")
}
format!("{prefix}{name}")
}
27 changes: 17 additions & 10 deletions crates/common/src/db/workspaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::models::workspace::Workspace;
use crate::tenant::validate_schema_name;
use sqlx::PgPool;

const WORKSPACE_SCHEMA_V1: &str = include_str!("../../../../migrations/workspace_v1.sql");
const WORKSPACE_SCHEMA_V1: &str = include_str!("../../migrations/workspace_v1.sql");

pub async fn create(
pool: &PgPool,
Expand Down Expand Up @@ -30,7 +30,7 @@ pub async fn create(
.await?;

// Create the schema and apply workspace DDL
provision_schema(pool, schema_name).await?;
provision_schema(pool, schema_name, "").await?;

// Update schema_version
sqlx::query(
Expand Down Expand Up @@ -101,17 +101,24 @@ pub async fn resolve_schema(
Ok(row.map(|r| r.0))
}

async fn provision_schema(pool: &PgPool, schema_name: &str) -> Result<(), sqlx::Error> {
let create_schema = format!("CREATE SCHEMA IF NOT EXISTS \"{}\"", schema_name);
sqlx::query(&create_schema).execute(pool).await?;
pub async fn provision_schema(
pool: &PgPool,
schema_name: &str,
table_prefix: &str,
) -> Result<(), sqlx::Error> {
sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS \"{}\"", schema_name))

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible to use bind parameters here @mahatoankitkumar instead of string concatenation ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No — PostgreSQL doesn't support bind parameters for identifiers (schema names, table names)

.execute(pool)
.await?;
Comment on lines +104 to +111

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Add input validation before SQL interpolation in the now-public function.

provision_schema is now a public API, but schema_name is interpolated into SQL at line 109 before scoped_connection performs its validation at line 113. Callers like KronosLibraryClient::provision_workspace pass external input directly, creating a SQL injection vector through quoted identifier escapes.

Validate at function entry, consistent with the create function's pattern.

🛡️ Proposed fix to add validation
 pub async fn provision_schema(
     pool: &PgPool,
     schema_name: &str,
     table_prefix: &str,
 ) -> Result<(), sqlx::Error> {
+    use crate::tenant::validate_table_prefix;
+    assert!(
+        validate_schema_name(schema_name),
+        "Invalid schema name: {}",
+        schema_name
+    );
+    assert!(
+        validate_table_prefix(table_prefix),
+        "Invalid table prefix: {}",
+        table_prefix
+    );
+
     sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS \"{}\"", schema_name))
         .execute(pool)
         .await?;
🧰 Tools
🪛 OpenGrep (1.22.0)

[ERROR] 109-109: SQL query built via format!() passed to a database method. Use parameterized queries with bind parameters instead.

(coderabbit.sql-injection.rust-format-query)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/common/src/db/workspaces.rs` around lines 104 - 111, provision_schema
currently interpolates schema_name into SQL before validation, exposing a SQL
injection risk; update provision_schema to validate schema_name at function
entry (reusing the same validation logic/pattern used by create and
scoped_connection) and reject or sanitize invalid names before calling
sqlx::query(format!(...)); ensure callers such as
KronosLibraryClient::provision_workspace can only pass validated names and keep
the CREATE SCHEMA statement unchanged aside from using the already-validated
schema_name.

Source: Linters/SAST tools


let mut conn = pool.acquire().await?;
let mut conn = crate::db::scoped::scoped_connection(pool, schema_name).await?;

sqlx::query(&format!("SET search_path TO \"{}\"", schema_name))
.execute(&mut *conn)
.await?;
let p = if table_prefix.is_empty() {
String::new()
} else {
format!("{}_", table_prefix)
};

let ddl = WORKSPACE_SCHEMA_V1.replace("{p}", "");
let ddl = WORKSPACE_SCHEMA_V1.replace("{p}", &p);
for stmt in ddl.split(';') {
let stmt = stmt.trim();
if !stmt.is_empty() {
Expand Down
5 changes: 5 additions & 0 deletions crates/common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
// Re-exported so embedders don't need their own sqlx dependency: sqlx types
// (PgPool, Error) appear in this crate's public API (e.g. SchemaProvider),
// and a separately pinned sqlx version would not type-check against them.
pub use sqlx;

pub mod cache;
pub mod config;
pub mod crypto;
Expand Down
12 changes: 8 additions & 4 deletions crates/common/src/tenant.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_trait::async_trait;
use sqlx::PgPool;
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -35,9 +35,14 @@ pub fn build_schema_name(org_id: &str, workspace_slug: &str) -> String {
/// Trait for discovering active workspace schemas.
/// Implement this to tell Kronos's worker where to find the list of workspaces.
/// Kronos ships `SchemaRegistry` as the default implementation.
#[async_trait]
///
/// Declared with `-> impl Future + Send` (rather than `async fn`) so the
/// returned future is usable inside the spawned worker task; implementors
/// can simply write `async fn get_active_schemas(&self)`.
pub trait SchemaProvider: Send + Sync + 'static {
async fn get_active_schemas(&self) -> Result<Vec<String>, sqlx::Error>;
fn get_active_schemas(
&self,
) -> impl Future<Output = Result<Vec<String>, sqlx::Error>> + Send;
}

/// Cached registry of active workspace schemas.
Expand Down Expand Up @@ -68,7 +73,6 @@ impl SchemaRegistry {
}
}

#[async_trait]
impl SchemaProvider for SchemaRegistry {
async fn get_active_schemas(&self) -> Result<Vec<String>, sqlx::Error> {
// Check cache first
Expand Down
92 changes: 57 additions & 35 deletions crates/worker/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,7 @@ pub struct KronosLibraryClient {
}

impl KronosLibraryClient {
/// Create a new client.
///
/// - `pool`: caller-owned sqlx pool pointing at the same PostgreSQL instance.
/// - `table_prefix`: prefix for all Kronos tables (e.g. `"sched"` → `sched_jobs`).
/// Empty string means no prefix (original table names).
/// - `encryption_key`: 64 hex-char AES-256 key for secrets; pass zeros if not using secrets.
/// - `http_client`: optional reqwest client to reuse the caller's connection pool.
/// `table_prefix` must include the trailing underscore (e.g. `"sched_"`); use `""` for no prefix.
pub fn new(
pool: PgPool,
table_prefix: &str,
Expand All @@ -147,6 +141,30 @@ impl KronosLibraryClient {
Ok(Self { pool, ctx })
}

/// Convenience constructor for callers that don't manage their own `PgPool`:
/// builds an internal pool from `database_url` and `max_connections`.
/// Use [`Self::pool`] to share the pool (e.g. with a `SchemaProvider`).
/// Callers that need finer pool control should build a `PgPool` themselves
/// and use [`Self::new`].
pub async fn from_database_url(
database_url: &str,
max_connections: u32,
table_prefix: &str,
encryption_key: &str,
http_client: Option<Client>,
) -> anyhow::Result<Self> {
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(max_connections)
.connect(database_url)
.await?;
Self::new(pool, table_prefix, encryption_key, http_client)
}

/// The connection pool this client runs on.
pub fn pool(&self) -> &PgPool {
&self.pool
}

/// Create a job in the given workspace schema and return the execution_id.
pub async fn create_job(
&self,
Expand Down Expand Up @@ -321,26 +339,49 @@ impl KronosLibraryClient {
Ok(db::executions::get(&mut db, execution_id).await?)
}

/// Start the background worker. Returns a JoinHandlethe caller should
/// await it (or drop it) on shutdown.
/// Start the background worker. Returns a [`WorkerHandle`]call
/// [`WorkerHandle::shutdown`] on shutdown, then await [`WorkerHandle::join`].
///
/// Pass a `WorkerConfig` to control concurrency, poll interval, etc.
/// Pass a `CancellationToken` that the caller cancels on shutdown.
pub fn start_worker<S: SchemaProvider>(
&self,
schema_provider: S,
cancel: CancellationToken,
worker_config: WorkerConfig,
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
) -> WorkerHandle {
let pool = self.pool.clone();
let ctx = self.ctx.clone();

// Build an AppConfig-compatible struct from the context + worker_config
let config = build_app_config(&ctx, &worker_config);

tokio::spawn(async move {
poller::run(pool, config, schema_provider, cancel).await
})
let cancel = CancellationToken::new();
let join = {
let cancel = cancel.clone();
tokio::spawn(async move {
poller::run(pool, config, schema_provider, cancel).await
})
};
WorkerHandle { cancel, join }
}
}

/// Handle to a running background worker. Owns the cancellation token and the
/// task handle so embedders don't need their own tokio-util dependency.
pub struct WorkerHandle {
cancel: CancellationToken,
join: tokio::task::JoinHandle<anyhow::Result<()>>,
}

impl WorkerHandle {
/// Signal the worker to stop. Returns immediately; the worker finishes
/// in-flight jobs (bounded by `WorkerConfig::shutdown_timeout_sec`).
pub fn shutdown(&self) {
self.cancel.cancel();
}

/// Wait for the worker task to finish.
pub async fn join(self) -> anyhow::Result<()> {
self.join.await?
}
}

Expand Down Expand Up @@ -431,26 +472,7 @@ impl KronosClient for KronosLibraryClient {
}

async fn provision_workspace(&self, schema_name: &str) -> anyhow::Result<()> {
const TEMPLATE: &str = include_str!("../../../migrations/workspace_v1.sql");

let prefix = &self.ctx.table_prefix;
let p = if prefix.is_empty() {
String::new()
} else {
format!("{}_", prefix)
};

let ddl = TEMPLATE.replace("{p}", &p);
let mut conn = db::scoped::scoped_connection(&self.pool, schema_name).await?;

for stmt in ddl.split(';') {
let stmt = stmt.trim();
if !stmt.is_empty() {
sqlx::query(stmt).execute(&mut *conn).await?;
}
}

Ok(())
Ok(db::workspaces::provision_schema(&self.pool, schema_name, &self.ctx.table_prefix).await?)
}

async fn cancel_job(&self, schema_name: &str, job_id: &str) -> anyhow::Result<()> {
Expand Down
4 changes: 3 additions & 1 deletion crates/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@ pub mod dispatcher;
pub mod pipeline;
pub mod poller;

pub use client::{JobTrigger, KronosClient, KronosHttpClient, KronosLibraryClient, WorkerConfig};
pub use client::{
JobTrigger, KronosClient, KronosHttpClient, KronosLibraryClient, WorkerConfig, WorkerHandle,
};
pub use kronos_common::models::Execution;