Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion crates/api/src/handlers/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,67 @@ pub async fn create(
}
}

/// Optional server-side filters for [`list`], parsed from the query string
/// alongside [`PaginationParams`]. Blank values (e.g. `?status=`) are treated as
/// absent so the dashboard can send empty params for an "All" selection.
#[derive(Debug, serde::Deserialize)]
pub struct JobListFilters {
pub status: Option<String>,
pub trigger_type: Option<String>,
pub endpoint: Option<String>,
pub endpoint_type: Option<String>,
}

fn blank_to_none(value: Option<String>) -> Option<String> {
value.and_then(|v| {
let trimmed = v.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
}

impl JobListFilters {
/// Validates the enum-like filters and converts to the DB-layer struct.
/// Invalid values are rejected up front so a typo surfaces as a 400 rather
/// than silently returning zero rows.
fn into_db_filters(self) -> Result<db::jobs::JobFilters, AppError> {
let status = blank_to_none(self.status);
if let Some(s) = &status {
if s != "ACTIVE" && s != "RETIRED" {
return Err(AppError::InvalidRequest(format!("Invalid status: {s}")));
}
}

let trigger = blank_to_none(self.trigger_type);
if let Some(t) = &trigger {
TriggerType::from_str_val(t)
.ok_or_else(|| AppError::InvalidRequest(format!("Invalid trigger: {t}")))?;
}

let endpoint_type = blank_to_none(self.endpoint_type);
if let Some(et) = &endpoint_type {
EndpointType::from_str_val(et)
.ok_or_else(|| AppError::InvalidRequest(format!("Invalid endpoint_type: {et}")))?;
}

Ok(db::jobs::JobFilters {
status,
trigger,
endpoint: blank_to_none(self.endpoint),
endpoint_type,
})
}
}

pub async fn list(
state: web::Data<AppState>,
_auth: AuthenticatedRequest,
ws: Workspace,
params: web::Query<PaginationParams>,
filters: web::Query<JobListFilters>,
) -> Result<HttpResponse, AppError> {
let prefix = state.prefix();
let mut conn = kronos_common::db::scoped::scoped_connection(&state.pool, &ws.0.schema_name)
Expand All @@ -274,7 +330,8 @@ pub async fn list(
let mut db = DbContext::new(&mut *conn, prefix);
let limit = params.effective_limit();
let cursor = params.decode_cursor();
let items = db::jobs::list(&mut db, cursor.as_deref(), limit + 1).await?;
let filters = filters.into_inner().into_db_filters()?;
let items = db::jobs::list(&mut db, cursor.as_deref(), limit + 1, &filters).await?;

let has_more = items.len() as i64 > limit;
let items: Vec<_> = items.into_iter().take(limit as usize).collect();
Expand Down
143 changes: 126 additions & 17 deletions crates/common/src/db/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,30 +157,77 @@ pub async fn get_by_idempotency(
.await
}

/// Optional, server-side filters applied to [`list`]. All fields are ANDed
/// together; `None` fields are simply not constrained. `endpoint` is matched as
/// a case-insensitive substring so the dashboard can offer a search box, while
/// the enum-like fields (`status`, `trigger`, `endpoint_type`) match exactly.
#[derive(Debug, Default, Clone)]
pub struct JobFilters {
pub status: Option<String>,
pub trigger: Option<String>,
pub endpoint: Option<String>,
pub endpoint_type: Option<String>,
}

/// Builds the `list` query and the ordered list of string binds. Kept pure (no
/// DB access) so the placeholder/bind bookkeeping can be unit tested. The final
/// `LIMIT` placeholder is left for the caller to bind as an `i64`.
fn build_list_query(t: &str, cursor: Option<&str>, filters: &JobFilters) -> (String, Vec<String>) {
let mut conditions: Vec<String> = Vec::new();
let mut binds: Vec<String> = Vec::new();
let mut n = 1;

if let Some(c) = cursor {
conditions.push(format!(
"created_at < (SELECT created_at FROM {t} WHERE job_id = ${n})"
));
binds.push(c.to_string());
n += 1;
}
if let Some(status) = &filters.status {
conditions.push(format!("status = ${n}"));
binds.push(status.clone());
n += 1;
}
if let Some(trigger) = &filters.trigger {
conditions.push(format!("trigger_type = ${n}"));
binds.push(trigger.clone());
n += 1;
}
if let Some(endpoint_type) = &filters.endpoint_type {
conditions.push(format!("endpoint_type = ${n}"));
binds.push(endpoint_type.clone());
n += 1;
}
if let Some(endpoint) = &filters.endpoint {
conditions.push(format!("endpoint ILIKE '%' || ${n} || '%'"));
binds.push(endpoint.clone());
n += 1;
Comment on lines +202 to +205

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Escape wildcard characters in endpoint search.

Line 203 treats % and _ inside user input as SQL wildcards, so endpoint-name filtering can return incorrect matches for literal names containing those characters.

Suggested fix
+fn escape_like(value: &str) -> String {
+    value
+        .replace('\\', "\\\\")
+        .replace('%', "\\%")
+        .replace('_', "\\_")
+}
@@
-    if let Some(endpoint) = &filters.endpoint {
-        conditions.push(format!("endpoint ILIKE '%' || ${n} || '%'"));
-        binds.push(endpoint.clone());
+    if let Some(endpoint) = &filters.endpoint {
+        conditions.push(format!("endpoint ILIKE '%' || ${n} || '%' ESCAPE '\\'"));
+        binds.push(escape_like(endpoint));
         n += 1;
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if let Some(endpoint) = &filters.endpoint {
conditions.push(format!("endpoint ILIKE '%' || ${n} || '%'"));
binds.push(endpoint.clone());
n += 1;
if let Some(endpoint) = &filters.endpoint {
conditions.push(format!("endpoint ILIKE '%' || ${n} || '%' ESCAPE '\\'"));
binds.push(escape_like(endpoint));
n += 1;
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/common/src/db/jobs.rs` around lines 202 - 205, The endpoint filter
currently treats user input percent and underscore as SQL wildcards; update the
block that handles filters.endpoint so you escape '%' and '_' (and backslashes)
in the endpoint value before pushing it to binds, then use an ILIKE with an
explicit ESCAPE clause; specifically, transform filters.endpoint (e.g. replace
'\' with '\\', '%' with '\%', '_' with '\_'), push the escaped string to binds,
change the condition pushed into conditions from "endpoint ILIKE '%' || ${n} ||
'%'" to include "ESCAPE '\\'" (e.g. "endpoint ILIKE '%' || ${n} || '%' ESCAPE
'\\'") and leave n increment logic as-is so the query matches literals
correctly.

}

let where_clause = if conditions.is_empty() {
String::new()
} else {
format!(" WHERE {}", conditions.join(" AND "))
};

let sql = format!("SELECT * FROM {t}{where_clause} ORDER BY created_at DESC LIMIT ${n}");
(sql, binds)
Comment on lines +181 to +215

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Use a stable sort key for cursor pagination.

Line 182 and Line 214 paginate/order by created_at only, so rows sharing the same timestamp can be skipped or repeated across pages.

Suggested fix
-        conditions.push(format!(
-            "created_at < (SELECT created_at FROM {t} WHERE job_id = ${n})"
-        ));
+        conditions.push(format!(
+            "(created_at, job_id) < (SELECT created_at, job_id FROM {t} WHERE job_id = ${n})"
+        ));
@@
-    let sql = format!("SELECT * FROM {t}{where_clause} ORDER BY created_at DESC LIMIT ${n}");
+    let sql = format!(
+        "SELECT * FROM {t}{where_clause} ORDER BY created_at DESC, job_id DESC LIMIT ${n}"
+    );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
conditions.push(format!(
"created_at < (SELECT created_at FROM {t} WHERE job_id = ${n})"
));
binds.push(c.to_string());
n += 1;
}
if let Some(status) = &filters.status {
conditions.push(format!("status = ${n}"));
binds.push(status.clone());
n += 1;
}
if let Some(trigger) = &filters.trigger {
conditions.push(format!("trigger_type = ${n}"));
binds.push(trigger.clone());
n += 1;
}
if let Some(endpoint_type) = &filters.endpoint_type {
conditions.push(format!("endpoint_type = ${n}"));
binds.push(endpoint_type.clone());
n += 1;
}
if let Some(endpoint) = &filters.endpoint {
conditions.push(format!("endpoint ILIKE '%' || ${n} || '%'"));
binds.push(endpoint.clone());
n += 1;
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!(" WHERE {}", conditions.join(" AND "))
};
let sql = format!("SELECT * FROM {t}{where_clause} ORDER BY created_at DESC LIMIT ${n}");
(sql, binds)
conditions.push(format!(
"(created_at, job_id) < (SELECT created_at, job_id FROM {t} WHERE job_id = ${n})"
));
binds.push(c.to_string());
n += 1;
}
if let Some(status) = &filters.status {
conditions.push(format!("status = ${n}"));
binds.push(status.clone());
n += 1;
}
if let Some(trigger) = &filters.trigger {
conditions.push(format!("trigger_type = ${n}"));
binds.push(trigger.clone());
n += 1;
}
if let Some(endpoint_type) = &filters.endpoint_type {
conditions.push(format!("endpoint_type = ${n}"));
binds.push(endpoint_type.clone());
n += 1;
}
if let Some(endpoint) = &filters.endpoint {
conditions.push(format!("endpoint ILIKE '%' || ${n} || '%'"));
binds.push(endpoint.clone());
n += 1;
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!(" WHERE {}", conditions.join(" AND "))
};
let sql = format!(
"SELECT * FROM {t}{where_clause} ORDER BY created_at DESC, job_id DESC LIMIT ${n}"
);
(sql, binds)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/common/src/db/jobs.rs` around lines 181 - 215, The pagination uses
only created_at, which is unstable; update the cursor logic and ORDER BY to use
a deterministic tie-breaker (job_id). Replace the single-row cursor condition
"created_at < (SELECT created_at FROM {t} WHERE job_id = ${n})" with a composite
condition like "(created_at < (SELECT created_at FROM {t} WHERE job_id = ${n})
OR (created_at = (SELECT created_at FROM {t} WHERE job_id = ${n}) AND job_id <
${m}))" and add the job_id value to binds (note the new bind ${m}); then change
the ORDER BY in the sql string from "ORDER BY created_at DESC" to "ORDER BY
created_at DESC, job_id DESC" so results are stably ordered for cursor
pagination (update the binds vector where the cursor variable c is handled and
the final sql variable).

}

pub async fn list(
db: &mut DbContext<'_>,
cursor: Option<&str>,
limit: i64,
filters: &JobFilters,
) -> Result<Vec<Job>, sqlx::Error> {
let t = tbl(db.prefix, "jobs");
match cursor {
Some(c) => sqlx::query_as::<_, Job>(&format!(
"SELECT * FROM {t} WHERE created_at < (SELECT created_at FROM {t} WHERE job_id = $1)
ORDER BY created_at DESC LIMIT $2"
))
.bind(c)
.bind(limit)
.fetch_all(&mut *db.conn)
.await,
None => {
sqlx::query_as::<_, Job>(&format!(
"SELECT * FROM {t} ORDER BY created_at DESC LIMIT $1"
))
.bind(limit)
.fetch_all(&mut *db.conn)
.await
}
let (sql, binds) = build_list_query(&t, cursor, filters);
let mut query = sqlx::query_as::<_, Job>(&sql);
for bind in &binds {
query = query.bind(bind);
}
query.bind(limit).fetch_all(&mut *db.conn).await
}

pub async fn cancel(
Expand Down Expand Up @@ -430,4 +477,66 @@ mod tests {
assert!(cmd.contains("\"ws_acme\".\"sched_jobs\" j"));
assert!(cmd.contains("\"ws_acme\".\"sched_endpoints\" e"));
}

#[test]
fn list_query_without_cursor_or_filters() {
let (sql, binds) = build_list_query("jobs", None, &JobFilters::default());
assert_eq!(sql, "SELECT * FROM jobs ORDER BY created_at DESC LIMIT $1");
assert!(binds.is_empty());
}

#[test]
fn list_query_with_cursor_only() {
let (sql, binds) = build_list_query("jobs", Some("job-9"), &JobFilters::default());
assert_eq!(
sql,
"SELECT * FROM jobs WHERE created_at < (SELECT created_at FROM jobs WHERE job_id = $1) \
ORDER BY created_at DESC LIMIT $2"
);
assert_eq!(binds, vec!["job-9".to_string()]);
}

#[test]
fn list_query_binds_filters_in_order_after_cursor() {
let filters = JobFilters {
status: Some("ACTIVE".into()),
trigger: Some("CRON".into()),
endpoint_type: Some("HTTP".into()),
endpoint: Some("notify".into()),
};
let (sql, binds) = build_list_query("jobs", Some("job-9"), &filters);
// Cursor is $1, filters follow in declaration order, LIMIT is last ($6).
assert_eq!(
sql,
"SELECT * FROM jobs WHERE \
created_at < (SELECT created_at FROM jobs WHERE job_id = $1) AND \
status = $2 AND trigger_type = $3 AND endpoint_type = $4 AND \
endpoint ILIKE '%' || $5 || '%' \
ORDER BY created_at DESC LIMIT $6"
);
assert_eq!(
binds,
vec![
"job-9".to_string(),
"ACTIVE".to_string(),
"CRON".to_string(),
"HTTP".to_string(),
"notify".to_string(),
]
);
}

#[test]
fn list_query_filters_without_cursor_start_at_one() {
let filters = JobFilters {
status: Some("RETIRED".into()),
..Default::default()
};
let (sql, binds) = build_list_query("jobs", None, &filters);
assert_eq!(
sql,
"SELECT * FROM jobs WHERE status = $1 ORDER BY created_at DESC LIMIT $2"
);
assert_eq!(binds, vec!["RETIRED".to_string()]);
}
}
59 changes: 55 additions & 4 deletions crates/dashboard/src/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,57 @@ mod inner {

// -- Jobs API (workspace-scoped) --

pub async fn list_jobs(org_id: String, workspace_id: String) -> Result<Vec<Job>, String> {
/// Percent-encodes a query-string value (RFC 3986 unreserved set passes
/// through, everything else is `%XX`). Keeps the endpoint search box safe
/// for spaces and other special characters without pulling in a URL crate.
fn encode_query_value(value: &str) -> String {
let mut out = String::with_capacity(value.len());
for byte in value.bytes() {
match byte {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
out.push(byte as char)
}
_ => out.push_str(&format!("%{byte:02X}")),
}
}
out
}

fn push_query_param(qs: &mut String, key: &str, value: &str) {
if value.is_empty() {
return;
}
qs.push(if qs.is_empty() { '?' } else { '&' });
qs.push_str(key);
qs.push('=');
qs.push_str(&encode_query_value(value));
}

pub async fn list_jobs(
org_id: String,
workspace_id: String,
params: JobListQueryParams,
) -> Result<PaginatedResponse<Job>, String> {
let config = get_config();
let base = config.api_base();
let resp = Request::get(&format!("{base}/v1/jobs"))
let mut qs = String::new();
if let Some(cursor) = &params.cursor {
push_query_param(&mut qs, "cursor", cursor);
}
push_query_param(&mut qs, "limit", &params.limit.to_string());
if let Some(status) = &params.status {
push_query_param(&mut qs, "status", status);
}
if let Some(trigger) = &params.trigger {
push_query_param(&mut qs, "trigger_type", trigger);
}
if let Some(endpoint_type) = &params.endpoint_type {
push_query_param(&mut qs, "endpoint_type", endpoint_type);
}
if let Some(endpoint) = &params.endpoint {
push_query_param(&mut qs, "endpoint", endpoint);
}
let resp = Request::get(&format!("{base}/v1/jobs{qs}"))
.header("Authorization", &format!("Bearer {}", config.api_key))
.header("X-Org-Id", &org_id)
.header("X-Workspace-Id", &workspace_id)
Expand All @@ -186,7 +233,7 @@ mod inner {
));
}
let data: PaginatedResponse<Job> = resp.json().await.map_err(|e| e.to_string())?;
Ok(data.data)
Ok(data)
}

pub async fn get_job(
Expand Down Expand Up @@ -899,7 +946,11 @@ mod inner {
) -> Result<Workspace, String> {
Err("SSR: not available".to_string())
}
pub async fn list_jobs(_org_id: String, _workspace_id: String) -> Result<Vec<Job>, String> {
pub async fn list_jobs(
_org_id: String,
_workspace_id: String,
_params: JobListQueryParams,
) -> Result<PaginatedResponse<Job>, String> {
Err("SSR: not available".to_string())
}
pub async fn get_job(
Expand Down
25 changes: 25 additions & 0 deletions crates/dashboard/src/api/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,31 @@ pub struct Job {
pub created_at: String,
}

/// Query parameters for paginating and filtering the jobs list. Empty `Option`
/// fields are omitted from the request, which the API treats as "no filter".
#[derive(Debug, Clone)]
pub struct JobListQueryParams {
pub cursor: Option<String>,
pub limit: i64,
pub status: Option<String>,
pub trigger: Option<String>,
pub endpoint: Option<String>,
pub endpoint_type: Option<String>,
}

impl Default for JobListQueryParams {
fn default() -> Self {
Self {
cursor: None,
limit: 50,
status: None,
trigger: None,
endpoint: None,
endpoint_type: None,
}
}
}

// -- Endpoint --

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
Expand Down
Loading