diff --git a/src/api.rs b/src/api.rs index 4874a90..06f7d41 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,4 +1,4 @@ -use crate::jobs::{Job, JobQueue, JobStatus, Quadrant, VideoQuadrantSelection, WebDavConfig}; +use crate::jobs::{Job, JobProgress, JobQueue, JobStatus, Quadrant, VideoQuadrantSelection, WebDavConfig}; use crate::webdav::WebDavClient; use anyhow::Result; use axum::{ @@ -89,6 +89,7 @@ pub async fn run_server(port: u16, data_dir: &str) -> Result<()> { .route("/api/jobs/{id}", patch(update_job)) .route("/api/jobs/pending", get(get_pending_job)) .route("/api/jobs/claim", post(claim_job)) + .route("/api/jobs/{id}/progress", patch(update_job_progress)) .route("/health", get(health_check)) // Static files for worker provisioning .route("/assets/worker", get(serve_worker_binary)) @@ -406,6 +407,55 @@ async fn claim_job( } } +#[derive(Debug, Deserialize)] +struct UpdateProgressRequest { + #[serde(default)] + frame: Option, + #[serde(default)] + total_frames: Option, + #[serde(default)] + time: Option, + #[serde(default)] + duration: Option, + #[serde(default)] + speed: Option, + #[serde(default)] + percent: Option, + #[serde(default)] + stage: Option, +} + +/// Update progress for a job +async fn update_job_progress( + State(state): State, + Path(id): Path, + Json(req): Json, +) -> Response { + let progress = JobProgress { + frame: req.frame, + total_frames: req.total_frames, + time: req.time, + duration: req.duration, + speed: req.speed, + percent: req.percent, + stage: req.stage, + }; + + let queue = state.queue.lock().unwrap(); + match queue.update_job_progress(&id, progress) { + Ok(job) => { + Json(job).into_response() + } + Err(e) => { + error!("Failed to update job progress: {}", e); + AppError { + status: StatusCode::NOT_FOUND, + message: format!("Job not found: {}", e), + }.into_response() + } + } +} + async fn serve_worker_binary() -> Response { // Serve the Linux worker binary from ./assets/worker-linux let path = "./assets/worker-linux"; diff --git a/src/hetzner.rs b/src/hetzner.rs index 6c90f79..e30f9e7 100644 --- a/src/hetzner.rs +++ b/src/hetzner.rs @@ -415,11 +415,31 @@ packages: - ffmpeg - wget +write_files: + - path: /etc/systemd/system/ffmpeg-worker.service + content: | + [Unit] + Description=FFmpeg GPC Worker + After=network.target + + [Service] + Type=simple + ExecStart=/opt/worker worker --queue-url {queue_url} + Restart=always + RestartSec=10 + StandardOutput=journal + StandardError=journal + + [Install] + WantedBy=multi-user.target + runcmd: - wget -O /root/gpc-bg.png {bg_image_url} - - wget -O /tmp/worker {binary_url} - - chmod +x /tmp/worker - - /tmp/worker worker --queue-url {queue_url} + - wget -O /opt/worker {binary_url} + - chmod +x /opt/worker + - systemctl daemon-reload + - systemctl enable ffmpeg-worker + - systemctl start ffmpeg-worker final_message: "FFmpeg worker is ready!" "# @@ -445,11 +465,31 @@ packages: ssh_authorized_keys: - {ssh_public_key} +write_files: + - path: /etc/systemd/system/ffmpeg-worker.service + content: | + [Unit] + Description=FFmpeg GPC Worker + After=network.target + + [Service] + Type=simple + ExecStart=/opt/worker worker --queue-url {queue_url} + Restart=always + RestartSec=10 + StandardOutput=journal + StandardError=journal + + [Install] + WantedBy=multi-user.target + runcmd: - wget -O /root/gpc-bg.png {bg_image_url} - - wget -O /tmp/worker {binary_url} - - chmod +x /tmp/worker - - /tmp/worker worker --queue-url {queue_url} + - wget -O /opt/worker {binary_url} + - chmod +x /opt/worker + - systemctl daemon-reload + - systemctl enable ffmpeg-worker + - systemctl start ffmpeg-worker final_message: "FFmpeg worker is ready!" "# diff --git a/src/jobs.rs b/src/jobs.rs index 1cca051..c39ea46 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -52,6 +52,24 @@ pub struct VideoQuadrantSelection { pub slides: Quadrant, } +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct JobProgress { + /// Current frame being processed + pub frame: Option, + /// Total frames (if known) + pub total_frames: Option, + /// Current time position in the video (e.g., "00:01:23.45") + pub time: Option, + /// Total duration (e.g., "00:10:00.00") + pub duration: Option, + /// Processing speed (e.g., "1.5x") + pub speed: Option, + /// Percentage complete (0-100) + pub percent: Option, + /// Current stage of processing + pub stage: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Job { pub id: String, @@ -65,6 +83,8 @@ pub struct Job { pub error: Option, pub worker_id: Option, pub webdav_config: WebDavConfig, + #[serde(default)] + pub progress: Option, } pub struct JobQueue { @@ -126,6 +146,7 @@ impl JobQueue { error: None, worker_id: None, webdav_config, + progress: None, }; jobs.push(job.clone()); @@ -204,6 +225,21 @@ impl JobQueue { None => Ok(None), } } + + /// Update progress for a job + pub fn update_job_progress(&self, job_id: &str, progress: JobProgress) -> Result { + let mut jobs = self.load_jobs()?; + let job = jobs + .iter_mut() + .find(|j| j.id == job_id) + .ok_or_else(|| anyhow::anyhow!("Job not found: {}", job_id))?; + + job.progress = Some(progress); + let job = job.clone(); + self.save_jobs(&jobs)?; + + Ok(job) + } } pub async fn run_worker(queue_url: String) -> Result<()> { @@ -235,7 +271,7 @@ pub async fn run_worker(queue_url: String) -> Result<()> { } async fn claim_job(queue_url: &str, worker_id: &str) -> Result> { - let url = format!("{}/api/jobs/claim", queue_url); + let url = format!("{}/jobs/claim", queue_url); info!("Claiming job at: {}", url); let client = reqwest::Client::new(); @@ -313,11 +349,21 @@ async fn process_job(job: Job) -> Result<()> { let local_output_path = format!("{}/output.mp4", temp_dir); info!("Local output path: {}", local_output_path); + // Report initial progress + if let Some(queue_url) = &job.webdav_config.queue_url { + let _ = update_job_progress_remote(queue_url, &job.id, JobProgress { + stage: Some("Starting FFmpeg".to_string()), + ..Default::default() + }).await; + } + info!("Starting FFmpeg (output to local file)..."); - // Run FFmpeg command - output to local file first - let ffmpeg_result = tokio::process::Command::new("ffmpeg") + // Run FFmpeg command with progress parsing + // Use -progress pipe:1 to get machine-readable progress on stdout + let mut child = tokio::process::Command::new("ffmpeg") .arg("-y") // Overwrite output + .arg("-progress").arg("pipe:1") // Output progress to stdout .arg("-i").arg(&video_url) // Input video (streaming from WebDAV) .arg("-i").arg(bg_image_path) // Background image .arg("-filter_complex").arg(&filter_complex) @@ -329,85 +375,147 @@ async fn process_job(job: Job) -> Result<()> { .arg("-threads").arg("0") .arg("-c:a").arg("copy") .arg(&local_output_path) - .output() - .await; - - match ffmpeg_result { - Ok(output) => { - let stdout = String::from_utf8_lossy(&output.stdout); - let stderr = String::from_utf8_lossy(&output.stderr); - info!("FFmpeg exit status: {}", output.status); - if !stdout.is_empty() { - info!("FFmpeg stdout: {}", stdout); - } - if !stderr.is_empty() { - info!("FFmpeg stderr: {}", stderr); + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn()?; + + // Parse progress from stdout + let stdout = child.stdout.take(); + let queue_url_clone = job.webdav_config.queue_url.clone(); + let job_id_clone = job.id.clone(); + + // Spawn a task to read and parse progress + let progress_handle = tokio::spawn(async move { + if let Some(stdout) = stdout { + use tokio::io::{AsyncBufReadExt, BufReader}; + let reader = BufReader::new(stdout); + let mut lines = reader.lines(); + + let mut current_frame: Option = None; + let mut current_time: Option = None; + let mut current_speed: Option = None; + let mut total_duration: Option = None; + let mut last_report = std::time::Instant::now(); + + while let Ok(Some(line)) = lines.next_line().await { + // Parse FFmpeg progress output format: + // frame=123 + // fps=30.0 + // out_time=00:00:05.123456 + // speed=1.5x + // progress=continue/end + + if let Some(value) = line.strip_prefix("frame=") { + current_frame = value.trim().parse().ok(); + } else if let Some(value) = line.strip_prefix("out_time=") { + // Format: 00:00:05.123456 - trim to 00:00:05 + let time = value.trim(); + if let Some(dot_pos) = time.rfind('.') { + current_time = Some(time[..dot_pos].to_string()); + } else { + current_time = Some(time.to_string()); + } + } else if let Some(value) = line.strip_prefix("speed=") { + current_speed = Some(value.trim().to_string()); + } else if let Some(value) = line.strip_prefix("duration=") { + total_duration = Some(value.trim().to_string()); + } else if line.starts_with("progress=") { + // End of a progress block - report to server (throttled) + if last_report.elapsed() >= std::time::Duration::from_secs(2) { + if let Some(queue_url) = &queue_url_clone { + let progress = JobProgress { + frame: current_frame, + total_frames: None, + time: current_time.clone(), + duration: total_duration.clone(), + speed: current_speed.clone(), + percent: None, // Could calculate from time/duration + stage: Some("Encoding".to_string()), + }; + let _ = update_job_progress_remote(queue_url, &job_id_clone, progress).await; + } + last_report = std::time::Instant::now(); + } + } } + } + }); - if output.status.success() { - info!("FFmpeg processing successful!"); + // Wait for FFmpeg to complete + let output = child.wait_with_output().await?; - // Check output file size - match fs::metadata(&local_output_path) { - Ok(meta) => info!("Output file size: {} bytes", meta.len()), - Err(e) => error!("Failed to stat output file: {}", e), - } + // Wait for progress parsing to finish + let _ = progress_handle.await; - // Now upload to WebDAV - info!("Reading output file for upload..."); - let output_data = fs::read(&local_output_path)?; - info!("Read {} bytes, uploading to WebDAV...", output_data.len()); - - let dav_client = WebDavClient::new(&job.webdav_config)?; - - // Create the output folder on WebDAV if needed - // job.output_path is like "processed/filename.mp4" - if let Some(folder_end) = job.output_path.rfind('/') { - let folder = &job.output_path[..folder_end]; - if !folder.is_empty() { - info!("Ensuring folder exists: {}", folder); - if let Err(e) = dav_client.ensure_folder_exists(folder).await { - warn!("Could not create folder {}: {} (may already exist)", folder, e); - } - } + let stderr = String::from_utf8_lossy(&output.stderr); + info!("FFmpeg exit status: {}", output.status); + if !stderr.is_empty() { + info!("FFmpeg stderr: {}", stderr); + } + + if output.status.success() { + info!("FFmpeg processing successful!"); + + // Report upload stage + if let Some(queue_url) = &job.webdav_config.queue_url { + let _ = update_job_progress_remote(queue_url, &job.id, JobProgress { + stage: Some("Uploading".to_string()), + ..Default::default() + }).await; + } + + // Check output file size + match fs::metadata(&local_output_path) { + Ok(meta) => info!("Output file size: {} bytes", meta.len()), + Err(e) => error!("Failed to stat output file: {}", e), + } + + // Now upload to WebDAV + info!("Reading output file for upload..."); + let output_data = fs::read(&local_output_path)?; + info!("Read {} bytes, uploading to WebDAV...", output_data.len()); + + let dav_client = WebDavClient::new(&job.webdav_config)?; + + // Create the output folder on WebDAV if needed + // job.output_path is like "processed/filename.mp4" + if let Some(folder_end) = job.output_path.rfind('/') { + let folder = &job.output_path[..folder_end]; + if !folder.is_empty() { + info!("Ensuring folder exists: {}", folder); + if let Err(e) = dav_client.ensure_folder_exists(folder).await { + warn!("Could not create folder {}: {} (may already exist)", folder, e); } + } + } - info!("Uploading to: {}", job.output_path); - match dav_client.upload_file(&job.output_path, output_data).await { - Ok(_) => { - info!("Upload successful!"); - info!("Job {} completed successfully", job.id); - - // Update job to completed via queue URL - if let Some(queue_url) = &job.webdav_config.queue_url { - info!("Updating job status to completed at: {}", queue_url); - match update_job_status_remote(queue_url, &job.id, JobStatus::Completed, None).await { - Ok(_) => info!("Status update successful"), - Err(e) => error!("Status update failed: {}", e), - } - } - } - Err(e) => { - error!("Upload FAILED: {}", e); - if let Some(queue_url) = &job.webdav_config.queue_url { - let _ = update_job_status_remote(queue_url, &job.id, JobStatus::Failed, None).await; - } + info!("Uploading to: {}", job.output_path); + match dav_client.upload_file(&job.output_path, output_data).await { + Ok(_) => { + info!("Upload successful!"); + info!("Job {} completed successfully", job.id); + + // Update job to completed via queue URL + if let Some(queue_url) = &job.webdav_config.queue_url { + info!("Updating job status to completed at: {}", queue_url); + match update_job_status_remote(queue_url, &job.id, JobStatus::Completed, None).await { + Ok(_) => info!("Status update successful"), + Err(e) => error!("Status update failed: {}", e), } } - } else { - error!("FFmpeg FAILED with exit code: {}", output.status); - + } + Err(e) => { + error!("Upload FAILED: {}", e); if let Some(queue_url) = &job.webdav_config.queue_url { let _ = update_job_status_remote(queue_url, &job.id, JobStatus::Failed, None).await; } } } - Err(e) => { - error!("Failed to run FFmpeg: {}", e); + } else { + error!("FFmpeg FAILED with exit code: {}", output.status); - if let Some(queue_url) = &job.webdav_config.queue_url { - let _ = update_job_status_remote(queue_url, &job.id, JobStatus::Failed, None).await; - } + if let Some(queue_url) = &job.webdav_config.queue_url { + let _ = update_job_status_remote(queue_url, &job.id, JobStatus::Failed, None).await; } } @@ -470,6 +578,24 @@ fn build_webdav_download_url(config: &WebDavConfig, path: &str) -> String { ) .replacen("://", &format!("://{}:{}@", encode(&config.username), encode(&config.password)), 1) } + +async fn update_job_progress_remote( + queue_url: &str, + job_id: &str, + progress: JobProgress, +) -> Result<()> { + let client = reqwest::Client::new(); + + client + .patch(format!("{}/jobs/{}/progress", queue_url, job_id)) + .json(&progress) + .send() + .await + .map_err(|e| anyhow::anyhow!("Failed to update job progress: {}", e))?; + + Ok(()) +} + async fn update_job_status_remote( queue_url: &str, job_id: &str, diff --git a/templates/index.html b/templates/index.html index 1e32724..bd1c060 100644 --- a/templates/index.html +++ b/templates/index.html @@ -265,6 +265,37 @@ .status-completed { background: #4ade8033; color: #4ade80; } .status-failed { background: #ef444433; color: #ef4444; } + .job-progress { + margin-top: 8px; + } + + .progress-bar { + height: 6px; + background: #1a1a2e; + border-radius: 3px; + overflow: hidden; + margin-bottom: 4px; + } + + .progress-bar-fill { + height: 100%; + background: linear-gradient(90deg, #4a9eff, #8b5cf6); + border-radius: 3px; + transition: width 0.3s ease; + } + + .progress-info { + display: flex; + justify-content: space-between; + font-size: 11px; + color: #888; + } + + .progress-stage { + color: #4a9eff; + font-weight: 500; + } + .loading { display: inline-block; width: 16px; @@ -627,15 +658,28 @@

${video.name}

} listEl.innerHTML = jobs.map(job => ` -
- ${job.status} -
-
${job.video_path.split('/').pop()}
-
${new Date(job.created_at).toLocaleString()}
-
-
- ${job.selection.presentation}/${job.selection.slides} +
+
+ ${job.status} +
+
${job.video_path.split('/').pop()}
+
${new Date(job.created_at).toLocaleString()}
+
+
+ ${job.selection.presentation}/${job.selection.slides} +
+ ${job.status === 'Processing' && job.progress ? ` +
+
+
+
+
+ ${job.progress.stage || 'Processing'} + ${job.progress.time || ''} ${job.progress.speed ? '@ ' + job.progress.speed : ''} +
+
+ ` : ''}
`).join(''); } @@ -700,12 +744,12 @@

${video.name}

if (e.key === 'ArrowLeft') prevImage({ stopPropagation: () => {} }); }); - // Auto-refresh jobs every 10 seconds + // Auto-refresh jobs every 3 seconds for progress updates setInterval(() => { if (!document.getElementById('jobs-tab').classList.contains('hidden')) { loadJobs(); } - }, 10000); + }, 3000);