diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..6892e8b --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,3 @@ +[build] +rustflags = ["-C", "target-cpu=native"] +#"-C", "target-feature=+crt-static" \ No newline at end of file diff --git a/backend/src/api/admin/admin_series_handlers.rs b/backend/src/api/admin/admin_series_handlers.rs index 9210bd7..84a210e 100644 --- a/backend/src/api/admin/admin_series_handlers.rs +++ b/backend/src/api/admin/admin_series_handlers.rs @@ -1,6 +1,6 @@ +use axum::Json; use axum::extract::{Path, Query, State}; use axum::http::StatusCode; -use axum::Json; use axum_core::__private::tracing::warn; use axum_core::response::{IntoResponse, Response}; use axum_extra::extract::Multipart; @@ -14,9 +14,9 @@ use crate::api::admin::{ }; use crate::api::extractor::{AdminOrHigherUser, SuperAdminUser}; use crate::builder::startup::AppState; -use crate::database::{NewSeriesData, Series, UpdateSeriesData}; +use crate::database::{NewSeriesData, Series, SeriesCheckTaskInfo, UpdateSeriesData}; +use crate::task_workers::check_series_worker::SeriesCheckJob; use crate::task_workers::repair_chapter_worker; -use crate::task_workers::series_check_worker::SeriesCheckJob; // Create new series pub async fn create_new_series_handler( @@ -69,10 +69,17 @@ pub async fn create_new_series_handler( } }; - // Crate and send job to worker via priority queue - let job = SeriesCheckJob { - series: fetch_new_series, + let series_task = SeriesCheckTaskInfo { + id: fetch_new_series.id, + title: fetch_new_series.title, + current_source_url: fetch_new_series.current_source_url, + source_website_host: fetch_new_series.source_website_host, + check_interval_minutes: fetch_new_series.check_interval_minutes, }; + + // Crate and send job to worker via priority queue + let job = SeriesCheckJob { series_task }; + if let Err(e) = state.worker_channels.series_check_tx.send(job).await { eprintln!( "Failed to send job to worker for series: {} {}", diff --git a/backend/src/database/chapters.rs b/backend/src/database/chapters.rs index 13d06d5..133b80f 100644 --- a/backend/src/database/chapters.rs +++ b/backend/src/database/chapters.rs @@ -1,16 +1,59 @@ use super::*; -/// Macros `sqlx::query!` -/// For DML operations (INSERT, UPDATE, DELETE) or SELECTs, -/// where you're manually processing generic `sqlx::Row`s (anonymous struct). -/// -/// Macros `sqlx::query_as!` -/// For mapping SELECT results directly to a defined rust struct (`#[derive(FromRow)]`), -/// recommended for structured data retrieval. -/// -/// Macros `sqlx::query_scalar!` -/// For queries returning a single value (one row, one column). -/// Highly efficient for this purpose. +// ========================================================================= +// Public Read Used by the API to display chapters to users +// ========================================================================= +impl DatabaseService { + pub async fn get_images_urls_for_chapter_series( + &self, + series_id: i32, + chapter_number: f32, + ) -> AnyhowResult> { + let urls = sqlx::query_scalar!( + r#" + SELECT ci.image_url + FROM chapter_images ci + JOIN series_chapters mc ON ci.chapter_id = mc.id + WHERE mc.series_id = $1 AND mc.chapter_number = $2 + ORDER BY ci.image_order ASC + "#, + series_id, + chapter_number, + ) + .fetch_all(&self.pool) + .await + .context("Failed to get images URLs for chapter series")?; + + Ok(urls) + } + + // Get chapters for a sepecific series + pub async fn get_chapters_by_series_id( + &self, + series_id: i32, + ) -> AnyhowResult> { + let chapters = sqlx::query_as!( + SeriesChapter, + r#" + SELECT id, series_id, chapter_number, status AS "status: _",title, source_url, created_at + FROM series_chapters + WHERE series_id = $1 + ORDER BY chapter_number + DESC + "#, + series_id + ) + .fetch_all(&self.pool) + .await + .context("Failed to query chapters by series ID with sqlx")?; + + Ok(chapters) + } +} + +// ========================================================================= +// Scraper Ingestion & Data Entry +// ========================================================================= impl DatabaseService { /// Adds a new chapter to the database and returns its new ID. /// This function assumes the chapter does not already exist (checked by source_url uniqueness). @@ -20,16 +63,22 @@ impl DatabaseService { chapter_number: f32, title: Option<&str>, source_url: &str, + chapter_status: ChapterStatus, ) -> AnyhowResult { let new_id = sqlx::query_scalar!( - "INSERT INTO series_chapters (series_id, chapter_number, title, source_url) - VALUES ($1, $2, $3, $4) - ON CONFLICT (source_url) DO UPDATE SET updated_at = NOW() + "INSERT INTO series_chapters (series_id, chapter_number, title, source_url, status) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (series_id, chapter_number) + DO UPDATE SET + updated_at = NOW(), + source_url = EXCLUDED.source_url, + status = EXCLUDED.status RETURNING id", series_id, chapter_number, title, source_url, + chapter_status as ChapterStatus ) .fetch_one(&self.pool) .await @@ -57,48 +106,65 @@ impl DatabaseService { Ok(new_id) } - pub async fn delete_chapter_and_images_for_chapter( - &self, - series_id: i32, - chapter_number: f32, - ) -> AnyhowResult { - // exclusive connection from the pool - let mut tx = self - .pool - .begin() - .await - .context("Failed to start transaction")?; - - let chapter_id_to_delete = sqlx::query_scalar!( - "SELECT id FROM series_chapters WHERE series_id = $1 AND chapter_number = $2", - series_id, - chapter_number, + pub async fn get_max_known_chapter(&self, series_id: i32) -> AnyhowResult { + let result = sqlx::query_scalar!( + r#" + SELECT MAX(chapter_number) + FROM series_chapters + WHERE series_id = $1 + "#, + series_id ) - .fetch_optional(&mut *tx) // Run query inside transaction + .fetch_one(&self.pool) .await - .context("Failed to get chapter ID to delete")?; - - if let Some(chapter_id) = chapter_id_to_delete { - sqlx::query!( - "DELETE FROM chapter_images WHERE chapter_id = $1", - chapter_id - ) - .execute(&mut *tx) - .await - .context("Failed to delete chapter images")?; + .context("Failed to get max known chapter number")?; - let result = sqlx::query!("DELETE FROM series_chapters WHERE id = $1", chapter_id) - .execute(&mut *tx) - .await - .context("Failed to delete chapter")?; + Ok(result.unwrap_or(0.0)) + } +} - // If transaction was successful, commit it - tx.commit().await.context("Failed to commit transaction")?; +// ========================================================================= +// Background Worker & Job Queue +// Handling "Processing" status, locking, and job distribution +// ========================================================================= +impl DatabaseService { + pub async fn find_and_lock_pending_chapters( + &self, + limit: i64, + ) -> AnyhowResult> { + let record = sqlx::query_as!( + DownloadJobData, + r#" + WITH locked_rows AS ( + SELECT id + FROM series_chapters + WHERE status = 'Processing' + ORDER BY created_at ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + UPDATE series_chapters sc + SET + status = 'Processing', + updated_at = NOW() + FROM locked_rows lr, series s + WHERE sc.id = lr.id AND sc.series_id = s.id + RETURNING + sc.id as chapter_id, + sc.chapter_number, + sc.source_url as chapter_url, + s.id as series_id, + s.title as series_title, + s.source_website_host as source_host, + s.current_source_url as series_url + "#, + limit + ) + .fetch_all(&self.pool) + .await + .context("Failed to deque pending chapters")?; - Ok(result.rows_affected()) - } else { - Ok(0) // No chapter found to delete - } + Ok(record) } pub async fn update_chapter_status( @@ -127,50 +193,4 @@ impl DatabaseService { } } } - - pub async fn get_images_urls_for_chapter_series( - &self, - series_id: i32, - chapter_number: f32, - ) -> AnyhowResult> { - let urls = sqlx::query_scalar!( - r#" - SELECT ci.image_url - FROM chapter_images ci - JOIN series_chapters mc ON ci.chapter_id = mc.id - WHERE mc.series_id = $1 AND mc.chapter_number = $2 - ORDER BY ci.image_order ASC - "#, - series_id, - chapter_number, - ) - .fetch_all(&self.pool) - .await - .context("Failed to get images URLs for chapter series")?; - - Ok(urls) - } - - // Get chapters for a sepecific series - pub async fn get_chapters_by_series_id( - &self, - series_id: i32, - ) -> AnyhowResult> { - let chapters = sqlx::query_as!( - SeriesChapter, - r#" - SELECT id, series_id, chapter_number, status AS "status: _",title, source_url, created_at - FROM series_chapters - WHERE series_id = $1 - ORDER BY chapter_number - DESC - "#, - series_id - ) - .fetch_all(&self.pool) - .await - .context("Failed to query chapters by series ID with sqlx")?; - - Ok(chapters) - } } diff --git a/backend/src/database/mod.rs b/backend/src/database/mod.rs index 96f2447..e30dec2 100644 --- a/backend/src/database/mod.rs +++ b/backend/src/database/mod.rs @@ -7,12 +7,13 @@ use serde::{Deserialize, Serialize}; use sqlx::{FromRow, PgPool, Type}; use url::Url; +pub mod admin_actions; pub mod auth; pub mod chapters; pub mod comments; pub mod series; -pub mod series_user_actions; pub mod storage; +pub mod user_actions; pub mod users; // Type alias for database connection pool @@ -93,6 +94,7 @@ pub struct Series { #[derive(Debug, Clone, PartialEq, Eq, sqlx::Type, Serialize, Deserialize)] #[sqlx(type_name = "chapter_status", rename_all = "PascalCase")] pub enum ChapterStatus { + Pending, Processing, Available, NoImagesFound, @@ -175,6 +177,32 @@ pub enum SeriesOrderBy { Rating, } +#[derive(Debug, Clone, FromRow)] +pub struct SeriesCheckTaskInfo { + pub id: i32, + pub title: String, + pub current_source_url: String, + pub source_website_host: String, + pub check_interval_minutes: i32, +} + +#[derive(Debug, FromRow)] +pub struct DownloadJobData { + pub series_id: i32, + pub series_title: String, + pub series_url: String, + pub source_host: String, + + pub chapter_id: i32, + pub chapter_number: f32, + pub chapter_url: String, +} + +#[derive(Debug, Clone, FromRow)] +pub struct SeriesDeletionJob { + pub id: i32, +} + #[derive(Debug, FromRow, Serialize, Deserialize)] pub struct CategoryTag { pub id: i32, @@ -312,6 +340,7 @@ pub struct CommentFlatRow { user_id: i32, user_username: String, user_avatar_url: Option, + user_role_id: i32, upvotes: i64, downvotes: i64, is_deleted: bool, @@ -324,6 +353,7 @@ pub struct CommentUser { pub id: i32, pub username: String, pub avatar_url: Option, + pub role_id: i32, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Type)] diff --git a/backend/src/database/series.rs b/backend/src/database/series.rs index 971a5c9..c57f461 100644 --- a/backend/src/database/series.rs +++ b/backend/src/database/series.rs @@ -1,198 +1,13 @@ -use anyhow::{anyhow, Context}; +use anyhow::Context; +use sqlx::postgres::types::PgInterval; use super::*; -/// Macros `sqlx::query!` -/// For DML operations (INSERT, UPDATE, DELETE) or SELECTs, -/// where you're manually processing generic `sqlx::Row` (anonymous struct). -/// -/// Macros `sqlx::query_as!` -/// For mapping SELECT results directly to a defined rust struct (`#[derive(FromRow)]`), -/// recommended for structured data retrieval. -/// -/// Macros `sqlx::query_scalar!` -/// For queries returning a single value (one row, one column). -/// Highly efficient for this purpose. +// ========================================================================= +// Public Read Operations +// High traffic, used by standard users/visitors. +// ========================================================================= impl DatabaseService { - pub async fn add_new_series(&self, data: &NewSeriesData<'_>) -> AnyhowResult { - let mut tx = self - .pool - .begin() - .await - .context("Failed to begin transaction")?; - - let host = get_host_from_url(Some(data.source_url)); - - let new_series_id = sqlx::query_scalar!( - r#" - INSERT INTO series - (title, original_title, description, cover_image_url, current_source_url, source_website_host, check_interval_minutes) - VALUES ($1, $2, $3, $4, $5, $6, $7) - RETURNING id"#, - data.title, - data.original_title, - data.description, - data.cover_image_url, - data.source_url, - host, - data.check_interval_minutes, - ) - .fetch_one(&mut *tx) - .await - .context("Failed to add series with sqlx")?; - - if let Some(author_names) = data.authors { - for name in author_names { - let author_id = sqlx::query_scalar!( - r#" - WITH ins AS( - INSERT INTO authors (name) - VALUES ($1) - ON CONFLICT (name) DO NOTHING - RETURNING id - ) - SELECT id FROM ins - UNION ALL - SELECT id FROM authors WHERE name = $1 - LIMIT 1 - "#, - name - ) - .fetch_one(&mut *tx) - .await - .context("Failed to find or create author with sqlx")?; - - sqlx::query!( - "INSERT INTO series_authors (series_id, author_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", - new_series_id, - author_id - ).execute(&mut *tx).await.context(format!("Failed to link author {} to ", name))?; - } - } - - if let Some(category_ids) = data.category_ids - && !category_ids.is_empty() - { - for &category_id in category_ids { - // Insert the relationship into the series_categories junction table. - sqlx::query!( - "INSERT INTO series_categories (series_id, category_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", - new_series_id, - category_id - ) - .execute(&mut *tx) - .await - .context(format!("Failed to link category {} to series", category_id))?; - } - } - - tx.commit().await.context("Failed to commit transaction")?; - - Ok(new_series_id) - } - - pub async fn update_series_metadata( - &self, - series_id: i32, - data: &UpdateSeriesData<'_>, - ) -> AnyhowResult { - let mut tx = self - .pool - .begin() - .await - .context("Failed to begin transaction")?; - - let host = get_host_from_url(data.source_url); - - let result = sqlx::query!( - "UPDATE series - SET - title = COALESCE($1, title), - original_title = COALESCE($2, original_title), - description = COALESCE($3, description), - cover_image_url = COALESCE($4, cover_image_url), - current_source_url = COALESCE($5, current_source_url), - source_website_host = COALESCE($6, source_website_host), - check_interval_minutes = COALESCE($7, check_interval_minutes), - updated_at = NOW() - WHERE id = $8", - data.title, - data.original_title, - data.description, - data.cover_image_url, - data.source_url, - host, - data.check_interval_minutes, - series_id - ) - .execute(&mut *tx) - .await - .context("Failed to update series with sqlx")?; - - if let Some(author_names) = data.authors { - sqlx::query!("DELETE FROM series_authors WHERE series_id = $1", series_id) - .execute(&mut *tx) - .await - .context("Failed to delete existing authors for series")?; - - for name in author_names { - let author_id = sqlx::query_scalar!( - r#" - WITH ins AS ( - INSERT INTO authors (name) VALUES ($1) - ON CONFLICT (name) DO NOTHING - RETURNING id - ) - SELECT id FROM ins - UNION ALL - SELECT id FROM authors WHERE name = $1 - LIMIT 1 - "#, - name - ) - .fetch_one(&mut *tx) - .await - .context(format!("Failed to find or create author: {}", name))?; - - sqlx::query!( - "INSERT INTO series_authors (series_id, author_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", - series_id, - author_id - ) - .execute(&mut *tx) - .await - .context(format!("Failed to link author {} to series", name))?; - } - } - - if let Some(category_ids) = data.category_ids { - sqlx::query!( - "DELETE FROM series_categories WHERE series_id = $1", - series_id - ) - .execute(&mut *tx) - .await - .context("Failed to delete existing categories for series")?; - - if !category_ids.is_empty() { - for category_id in category_ids { - sqlx::query!( - "INSERT INTO series_categories (series_id, category_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", - series_id, - category_id - ) - .execute(&mut *tx) - .await - .context(format!("Failed to link category {} to series", category_id))?; - } - } - } - - tx.commit().await.context("Failed to commit transaction")?; - - Ok(result.rows_affected()) - } - pub async fn get_series_by_id(&self, id: i32) -> AnyhowResult> { let series = sqlx::query_as!( Series, @@ -201,7 +16,8 @@ impl DatabaseService { source_website_host, views_count, bookmarks_count, total_rating_score, total_ratings_count, last_chapter_found_in_storage, processing_status as "processing_status: SeriesStatus", check_interval_minutes, last_checked_at, next_checked_at, created_at, updated_at - FROM series WHERE id = $1 + FROM series + WHERE id = $1 "#, id ) @@ -219,7 +35,8 @@ impl DatabaseService { source_website_host, views_count, bookmarks_count, total_rating_score, total_ratings_count, last_chapter_found_in_storage, processing_status as "processing_status: SeriesStatus", check_interval_minutes, last_checked_at, next_checked_at, created_at, updated_at - FROM series WHERE title = $1 + FROM series + WHERE title = $1 "#, title ) @@ -232,9 +49,11 @@ impl DatabaseService { // Get authors for a sepecific series pub async fn get_authors_by_series_id(&self, series_id: i32) -> AnyhowResult> { let authors_name = sqlx::query_scalar!( - r#"SELECT a.name FROM authors a + r#" + SELECT a.name FROM authors a JOIN series_authors sa ON a.id = sa.author_id - WHERE sa.series_id = $1"#, + WHERE sa.series_id = $1 + "#, series_id ) .fetch_all(&self.pool) @@ -264,217 +83,123 @@ impl DatabaseService { Ok(categories) } - // Get series search list for admin panel - pub async fn get_admin_paginated_series( - &self, - page: u32, - page_size: u32, - search_query: Option<&str>, - ) -> AnyhowResult> { - let page = page.max(1); - let limit = page_size as i64; - let offset = (page as i64 - 1) * limit; - - // Format search query, wraps the query in '%' to allow match substrings - // let formatted_search_query = search_query.map(|s| format!("%{}%", s)); - - #[derive(Debug, FromRow)] - struct QueryResult { - id: i32, - title: String, - original_title: Option, - description: String, - cover_image_url: String, - current_source_url: String, - updated_at: DateTime, - processing_status: SeriesStatus, - #[sqlx(json)] - authors: serde_json::Value, - total_items: Option, - } - - let record_list = match search_query.filter(|q| !q.trim().is_empty()) { - Some(search_match) => { - let search_match = search_match.trim(); - let similarity_threshold = 0.20_f32; - - sqlx::query_as!( - QueryResult, - r#" - WITH base_search AS ( - SELECT - s.id, s.title, s.original_title, s.description, s.cover_image_url, - s.current_source_url, s.updated_at, s.processing_status, - -- Calculate similarity score for ranking - similarity(s.title, $3) as sim_score - FROM series s - WHERE - s.title ILIKE '%' || $3 || '%' - OR - (s.title % $3 AND similarity(s.title, $3) >= $4) - ), - ranked_results AS ( - SELECT - *, - CASE - WHEN title ILIKE $3 THEN 10 - WHEN title ILIKE $3 || '%' THEN 8 - WHEN title ILIKE '%' || $3 || '%' THEN 6 - ELSE 4 - END as search_rank - FROM base_search - ), - total_count AS ( - SELECT COUNT(*) AS total FROM ranked_results - ) - SELECT - rr.id, rr.title, rr.original_title, rr.description, - rr.cover_image_url, rr.current_source_url, rr.updated_at, - rr.processing_status as "processing_status: SeriesStatus", - -- Aggregate author names into a JSON array for each series - COALESCE( - json_agg(a.name) FILTER (WHERE a.id IS NOT NULL), - '[]'::json - ) AS "authors!", - tc.total as total_items - FROM ranked_results rr - CROSS JOIN total_count tc - LEFT JOIN series_authors sa ON rr.id = sa.series_id - LEFT JOIN authors a ON sa.author_id = a.id - GROUP BY - rr.id, rr.title, rr.original_title, rr.description, rr.cover_image_url, - rr.current_source_url, rr.updated_at, rr.processing_status, - rr.search_rank, rr.sim_score, tc.total - -- Order by the best rank, then by similarity, then by ID for stable sorting - ORDER BY rr.search_rank DESC, rr.sim_score DESC, rr.id ASC - LIMIT $1 - OFFSET $2 - "#, - limit, - offset, - search_match, - similarity_threshold, - ) - .fetch_all(&self.pool) - .await - .context("Failed to query all series") - } - None => { - // No search - simple pagination - sqlx::query_as!( - QueryResult, - r#" - SELECT - s.id, s.title, s.original_title, s.description, s.cover_image_url, - s.current_source_url, s.updated_at, - s.processing_status as "processing_status: SeriesStatus", - COALESCE( - json_agg(a.name) FILTER (WHERE a.id IS NOT NULL), - '[]'::json - ) as "authors!", - COUNT(*) OVER () as total_items - FROM - series s - LEFT JOIN series_authors sa ON s.id = sa.series_id - LEFT JOIN authors a ON sa.author_id = a.id - GROUP BY s.id - ORDER BY s.updated_at DESC - LIMIT $1 OFFSET $2 - "#, - limit, - offset - ) - .fetch_all(&self.pool) - .await - .context("Failed to get paginated series without search") - } - }?; + pub async fn get_series_chapters_count(&self, series_id: i32) -> AnyhowResult { + let count = sqlx::query_scalar!( + "SELECT COUNT(*) FROM series_chapters WHERE series_id = $1", + series_id + ) + .fetch_one(&self.pool) + .await + .context("Failed to get series chapters count")?; - let total_items = record_list - .first() - .map_or(0, |row| row.total_items.unwrap_or(0)); + // It will return a row with 0, not NULL, even if no chapters exist + Ok(count.unwrap_or(0)) + } - let series_list = record_list - .into_iter() - .map(|r| SeriesWithAuthors { - id: r.id, - title: r.title, - original_title: r.original_title, - description: r.description, - cover_image_url: r.cover_image_url, - current_source_url: r.current_source_url, - processing_status: r.processing_status, - updated_at: r.updated_at, - authors: serde_json::from_value(r.authors).unwrap_or_default(), - }) - .collect(); + pub async fn get_list_all_categories(&self) -> AnyhowResult> { + let categories = sqlx::query_as!(CategoryTag, "SELECT id, name FROM categories") + .fetch_all(&self.pool) + .await + .context("Failed to list all categories with sqlx")?; - Ok(PaginatedResult { - items: series_list, - total_items, - }) + Ok(categories) } +} - /// Updates only the processing status of a series. - /// Marking a series as "scraping" or "error" without touching check schedules. - pub async fn update_series_processing_status( +// ========================================================================= +// System, Scraper & Background Worker +// Database locking, scheduling, and automated status updates. +// ========================================================================= +impl DatabaseService { + pub async fn find_and_lock_series_for_check( &self, - series_id: i32, - new_status: SeriesStatus, - ) -> AnyhowResult { - let result = sqlx::query!( - "UPDATE series SET processing_status = $1, updated_at = NOW() WHERE id = $2", - new_status as _, - series_id, + limit: i64, + ) -> AnyhowResult> { + let series = sqlx::query_as!( + SeriesCheckTaskInfo, + r#" + WITH candidate AS ( + SELECT id FROM series + WHERE + processing_status = $1 + AND next_checked_at <= NOW() + ORDER BY next_checked_at ASC + LIMIT $2 + FOR UPDATE SKIP LOCKED + ) + UPDATE series + SET processing_status = $3 + WHERE id IN (SELECT id FROM candidate) + RETURNING + id, + title, + current_source_url, + source_website_host, + check_interval_minutes + "#, + SeriesStatus::Ongoing as _, + limit, + SeriesStatus::Processing as _, ) - .execute(&self.pool) + .fetch_all(&self.pool) .await - .context("Failed to update series processing status with sqlx")?; + .context("Failed to find and lock series for check with sqlx")?; - Ok(result.rows_affected()) + Ok(series) } - // Called only if there's new valid content (new chapter) - pub async fn update_series_new_content_timestamp(&self, series_id: i32) -> AnyhowResult { - let result = sqlx::query!( - "UPDATE series SET updated_at = NOW() WHERE id = $1", - series_id, + pub async fn find_and_lock_series_for_job_deletion( + &self, + ) -> AnyhowResult> { + // If the row is already locked by another transaction, + // it will skip it and look for the next row. + let series = sqlx::query_as!( + SeriesDeletionJob, + r#" + WITH candidate AS ( + SELECT id FROM series + WHERE processing_status = $1 + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + UPDATE series + SET processing_status = $2 + WHERE id = (SELECT id FROM candidate) + RETURNING + id + "#, + SeriesStatus::PendingDeletion as _, + SeriesStatus::Deleting as _ ) - .execute(&self.pool) + .fetch_optional(&self.pool) .await - .context("Failed to update `updated_at` timestamp")?; + .context("Failed to find and lock series for job deletion with sqlx")?; - Ok(result.rows_affected()) + Ok(series) } - // Called after a series has been processed + // Called after a series has been checked/processed pub async fn update_series_check_schedule( &self, series_id: i32, - new_status: Option, + check_interval_minutes: i32, + new_status: SeriesStatus, new_next_checked_at: Option>, ) -> AnyhowResult { - // First, get the series data asynchronously. - let series = self - .get_series_by_id(series_id) - .await? - .ok_or_else(|| anyhow!("Series with id {} not found for schedule update", series_id))?; - // Calculate the next check time if not provided let final_next_checked_at = new_next_checked_at.unwrap_or_else(|| { let mut rng = rand::rng(); - let base_interval = series.check_interval_minutes as i64; + let base_interval = check_interval_minutes as i64; // Add a random +- 5 minutes jitter to avoid all series checking at the exact same time let random_jitter = rng.random_range(-300..=300); let actual_interval_secs = (base_interval * 60) + random_jitter; Utc::now() + chrono::Duration::seconds(actual_interval_secs.max(300)) }); - let final_status = new_status.unwrap_or(series.processing_status); - let result = sqlx::query!( - "UPDATE series SET processing_status = $1, last_checked_at = NOW(), next_checked_at = $2 WHERE id = $3", - final_status as _, + "UPDATE series + SET processing_status = $1, last_checked_at = NOW(), next_checked_at = $2 WHERE id = $3", + new_status as _, final_next_checked_at, series_id, ) @@ -484,13 +209,35 @@ impl DatabaseService { Ok(result.rows_affected()) } + /// Updates only the processing status of a series. + /// Marking a series as "scraping" or "error" without touching check schedules. + pub async fn update_series_processing_status( + &self, + series_id: i32, + new_status: SeriesStatus, + ) -> AnyhowResult { + let result = sqlx::query!( + "UPDATE series SET processing_status = $1, updated_at = NOW() WHERE id = $2", + new_status as _, + series_id, + ) + .execute(&self.pool) + .await + .context("Failed to update series processing status with sqlx")?; + + Ok(result.rows_affected()) + } + pub async fn update_series_last_chapter_found_in_storage( &self, series_id: i32, - chapter_number: Option, + chapter_number: f32, ) -> AnyhowResult { let result = sqlx::query!( - "UPDATE series SET last_chapter_found_in_storage = $1, updated_at = NOW() WHERE id = $2", + "UPDATE series + SET last_chapter_found_in_storage = GREATEST(COALESCE(last_chapter_found_in_storage, 0), $1), + updated_at = NOW() + WHERE id = $2", chapter_number, series_id, ).execute(&self.pool).await.context("Failed to update series last chapter found in storage with sqlx")?; @@ -498,17 +245,17 @@ impl DatabaseService { Ok(result.rows_affected()) } - pub async fn get_series_chapters_count(&self, series_id: i32) -> AnyhowResult { - let count = sqlx::query_scalar!( - "SELECT COUNT(*) FROM series_chapters WHERE series_id = $1", - series_id + // Called only if there's new valid content (new chapter) + pub async fn update_series_new_content_timestamp(&self, series_id: i32) -> AnyhowResult { + let result = sqlx::query!( + "UPDATE series SET updated_at = NOW() WHERE id = $1", + series_id, ) - .fetch_one(&self.pool) + .execute(&self.pool) .await - .context("Failed to get series chapters count")?; + .context("Failed to update `updated_at` timestamp")?; - // It will return a row with 0, not NULL, even if no chapters exist - Ok(count.unwrap_or(0)) + Ok(result.rows_affected()) } pub async fn get_image_keys_for_series_deletion( @@ -547,167 +294,22 @@ impl DatabaseService { })) } - pub async fn delete_series_by_id(&self, series_id: i32) -> AnyhowResult { - let mut tx = self - .pool - .begin() - .await - .context("Failed to start transaction for series deletion")?; - - let chapter_ids: Vec = sqlx::query_scalar!( - "SELECT id FROM series_chapters WHERE series_id = $1", - series_id - ) - .fetch_all(&mut *tx) - .await - .context("Failed to get chapter IDs for deletion")?; - - if !chapter_ids.is_empty() { - // Delete all image record for all chapters - sqlx::query!( - "DELETE FROM chapter_images WHERE chapter_id = ANY ($1)", - &chapter_ids - ) - .execute(&mut *tx) - .await - .context("Failed to delete chapter images")?; - } - - // Delete all chapter records - sqlx::query!( - "DELETE FROM series_chapters WHERE series_id = $1", - series_id - ) - .execute(&mut *tx) - .await - .context("Failed to delete series chapters")?; - - // Delete all author link records - sqlx::query!("DELETE FROM series_authors WHERE series_id = $1", series_id) - .execute(&mut *tx) - .await - .context("Failed to delete series-authors links")?; - - let result = sqlx::query!("DELETE FROM series WHERE id = $1", series_id) - .execute(&mut *tx) - .await - .context("Failed to delete series")?; - - tx.commit() - .await - .context("Failed to commit transaction for series deletion")?; - - Ok(result.rows_affected()) - } + // Query helper for delete old view logs + pub async fn cleanup_old_view_logs(&self) -> AnyhowResult { + let retention_interval = PgInterval { + months: 0, + days: 35, + microseconds: 0, + }; - pub async fn mark_series_for_deletion(&self, series_id: i32) -> AnyhowResult { let result = sqlx::query!( - "UPDATE series SET processing_status = $1, - updated_at = NOW() WHERE id = $2 AND processing_status NOT IN ($3, $4)", - SeriesStatus::PendingDeletion as _, - series_id, - SeriesStatus::PendingDeletion as _, - SeriesStatus::Deleting as _, + "DELETE FROM series_view_log WHERE viewed_at < NOW() - $1::interval", + retention_interval as _ ) .execute(&self.pool) .await - .context("Failed to mark series for deletion with sqlx")?; + .context("Failed to cleanup old view logs with sqlx")?; Ok(result.rows_affected()) } - - pub async fn find_and_lock_series_for_check(&self, limit: i64) -> AnyhowResult> { - let series = sqlx::query_as!( - Series, - r#" - WITH candidate AS ( - SELECT id FROM series - WHERE - processing_status = $1 - AND next_checked_at <= NOW() - ORDER BY next_checked_at ASC - LIMIT $2 - FOR UPDATE SKIP LOCKED - ) - UPDATE series - SET processing_status = $3 - WHERE id IN (SELECT id FROM candidate) - RETURNING - id, title, original_title, description, cover_image_url, current_source_url, source_website_host, - views_count, bookmarks_count, total_rating_score, total_ratings_count, last_chapter_found_in_storage, - processing_status as "processing_status: SeriesStatus", check_interval_minutes, last_checked_at, - next_checked_at, created_at, updated_at - "#, - SeriesStatus::Ongoing as _, - limit, - SeriesStatus::Processing as _, - ) - .fetch_all(&self.pool) - .await - .context("Failed to find and lock series for check with sqlx")?; - - Ok(series) - } - - pub async fn find_and_lock_series_for_job_deletion(&self) -> AnyhowResult> { - // If the row is already locked by another transaction, - // it will skip it and look for the next row. - let series = sqlx::query_as!( - Series, - r#" - WITH candidate AS ( - SELECT id FROM series - WHERE processing_status = $1 - LIMIT 1 - FOR UPDATE SKIP LOCKED - ) - UPDATE series - SET processing_status = $2 - WHERE id = (SELECT id FROM candidate) - RETURNING - id, title, original_title, description, cover_image_url, current_source_url, - source_website_host, views_count, bookmarks_count, total_rating_score, total_ratings_count, last_chapter_found_in_storage, - processing_status as "processing_status: SeriesStatus", check_interval_minutes, last_checked_at, - next_checked_at, created_at, updated_at - "#, - SeriesStatus::PendingDeletion as _, - SeriesStatus::Deleting as _ - ) - .fetch_optional(&self.pool) - .await - .context("Failed to find and lock series for job deletion with sqlx")?; - - Ok(series) - } - - pub async fn create_category_tag(&self, name: &str) -> AnyhowResult { - let category = sqlx::query_as!( - CategoryTag, - "INSERT INTO categories (name) VALUES ($1) RETURNING id, name", - name - ) - .fetch_one(&self.pool) - .await - .context("Failed to create category tag with sqlx")?; - - Ok(category) - } - - pub async fn delete_category_tag(&self, id: i32) -> AnyhowResult { - let result = sqlx::query!("DELETE FROM categories WHERE id = $1", id) - .execute(&self.pool) - .await - .context("Failed to delete category tag with sqlx")?; - - Ok(result.rows_affected()) - } - - pub async fn get_list_all_categories(&self) -> AnyhowResult> { - let categories = sqlx::query_as!(CategoryTag, "SELECT id, name FROM categories") - .fetch_all(&self.pool) - .await - .context("Failed to list all categories with sqlx")?; - - Ok(categories) - } } diff --git a/backend/src/processing/coordinator.rs b/backend/src/processing/coordinator.rs index b913cf0..14bd889 100644 --- a/backend/src/processing/coordinator.rs +++ b/backend/src/processing/coordinator.rs @@ -8,62 +8,15 @@ use tokio::task; use crate::common::utils::random_sleep_time; use crate::database::storage::StorageClient; -use crate::database::{ChapterStatus, DatabaseService, Series}; +use crate::database::{ChapterStatus, DatabaseService}; use crate::processing::image_encoding; use crate::scraping::model::SiteScrapingConfig; use crate::scraping::{fetcher, parser}; - -// Manage loop through a list of chapters and processes them one by one. -pub async fn process_series_chapters_from_list( - series_data: &Series, - chapters_to_process: &[parser::ChapterInfo], - http_client: &Client, - storage_client: Arc, - config: &SiteScrapingConfig, - db_service: &DatabaseService, -) -> Result> { - println!( - "[COORDINATOR] Starting batch processing for '{}'.", - series_data.title - ); - - let mut last_successfully_downloaded_chapter: Option = None; - - for chapter_info in chapters_to_process { - match process_single_chapter( - series_data, - chapter_info, - http_client, - storage_client.clone(), - config, - db_service, - ) - .await - { - Ok(Some(chapter_num)) => { - last_successfully_downloaded_chapter = Some(chapter_num); - } - Ok(None) => { - /* Chapter was processed but had no images, which is fine don't stop process */ - } - Err(e) => { - eprintln!( - "[COORDINATOR] Error processing chapter {}, stopping series: {}", - chapter_info.number, e - ); - // Decide if you want to stop the whole process on a single chapter failure - // break; - } - } - // Pause between scraping chapters - random_sleep_time(3, 6).await; - } - Ok(last_successfully_downloaded_chapter) -} +use crate::task_workers::chapter_download_worker::SeriesProcessingContext; // Process scraping and downloading single chapters pub async fn process_single_chapter( - series: &Series, + series_ctx: &SeriesProcessingContext, chapter_info: &parser::ChapterInfo, http_client: &Client, storage_client: Arc, @@ -76,15 +29,16 @@ pub async fn process_single_chapter( println!( "[COORDINATOR] Processing Chapter {} for '{}'...", - chapter_info.number, series.title + chapter_info.number, series_ctx.series_title ); let chapter_id = db_service .add_new_chapter( - series.id, + series_ctx.series_id, chapter_info.number, Some(&consistent_title), &chapter_info.url, + ChapterStatus::Processing, ) .await?; println!( @@ -111,7 +65,7 @@ pub async fn process_single_chapter( } let semaphore = Arc::new(Semaphore::new(2)); - let series_slug = slugify(&series.title); + let series_slug = slugify(&series_ctx.series_title); let mut processing_tasks = Vec::new(); // Process image @@ -238,12 +192,14 @@ pub async fn process_single_chapter( ) { // Complete chapter images (true, true) => { + // Update series status metadata db_service .update_chapter_status(chapter_id, ChapterStatus::Available) .await?; + // Update series content timestamp metadata if let Err(e) = db_service - .update_series_new_content_timestamp(series.id) + .update_series_new_content_timestamp(series_ctx.series_id) .await { eprintln!( @@ -251,6 +207,20 @@ pub async fn process_single_chapter( e ); } + + // Update series last chapter in storage metadata + db_service + .update_series_last_chapter_found_in_storage( + series_ctx.series_id, + chapter_info.number, + ) + .await?; + + println!( + "[COORDINATOR] Series '{}' metadata updated to chapter {}.", + series_ctx.series_title, chapter_info.number + ); + Ok(Some(chapter_info.number)) } // Partial/Incomplete chapter images diff --git a/backend/src/processing/orchestrator.rs b/backend/src/processing/orchestrator.rs index e7c6e3b..932cbcf 100644 --- a/backend/src/processing/orchestrator.rs +++ b/backend/src/processing/orchestrator.rs @@ -1,31 +1,34 @@ use std::env; use std::sync::Arc; -use anyhow::{anyhow, Result}; +use anyhow::{Result, anyhow}; use reqwest::Client; use url::Url; use crate::common::utils::random_sleep_time; use crate::database::storage::StorageClient; -use crate::database::{DatabaseService, Series}; +use crate::database::{ChapterStatus, DatabaseService, SeriesCheckTaskInfo}; use crate::processing::coordinator; use crate::scraping::fetcher; use crate::scraping::model::SitesConfig; use crate::scraping::parser::{ChapterInfo, ChapterParser}; +use crate::task_workers::chapter_download_worker::SeriesProcessingContext; use crate::task_workers::repair_chapter_worker::RepairChapterMsg; // The main "engine" for checking series and scraping task. // This function can be called from anywhere, including a background task. pub async fn run_series_check( - series: Series, + series_check_task: SeriesCheckTaskInfo, http_client: Client, db_service: &DatabaseService, sites_config: Arc, - storage_client: Arc, ) -> Result<()> { - println!("[SERIES CHECK] Starting for series: '{}'", series.title); + println!( + "[SERIES CHECK] Starting for series: '{}'", + series_check_task.title + ); - let host = &series.source_website_host; + let host = &series_check_task.source_website_host; let site_config = sites_config .get_site_config(host) .ok_or_else(|| anyhow!("No scraping config for host: {}", host))?; @@ -35,37 +38,45 @@ pub async fn run_series_check( println!( "[SERIES CHECK] Fetching series main page HTML from: {}", - series.current_source_url + series_check_task.current_source_url ); - let series_page_html = fetcher::fetch_html(&http_client, &series.current_source_url).await?; + let series_page_html = + fetcher::fetch_html(&http_client, &series_check_task.current_source_url).await?; + + let max_known_db_chapter = db_service + .get_max_known_chapter(series_check_task.id) + .await?; random_sleep_time(2, 5).await; // [Quick Check] Get latest chapter println!("[SERIES CHECK] Performing quick check, get latest chapter."); - let latest_site_chapter = chapter_parser - .quick_check_extract_latest_chapter_info(&series_page_html, &series.current_source_url)?; + let latest_site_chapter = chapter_parser.quick_check_extract_latest_chapter_info( + &series_page_html, + &series_check_task.current_source_url, + )?; - let last_db_chapter_number = series.last_chapter_found_in_storage.unwrap_or(0.0); let mut chapters_to_scrape: Vec = Vec::new(); let mut needs_full_scan = false; if let Some(latest_chapter) = latest_site_chapter { println!( - "[SERIES CHECK] Latest on site: {:.2}, latest in DB: {:.2}", - latest_chapter.number, last_db_chapter_number + "[SERIES CHECK] Latest on site: {:.2}, Max known in DB: {:.2}", + latest_chapter.number, max_known_db_chapter ); // If latest chapter on site > latest in DB, we need a full scan. - if latest_chapter.number > last_db_chapter_number { + if latest_chapter.number > max_known_db_chapter { println!("[SERIES CHECK] New chapter detected by Quick Check. Triggering full scan."); needs_full_scan = true; } else { // [Count Check] If no new chapter, check for backfills or deletions println!("[SERIES CHECK] Quick Check passed. Performing Count Check"); let site_chapter_count = chapter_parser.count_chapter_links(&series_page_html)?; - let db_chapter_count = db_service.get_series_chapters_count(series.id).await?; + let db_chapter_count = db_service + .get_series_chapters_count(series_check_task.id) + .await?; println!( "[SERIES CHECK] Chapter on site: {}, chapters in DB: {}", @@ -85,13 +96,15 @@ pub async fn run_series_check( // [Full Scan] Only run if triggered by one of the checks above. if needs_full_scan { println!("[SERIES CHECK] Run full scan"); - let all_available_chapters = chapter_parser - .full_scan_extract_all_chapter_info(&series_page_html, &series.current_source_url)?; + let all_available_chapters = chapter_parser.full_scan_extract_all_chapter_info( + &series_page_html, + &series_check_task.current_source_url, + )?; if all_available_chapters.is_empty() { println!( "[SERIES CHECK] Full scan found no chapters for '{}'.", - series.title + series_check_task.title ); return Ok(()); } @@ -104,14 +117,14 @@ pub async fn run_series_check( // Filter chapters that are actually new to avoid re-scraping on a syncronization. chapters_to_scrape = all_available_chapters .into_iter() - .filter(|ch_info| ch_info.number > last_db_chapter_number) + .filter(|ch_info| ch_info.number > max_known_db_chapter) .collect(); } if chapters_to_scrape.is_empty() { println!( "[SERIES CHECK] No new chapters to scrape for '{}'. All are up-to-date.", - series.title + series_check_task.title ); return Ok(()); } @@ -122,24 +135,23 @@ pub async fn run_series_check( ); // Start Scraping Process for Selected Chapters - let last_info_downloaded_chapter = coordinator::process_series_chapters_from_list( - &series, - &chapters_to_scrape, - &http_client, - storage_client, - site_config, - db_service, - ) - .await?; - - // Update series metadata in the database - if let Some(last_chapter_num) = last_info_downloaded_chapter { - db_service - .update_series_last_chapter_found_in_storage(series.id, Some(last_chapter_num)) + for ch_info in chapters_to_scrape { + let convert_chapter_number = ch_info.number.to_string().replace('.', "-"); + let consistent_title = format!("{}-eng", convert_chapter_number); + + let chapter_id = db_service + .add_new_chapter( + series_check_task.id, + ch_info.number, + Some(&consistent_title), + &ch_info.url, + ChapterStatus::Pending, + ) .await?; + println!( - "[BULK SCRAPE] Updated last local chapter for '{}' to {}.", - series.title, last_chapter_num + "[SERIES CHECK] Queueing Chapter {} (ID: {})", + ch_info.number, chapter_id ); } @@ -205,8 +217,10 @@ pub async fn repair_specific_chapter_series( number: msg.chapter_number, }; + let series_ctx: SeriesProcessingContext = (&series).into(); + coordinator::process_single_chapter( - &series, + &series_ctx, &chapter_info_to_scrape, &http_client, storage_client, diff --git a/backend/src/task_workers/channels.rs b/backend/src/task_workers/channels.rs index c5c3d4b..673de44 100644 --- a/backend/src/task_workers/channels.rs +++ b/backend/src/task_workers/channels.rs @@ -4,16 +4,17 @@ use arc_swap::ArcSwap; use reqwest::Client; use tokio::sync::mpsc; -use crate::database::storage::StorageClient; use crate::database::DatabaseService; +use crate::database::storage::StorageClient; use crate::scraping::model::SitesConfig; +use crate::task_workers::chapter_download_worker::run_chapter_download_worker; +use crate::task_workers::check_series_worker::{ + SeriesCheckJob, run_series_check_scheduler, run_series_check_worker, +}; use crate::task_workers::delete_password_reset_token_worker::run_cleanup_password_reset_token_worker; use crate::task_workers::delete_series_worker::{run_deletion_scheduler, run_deletion_worker}; use crate::task_workers::log_view_cleanup_worker::run_log_view_cleanup_worker; -use crate::task_workers::repair_chapter_worker::{run_repair_chapter_worker, RepairChapterMsg}; -use crate::task_workers::series_check_worker::{ - run_series_check_scheduler, run_series_check_worker, SeriesCheckJob, -}; +use crate::task_workers::repair_chapter_worker::{RepairChapterMsg, run_repair_chapter_worker}; #[derive(Clone)] pub struct OnDemandChannels { @@ -41,14 +42,25 @@ pub fn setup_worker_channels( tokio::spawn(run_series_check_worker( i, db_service.clone(), - storage_client.clone(), http_client.clone(), sites_config.clone(), rx_clone, )); } - // Delete series worker channels + // Download chapter worker channels + const DOWNLOAD_WORKER_COUNT: usize = 1; + for i in 0..DOWNLOAD_WORKER_COUNT { + tokio::spawn(run_chapter_download_worker( + i, + db_service.clone(), + storage_client.clone(), + http_client.clone(), + sites_config.clone(), + )); + } + + // Deletion worker channels let (deletion_tx, deletion_rx) = mpsc::channel(16); tokio::spawn(run_deletion_scheduler(db_service.clone(), deletion_tx)); diff --git a/backend/src/task_workers/chapter_download_worker.rs b/backend/src/task_workers/chapter_download_worker.rs new file mode 100644 index 0000000..e431714 --- /dev/null +++ b/backend/src/task_workers/chapter_download_worker.rs @@ -0,0 +1,154 @@ +use std::sync::Arc; +use std::time::Duration; + +use arc_swap::ArcSwap; +use reqwest::Client; +use tokio::time::MissedTickBehavior; + +use crate::database::storage::StorageClient; +use crate::database::{ + ChapterStatus, DatabaseService, DownloadJobData, Series, SeriesCheckTaskInfo, +}; +use crate::processing::coordinator::process_single_chapter; +use crate::scraping::model::SitesConfig; +use crate::scraping::parser::ChapterInfo; + +#[derive(Debug)] +pub struct SeriesProcessingContext { + pub series_id: i32, + pub series_title: String, + pub source_url: String, +} + +impl From<&Series> for SeriesProcessingContext { + fn from(series: &Series) -> Self { + Self { + series_id: series.id, + series_title: series.title.clone(), + source_url: series.current_source_url.clone(), + } + } +} + +impl From<&SeriesCheckTaskInfo> for SeriesProcessingContext { + fn from(task: &SeriesCheckTaskInfo) -> Self { + Self { + series_id: task.id, + series_title: task.title.clone(), + source_url: task.current_source_url.clone(), + } + } +} + +impl From<&DownloadJobData> for SeriesProcessingContext { + fn from(job: &DownloadJobData) -> Self { + Self { + series_id: job.series_id, + series_title: job.series_title.clone(), + source_url: job.series_url.clone(), + } + } +} + +pub async fn run_chapter_download_worker( + worker_id: usize, + db_service: DatabaseService, + storage_client: Arc, + http_client: Client, + sites_config: Arc>, +) { + println!("[DOWNLOAD-WORKER] Starting... Worker ID {}", worker_id); + + let mut interval = tokio::time::interval(Duration::from_secs(10)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { + interval.tick().await; + + let pending_jobs = match db_service.find_and_lock_pending_chapters(20).await { + Ok(jobs) => jobs, + Err(e) => { + eprintln!( + "[DOWNLOAD-WORKER] ID: {}. Failed to find and lock pending chapters: {}", + worker_id, e + ); + continue; + } + }; + + if pending_jobs.is_empty() { + continue; + } + + println!( + "[DOWNLOAD-SCHEDULER] Dispatching {} chapters to workers...", + pending_jobs.len() + ); + + for download_job in pending_jobs { + let storage_client = storage_client.clone(); + let db_service = db_service.clone(); + let http_client = http_client.clone(); + let sites_config = sites_config.clone(); + + tokio::spawn(async move { + let series_ctx: SeriesProcessingContext = (&download_job).into(); + + let chapter_info = ChapterInfo { + url: download_job.chapter_url.clone(), + number: download_job.chapter_number, + }; + + // Get config, if failed skip + let config_snapshot = sites_config.load(); + + let site_config = match config_snapshot.get_site_config(&download_job.source_host) { + Some(config) => config, + None => { + eprintln!( + "[DOWNLOAD-WORKER] No config for host: {}", + download_job.source_host + ); + + // Update status to error if no config available + if let Err(e) = db_service + .update_chapter_status(download_job.chapter_id, ChapterStatus::Error) + .await + { + eprintln!("[DOWNLOAD-WORKER] Failed to update status to Error: {}", e); + } + + return; + } + }; + + let result = process_single_chapter( + &series_ctx, + &chapter_info, + &http_client, + storage_client, + site_config, + &db_service, + ) + .await; + + if let Err(e) = result { + eprintln!( + "[DOWNLOAD-WORKER] Failed to process chapter {} (ID: {}): {}", + download_job.chapter_number, download_job.chapter_id, e + ); + + if let Err(db_error) = db_service + .update_chapter_status(download_job.chapter_id, ChapterStatus::Error) + .await + { + eprintln!( + "[DOWNLOAD-WORKER] Double Fault: Failed to save error status: {}", + db_error + ); + } + } + }); + } + } +} diff --git a/backend/src/task_workers/series_check_worker.rs b/backend/src/task_workers/check_series_worker.rs similarity index 69% rename from backend/src/task_workers/series_check_worker.rs rename to backend/src/task_workers/check_series_worker.rs index ff1f02a..07458c3 100644 --- a/backend/src/task_workers/series_check_worker.rs +++ b/backend/src/task_workers/check_series_worker.rs @@ -3,15 +3,15 @@ use std::time::Duration; use arc_swap::ArcSwap; use reqwest::Client; +use tokio::time::MissedTickBehavior; -use crate::database::storage::StorageClient; -use crate::database::{DatabaseService, Series, SeriesStatus}; +use crate::database::{DatabaseService, SeriesCheckTaskInfo, SeriesStatus}; use crate::processing::orchestrator; use crate::scraping::model::SitesConfig; #[derive(Debug)] pub struct SeriesCheckJob { - pub series: Series, + pub series_task: SeriesCheckTaskInfo, } // Scheduler for pooling DB @@ -19,10 +19,11 @@ pub async fn run_series_check_scheduler( db_service: DatabaseService, job_sender: async_channel::Sender, ) { - println!("[SERIES-SCHEDULER] Starting..."); + println!("[SERIES-SCHEDULER] Scanning database for series updates..."); + + // Run check every 60 seconds + let mut interval = tokio::time::interval(Duration::from_secs(60)); - // Interval to check db for job - let mut interval = tokio::time::interval(Duration::from_secs(30)); // Skip first tick interval.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -33,21 +34,23 @@ pub async fn run_series_check_scheduler( match db_service.find_and_lock_series_for_check(20).await { Ok(series_list) => { if series_list.is_empty() { - // If no job, waiting fot the next interval tick + // No series found, wait for next tick break; } - println!( - "[SERIES-SCHEDULER] Found batch of {} series to check", - series_list.len() - ); + for series_task in series_list { + println!( + "[SERIES-SCHEDULER] Found series for check {}, id {}", + series_task.title, series_task.id + ); - for series in series_list { - let job = SeriesCheckJob { series }; + let job = SeriesCheckJob { series_task }; // Send worker queue // If queue full, will wait (backpressure) until worker empty if job_sender.send(job).await.is_err() { - eprintln!("[SERIES-SCHEDULER] CRITICAL: Channel closed."); + eprintln!( + "[SERIES-SCHEDULER] CRITICAL: Job channel closed. Worker may have panicked." + ); return; } } @@ -64,7 +67,6 @@ pub async fn run_series_check_scheduler( pub async fn run_series_check_worker( worker_id: usize, db_service: DatabaseService, - storage_client: Arc, http_client: Client, sites_config: Arc>, job_receiver: async_channel::Receiver, @@ -72,18 +74,17 @@ pub async fn run_series_check_worker( println!("[SERIES-WORKER {}] Starting...", worker_id); while let Ok(job) = job_receiver.recv().await { - let series = job.series; + let series_task = job.series_task; println!( "[SERIES-WORKER] Checking series {}, id {}", - series.title, series.id + series_task.title, series_task.id ); let result = orchestrator::run_series_check( - series.clone(), + series_task.clone(), http_client.clone(), &db_service, sites_config.load().clone(), - storage_client.clone(), ) .await; @@ -91,7 +92,7 @@ pub async fn run_series_check_worker( let (final_status, next_check_time) = if let Err(e) = result { eprintln!( "[SERIES-WORKER] Error checking series {}:{}. Retrying later: {}", - series.title, series.id, e + series_task.title, series_task.id, e ); // If failed, retry again after 1 hour ( @@ -104,12 +105,17 @@ pub async fn run_series_check_worker( }; if let Err(e) = db_service - .update_series_check_schedule(series.id, Some(final_status), next_check_time) + .update_series_check_schedule( + series_task.id, + series_task.check_interval_minutes, + final_status, + next_check_time, + ) .await { eprintln!( "[SERIES-WORKER] CRITICAL: Failed to update schedule for series {}: {}", - series.id, e + series_task.id, e ); } } diff --git a/backend/src/task_workers/delete_series_worker.rs b/backend/src/task_workers/delete_series_worker.rs index ebb6a37..2a021d3 100644 --- a/backend/src/task_workers/delete_series_worker.rs +++ b/backend/src/task_workers/delete_series_worker.rs @@ -4,13 +4,14 @@ use std::time::Duration; use anyhow::Context; use backon::{BackoffBuilder, Retryable}; use tokio::sync::mpsc; +use tokio::time::MissedTickBehavior; use crate::database::storage::StorageClient; -use crate::database::{DatabaseService, Series, SeriesStatus}; +use crate::database::{DatabaseService, SeriesDeletionJob, SeriesStatus}; #[derive(Debug, Clone)] pub struct DeletionJob { - series: Series, + series: SeriesDeletionJob, } // Scheduler to pool database for deletion jobs @@ -23,7 +24,7 @@ pub async fn run_deletion_scheduler( // Interval pooling let mut interval = tokio::time::interval(Duration::from_secs(180)); // Skip frist tick - interval.tick().await; + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { interval.tick().await; @@ -41,7 +42,7 @@ pub async fn run_deletion_scheduler( eprintln!( "[DELETION-WORKER] CRITICAL: Receiver channel closed. Shutting down." ); - break; + return; } } Ok(None) => { @@ -141,31 +142,37 @@ async fn execute_full_deletion( storage_client: Arc, ) -> anyhow::Result<()> { // Get all image keys - let image_keys = db_service + let deletion_image_keys = db_service .get_image_keys_for_series_deletion(series_id) .await - .context("Failed to get image keys") - // If no series found, assume no images - .unwrap_or_default(); - - let keys_to_delete: Vec = image_keys - .iter() - .flat_map(|keys| keys.all_urls()) - .filter_map(|url| storage_client.extract_object_key_from_url(url)) - .collect(); - - if !keys_to_delete.is_empty() { - storage_client - .delete_image_objects(&keys_to_delete) - .await - .context("Failed to delete image objects")?; + .context("Failed to get image keys from DB")?; + + if let Some(deletion_keys) = deletion_image_keys { + let keys_to_delete: Vec = deletion_keys + .all_urls() + .filter_map(|url| storage_client.extract_object_key_from_url(url)) + .collect(); + + if !keys_to_delete.is_empty() { + storage_client + .delete_image_objects(&keys_to_delete) + .await + .context("Failed to delete image objects")?; + } } - db_service + let rows_deleted = db_service .delete_series_by_id(series_id) .await .context("Failed to delete series")?; + if rows_deleted == 0 { + println!( + "[DELETION-WORKER] Series {} was not found in DB (rows affected: 0), assuming already deleted.", + series_id + ); + } + println!( "[WORKER] Successfully processed and deleted series {}", series_id diff --git a/backend/src/task_workers/log_view_cleanup_worker.rs b/backend/src/task_workers/log_view_cleanup_worker.rs index d634b1e..87b40eb 100644 --- a/backend/src/task_workers/log_view_cleanup_worker.rs +++ b/backend/src/task_workers/log_view_cleanup_worker.rs @@ -6,7 +6,7 @@ pub async fn run_log_view_cleanup_worker(db_service: DatabaseService) { let log_cleanup = async { let scheduler = JobScheduler::new().await?; let db_clone = db_service.clone(); - let cron_exp = "0 0 2 * * * *"; + let cron_exp = "0 0 */6 * * * *"; let cleanup_job = Job::new_async(cron_exp, move |_uuid, _locked| { let db = db_clone.clone(); diff --git a/backend/src/task_workers/mod.rs b/backend/src/task_workers/mod.rs index 152848c..76887bf 100644 --- a/backend/src/task_workers/mod.rs +++ b/backend/src/task_workers/mod.rs @@ -1,6 +1,7 @@ pub mod channels; +pub mod chapter_download_worker; +pub mod check_series_worker; pub mod delete_password_reset_token_worker; pub mod delete_series_worker; pub mod log_view_cleanup_worker; pub mod repair_chapter_worker; -pub mod series_check_worker; diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 7e66442..09e8e70 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ [toolchain] -channel = "nightly-2026-01-08" +channel = "nightly-2026-01-07" #channel = "stable" -components = [] \ No newline at end of file +components = []