Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions crates/embedded-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,26 @@ version.workspace = true
edition.workspace = true
rust-version.workspace = true

[features]
default = []
kafka = ["dep:rdkafka"]
redis-stream = ["dep:redis"]
kms = ["kronos-common/kms"]

[dependencies]
kronos-common = { path = "../common" }
sqlx = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
reqwest = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
rand = { workspace = true }
metrics = { workspace = true }
anyhow = { workspace = true }
thiserror = { workspace = true }

rdkafka = { workspace = true, optional = true }
redis = { workspace = true, optional = true }
File renamed without changes.
178 changes: 178 additions & 0 deletions crates/embedded-worker/src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
use kronos_common::config::AppConfig;
use kronos_common::schema_config::SchemaConfig;
use sqlx::PgPool;

use crate::error::BuildError;
use crate::worker::{Worker, WorkerConfig};

/// Builder for a [`Worker`]. See `Worker::builder`.
pub struct WorkerBuilder {
pub(crate) pool: PgPool,
pub(crate) system_schema: String,
pub(crate) tenant_schema_prefix: String,
pub(crate) max_concurrent: usize,
pub(crate) poll_interval_ms: u64,
pub(crate) config_cache_ttl_sec: u64,
pub(crate) secret_cache_ttl_sec: u64,
pub(crate) shutdown_timeout_sec: u64,
pub(crate) encryption_key: Option<String>,
pub(crate) install_metrics_recorder: bool,
pub(crate) metrics_port: u16,
}

impl WorkerBuilder {
pub fn new(pool: PgPool) -> Self {
let defaults = SchemaConfig::library_default();
Self {
pool,
system_schema: defaults.system_schema,
tenant_schema_prefix: defaults.tenant_schema_prefix,
max_concurrent: 50,
poll_interval_ms: 200,
config_cache_ttl_sec: 60,
secret_cache_ttl_sec: 300,
shutdown_timeout_sec: 30,
encryption_key: None,
install_metrics_recorder: false,
metrics_port: 9090,
}
}

pub fn system_schema(mut self, v: String) -> Self {
self.system_schema = v;
self
}
pub fn tenant_schema_prefix(mut self, v: String) -> Self {
self.tenant_schema_prefix = v;
self
}
pub fn max_concurrent(mut self, v: usize) -> Self {
self.max_concurrent = v;
self
}
pub fn poll_interval_ms(mut self, v: u64) -> Self {
self.poll_interval_ms = v;
self
}
pub fn config_cache_ttl_sec(mut self, v: u64) -> Self {
self.config_cache_ttl_sec = v;
self
}
pub fn secret_cache_ttl_sec(mut self, v: u64) -> Self {
self.secret_cache_ttl_sec = v;
self
}
pub fn shutdown_timeout_sec(mut self, v: u64) -> Self {
self.shutdown_timeout_sec = v;
self
}
pub fn encryption_key(mut self, v: String) -> Self {
self.encryption_key = Some(v);
self
}
pub fn install_metrics_recorder(mut self, v: bool) -> Self {
self.install_metrics_recorder = v;
self
}
pub fn metrics_port(mut self, v: u16) -> Self {
self.metrics_port = v;
self
}

/// Adapter that copies env-derived config into the builder. Used by the
/// `kronos-worker` binary to preserve service-mode defaults
/// (`system_schema = "public"`, `tenant_schema_prefix = ""`).
pub fn from_app_config(mut self, cfg: &AppConfig) -> Self {
self.system_schema = cfg.schema.system_schema.clone();
self.tenant_schema_prefix = cfg.schema.tenant_schema_prefix.clone();
self.max_concurrent = cfg.worker.max_concurrent;
self.poll_interval_ms = cfg.worker.poll_interval_ms;
self.config_cache_ttl_sec = cfg.worker.config_cache_ttl_sec;
self.secret_cache_ttl_sec = cfg.worker.secret_cache_ttl_sec;
self.shutdown_timeout_sec = cfg.worker.shutdown_timeout_sec;
self.encryption_key = Some(cfg.crypto.encryption_key.clone());
self.metrics_port = cfg.metrics.port;
self
}

// Test-only accessors so unit tests don't have to go through `build()`.
#[doc(hidden)]
pub fn system_schema_for_test(&self) -> &str { &self.system_schema }
#[doc(hidden)]
pub fn tenant_schema_prefix_for_test(&self) -> &str { &self.tenant_schema_prefix }
#[doc(hidden)]
pub fn max_concurrent_for_test(&self) -> usize { self.max_concurrent }
#[doc(hidden)]
pub fn poll_interval_ms_for_test(&self) -> u64 { self.poll_interval_ms }
#[doc(hidden)]
pub fn config_cache_ttl_sec_for_test(&self) -> u64 { self.config_cache_ttl_sec }
#[doc(hidden)]
pub fn secret_cache_ttl_sec_for_test(&self) -> u64 { self.secret_cache_ttl_sec }
#[doc(hidden)]
pub fn shutdown_timeout_sec_for_test(&self) -> u64 { self.shutdown_timeout_sec }
}

impl WorkerBuilder {
/// Validate the config, probe the system schema, and produce a [`Worker`].
/// When `install_metrics_recorder(true)` was called, the metrics recorder
/// is installed on `metrics_port` exactly once before returning.
pub async fn build(self) -> Result<Worker, BuildError> {
// 1. Schema-name shape validation (no DB call).
let cfg = SchemaConfig {
system_schema: self.system_schema.clone(),
tenant_schema_prefix: self.tenant_schema_prefix.clone(),
};
cfg.validate().map_err(BuildError::InvalidSchemaConfig)?;

// 2. Encryption key required for v1.
let encryption_key = self
.encryption_key
.clone()
.ok_or(BuildError::EncryptionKeyMissing)?;

// 3. System-schema existence probe via to_regclass (null-safe; no parse error
// when schema or table is missing). system_schema is already shape-validated,
// so quoting it is safe.
let qualified_orgs = format!("\"{}\".organizations", self.system_schema);
let qualified_ws = format!("\"{}\".workspaces", self.system_schema);
let probe: (Option<String>, Option<String>) = sqlx::query_as(
"SELECT to_regclass($1)::text, to_regclass($2)::text",
)
.bind(&qualified_orgs)
.bind(&qualified_ws)
.fetch_one(&self.pool)
.await?;

if probe.0.is_none() {
return Err(BuildError::SystemSchemaMissing {
schema: self.system_schema.clone(),
table: "organizations".into(),
});
}
if probe.1.is_none() {
return Err(BuildError::SystemSchemaMissing {
schema: self.system_schema.clone(),
table: "workspaces".into(),
});
}

// 4. Optional metrics recorder install — service-binary opt-in.
if self.install_metrics_recorder {
kronos_common::metrics::install_recorder_with_listener(self.metrics_port);
}

Ok(Worker {
pool: self.pool,
cfg: WorkerConfig {
system_schema: self.system_schema,
tenant_schema_prefix: self.tenant_schema_prefix,
max_concurrent: self.max_concurrent,
poll_interval_ms: self.poll_interval_ms,
config_cache_ttl_sec: self.config_cache_ttl_sec,
secret_cache_ttl_sec: self.secret_cache_ttl_sec,
shutdown_timeout_sec: self.shutdown_timeout_sec,
encryption_key,
},
})
}
}
17 changes: 17 additions & 0 deletions crates/embedded-worker/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use thiserror::Error;

#[derive(Debug, Error)]
pub enum BuildError {
#[error("invalid schema config: {0}")]
InvalidSchemaConfig(String),

#[error("system schema {schema:?} is missing the required table {table:?}; \
run kronos-migrate or call kronos_client::migrate before starting the worker")]
SystemSchemaMissing { schema: String, table: String },

#[error("encryption_key is required but was not provided")]
EncryptionKeyMissing,

#[error("database error during worker build: {0}")]
Database(#[from] sqlx::Error),
}
43 changes: 43 additions & 0 deletions crates/embedded-worker/src/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

/// Handle to a running worker. The worker continues until `shutdown()` is
/// called (graceful) or the handle is dropped (immediate task abort).
pub struct WorkerHandle {
pub(crate) shutdown_tx: Option<oneshot::Sender<()>>,
pub(crate) join: Option<JoinHandle<anyhow::Result<()>>>,
}

impl WorkerHandle {
/// Send the shutdown signal and wait for the worker loop to drain in-flight
/// tasks. Returns the worker's final result (`Ok(())` on clean exit).
pub async fn shutdown(mut self) -> anyhow::Result<()> {
if let Some(tx) = self.shutdown_tx.take() {
// Receiver is only dropped if the worker exited early; its result
// will be surfaced by join.await below.
let _ = tx.send(());
}
// `take()` so the `Drop` impl below doesn't also abort the (already
// draining) task once we return.
let join = self
.join
.take()
.expect("join handle present until shutdown consumes self");
match join.await {
Ok(res) => res,
Err(join_err) => Err(anyhow::anyhow!("worker task panicked: {join_err}")),
}
}
}

impl Drop for WorkerHandle {
/// Aborts the spawned worker task. `tokio::task::JoinHandle::drop` only
/// detaches — without this impl, dropping a `WorkerHandle` would leak the
/// worker (the loop would keep polling forever). Graceful shutdown is
/// opt-in via [`WorkerHandle::shutdown`]; bare drop is "fire abort and go."
fn drop(&mut self) {
if let Some(join) = self.join.as_ref() {
join.abort();
}
}
}
19 changes: 17 additions & 2 deletions crates/embedded-worker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,17 @@
//! Kronos worker pipeline as an embeddable library. This crate is an empty
//! shell in Plan 1; Plan 2 moves poller/pipeline/backoff/dispatcher here.
//! Kronos worker pipeline as an embeddable library. Moved from `kronos-worker`
//! in Plan 2 of the embedded-mode initiative.

pub mod backoff;
pub mod dispatcher;
pub mod pipeline;
pub mod poller;

mod builder;
mod error;
mod handle;
mod worker;

pub use builder::WorkerBuilder;
pub use error::BuildError;
pub use handle::WorkerHandle;
pub use worker::Worker;
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use kronos_common::{
cache::{ConfigCache, SecretCache},
config::AppConfig,
db, metrics as m,
tenant::SchemaRegistry,
};
Expand All @@ -13,31 +12,37 @@ use tokio::sync::Semaphore;
use uuid::Uuid;

use crate::pipeline::{self, PipelineContext};

pub async fn run(pool: PgPool, config: AppConfig) -> anyhow::Result<()> {
use crate::worker::WorkerConfig;

pub(crate) async fn run_loop<F>(
pool: PgPool,
cfg: WorkerConfig,
shutdown: F,
) -> anyhow::Result<()>
where
F: std::future::Future<Output = ()>,
{
let worker_id = format!("worker_{}", Uuid::new_v4().simple());
let semaphore = Arc::new(Semaphore::new(config.worker.max_concurrent));
let poll_interval = Duration::from_millis(config.worker.poll_interval_ms);
let semaphore = Arc::new(Semaphore::new(cfg.max_concurrent));
let poll_interval = Duration::from_millis(cfg.poll_interval_ms);
let schema_registry = SchemaRegistry::new(
pool.clone(),
config.schema.system_schema.clone(),
cfg.system_schema.clone(),
30,
);

let ctx = Arc::new(PipelineContext {
pool: pool.clone(),
http_client: Client::new(),
config_cache: ConfigCache::new(config.worker.config_cache_ttl_sec),
secret_cache: SecretCache::new(config.worker.secret_cache_ttl_sec),
encryption_key: config.crypto.encryption_key.clone(),
config_cache: ConfigCache::new(cfg.config_cache_ttl_sec),
secret_cache: SecretCache::new(cfg.secret_cache_ttl_sec),
encryption_key: cfg.encryption_key.clone(),
});

tracing::info!(worker_id = %worker_id, "Worker polling started");

let idle = Arc::new(AtomicBool::new(false));

let shutdown = tokio::signal::ctrl_c();

// ctrl_c gives an !Unpin future
// tokio::select wants the future it polls to implement Unpin (or are pinned)
tokio::pin!(shutdown);
Expand All @@ -56,9 +61,9 @@ pub async fn run(pool: PgPool, config: AppConfig) -> anyhow::Result<()> {
tokio::select! {
_ = &mut shutdown => {
tracing::info!("Shutting down worker, waiting for in-flight tasks...");
let timeout = Duration::from_secs(config.worker.shutdown_timeout_sec);
let timeout = Duration::from_secs(cfg.shutdown_timeout_sec);
let _ = tokio::time::timeout(timeout, async {
let _all = semaphore.acquire_many(config.worker.max_concurrent as u32).await;
let _all = semaphore.acquire_many(cfg.max_concurrent as u32).await;
}).await;
tracing::info!("Worker shutdown complete");
return Ok(());
Expand Down
Loading