diff --git a/CHANGELOG.md b/CHANGELOG.md index b17f77d..f5101e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [1.0.0-rc.7] - 2026-05-07 + +- bump: to v1.0.0-rc.8 (#78) - feat: idempotency for tasks (#77) ## [1.0.0-rc.7] - 2026-04-07 diff --git a/Cargo.lock b/Cargo.lock index a80bda5..5dbca3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -85,7 +85,7 @@ dependencies = [ [[package]] name = "apalis-sqlite" -version = "1.0.0-rc.7" +version = "1.0.0-rc.8" dependencies = [ "apalis", "apalis-codec", @@ -93,7 +93,6 @@ dependencies = [ "apalis-sql", "apalis-workflow", "async-std", - "chrono", "futures", "futures-util", "log", @@ -1084,9 +1083,9 @@ checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" [[package]] name = "js-sys" -version = "0.3.97" +version = "0.3.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1840c94c045fbcf8ba2812c95db44499f7c64910a912551aaaa541decebcacf" +checksum = "67df7112613f8bfd9150013a0314e196f4800d3201ae742489d999db2f979f08" dependencies = [ "cfg-if", "futures-util", @@ -2538,9 +2537,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" [[package]] name = "wasm-bindgen" -version = "0.2.120" +version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df52b6d9b87e0c74c9edfa1eb2d9bf85e5d63515474513aa50fa181b3c4f5db1" +checksum = "49ace1d07c165b0864824eee619580c4689389afa9dc9ed3a4c75040d82e6790" dependencies = [ "cfg-if", "once_cell", @@ -2551,9 +2550,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.70" +version = "0.4.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af934872acec734c2d80e6617bbb5ff4f12b052dd8e6332b0817bce889516084" +checksum = "96492d0d3ffba25305a7dc88720d250b1401d7edca02cc3bcd50633b424673b8" dependencies = [ "js-sys", "wasm-bindgen", @@ -2561,9 +2560,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.120" +version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78b1041f495fb322e64aca85f5756b2172e35cd459376e67f2a6c9dffcedb103" +checksum = "8e68e6f4afd367a562002c05637acb8578ff2dea1943df76afb9e83d177c8578" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2571,9 +2570,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.120" +version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dcd0ff20416988a18ac686d4d4d0f6aae9ebf08a389ff5d29012b05af2a1b41" +checksum = "d95a9ec35c64b2a7cb35d3fead40c4238d0940c86d107136999567a4703259f2" dependencies = [ "bumpalo", "proc-macro2", @@ -2584,9 +2583,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.120" +version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49757b3c82ebf16c57d69365a142940b384176c24df52a087fb748e2085359ea" +checksum = "c4e0100b01e9f0d03189a92b96772a1fb998639d981193d7dbab487302513441" dependencies = [ "unicode-ident", ] diff --git a/Cargo.toml b/Cargo.toml index d342938..b84cc21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apalis-sqlite" -version = "1.0.0-rc.7" +version = "1.0.0-rc.8" authors = ["Njuguna Mureithi "] readme = "README.md" edition = "2024" @@ -44,9 +44,8 @@ ulid = { version = "1", features = ["serde"] } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } apalis = { version = "1.0.0-rc.9" } apalis-workflow = { version = "0.1.0-rc.9" } -apalis-codec = { version = "0.1.0-rc.9", features = ["msgpack"]} +apalis-codec = { version = "0.1.0-rc.9", features = ["msgpack"] } futures-util = "0.3.31" -chrono = "0.4" [package.metadata.docs.rs] # defines the configuration attribute `docsrs` diff --git a/README.md b/README.md index 4fa17b2..1c678cf 100644 --- a/README.md +++ b/README.md @@ -1,26 +1,47 @@ # apalis-sqlite -Background task processing for Rust using `apalis` and `sqlite`. +[![Crates.io](https://img.shields.io/crates/v/apalis-sqlite.svg)](https://crates.io/crates/apalis-sqlite) +[![Docs.rs](https://docs.rs/apalis-sqlite/badge.svg)](https://docs.rs/apalis-sqlite) +[![License: MIT OR Apache-2.0](https://img.shields.io/badge/license-MIT%20%2F%20Apache--2.0-blue.svg)](#license) +[![Build Status](https://img.shields.io/github/actions/workflow/status/apalis-dev/apalis-sqlite/ci.yml?branch=main)](https://github.com/apalis-dev/apalis-sqlite/actions) + +> [Background task processing for Rust](https://apalis.dev/), powered by [`apalis`](https://crates.io/crates/apalis) and [SQLite](https://www.sqlite.org/). + +--- ## Features -- **Reliable job queue** using SQLite as the backend. -- **Multiple storage types**: standard polling and event-driven (hooked) storage. -- **Custom codecs** for serializing/deserializing job arguments as bytes. -- **Heartbeat and orphaned job re-enqueueing** for robust job processing. -- **Integration with `apalis` workers and middleware.** +- **Reliable job queue** — SQLite-backed persistence survives process restarts; no external broker needed. +- **Durable execution** — jobs are written to disk before processing begins, so a crash mid-flight won't silently drop work. +- **Automatic retries** — configure `max_attempts` per task; failed jobs are re-enqueued with backoff and never lost. +- **Heartbeat & orphan recovery** — workers emit periodic heartbeats; jobs held by a dead worker are automatically re-enqueued. +- **Scheduled & delayed jobs** — use `run_after` to enqueue work that should only execute at or after a future point in time. +- **Priority queues** — assign integer priorities so high-urgency jobs are always picked up first. +- **Multiple polling strategies** — choose between standard polling and event-driven (hooked) storage to trade latency for CPU usage. +- **Multi-step workflows** — chain async steps into pipelines with `apalis-workflow`; each stage only runs if the previous one succeeds. +- **Shared storage** — multiplex multiple job types over a single SQLite connection. +- **Custom codecs** — pluggable serialization/deserialization of job payloads as raw bytes. +- **First-class `apalis` integration** — works seamlessly with workers, tower layers, and middleware. + +--- ## Storage Types -- [`SqliteStorage`]: Standard polling-based storage. -- [`SqliteStorageWithHook`]: Event-driven storage using SQLite update hooks for low-latency job fetching. -- [`SharedSqliteStorage`]: Shared storage for multiple job types. +| Type | Description | +| --------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------- | +| [`SqliteStorage`](https://docs.rs/apalis-sqlite/latest/apalis_sqlite/struct.SqliteStorage.html) | Standard polling-based storage. | +| [`SqliteStorageWithHook`](https://docs.rs/apalis-sqlite/latest/apalis_sqlite/struct.SqliteStorageWithHook.html) | Event-driven storage using SQLite update hooks for low-latency job fetching. | +| [`SharedSqliteStorage`](https://docs.rs/apalis-sqlite/latest/apalis_sqlite/struct.SharedSqliteStorage.html) | Shared storage supporting multiple job types over a single connection. | -The naming is designed to clearly indicate the storage mechanism and its capabilities, but under the hood the result is the `SqliteStorage` struct with different configurations. +> All types are built on top of `SqliteStorage` with different configurations applied under the hood. + +--- ## Examples -### Basic Worker Example +### Basic Worker + +Set up a pool, push jobs, and run a worker: ```rust,no_run use std::time::Duration; @@ -38,12 +59,11 @@ async fn main() { let mut start = 0; let mut items = stream::repeat_with(move || { start += 1; - let task = Task::builder(start) + Task::builder(start) .run_after(Duration::from_secs(1)) .priority(1) .max_attempts(5) - .build(); - task + .build() }) .take(10); backend.push_all(&mut items).await.unwrap(); @@ -59,7 +79,9 @@ async fn main() { } ``` -### Hooked Worker Example (Event-driven) +### Event-Driven Worker (Hooked) + +Use SQLite update hooks to react to new jobs with minimal latency: ```rust,no_run use std::time::Duration; @@ -97,7 +119,6 @@ async fn main() { .take(20) .collect::>() .await; - /// Push with just a `Pool` apalis_sqlite::sink::push_tasks(pool, config, items).await.unwrap(); } }); @@ -113,7 +134,12 @@ async fn main() { } ``` -### Workflow Example +### Sequential Workflow (Order Fulfilment Pipeline) + +Chain async steps to model a real-world order processing pipeline — validating payment, +reserving inventory, dispatching a shipment notification, and emailing the customer. +Each stage only runs if the previous one succeeds; a failure at any step returns an +error and the job can be retried from the beginning. ```rust,no_run use std::time::Duration; @@ -121,38 +147,147 @@ use std::time::Duration; use apalis::prelude::*; use apalis_sqlite::*; use apalis_workflow::*; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct Order { + id: u64, + customer_email: String, + items: Vec, + total_cents: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct ChargedOrder { + order_id: u64, + customer_email: String, + transaction_id: String, + items: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct StockedItem { + order_id: u64, + sku: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct DispatchedOrder { + order_id: u64, + customer_email: String, + tracking_number: String, + item_count: usize, +} + +/// Simulates an inventory check — returns `None` for out-of-stock SKUs so +async fn check_stock(item: StockedItem) -> Option { + let out_of_stock = ["WIDGET-C", "WIDGET-D"]; + if out_of_stock.contains(&item.sku.as_str()) { + eprintln!("[{}] {} is out of stock — skipping", item.order_id, item.sku); + None + } else { + Some(item) + } +} #[tokio::main] async fn main() { - let workflow = Workflow::new("odd-numbers-workflow") - .and_then(|a: usize| async move { - Ok::, BoxDynError>((0..=a).collect::>()) + let workflow = Workflow::new("order-fulfilment") + // Step 1: Charge payment and return a record of the charged order. + .and_then(|order: Order| async move { + println!( + "[{}] Charging ${:.2} for {} item(s)...", + order.id, + order.total_cents as f64 / 100.0, + order.items.len(), + ); + // Call your payment provider (e.g. Stripe) here. + Ok::(ChargedOrder { + order_id: order.id, + customer_email: order.customer_email, + transaction_id: format!("txn_{}", order.id), + items: order.items, + }) }) - .filter_map(|x: usize| async move { - if x % 2 != 0 { Some(x) } else { None } + .and_then(|charged: ChargedOrder| async move { + let stocked = charged + .items + .into_iter() + .map(|sku| StockedItem { order_id: charged.order_id, sku }) + .collect::>(); + Ok::<(ChargedOrder, Vec), BoxDynError>(( + ChargedOrder { items: vec![], ..charged }, + stocked, + )) }) - .filter_map(|x: usize| async move { - if x % 3 != 0 { Some(x) } else { None } + .and_then(|(charged, items): (ChargedOrder, Vec)| async move { + let available = futures::future::join_all(items.into_iter().map(check_stock)) + .await + .into_iter() + .flatten() + .collect::>(); + + if available.is_empty() { + return Err("No items available to fulfil this order".into()); + } + + println!( + "[{}] {} item(s) confirmed in stock", + charged.order_id, + available.len() + ); + Ok::<(ChargedOrder, Vec), BoxDynError>((charged, available)) }) - .filter_map(|x: usize| async move { - if x % 5 != 0 { Some(x) } else { None } + // Step 3: Brief pause before handing off to the courier API. + .delay_for(Duration::from_millis(500)) + // Step 4: Dispatch the shipment and obtain a tracking number. + .and_then(|(charged, items): (ChargedOrder, Vec)| async move { + let tracking_number = format!("TRACK-{}", charged.order_id); + println!( + "[{}] Dispatched {} item(s) — tracking: {}", + charged.order_id, + items.len(), + tracking_number, + ); + // Call your courier API (e.g. EasyPost, Shippo) here. + Ok::(DispatchedOrder { + order_id: charged.order_id, + customer_email: charged.customer_email, + tracking_number, + item_count: items.len(), + }) }) - .delay_for(Duration::from_millis(1000)) - .and_then(|a: Vec| async move { - println!("Sum: {}", a.iter().sum::()); + // Step 5: Notify the customer by email. + .and_then(|dispatched: DispatchedOrder| async move { + println!( + "[{}] Emailing {} — your {} item(s) are on the way! ({})", + dispatched.order_id, + dispatched.customer_email, + dispatched.item_count, + dispatched.tracking_number, + ); + // Send a transactional email (e.g. Resend, SendGrid) here. Ok::<(), BoxDynError>(()) }); let pool = SqlitePool::connect(":memory:").await.unwrap(); SqliteStorage::setup(&pool).await.unwrap(); - let mut sqlite = SqliteStorage::new_in_queue(&pool, "test-workflow"); - - sqlite.push_start(100usize).await.unwrap(); + let mut sqlite = SqliteStorage::new_in_queue(&pool, "order-fulfilment"); + + sqlite + .push_start(Order { + id: 1001, + customer_email: "alice@example.com".into(), + items: vec!["WIDGET-A".into(), "WIDGET-B".into(), "WIDGET-C".into()], + total_cents: 7499, + }) + .await + .unwrap(); - let worker = WorkerBuilder::new("rango-tango") + let worker = WorkerBuilder::new("fulfilment-worker") .backend(sqlite) .on_event(|ctx, ev| { - println!("On Event = {:?}", ev); + println!("Event: {:?}", ev); if matches!(ev, Event::Error(_)) { ctx.stop().unwrap(); } @@ -163,27 +298,86 @@ async fn main() { } ``` -### Shared Example +### DAG Workflow -Full support for sharing the same connection. This example shows how to run multiple types with one function +Chain async steps to model a real-world ETL pipeline which runs multiple steps concurrently, then collects the results. + +```rust,no_run +use std::time::Duration; + +use apalis::prelude::*; +use apalis_codec::msgpack::MsgPackCodec; +use apalis_sqlite::SqliteStorage; +use apalis_workflow::*; +use sqlx::SqlitePool; + +async fn get_name(user_id: u32) -> Result { + Ok(user_id.to_string()) +} + +async fn get_age(user_id: u32) -> Result { + tokio::time::sleep(Duration::from_millis(800)).await; + Ok(user_id as usize + 20) +} + +async fn get_address(user_id: u32) -> Result { + tokio::time::sleep(Duration::from_millis(500)).await; + Ok(user_id as usize + 100) +} + +async fn collector( + (name, age, address): (String, usize, usize), + wrk: WorkerContext, +) -> Result { + let result = name.parse::()? + age + address; + wrk.stop().unwrap(); + Ok(result) +} + +#[tokio::main] +async fn main() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + SqliteStorage::setup(&pool).await.unwrap(); + let mut backend = SqliteStorage::new(&pool).with_codec::(); + backend.start_fan_out(vec![42, 43, 44]).await.unwrap(); + + let dag_flow = DagFlow::new("user-etl-workflow"); + let get_name = dag_flow.node(get_name); + let get_age = dag_flow.node(get_age); + let get_address = dag_flow.node(get_address); + dag_flow + .node(collector) + .depends_on((&get_name, &get_age, &get_address)); // Order and types matters here + + dag_flow.validate().unwrap(); // Ensure DAG is valid + + let worker = WorkerBuilder::new("rango-tango") + .backend(backend) + .on_event(|_c, e| { + println!("{e:?},"); + }) + .build(dag_flow); + worker.run().await.unwrap(); +} +``` + +### Shared Storage (Multiple Job Types) + +Run multiple job types over a single SQLite connection: ```rust,no_run use std::{collections::HashMap, time::Duration}; use apalis::prelude::*; use apalis_sqlite::{SharedSqliteStorage, SqliteStorage}; - use futures::stream; #[tokio::main] async fn main() { - let mut store = SharedSqliteStorage::new(":memory:"); - SqliteStorage::setup(store.pool()).await.unwrap(); let mut map_store = store.make_shared().unwrap(); - let mut int_store = store.make_shared().unwrap(); map_store @@ -213,11 +407,37 @@ async fn main() { } ``` +--- + ## Observability -You can track your jobs using [apalis-board](https://github.com/apalis-dev/apalis-board). -![Task](https://github.com/apalis-dev/apalis-board/raw/main/screenshots/task.png) +Monitor and inspect your jobs visually using [**apalis-board**](https://github.com/apalis-dev/apalis-board): + +![apalis-board task view](https://github.com/apalis-dev/apalis-board/raw/main/screenshots/task.png) + +--- + +## Related Crates + +- [`apalis`](https://github.com/apalis-dev/apalis) — the core worker and middleware framework. +- [`apalis-workflow`](https://docs.rs/apalis-workflow) — multi-step workflow support built on apalis. +- [`apalis-board`](https://github.com/apalis-dev/apalis-board) — web UI for monitoring jobs. + +--- + +## Showcase and Examples + +- [`ryot`](https://github.com/IgnisDa/ryot) - A self hosted platform for tracking various facets of your life - media, fitness and more. +- [`decomp.dev`](https://github.com/encounter/decomp.dev) - A video game decompilation tool +- [`actix-ntfy-service`](https://github.com/apalis-dev/apalis-board/tree/main/examples/actix-ntfy-service) - Basic example that shows how to publish notifications using apalis-sqlite + +--- ## License -Licensed under either of Apache License, Version 2.0 or MIT license at your option. +Licensed under either of: + +- [Apache License, Version 2.0](LICENSE-APACHE) +- [MIT License](LICENSE-MIT) + +at your option. diff --git a/examples/dag.rs b/examples/etl_workflow.rs similarity index 100% rename from examples/dag.rs rename to examples/etl_workflow.rs diff --git a/examples/order_workflow.rs b/examples/order_workflow.rs new file mode 100644 index 0000000..f54d67a --- /dev/null +++ b/examples/order_workflow.rs @@ -0,0 +1,331 @@ +use std::time::Duration; + +use apalis::prelude::*; +use apalis_sqlite::*; +use apalis_workflow::*; +use serde::{Deserialize, Serialize}; +use sqlx::{FromRow, SqlitePool}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct OrderJob { + order_id: i64, +} + +#[derive(Debug, Clone, FromRow)] +struct OrderRecord { + id: i64, + customer_email: String, + total_cents: i64, + payment_txn: Option, + tracking_number: Option, + status: String, +} + +#[derive(Debug, Clone, FromRow, Serialize, Deserialize)] +struct OrderItemRecord { + id: i64, + order_id: i64, + sku: String, + in_stock: bool, +} + +#[derive(Clone)] +struct PaymentService { + // e.g. stripe::Client +} + +impl PaymentService { + async fn charge(&self, order_id: i64, amount_cents: i64) -> Result { + // Call your payment gateway here and return the transaction ID. + Ok(format!("charge order {order_id} for {amount_cents} cents")) + } + + async fn refund(&self, txn_id: &str) -> Result<(), BoxDynError> { + // Refund a previously captured transaction. + todo!("refund transaction {txn_id}") + } +} + +#[derive(Clone)] +struct InventoryService { + // e.g. reqwest::Client + base_url +} + +impl InventoryService { + async fn is_available(&self, sku: &str) -> Result { + println!("Checking stock for {sku}"); + Ok(true) + } +} + +#[derive(Clone)] +struct ShipmentService { + // e.g. fedex_sdk::Client +} + +impl ShipmentService { + async fn dispatch(&self, order_id: i64, item_count: i64) -> Result { + // Submit shipment and return a tracking number. + Ok(format!("dispatch {item_count} items for order {order_id}")) + } +} + +#[derive(Clone)] +struct EmailService { + // e.g. sendgrid::SGClient +} + +impl EmailService { + async fn send_shipment_confirmation( + &self, + to: &str, + order_id: i64, + tracking_number: &str, + ) -> Result<(), BoxDynError> { + // Send a transactional email via your provider. + println!("email {to} about order {order_id} with tracking {tracking_number}"); + Ok(()) + } +} + +async fn charge_payment( + job: OrderJob, + db: Data, + payment: Data, +) -> Result, BoxDynError> { + let order: OrderRecord = sqlx::query_as("SELECT * FROM orders WHERE id = ?") + .bind(job.order_id) + .fetch_one(&*db) + .await?; + + if order.payment_txn.is_some() { + // already charged on a previous attempt + return Err("Order is already processed".into()); + } + + let txn = payment.charge(order.id, order.total_cents).await?; + + sqlx::query("UPDATE orders SET payment_txn = ?, status = 'charged' WHERE id = ?") + .bind(&txn) + .bind(order.id) + .execute(&*db) + .await?; + + let items: Vec = + sqlx::query_as("SELECT * FROM order_items WHERE order_id = ?") + .bind(job.order_id) + .fetch_all(&*db) + .await?; + + Ok(items) +} + +async fn filter_unavailable( + item: OrderItemRecord, + inventory: Data, +) -> Result, BoxDynError> { + let available = inventory.is_available(&item.sku).await?; + if available { Ok(Some(item)) } else { Ok(None) } +} + +async fn refund_if_all_unavailable( + items: Vec, + db: Data, + payment: Data, +) -> Result { + let order_id = items[0].order_id; + let order: OrderRecord = sqlx::query_as("SELECT * FROM orders WHERE id = ?") + .bind(order_id) + .fetch_one(&*db) + .await?; + + let available_count = items.len(); + + for item in &items { + let OrderItemRecord { id, .. } = item; + sqlx::query("UPDATE order_items SET in_stock = ? WHERE id = ?") + .bind(true) + .bind(id) + .execute(&*db) + .await?; + } + + if available_count == 0 { + if let Some(txn) = &order.payment_txn { + payment.refund(txn).await?; + sqlx::query("UPDATE orders SET status = 'refunded' WHERE id = ?") + .bind(order.id) + .execute(&*db) + .await?; + } + + return Err(format!("No items in stock for order {}", order.id).into()); + } + + sqlx::query("UPDATE orders SET status = 'stock_confirmed' WHERE id = ?") + .bind(order.id) + .execute(&*db) + .await?; + + Ok(OrderJob { order_id: order_id }) +} + +async fn dispatch_shipment( + job: OrderJob, + db: Data, + shipment: Data, +) -> Result { + let order: OrderRecord = sqlx::query_as("SELECT * FROM orders WHERE id = ?") + .bind(job.order_id) + .fetch_one(&*db) + .await?; + + if order.tracking_number.is_some() { + return Ok(job); // already dispatched on a previous attempt + } + + let item_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM order_items WHERE order_id = ? AND in_stock = TRUE", + ) + .bind(job.order_id) + .fetch_one(&*db) + .await?; + + let tracking = shipment.dispatch(order.id, item_count).await?; + + sqlx::query("UPDATE orders SET tracking_number = ?, status = 'dispatched' WHERE id = ?") + .bind(&tracking) + .bind(order.id) + .execute(&*db) + .await?; + + Ok(job) +} + +async fn notify_customer( + job: OrderJob, + db: Data, + email: Data, +) -> Result<(), BoxDynError> { + let order: OrderRecord = sqlx::query_as("SELECT * FROM orders WHERE id = ?") + .bind(job.order_id) + .fetch_one(&*db) + .await?; + + if order.status == "completed" { + return Ok(()); // notification already sent on a previous attempt + } + + let tracking = order.tracking_number.as_deref().unwrap_or_default(); + + email + .send_shipment_confirmation(&order.customer_email, order.id, tracking) + .await?; + + sqlx::query("UPDATE orders SET status = 'completed' WHERE id = ?") + .bind(order.id) + .execute(&*db) + .await?; + + Ok(()) +} + +async fn setup_schema(pool: &SqlitePool) -> Result<(), sqlx::Error> { + sqlx::query( + "CREATE TABLE IF NOT EXISTS orders ( + id INTEGER PRIMARY KEY, + customer_email TEXT NOT NULL, + total_cents INTEGER NOT NULL, + payment_txn TEXT, + tracking_number TEXT, + status TEXT NOT NULL + );", + ) + .execute(pool) + .await?; + + sqlx::query( + "CREATE TABLE IF NOT EXISTS order_items ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + order_id INTEGER NOT NULL, + sku TEXT NOT NULL, + in_stock BOOLEAN DEFAULT FALSE, + FOREIGN KEY(order_id) REFERENCES orders(id) + );", + ) + .execute(pool) + .await?; + + Ok(()) +} + +async fn seed_order( + pool: &SqlitePool, + order_id: i64, + email: &str, + total_cents: i64, + skus: &[&str], +) -> Result<(), sqlx::Error> { + sqlx::query( + "INSERT INTO orders (id, customer_email, total_cents, status) VALUES (?, ?, ?, 'pending')", + ) + .bind(order_id) + .bind(email) + .bind(total_cents) + .execute(pool) + .await?; + + for sku in skus { + sqlx::query("INSERT INTO order_items (order_id, sku) VALUES (?, ?)") + .bind(order_id) + .bind(sku) + .execute(pool) + .await?; + } + + Ok(()) +} + +#[tokio::main] +async fn main() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + + setup_schema(&pool).await.unwrap(); + SqliteStorage::setup(&pool).await.unwrap(); + + seed_order( + &pool, + 1001, + "alice@example.com", + 7499, + &["WIDGET-A", "WIDGET-B", "WIDGET-C"], + ) + .await + .unwrap(); + + let mut sqlite = SqliteStorage::new_in_queue(&pool, "order-fulfilment"); + sqlite + .push_start(OrderJob { order_id: 1001 }) + .await + .unwrap(); + + let workflow = Workflow::new("order-fulfilment") + .and_then(charge_payment) + .filter_map(filter_unavailable) + .and_then(refund_if_all_unavailable) + .delay_for(Duration::from_millis(500)) + .and_then(dispatch_shipment) + .and_then(notify_customer); + + let worker = WorkerBuilder::new("fulfilment-worker") + .backend(sqlite) + .data(pool) + .data(PaymentService { /* ... */ }) + .data(InventoryService { /* ... */ }) + .data(ShipmentService { /* ... */ }) + .data(EmailService { /* ... */ }) + .on_event(|_ctx, ev| println!("Event: {:?}", ev)) + .build(workflow); + + worker.run().await.unwrap(); +} diff --git a/examples/stepped.rs b/examples/stepped.rs deleted file mode 100644 index adf0ecf..0000000 --- a/examples/stepped.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::time::Duration; - -use apalis::prelude::*; -use apalis_sqlite::SqliteStorage; -use apalis_workflow::*; -use sqlx::SqlitePool; - -#[tokio::main] -async fn main() { - let pool = SqlitePool::connect(":memory:").await.unwrap(); - SqliteStorage::setup(&pool).await.unwrap(); - let mut backend = SqliteStorage::new(&pool); - backend.push_start(42).await.unwrap(); - - async fn task1(task: u32) -> String { - println!("Executing task1 with input: {}", task); - (task + 99).to_string() - } - async fn task2(task: String) -> u32 { - println!("Executing task2 with input: {}", task); - task.parse::().unwrap() + 1 - } - async fn task3(task: u32, worker: WorkerContext) { - println!("Executing task3 with input: {}", task); - assert_eq!(task, 142); - worker.stop().unwrap(); - } - let workflow = Workflow::new("test_workflow") - .and_then(task1) - .delay_for(Duration::from_secs(1)) - .and_then(task2) - .and_then(task3); - let worker = WorkerBuilder::new("rango-tango") - .backend(backend) - .on_event(|_c, e| { - println!("{e:?},"); - }) - .build(workflow); - worker.run().await.unwrap(); -} diff --git a/examples/with_ui.rs b/examples/with_ui.rs deleted file mode 100644 index f328e4d..0000000 --- a/examples/with_ui.rs +++ /dev/null @@ -1 +0,0 @@ -fn main() {} diff --git a/src/lib.rs b/src/lib.rs index f32d38e..0a72699 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -473,7 +473,6 @@ mod tests { use apalis::prelude::*; use apalis_workflow::*; - use chrono::Local; use serde::{Deserialize, Serialize}; use sqlx::SqlitePool; @@ -498,8 +497,6 @@ mod tests { .take(ITEMS); backend.push_stream(&mut items).await.unwrap(); - println!("Starting worker at {}", Local::now()); - async fn send_reminder(item: usize, wrk: WorkerContext) -> Result<(), BoxDynError> { if ITEMS == item { wrk.stop().unwrap(); diff --git a/supply-chain/config.toml b/supply-chain/config.toml index 7c5bf8e..8dd036d 100644 --- a/supply-chain/config.toml +++ b/supply-chain/config.toml @@ -444,7 +444,7 @@ version = "1.0.18" criteria = "safe-to-deploy" [[exemptions.js-sys]] -version = "0.3.97" +version = "0.3.98" criteria = "safe-to-deploy" [[exemptions.kv-log-macro]] @@ -1008,23 +1008,23 @@ version = "0.1.0" criteria = "safe-to-deploy" [[exemptions.wasm-bindgen]] -version = "0.2.120" +version = "0.2.121" criteria = "safe-to-deploy" [[exemptions.wasm-bindgen-futures]] -version = "0.4.70" +version = "0.4.71" criteria = "safe-to-deploy" [[exemptions.wasm-bindgen-macro]] -version = "0.2.120" +version = "0.2.121" criteria = "safe-to-deploy" [[exemptions.wasm-bindgen-macro-support]] -version = "0.2.120" +version = "0.2.121" criteria = "safe-to-deploy" [[exemptions.wasm-bindgen-shared]] -version = "0.2.120" +version = "0.2.121" criteria = "safe-to-deploy" [[exemptions.wasm-encoder]] diff --git a/supply-chain/imports.lock b/supply-chain/imports.lock index 435a834..318a5f5 100644 --- a/supply-chain/imports.lock +++ b/supply-chain/imports.lock @@ -16,3 +16,7 @@ audited_as = "1.0.0-rc.4" [[unpublished.apalis-sqlite]] version = "1.0.0-rc.7" audited_as = "1.0.0-rc.6" + +[[unpublished.apalis-sqlite]] +version = "1.0.0-rc.8" +audited_as = "1.0.0-rc.7"