Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions crates/dashboard/src/pages/workspace_detail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,12 @@ fn JobsTable(jobs: Vec<Job>, org_id: String, workspace_id: String, set_refresh:
let (status_job, set_status_job) = signal(Option::<String>::None);
let (versions_job, set_versions_job) = signal(Option::<String>::None);
let (cancel_error, set_cancel_error) = signal(Option::<String>::None);
// Ad-hoc invoke: holds the job selected for a one-off run.
let (invoke_job, set_invoke_job) = signal(Option::<Job>::None);
let (invoke_open, set_invoke_open) = signal(false);

let oid_invoke = org_id.clone();
let wid_invoke = workspace_id.clone();

view! {
<div class="space-y-2">
Expand Down Expand Up @@ -1276,6 +1282,9 @@ fn JobsTable(jobs: Vec<Job>, org_id: String, workspace_id: String, set_refresh:
let wid_versions = workspace_id.clone();
let is_active = job.status == "ACTIVE";
let is_cron = job.trigger == "CRON";
// Internal jobs are kronos-managed and rejected by the API for user invokes.
let can_invoke = is_active && job.endpoint_type != "INTERNAL";
let job_invoke = job.clone();
let jid_for_status = job.job_id.clone();
let jid_for_versions = job.job_id.clone();
let jid_for_execs = job.job_id.clone();
Expand Down Expand Up @@ -1315,6 +1324,14 @@ fn JobsTable(jobs: Vec<Job>, org_id: String, workspace_id: String, set_refresh:
} class="text-orange-600 hover:text-orange-800 text-xs font-medium">"Cancel"</button>
})
} else { None }}
{if can_invoke {
Some(view! {
<button on:click=move |_| {
set_invoke_job.set(Some(job_invoke.clone()));
set_invoke_open.set(true);
} class="text-green-600 hover:text-green-800 text-xs font-medium">"Invoke"</button>
})
} else { None }}
<button on:click=move |_| {
let current = status_job.get_untracked();
if current.as_deref() == Some(&jid_status) {
Expand Down Expand Up @@ -1377,10 +1394,163 @@ fn JobsTable(jobs: Vec<Job>, org_id: String, workspace_id: String, set_refresh:
</tbody>
</table>
</div>

<Modal title="Invoke Job" open=invoke_open set_open=set_invoke_open>
<InvokeJobForm org_id=oid_invoke workspace_id=wid_invoke invoke_job=invoke_job set_modal_open=set_invoke_open set_refresh=set_refresh />
</Modal>
</div>
}
}

#[component]
fn InvokeJobForm(
org_id: String,
workspace_id: String,
invoke_job: ReadSignal<Option<Job>>,
set_modal_open: WriteSignal<bool>,
set_refresh: WriteSignal<u32>,
) -> impl IntoView {
// "IMMEDIATE" fires now; "SCHEDULED" maps to a one-off DELAYED job at run_at.
let (mode, set_mode) = signal("IMMEDIATE".to_string());
let (run_at, set_run_at) = signal(String::new());
let (input_json, set_input_json) = signal(String::new());
let (error, set_error) = signal(Option::<String>::None);
let (submitting, set_submitting) = signal(false);

// Reset the form and prefill the input whenever a new job is selected.
Effect::new(move || {
if let Some(job) = invoke_job.get() {
let input_str = job
.input
.as_ref()
.map(|v| serde_json::to_string_pretty(v).unwrap_or_default())
.unwrap_or_default();
set_input_json.set(input_str);
set_mode.set("IMMEDIATE".to_string());
set_run_at.set(String::new());
set_error.set(None);
}
});

let endpoint = move || invoke_job.get().map(|j| j.endpoint.clone()).unwrap_or_default();
let job_id = move || invoke_job.get().map(|j| j.job_id.clone()).unwrap_or_default();
let source_trigger = move || invoke_job.get().map(|j| j.trigger.clone()).unwrap_or_default();

let on_submit = move |ev: leptos::ev::SubmitEvent| {
ev.prevent_default();
let oid = org_id.clone();
let wid = workspace_id.clone();
let ep = endpoint();
let jid = job_id();
let m = mode.get_untracked();
let ra = run_at.get_untracked();
let inp = input_json.get_untracked();
set_submitting.set(true);
set_error.set(None);

leptos::task::spawn_local(async move {
let input = if inp.trim().is_empty() {
None
} else {
match serde_json::from_str::<serde_json::Value>(&inp) {
Ok(v) => Some(v),
Err(e) => {
set_error.set(Some(format!("Invalid JSON input: {e}")));
set_submitting.set(false);
return;
}
}
};

// Re-using POST /v1/jobs: an ad-hoc run is a fresh IMMEDIATE/DELAYED job
// built from the source job's endpoint + (optionally overridden) input.
let scheduled = m == "SCHEDULED";
let trigger = if scheduled { "DELAYED" } else { "IMMEDIATE" };
let mut body = serde_json::json!({
"endpoint": ep,
"trigger": trigger,
"input": input,
});
let obj = body.as_object_mut().unwrap();

if scheduled {
if ra.trim().is_empty() {
set_error.set(Some("Run At is required for a scheduled invoke".into()));
set_submitting.set(false);
return;
}
let run_at_norm = normalize_run_at(&ra);
// DELAYED jobs require an idempotency key; derive a stable one from the
// source job and target time so a double-submit is naturally deduped.
let key = format!("adhoc-{jid}-{run_at_norm}");
obj.insert("run_at".into(), serde_json::Value::String(run_at_norm));
obj.insert("idempotency_key".into(), serde_json::Value::String(key));
}

match api::create_job(oid, wid, body).await {
Ok(_) => {
set_modal_open.set(false);
set_refresh.update(|c| *c += 1);
}
Err(e) => set_error.set(Some(e.to_string())),
}
set_submitting.set(false);
});
};

view! {
<form on:submit=on_submit class="space-y-4">
<Show when=move || error.get().is_some()>
<ErrorAlert message=error.get().unwrap_or_default() />
</Show>

<div class="text-sm text-gray-600">
"Run a one-off invocation of "
<code class="bg-gray-100 px-1.5 py-0.5 rounded text-xs">{move || endpoint()}</code>
". This creates a new job from the selected "
<span class="font-medium">{move || source_trigger()}</span>
" job's definition."
</div>

<div>
<label class="block text-sm font-medium text-gray-700 mb-1">"When"</label>
<select prop:value=move || mode.get()
on:change=move |ev| set_mode.set(event_target_value(&ev))
class="w-full px-3 py-2 border border-gray-300 rounded-lg text-sm focus:ring-2 focus:ring-blue-500 focus:border-blue-500 outline-none">
<option value="IMMEDIATE">"Immediately"</option>
<option value="SCHEDULED">"At a scheduled time"</option>
</select>
</div>

<Show when=move || mode.get() == "SCHEDULED">
<div>
<label class="block text-sm font-medium text-gray-700 mb-1">"Run At (treated as UTC)"</label>
<input type="datetime-local" prop:value=move || run_at.get()
on:input=move |ev| set_run_at.set(event_target_value(&ev))
class="w-full px-3 py-2 border border-gray-300 rounded-lg text-sm focus:ring-2 focus:ring-blue-500 focus:border-blue-500 outline-none" />
</div>
</Show>

<div>
<label class="block text-sm font-medium text-gray-700 mb-1">"Input (JSON, optional override)"</label>
<textarea prop:value=move || input_json.get()
on:input=move |ev| set_input_json.set(event_target_value(&ev))
class="w-full px-3 py-2 border border-gray-300 rounded-lg text-sm font-mono focus:ring-2 focus:ring-blue-500 focus:border-blue-500 outline-none"
rows="4" placeholder="{\"key\": \"value\"}"></textarea>
</div>

<div class="flex justify-end gap-3 pt-2">
<button type="button" on:click=move |_| set_modal_open.set(false)
class="px-4 py-2 border border-gray-300 text-gray-700 rounded-lg hover:bg-gray-50 text-sm font-medium transition-colors">"Cancel"</button>
<button type="submit" disabled=move || submitting.get()
class="px-4 py-2 bg-green-600 text-white rounded-lg hover:bg-green-700 disabled:opacity-50 text-sm font-medium transition-colors">
{move || if submitting.get() { "Invoking..." } else { "Invoke" }}
</button>
</div>
</form>
}
}

#[component]
fn JobStatusPanel(org_id: String, workspace_id: String, job_id: String) -> impl IntoView {
let oid = org_id.clone();
Expand Down Expand Up @@ -2110,3 +2280,17 @@ fn truncate_id(id: &str) -> String {
fn format_date(s: &str) -> String {
if s.len() >= 10 { s[..10].to_string() } else { s.to_string() }
}

// `<input type="datetime-local">` yields "YYYY-MM-DDTHH:MM" (and sometimes ":SS"),
// which is not valid RFC 3339. Normalize to a UTC timestamp the API can parse,
// treating the entered wall-clock time as UTC.
fn normalize_run_at(s: &str) -> String {
let s = s.trim();
if s.ends_with('Z') || s.contains('+') {
s.to_string()
} else if s.len() == 16 {
format!("{s}:00Z")
} else {
format!("{s}Z")
}
}