From 9b0a56c1c87c36d5e028ae1bec0557767e796e6c Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 10 Jun 2026 03:10:02 +0000 Subject: [PATCH] feat(dashboard): add pagination and filters to the jobs page Wire up cursor-based pagination and server-side filtering for the Jobs tab in the workspace dashboard. Backend: - common/db/jobs::list now accepts a JobFilters struct (status, trigger, endpoint substring, endpoint_type), built via a pure, unit-tested query builder so the cursor + filter placeholder bookkeeping is verifiable. - api jobs::list parses a JobListFilters query struct alongside the existing pagination params, validating enum-like values up front so a typo returns 400 rather than silently empty results. - smithy ListJobsInput gains the endpoint_type query param (endpoint, trigger_type, status were already declared). Dashboard: - list_jobs sends cursor/limit/status/trigger_type/endpoint/endpoint_type via JobListQueryParams and returns the full PaginatedResponse (the cursor was previously discarded, capping the view at the first 50 jobs). - JobsTab gains a filter bar (status, trigger, endpoint type, endpoint search + clear) and pagination controls (Prev/Next with a cursor stack for back-navigation, plus a per-page size selector). Changing a filter or page size resets pagination. --- crates/api/src/handlers/jobs.rs | 59 ++++- crates/common/src/db/jobs.rs | 143 +++++++++-- crates/dashboard/src/api/client.rs | 59 ++++- crates/dashboard/src/api/models.rs | 25 ++ .../dashboard/src/pages/workspace_detail.rs | 233 ++++++++++++++++-- smithy/model/jobs.smithy | 3 + 6 files changed, 485 insertions(+), 37 deletions(-) diff --git a/crates/api/src/handlers/jobs.rs b/crates/api/src/handlers/jobs.rs index 6777bd0..3a810cf 100644 --- a/crates/api/src/handlers/jobs.rs +++ b/crates/api/src/handlers/jobs.rs @@ -261,11 +261,67 @@ pub async fn create( } } +/// Optional server-side filters for [`list`], parsed from the query string +/// alongside [`PaginationParams`]. Blank values (e.g. `?status=`) are treated as +/// absent so the dashboard can send empty params for an "All" selection. +#[derive(Debug, serde::Deserialize)] +pub struct JobListFilters { + pub status: Option, + pub trigger_type: Option, + pub endpoint: Option, + pub endpoint_type: Option, +} + +fn blank_to_none(value: Option) -> Option { + value.and_then(|v| { + let trimmed = v.trim(); + if trimmed.is_empty() { + None + } else { + Some(trimmed.to_string()) + } + }) +} + +impl JobListFilters { + /// Validates the enum-like filters and converts to the DB-layer struct. + /// Invalid values are rejected up front so a typo surfaces as a 400 rather + /// than silently returning zero rows. + fn into_db_filters(self) -> Result { + let status = blank_to_none(self.status); + if let Some(s) = &status { + if s != "ACTIVE" && s != "RETIRED" { + return Err(AppError::InvalidRequest(format!("Invalid status: {s}"))); + } + } + + let trigger = blank_to_none(self.trigger_type); + if let Some(t) = &trigger { + TriggerType::from_str_val(t) + .ok_or_else(|| AppError::InvalidRequest(format!("Invalid trigger: {t}")))?; + } + + let endpoint_type = blank_to_none(self.endpoint_type); + if let Some(et) = &endpoint_type { + EndpointType::from_str_val(et) + .ok_or_else(|| AppError::InvalidRequest(format!("Invalid endpoint_type: {et}")))?; + } + + Ok(db::jobs::JobFilters { + status, + trigger, + endpoint: blank_to_none(self.endpoint), + endpoint_type, + }) + } +} + pub async fn list( state: web::Data, _auth: AuthenticatedRequest, ws: Workspace, params: web::Query, + filters: web::Query, ) -> Result { let prefix = state.prefix(); let mut conn = kronos_common::db::scoped::scoped_connection(&state.pool, &ws.0.schema_name) @@ -274,7 +330,8 @@ pub async fn list( let mut db = DbContext::new(&mut *conn, prefix); let limit = params.effective_limit(); let cursor = params.decode_cursor(); - let items = db::jobs::list(&mut db, cursor.as_deref(), limit + 1).await?; + let filters = filters.into_inner().into_db_filters()?; + let items = db::jobs::list(&mut db, cursor.as_deref(), limit + 1, &filters).await?; let has_more = items.len() as i64 > limit; let items: Vec<_> = items.into_iter().take(limit as usize).collect(); diff --git a/crates/common/src/db/jobs.rs b/crates/common/src/db/jobs.rs index cb61ad7..4c31853 100644 --- a/crates/common/src/db/jobs.rs +++ b/crates/common/src/db/jobs.rs @@ -157,30 +157,77 @@ pub async fn get_by_idempotency( .await } +/// Optional, server-side filters applied to [`list`]. All fields are ANDed +/// together; `None` fields are simply not constrained. `endpoint` is matched as +/// a case-insensitive substring so the dashboard can offer a search box, while +/// the enum-like fields (`status`, `trigger`, `endpoint_type`) match exactly. +#[derive(Debug, Default, Clone)] +pub struct JobFilters { + pub status: Option, + pub trigger: Option, + pub endpoint: Option, + pub endpoint_type: Option, +} + +/// Builds the `list` query and the ordered list of string binds. Kept pure (no +/// DB access) so the placeholder/bind bookkeeping can be unit tested. The final +/// `LIMIT` placeholder is left for the caller to bind as an `i64`. +fn build_list_query(t: &str, cursor: Option<&str>, filters: &JobFilters) -> (String, Vec) { + let mut conditions: Vec = Vec::new(); + let mut binds: Vec = Vec::new(); + let mut n = 1; + + if let Some(c) = cursor { + conditions.push(format!( + "created_at < (SELECT created_at FROM {t} WHERE job_id = ${n})" + )); + binds.push(c.to_string()); + n += 1; + } + if let Some(status) = &filters.status { + conditions.push(format!("status = ${n}")); + binds.push(status.clone()); + n += 1; + } + if let Some(trigger) = &filters.trigger { + conditions.push(format!("trigger_type = ${n}")); + binds.push(trigger.clone()); + n += 1; + } + if let Some(endpoint_type) = &filters.endpoint_type { + conditions.push(format!("endpoint_type = ${n}")); + binds.push(endpoint_type.clone()); + n += 1; + } + if let Some(endpoint) = &filters.endpoint { + conditions.push(format!("endpoint ILIKE '%' || ${n} || '%'")); + binds.push(endpoint.clone()); + n += 1; + } + + let where_clause = if conditions.is_empty() { + String::new() + } else { + format!(" WHERE {}", conditions.join(" AND ")) + }; + + let sql = format!("SELECT * FROM {t}{where_clause} ORDER BY created_at DESC LIMIT ${n}"); + (sql, binds) +} + pub async fn list( db: &mut DbContext<'_>, cursor: Option<&str>, limit: i64, + filters: &JobFilters, ) -> Result, sqlx::Error> { let t = tbl(db.prefix, "jobs"); - match cursor { - Some(c) => sqlx::query_as::<_, Job>(&format!( - "SELECT * FROM {t} WHERE created_at < (SELECT created_at FROM {t} WHERE job_id = $1) - ORDER BY created_at DESC LIMIT $2" - )) - .bind(c) - .bind(limit) - .fetch_all(&mut *db.conn) - .await, - None => { - sqlx::query_as::<_, Job>(&format!( - "SELECT * FROM {t} ORDER BY created_at DESC LIMIT $1" - )) - .bind(limit) - .fetch_all(&mut *db.conn) - .await - } + let (sql, binds) = build_list_query(&t, cursor, filters); + let mut query = sqlx::query_as::<_, Job>(&sql); + for bind in &binds { + query = query.bind(bind); } + query.bind(limit).fetch_all(&mut *db.conn).await } pub async fn cancel( @@ -430,4 +477,66 @@ mod tests { assert!(cmd.contains("\"ws_acme\".\"sched_jobs\" j")); assert!(cmd.contains("\"ws_acme\".\"sched_endpoints\" e")); } + + #[test] + fn list_query_without_cursor_or_filters() { + let (sql, binds) = build_list_query("jobs", None, &JobFilters::default()); + assert_eq!(sql, "SELECT * FROM jobs ORDER BY created_at DESC LIMIT $1"); + assert!(binds.is_empty()); + } + + #[test] + fn list_query_with_cursor_only() { + let (sql, binds) = build_list_query("jobs", Some("job-9"), &JobFilters::default()); + assert_eq!( + sql, + "SELECT * FROM jobs WHERE created_at < (SELECT created_at FROM jobs WHERE job_id = $1) \ + ORDER BY created_at DESC LIMIT $2" + ); + assert_eq!(binds, vec!["job-9".to_string()]); + } + + #[test] + fn list_query_binds_filters_in_order_after_cursor() { + let filters = JobFilters { + status: Some("ACTIVE".into()), + trigger: Some("CRON".into()), + endpoint_type: Some("HTTP".into()), + endpoint: Some("notify".into()), + }; + let (sql, binds) = build_list_query("jobs", Some("job-9"), &filters); + // Cursor is $1, filters follow in declaration order, LIMIT is last ($6). + assert_eq!( + sql, + "SELECT * FROM jobs WHERE \ + created_at < (SELECT created_at FROM jobs WHERE job_id = $1) AND \ + status = $2 AND trigger_type = $3 AND endpoint_type = $4 AND \ + endpoint ILIKE '%' || $5 || '%' \ + ORDER BY created_at DESC LIMIT $6" + ); + assert_eq!( + binds, + vec![ + "job-9".to_string(), + "ACTIVE".to_string(), + "CRON".to_string(), + "HTTP".to_string(), + "notify".to_string(), + ] + ); + } + + #[test] + fn list_query_filters_without_cursor_start_at_one() { + let filters = JobFilters { + status: Some("RETIRED".into()), + ..Default::default() + }; + let (sql, binds) = build_list_query("jobs", None, &filters); + assert_eq!( + sql, + "SELECT * FROM jobs WHERE status = $1 ORDER BY created_at DESC LIMIT $2" + ); + assert_eq!(binds, vec!["RETIRED".to_string()]); + } } diff --git a/crates/dashboard/src/api/client.rs b/crates/dashboard/src/api/client.rs index 48ec87a..852fe7a 100644 --- a/crates/dashboard/src/api/client.rs +++ b/crates/dashboard/src/api/client.rs @@ -168,10 +168,57 @@ mod inner { // -- Jobs API (workspace-scoped) -- - pub async fn list_jobs(org_id: String, workspace_id: String) -> Result, String> { + /// Percent-encodes a query-string value (RFC 3986 unreserved set passes + /// through, everything else is `%XX`). Keeps the endpoint search box safe + /// for spaces and other special characters without pulling in a URL crate. + fn encode_query_value(value: &str) -> String { + let mut out = String::with_capacity(value.len()); + for byte in value.bytes() { + match byte { + b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => { + out.push(byte as char) + } + _ => out.push_str(&format!("%{byte:02X}")), + } + } + out + } + + fn push_query_param(qs: &mut String, key: &str, value: &str) { + if value.is_empty() { + return; + } + qs.push(if qs.is_empty() { '?' } else { '&' }); + qs.push_str(key); + qs.push('='); + qs.push_str(&encode_query_value(value)); + } + + pub async fn list_jobs( + org_id: String, + workspace_id: String, + params: JobListQueryParams, + ) -> Result, String> { let config = get_config(); let base = config.api_base(); - let resp = Request::get(&format!("{base}/v1/jobs")) + let mut qs = String::new(); + if let Some(cursor) = ¶ms.cursor { + push_query_param(&mut qs, "cursor", cursor); + } + push_query_param(&mut qs, "limit", ¶ms.limit.to_string()); + if let Some(status) = ¶ms.status { + push_query_param(&mut qs, "status", status); + } + if let Some(trigger) = ¶ms.trigger { + push_query_param(&mut qs, "trigger_type", trigger); + } + if let Some(endpoint_type) = ¶ms.endpoint_type { + push_query_param(&mut qs, "endpoint_type", endpoint_type); + } + if let Some(endpoint) = ¶ms.endpoint { + push_query_param(&mut qs, "endpoint", endpoint); + } + let resp = Request::get(&format!("{base}/v1/jobs{qs}")) .header("Authorization", &format!("Bearer {}", config.api_key)) .header("X-Org-Id", &org_id) .header("X-Workspace-Id", &workspace_id) @@ -186,7 +233,7 @@ mod inner { )); } let data: PaginatedResponse = resp.json().await.map_err(|e| e.to_string())?; - Ok(data.data) + Ok(data) } pub async fn get_job( @@ -899,7 +946,11 @@ mod inner { ) -> Result { Err("SSR: not available".to_string()) } - pub async fn list_jobs(_org_id: String, _workspace_id: String) -> Result, String> { + pub async fn list_jobs( + _org_id: String, + _workspace_id: String, + _params: JobListQueryParams, + ) -> Result, String> { Err("SSR: not available".to_string()) } pub async fn get_job( diff --git a/crates/dashboard/src/api/models.rs b/crates/dashboard/src/api/models.rs index ec6425a..b7cb127 100644 --- a/crates/dashboard/src/api/models.rs +++ b/crates/dashboard/src/api/models.rs @@ -88,6 +88,31 @@ pub struct Job { pub created_at: String, } +/// Query parameters for paginating and filtering the jobs list. Empty `Option` +/// fields are omitted from the request, which the API treats as "no filter". +#[derive(Debug, Clone)] +pub struct JobListQueryParams { + pub cursor: Option, + pub limit: i64, + pub status: Option, + pub trigger: Option, + pub endpoint: Option, + pub endpoint_type: Option, +} + +impl Default for JobListQueryParams { + fn default() -> Self { + Self { + cursor: None, + limit: 50, + status: None, + trigger: None, + endpoint: None, + endpoint_type: None, + } + } +} + // -- Endpoint -- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] diff --git a/crates/dashboard/src/pages/workspace_detail.rs b/crates/dashboard/src/pages/workspace_detail.rs index 9d22dd8..1e5880f 100644 --- a/crates/dashboard/src/pages/workspace_detail.rs +++ b/crates/dashboard/src/pages/workspace_detail.rs @@ -5,7 +5,7 @@ use leptos_router::hooks::use_params_map; use crate::app::prefixed; use crate::api::{ self, Config, CreateConfig, CreateEndpoint, CreatePayloadSpec, CreateSecret, Endpoint, - Execution, Job, PayloadSpec, UpdateConfig, UpdatePayloadSpec, + Execution, Job, JobListQueryParams, PayloadSpec, UpdateConfig, UpdatePayloadSpec, UpdateSecret, }; use crate::components::confirm::ConfirmDialog; @@ -985,18 +985,62 @@ fn UpdateSecretForm( // Jobs Tab (Enhanced) // ════════════════════════════════════════════════════════════ +/// Normalizes a raw filter string (from a select/input) into an optional value: +/// blank means "no filter". +fn filter_opt(value: String) -> Option { + let trimmed = value.trim(); + if trimmed.is_empty() { + None + } else { + Some(trimmed.to_string()) + } +} + #[component] fn JobsTab(org_id: String, workspace_id: String) -> impl IntoView { let (refresh, set_refresh) = signal(0u32); let (modal_open, set_modal_open) = signal(false); + // Filters ("" == "All"/unset). Changing any filter resets pagination. + let (status_filter, set_status_filter) = signal(String::new()); + let (trigger_filter, set_trigger_filter) = signal(String::new()); + let (endpoint_type_filter, set_endpoint_type_filter) = signal(String::new()); + let (endpoint_filter, set_endpoint_filter) = signal(String::new()); + let (page_size, set_page_size) = signal(50i64); + + // Cursor pagination. The backend cursor is forward-only, so we keep the + // cursor used for each visited page (`page_cursors[i]`) and an index into + // it to support a Previous button. Page 1 uses cursor `None`. + let (page_cursors, set_page_cursors) = signal(vec![Option::::None]); + let (page_index, set_page_index) = signal(0usize); + + // Resetting pagination whenever filters or page size change keeps the cursor + // stack consistent with the active query. + let reset_pagination = move || { + set_page_cursors.set(vec![None]); + set_page_index.set(0); + }; + let oid = org_id.clone(); let wid = workspace_id.clone(); let jobs = LocalResource::new(move || { let _ = refresh.get(); let oid = oid.clone(); let wid = wid.clone(); - api::list_jobs(oid, wid) + let cursor = page_cursors + .get() + .get(page_index.get()) + .cloned() + .flatten(); + let params = JobListQueryParams { + cursor, + limit: page_size.get(), + status: filter_opt(status_filter.get()), + trigger: filter_opt(trigger_filter.get()), + endpoint: filter_opt(endpoint_filter.get()), + endpoint_type: filter_opt(endpoint_type_filter.get()), + }; + api::list_jobs(oid, wid, params) }); let oid_render = org_id.clone(); @@ -1004,16 +1048,79 @@ fn JobsTab(org_id: String, workspace_id: String) -> impl IntoView { let oid_form = org_id.clone(); let wid_form = workspace_id.clone(); + let any_filter = move || { + !status_filter.get().is_empty() + || !trigger_filter.get().is_empty() + || !endpoint_type_filter.get().is_empty() + || !endpoint_filter.get().is_empty() + }; + view! {
-
- + // Filter bar + actions +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+ + + + +
+ +
}> @@ -1022,12 +1129,42 @@ fn JobsTab(org_id: String, workspace_id: String) -> impl IntoView { let wid = wid_render.clone(); jobs.get().map(|r| (*r).clone()).map(move |result| { match result { - Ok(jobs) => { - if jobs.is_empty() { - view! { }.into_any() + Ok(page) => { + let next_cursor = page.cursor.clone(); + if page.data.is_empty() { + let msg = if any_filter() { + "No jobs match the current filters." + } else { + "No jobs in this workspace. Create an endpoint first, then add a job." + }; + view! { +
+ + +
+ }.into_any() } else { - let jobs = jobs.clone(); - view! { }.into_any() + let jobs = page.data.clone(); + view! { +
+ + +
+ }.into_any() } } Err(e) => view! { }.into_any(), @@ -1043,6 +1180,72 @@ fn JobsTab(org_id: String, workspace_id: String) -> impl IntoView { } } +#[component] +fn JobsPagination( + next_cursor: Option, + page_size: ReadSignal, + set_page_size: WriteSignal, + set_page_cursors: WriteSignal>>, + page_index: ReadSignal, + set_page_index: WriteSignal, +) -> impl IntoView { + let has_next = next_cursor.is_some(); + let has_prev = move || page_index.get() > 0; + + let on_next = move |_| { + if let Some(nc) = next_cursor.clone() { + let idx = page_index.get_untracked(); + set_page_cursors.update(|cursors| { + // Drop any forward history, then record the next page's cursor. + cursors.truncate(idx + 1); + cursors.push(Some(nc)); + }); + set_page_index.set(idx + 1); + } + }; + + let on_prev = move |_| { + let idx = page_index.get_untracked(); + if idx > 0 { + set_page_index.set(idx - 1); + } + }; + + view! { +
+
+ "Per page:" + +
+
+ "Page " {move || page_index.get() + 1} + + +
+
+ } +} + #[component] fn CreateJobForm( org_id: String, diff --git a/smithy/model/jobs.smithy b/smithy/model/jobs.smithy index f2a7f9c..f8b0c72 100644 --- a/smithy/model/jobs.smithy +++ b/smithy/model/jobs.smithy @@ -158,6 +158,9 @@ structure ListJobsInput with [WorkspaceHeaders, PaginationQuery] { @httpQuery("status") status: JobStatusEnum + + @httpQuery("endpoint_type") + endpoint_type: EndpointTypeEnum } structure ListJobsOutput {