diff --git a/.claude/skills/add-background-task/SKILL.md b/.claude/skills/add-background-task/SKILL.md new file mode 100644 index 00000000000..34b1d569561 --- /dev/null +++ b/.claude/skills/add-background-task/SKILL.md @@ -0,0 +1,99 @@ +--- +name: add-background-task +description: Add a new Nexus background task. Use when the user wants to create a periodic background task in Nexus that runs on a timer. +--- + +# Add a Nexus background task + +All background tasks live in Nexus. A task implements the `BackgroundTask` trait (`nexus/src/app/background/mod.rs`), runs on a configurable period, and reports status as `serde_json::Value`. + +## General approach + +There are many existing background tasks in `nexus/src/app/background/tasks/`. Before writing anything, read a few tasks that are similar in shape to the one you're adding (e.g., a simple periodic cleanup vs. a task that watches a channel). Use those as models for structure, naming, logging, error handling, and status reporting. The goal is to conform to the patterns already in use, not to invent new ones. + +## Checklist + +These are the touch points for adding a new background task. Follow them in order. + +### 1. Status type (`nexus/types/src/internal_api/background.rs`) + +Define a struct for the task's activation status. Derive `Clone, Debug, Deserialize, Serialize, PartialEq, Eq`. For errors, use `Option` if the task can only fail in one way per activation, or `Vec` if it accumulates multiple independent errors. Match what similar tasks do. + +### 2. Task implementation (`nexus/src/app/background/tasks/.rs`) + +Create the task module. The struct holds whatever state it needs (typically `Arc` plus config). Implement `BackgroundTask::activate` by delegating to an `actually_activate` helper, then serialize the status to `serde_json::Value`. The `actually_activate` pattern makes unit testing easy without going through the trait. + +`actually_activate` can either build and return the status (`async fn actually_activate(&mut self, opctx) -> YourStatus`), or take a mutable reference to one (`async fn actually_activate(&mut self, opctx, status: &mut YourStatus) -> Result<(), Error>`). The first is simpler and works well when the task either fully succeeds or fully fails. The second is better when the task can partially complete (e.g., it loops over work items): `activate` creates the status struct up front, passes it in, and serializes it afterward regardless of `Ok`/`Err`, so any progress already recorded in `status` (items processed, partial counts, earlier errors) is preserved even if the method bails out with `?` later. + +Logging conventions: `debug` when there's nothing to do, `info` when routine work was done, `warn` when the work done indicates something is wrong (e.g., cleaning up after a crash), `error` on failure. Log errors as structured fields with the `; &err` slog syntax (which uses the `SlogInlineError` trait), not by interpolating into the message string. For the error string in the status struct, use `InlineErrorChain::new(&err).to_string()` (from `slog_error_chain`) to capture the full cause chain. Status error strings should not repeat the task name — omdb already shows which task you're looking at. + +If the task takes config values that need conversion or validation (e.g., converting a `Duration` to `TimeDelta`, or checking a numeric range), do it once in `new()` and store the validated form. Don't re-validate on every activation — if the config is invalid, panic in `new()` with a message that includes the invalid value. + +Include a unit test in the same file using `TestDatabase::new_with_datastore` that calls `actually_activate` directly. If the task has a datastore method, a single test exercising the task end-to-end (including the limit/batching behavior) is sufficient — don't add a redundant test for the datastore method separately unless it has complex logic worth testing in isolation. + +### 3. Register the module (`nexus/src/app/background/tasks/mod.rs`) + +Add `pub mod ;` in alphabetical order. + +### 4. Activator (`nexus/background-task-interface/src/init.rs`) + +Add `pub task_: Activator` to the `BackgroundTasks` struct, maintaining alphabetical order among the task fields. + +### 5. Config (`nexus-config/src/nexus_config.rs`) + +Add a config struct (e.g., `YourTaskConfig`) with at minimum `period_secs: Duration` (using `#[serde_as(as = "DurationSeconds")]`). If the task does bounded work per activation, name the limit field `max__per_activation` (e.g., `max_deleted_per_activation`, `max_timed_out_per_activation`) to match existing conventions. Add the field to `BackgroundTaskConfig`. Update the test config literal and expected parse output at the bottom of the file. + +### 6. Config files + +Add the new config fields to all of these: +- `nexus/examples/config.toml` +- `nexus/examples/config-second.toml` +- `nexus/tests/config.test.toml` +- `smf/nexus/single-sled/config-partial.toml` +- `smf/nexus/multi-sled/config-partial.toml` + +### 7. Wire up in `nexus/src/app/background/init.rs` + +- Import the task module. +- Add `Activator::new()` in the `BackgroundTasks` constructor. +- Destructure it in the `start` method. +- Call `driver.register(TaskDefinition { ... })` with the task. The last task registered should pass `datastore` by move (not `.clone()`), so adjust the previous last task if needed. +- If extra data is needed from `BackgroundTasksData`, add the field there and plumb it from `nexus/src/app/mod.rs`. + +### 8. Schema migration (if needed) + +If the task needs a new index or schema change to support its query, add a migration under `schema/crdb/`. See `schema/crdb/README.adoc` for the procedure. Also update `dbinit.sql` and bump the version in `nexus/db-model/src/schema_versions.rs`. + +### 9. Datastore method (if needed) + +If the task needs a new query, add it in the appropriate `nexus/db-queries/src/db/datastore/` file. Prefer the Diesel typed DSL over raw SQL (`diesel::sql_query`) for queries and test helpers. Only fall back to raw SQL when the DSL genuinely can't express the query. + +If the task modifies rows that other code paths also modify, think about races: what happens if both run concurrently on the same row? Both paths should typically guard their writes so only one succeeds. + +### 10. omdb output (`dev-tools/omdb/src/bin/omdb/nexus.rs`) + +Add a `print_task_` function and wire it into the match in `print_task_details` (alphabetical order). Import the status type. Use the `const_max_len` + `WIDTH` pattern to align columns: + +```rust +const LABEL: &str = "label:"; +const WIDTH: usize = const_max_len(&[LABEL, ...]) + 1; +println!(" {LABEL: { print_task_attached_subnet_manager_status(details); } + "audit_log_timeout_incomplete" => { + print_task_audit_log_timeout_incomplete(details); + } "blueprint_planner" => { print_task_blueprint_planner(details); } @@ -1273,6 +1277,9 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { "read_only_region_replacement_start" => { print_task_read_only_region_replacement_start(details); } + "reconfigurator_config_watcher" => { + print_task_reconfigurator_config_watcher(details); + } "region_replacement" => { print_task_region_replacement(details); } @@ -2323,6 +2330,16 @@ fn print_task_read_only_region_replacement_start(details: &serde_json::Value) { } } +fn print_task_reconfigurator_config_watcher(details: &serde_json::Value) { + match details.get("config_updated").and_then(|v| v.as_bool()) { + Some(updated) => println!(" config updated: {updated}"), + None => eprintln!( + "warning: failed to interpret task details: {:?}", + details + ), + } +} + fn print_task_region_replacement(details: &serde_json::Value) { match serde_json::from_value::(details.clone()) { Err(error) => eprintln!( @@ -2671,6 +2688,38 @@ fn print_task_saga_recovery(details: &serde_json::Value) { } } +fn print_task_audit_log_timeout_incomplete(details: &serde_json::Value) { + match serde_json::from_value::( + details.clone(), + ) { + Err(error) => eprintln!( + "warning: failed to interpret task details: {:?}: {:?}", + error, details + ), + Ok(status) => { + const TIMED_OUT: &str = "timed_out:"; + const CUTOFF: &str = "cutoff:"; + const MAX_UPDATE: &str = "max_timed_out_per_activation:"; + const ERROR: &str = "error:"; + const WIDTH: usize = + const_max_len(&[TIMED_OUT, CUTOFF, MAX_UPDATE, ERROR]) + 1; + + println!(" {TIMED_OUT:(details.clone()) { Err(error) => eprintln!( diff --git a/dev-tools/omdb/tests/env.out b/dev-tools/omdb/tests/env.out index dffb7910ece..02ac948f4a2 100644 --- a/dev-tools/omdb/tests/env.out +++ b/dev-tools/omdb/tests/env.out @@ -38,6 +38,11 @@ task: "attached_subnet_manager" distributes attached subnets to sleds and switch +task: "audit_log_timeout_incomplete" + transitions stale incomplete audit log entries to timeout status so they + become visible in the audit log + + task: "bfd_manager" Manages bidirectional fowarding detection (BFD) configuration on rack switches @@ -288,6 +293,11 @@ task: "attached_subnet_manager" distributes attached subnets to sleds and switch +task: "audit_log_timeout_incomplete" + transitions stale incomplete audit log entries to timeout status so they + become visible in the audit log + + task: "bfd_manager" Manages bidirectional fowarding detection (BFD) configuration on rack switches @@ -525,6 +535,11 @@ task: "attached_subnet_manager" distributes attached subnets to sleds and switch +task: "audit_log_timeout_incomplete" + transitions stale incomplete audit log entries to timeout status so they + become visible in the audit log + + task: "bfd_manager" Manages bidirectional fowarding detection (BFD) configuration on rack switches diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index 986509a32d0..98140113cdc 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -273,6 +273,11 @@ task: "attached_subnet_manager" distributes attached subnets to sleds and switch +task: "audit_log_timeout_incomplete" + transitions stale incomplete audit log entries to timeout status so they + become visible in the audit log + + task: "bfd_manager" Manages bidirectional fowarding detection (BFD) configuration on rack switches @@ -593,6 +598,14 @@ task: "attached_subnet_manager" no dendrite instances found no sleds found +task: "audit_log_timeout_incomplete" + configured period: every m + last completed activation: , triggered by + started at (s ago) and ran for ms + timed_out: 0 + cutoff: + max_timed_out_per_activation: 1000 + task: "bfd_manager" configured period: every s last completed activation: , triggered by @@ -782,7 +795,7 @@ task: "reconfigurator_config_watcher" configured period: every s last completed activation: , triggered by started at (s ago) and ran for ms -warning: unknown background task: "reconfigurator_config_watcher" (don't know how to interpret details: Object {"config_updated": Bool(false)}) + config updated: task: "region_replacement" configured period: every days h m s @@ -1194,6 +1207,14 @@ task: "attached_subnet_manager" no dendrite instances found no sleds found +task: "audit_log_timeout_incomplete" + configured period: every m + last completed activation: , triggered by + started at (s ago) and ran for ms + timed_out: 0 + cutoff: + max_timed_out_per_activation: 1000 + task: "bfd_manager" configured period: every s last completed activation: , triggered by @@ -1383,7 +1404,7 @@ task: "reconfigurator_config_watcher" configured period: every s last completed activation: , triggered by started at (s ago) and ran for ms -warning: unknown background task: "reconfigurator_config_watcher" (don't know how to interpret details: Object {"config_updated": Bool(false)}) + config updated: task: "region_replacement" configured period: every days h m s diff --git a/dev-tools/omdb/tests/test_all_output.rs b/dev-tools/omdb/tests/test_all_output.rs index cd1e4e15fd1..624bd71f794 100644 --- a/dev-tools/omdb/tests/test_all_output.rs +++ b/dev-tools/omdb/tests/test_all_output.rs @@ -333,6 +333,10 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) { redactor.extra_variable_length("cockroachdb_version", &crdb_version); } + // The `reconfigurator_config_watcher` task's output depends on + // whether it has had time to complete an activation. + redactor.field("config updated:", r"\w+"); + // The `tuf_artifact_replication` task's output depends on how // many sleds happened to register with Nexus before its first // execution. These redactions work around the issue described in diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index f8375137a70..8fe8f417b61 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -437,6 +437,8 @@ pub struct BackgroundTaskConfig { pub attached_subnet_manager: AttachedSubnetManagerConfig, /// configuration for console session cleanup task pub session_cleanup: SessionCleanupConfig, + /// configuration for audit log incomplete timeout task + pub audit_log_timeout_incomplete: AuditLogTimeoutIncompleteConfig, } #[serde_as] @@ -450,6 +452,21 @@ pub struct SessionCleanupConfig { pub max_delete_per_activation: u32, } +#[serde_as] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct AuditLogTimeoutIncompleteConfig { + /// period (in seconds) for periodic activations of this task + #[serde_as(as = "DurationSeconds")] + pub period_secs: Duration, + + /// how old an incomplete entry must be before it is timed out + #[serde_as(as = "DurationSeconds")] + pub timeout_secs: Duration, + + /// max rows per SQL statement + pub max_timed_out_per_activation: u32, +} + #[serde_as] #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct DnsTasksConfig { @@ -1276,6 +1293,9 @@ mod test { attached_subnet_manager.period_secs = 60 session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 + audit_log_timeout_incomplete.period_secs = 600 + audit_log_timeout_incomplete.timeout_secs = 14400 + audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 [default_region_allocation_strategy] type = "random" seed = 0 @@ -1544,6 +1564,12 @@ mod test { period_secs: Duration::from_secs(300), max_delete_per_activation: 10_000, }, + audit_log_timeout_incomplete: + AuditLogTimeoutIncompleteConfig { + period_secs: Duration::from_secs(600), + timeout_secs: Duration::from_secs(14400), + max_timed_out_per_activation: 1000, + }, }, multicast: MulticastConfig { enabled: false }, default_region_allocation_strategy: @@ -1652,6 +1678,9 @@ mod test { attached_subnet_manager.period_secs = 60 session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 + audit_log_timeout_incomplete.period_secs = 600 + audit_log_timeout_incomplete.timeout_secs = 14400 + audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 [default_region_allocation_strategy] type = "random" diff --git a/nexus/background-task-interface/src/init.rs b/nexus/background-task-interface/src/init.rs index 7223af8c071..1bacb6f259f 100644 --- a/nexus/background-task-interface/src/init.rs +++ b/nexus/background-task-interface/src/init.rs @@ -37,6 +37,7 @@ pub struct BackgroundTasks { pub task_instance_reincarnation: Activator, pub task_service_firewall_propagation: Activator, pub task_abandoned_vmm_reaper: Activator, + pub task_audit_log_timeout_incomplete: Activator, pub task_vpc_route_manager: Activator, pub task_saga_recovery: Activator, pub task_lookup_region_port: Activator, diff --git a/nexus/db-model/src/audit_log.rs b/nexus/db-model/src/audit_log.rs index f8a794ba12d..7db6486e1ef 100644 --- a/nexus/db-model/src/audit_log.rs +++ b/nexus/db-model/src/audit_log.rs @@ -224,7 +224,7 @@ impl From for AuditLogEntryInit { /// `audit_log_complete` is a view on `audit_log` filtering for rows with /// non-null `time_completed`, not its own table. -#[derive(Queryable, Selectable, Clone, Debug)] +#[derive(Queryable, Selectable, Clone, Debug, PartialEq)] #[diesel(table_name = audit_log_complete)] pub struct AuditLogEntry { pub id: Uuid, @@ -276,15 +276,15 @@ pub enum AuditLogCompletion { /// error, and I don't think we even have API timeouts) but rather that the /// attempts to complete the log entry failed (or were never even attempted /// because, e.g., Nexus crashed during the operation), and this entry had - /// to be cleaned up later by a background job (which doesn't exist yet) - /// after a timeout. Note we represent this result status as "Unknown" in - /// the external API because timeout is an implementation detail and makes - /// it sound like the operation timed out. + /// to be cleaned up later by a background job after a timeout. Note we + /// represent this result status as "Unknown" in the external API because + /// timeout is an implementation detail and makes it sound like the + /// operation timed out. Timeout, } #[derive(AsChangeset, Clone)] -#[diesel(table_name = audit_log)] +#[diesel(table_name = audit_log, treat_none_as_null = true)] pub struct AuditLogCompletionUpdate { pub time_completed: DateTime, pub result_kind: AuditLogResultKind, diff --git a/nexus/db-model/src/schema_versions.rs b/nexus/db-model/src/schema_versions.rs index 5e0fdd10b09..1e44f0c327b 100644 --- a/nexus/db-model/src/schema_versions.rs +++ b/nexus/db-model/src/schema_versions.rs @@ -16,7 +16,7 @@ use std::{collections::BTreeMap, sync::LazyLock}; /// /// This must be updated when you change the database schema. Refer to /// schema/crdb/README.adoc in the root of this repository for details. -pub const SCHEMA_VERSION: Version = Version::new(240, 0, 0); +pub const SCHEMA_VERSION: Version = Version::new(241, 0, 0); /// List of all past database schema versions, in *reverse* order /// @@ -28,6 +28,7 @@ static KNOWN_VERSIONS: LazyLock> = LazyLock::new(|| { // | leaving the first copy as an example for the next person. // v // KnownVersion::new(next_int, "unique-dirname-with-the-sql-files"), + KnownVersion::new(241, "audit-log-incomplete-timeout"), KnownVersion::new(240, "multicast-drop-mvlan"), KnownVersion::new(239, "fm-alert-request"), KnownVersion::new(238, "fewer-nullable-columns"), diff --git a/nexus/db-queries/src/db/datastore/audit_log.rs b/nexus/db-queries/src/db/datastore/audit_log.rs index 5d983e8f178..b244f93b16f 100644 --- a/nexus/db-queries/src/db/datastore/audit_log.rs +++ b/nexus/db-queries/src/db/datastore/audit_log.rs @@ -6,6 +6,7 @@ use super::DataStore; use crate::authz; use crate::context::OpContext; use crate::db; +use crate::db::model::AuditLogCompletion; use crate::db::model::AuditLogCompletionUpdate; use crate::db::model::AuditLogEntry; use crate::db::model::AuditLogEntryInit; @@ -106,7 +107,7 @@ impl DataStore { }) } - // set duration and result on an existing entry + /// Complete an audit log entry with the request's outcome. pub async fn audit_log_entry_complete( &self, opctx: &OpContext, @@ -115,14 +116,69 @@ impl DataStore { ) -> UpdateResult<()> { use nexus_db_schema::schema::audit_log; opctx.authorize(authz::Action::CreateChild, &authz::AUDIT_LOG).await?; - diesel::update(audit_log::table) + let rows = diesel::update(audit_log::table) .filter(audit_log::id.eq(entry.id)) + // This filter is very important: once an entry has been completed + // — whether by the normal request path or by the background + // timeout task — it is part of the immutable audit log. Overwriting + // it would change its `time_completed` timestamp, shifting its + // position in the `(time_completed, id)` pagination order and + // violating the guarantee that past query results are stable. + .filter(audit_log::time_completed.is_null()) .set(completion) .execute_async(&*self.pool_connection_authorized(opctx).await?) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + if rows == 0 { + warn!( + opctx.log, + "audit log entry already completed \ + (likely timed out by background task)"; + "entry_id" => %entry.id, + ); + } Ok(()) } + + /// Transition stale incomplete audit log rows to `result_kind = timeout`. + /// + /// Finds up to `limit` rows where `time_completed IS NULL` and + /// `time_started < cutoff`, atomically sets them to the timeout + /// completion state, and returns the number of rows updated. + pub async fn audit_log_timeout_incomplete( + &self, + opctx: &OpContext, + cutoff: DateTime, + limit: u32, + ) -> Result { + use nexus_db_schema::schema::audit_log; + opctx.authorize(authz::Action::CreateChild, &authz::AUDIT_LOG).await?; + + // Diesel's ValidSubselect doesn't allow a subquery from the same + // table being updated, so we alias audit_log for the subquery. + let audit_log_sub = + diesel::alias!(nexus_db_schema::schema::audit_log as audit_log_sub); + + let stale_ids = audit_log_sub + .filter(audit_log_sub.field(audit_log::time_completed).is_null()) + .filter(audit_log_sub.field(audit_log::time_started).lt(cutoff)) + .order(( + audit_log_sub.field(audit_log::time_started), + audit_log_sub.field(audit_log::id), + )) + .limit(i64::from(limit)) + .select(audit_log_sub.field(audit_log::id)); + + let completion: AuditLogCompletionUpdate = + AuditLogCompletion::Timeout.into(); + + diesel::update(audit_log::table) + .filter(audit_log::id.eq_any(stale_ids)) + .set(completion) + .execute_async(&*self.pool_connection_authorized(opctx).await?) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } #[cfg(test)] @@ -130,8 +186,9 @@ mod tests { use super::*; use crate::db::pub_test_utils::TestDatabase; use assert_matches::assert_matches; + use chrono::TimeDelta; use nexus_db_model::{ - AuditLogActor, AuditLogCompletion, AuditLogEntryInitParams, + AuditLogActor, AuditLogEntryInitParams, AuditLogResultKind, }; use omicron_common::api::external::Error; use omicron_test_utils::dev; @@ -371,4 +428,104 @@ mod tests { db.terminate().await; logctx.cleanup_successful(); } + + fn make_entry_params(request_id: &str) -> AuditLogEntryInitParams { + AuditLogEntryInitParams { + request_id: request_id.to_string(), + operation_id: "project_create".to_string(), + request_uri: "/v1/projects".to_string(), + source_ip: "1.1.1.1".parse().unwrap(), + user_agent: None, + actor: AuditLogActor::Unauthenticated, + auth_method: None, + credential_id: None, + } + } + + /// Force time_started into the past for an audit log entry so we can + /// test the timeout logic without waiting. + async fn set_time_started( + datastore: &DataStore, + id: Uuid, + time_started: DateTime, + ) { + use diesel::prelude::*; + use nexus_db_schema::schema::audit_log; + diesel::update(audit_log::table) + .filter(audit_log::id.eq(id)) + .set(audit_log::time_started.eq(time_started)) + .execute_async( + &*datastore.pool_connection_for_tests().await.unwrap(), + ) + .await + .expect("could not set time_started"); + } + + /// A late completion of an already-timed-out entry should be a no-op. + /// Once an entry is timed out and visible in the audit log, it's part + /// of the immutable record. Overwriting it would change its timestamp + /// and violate the guarantee that the past doesn't change. + #[tokio::test] + async fn test_audit_log_timeout_then_late_completion() { + let logctx = + dev::test_setup_log("test_audit_log_timeout_then_late_completion"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let now = Utc::now(); + let two_hours_ago = now - TimeDelta::try_hours(2).unwrap(); + let cutoff = now - TimeDelta::try_hours(1).unwrap(); + + let entry = datastore + .audit_log_entry_init(opctx, make_entry_params("late-req").into()) + .await + .unwrap(); + set_time_started(datastore, entry.id, two_hours_ago).await; + + // Timeout the entry + let timed_out = datastore + .audit_log_timeout_incomplete(opctx, cutoff, 100) + .await + .unwrap(); + assert_eq!(timed_out, 1); + + // Record the timed-out state + let pagparams = DataPageParams { + marker: None, + limit: NonZeroU32::new(100).unwrap(), + direction: dropshot::PaginationOrder::Ascending, + }; + let entries = datastore + .audit_log_list(opctx, &pagparams, two_hours_ago, None) + .await + .unwrap(); + let timed_out_entry = + entries.iter().find(|e| e.id == entry.id).unwrap(); + assert_eq!(timed_out_entry.result_kind, AuditLogResultKind::Timeout); + let timeout_completed_at = timed_out_entry.time_completed; + + // Now a late completion arrives — it should be a no-op because + // the entry is already completed (as timeout) + datastore + .audit_log_entry_complete( + opctx, + &entry, + AuditLogCompletion::Success { http_status_code: 200 }.into(), + ) + .await + .expect("late completion should not error"); + + // The entry should still show as timeout, unchanged + let entries = datastore + .audit_log_list(opctx, &pagparams, two_hours_ago, None) + .await + .unwrap(); + let found = entries.iter().find(|e| e.id == entry.id).unwrap(); + assert_eq!(found.result_kind, AuditLogResultKind::Timeout); + assert_eq!(found.time_completed, timeout_completed_at); + assert!(found.http_status_code.is_none()); + + db.terminate().await; + logctx.cleanup_successful(); + } } diff --git a/nexus/examples/config-second.toml b/nexus/examples/config-second.toml index da6cd03e9d8..305015055f2 100644 --- a/nexus/examples/config-second.toml +++ b/nexus/examples/config-second.toml @@ -195,6 +195,9 @@ trust_quorum.period_secs = 60 attached_subnet_manager.period_secs = 60 session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 +audit_log_timeout_incomplete.period_secs = 600 +audit_log_timeout_incomplete.timeout_secs = 14400 +audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/examples/config.toml b/nexus/examples/config.toml index 693ddacb4d4..d950e39e873 100644 --- a/nexus/examples/config.toml +++ b/nexus/examples/config.toml @@ -179,6 +179,9 @@ trust_quorum.period_secs = 60 attached_subnet_manager.period_secs = 60 session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 +audit_log_timeout_incomplete.period_secs = 600 +audit_log_timeout_incomplete.timeout_secs = 14400 +audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index 1bea553be1f..53067faba15 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -92,6 +92,7 @@ use super::driver::TaskDefinition; use super::tasks::abandoned_vmm_reaper; use super::tasks::alert_dispatcher::AlertDispatcher; use super::tasks::attached_subnets; +use super::tasks::audit_log_timeout_incomplete; use super::tasks::bfd; use super::tasks::blueprint_execution; use super::tasks::blueprint_load; @@ -247,6 +248,7 @@ impl BackgroundTasksInitializer { task_instance_reincarnation: Activator::new(), task_service_firewall_propagation: Activator::new(), task_abandoned_vmm_reaper: Activator::new(), + task_audit_log_timeout_incomplete: Activator::new(), task_vpc_route_manager: Activator::new(), task_saga_recovery: Activator::new(), task_lookup_region_port: Activator::new(), @@ -362,6 +364,7 @@ impl BackgroundTasksInitializer { task_trust_quorum_manager, task_attached_subnet_manager, task_session_cleanup, + task_audit_log_timeout_incomplete, // Add new background tasks here. Be sure to use this binding in a // call to `Driver::register()` below. That's what actually wires // up the Activator to the corresponding background task. @@ -1192,7 +1195,7 @@ impl BackgroundTasksInitializer { absolute timeout", period: config.session_cleanup.period_secs, task_impl: Box::new(session_cleanup::SessionCleanup::new( - datastore, + datastore.clone(), args.console_session_absolute_timeout, config.session_cleanup.max_delete_per_activation, )), @@ -1201,6 +1204,25 @@ impl BackgroundTasksInitializer { activator: task_session_cleanup, }); + driver.register(TaskDefinition { + name: "audit_log_timeout_incomplete", + description: "transitions stale incomplete audit log entries to \ + timeout status so they become visible in the audit log", + period: config.audit_log_timeout_incomplete.period_secs, + task_impl: Box::new( + audit_log_timeout_incomplete::AuditLogTimeoutIncomplete::new( + datastore, + config.audit_log_timeout_incomplete.timeout_secs, + config + .audit_log_timeout_incomplete + .max_timed_out_per_activation, + ), + ), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_audit_log_timeout_incomplete, + }); + driver } } diff --git a/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs b/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs new file mode 100644 index 00000000000..85fc52073b3 --- /dev/null +++ b/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs @@ -0,0 +1,294 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Background task that transitions stale incomplete audit log entries to +//! `result_kind = timeout`, making them visible through the audit log API. + +use crate::app::background::BackgroundTask; +use chrono::TimeDelta; +use chrono::Utc; +use futures::future::BoxFuture; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::DataStore; +use nexus_types::internal_api::background::AuditLogTimeoutIncompleteStatus; +use serde_json::json; +use slog_error_chain::InlineErrorChain; +use std::sync::Arc; +use std::time::Duration; + +pub struct AuditLogTimeoutIncomplete { + datastore: Arc, + timeout: TimeDelta, + max_timed_out_per_activation: u32, +} + +impl AuditLogTimeoutIncomplete { + pub fn new( + datastore: Arc, + timeout: Duration, + max_timed_out_per_activation: u32, + ) -> Self { + let Ok(timeout) = TimeDelta::from_std(timeout) else { + panic!( + "invalid audit_log_timeout_incomplete.timeout_secs \ + value {timeout:?} \ + (must be representable as a TimeDelta)" + ); + }; + Self { datastore, timeout, max_timed_out_per_activation } + } + + pub(crate) async fn actually_activate( + &mut self, + opctx: &OpContext, + ) -> AuditLogTimeoutIncompleteStatus { + let cutoff = Utc::now() + .checked_sub_signed(self.timeout) + .expect("now - timeout overflowed (timeout_secs is too large)"); + let timed_out = match self + .datastore + .audit_log_timeout_incomplete( + opctx, + cutoff, + self.max_timed_out_per_activation, + ) + .await + { + Ok(count) => count, + Err(err) => { + slog::error!( + &opctx.log, + "audit log timeout incomplete failed"; + &err, + ); + return AuditLogTimeoutIncompleteStatus { + timed_out: 0, + cutoff, + max_timed_out_per_activation: self + .max_timed_out_per_activation, + error: Some(InlineErrorChain::new(&err).to_string()), + }; + } + }; + + if timed_out > 0 { + slog::warn!( + &opctx.log, + "audit log timeout: timed out {timed_out} incomplete \ + entries (may indicate Nexus crashes or bugs)"; + "cutoff" => %cutoff, + "limit" => self.max_timed_out_per_activation, + ); + } else { + slog::debug!( + &opctx.log, + "audit log timeout: no stale entries found"; + "cutoff" => %cutoff, + ); + } + + AuditLogTimeoutIncompleteStatus { + timed_out, + cutoff, + max_timed_out_per_activation: self.max_timed_out_per_activation, + error: None, + } + } +} + +impl BackgroundTask for AuditLogTimeoutIncomplete { + fn activate<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> BoxFuture<'a, serde_json::Value> { + Box::pin(async { + let status = self.actually_activate(opctx).await; + match serde_json::to_value(status) { + Ok(val) => val, + Err(err) => { + json!({ "error": format!("failed to serialize status: {err}") }) + } + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeDelta; + use chrono::Utc; + use nexus_db_model::{ + AuditLogActor, AuditLogCompletion, AuditLogEntry, + AuditLogEntryInitParams, AuditLogResultKind, + }; + use nexus_db_queries::db::pub_test_utils::TestDatabase; + use omicron_test_utils::dev; + use uuid::Uuid; + + async fn get_completed_entry( + datastore: &DataStore, + id: Uuid, + ) -> AuditLogEntry { + use async_bb8_diesel::AsyncRunQueryDsl; + use diesel::prelude::*; + use nexus_db_schema::schema::audit_log_complete; + audit_log_complete::table + .filter(audit_log_complete::id.eq(id)) + .select(AuditLogEntry::as_select()) + .first_async(&*datastore.pool_connection_for_tests().await.unwrap()) + .await + .expect("could not load audit log entry") + } + + async fn assert_incomplete(datastore: &DataStore, id: Uuid) { + use async_bb8_diesel::AsyncRunQueryDsl; + use diesel::prelude::*; + use nexus_db_schema::schema::audit_log; + let time_completed: Option> = audit_log::table + .filter(audit_log::id.eq(id)) + .select(audit_log::time_completed) + .first_async(&*datastore.pool_connection_for_tests().await.unwrap()) + .await + .expect("could not load audit log entry"); + assert!( + time_completed.is_none(), + "expected entry {id} to be incomplete, \ + but time_completed = {time_completed:?}" + ); + } + + fn make_entry(request_id: &str) -> AuditLogEntryInitParams { + AuditLogEntryInitParams { + request_id: request_id.to_string(), + operation_id: "project_create".to_string(), + request_uri: "/v1/projects".to_string(), + source_ip: "1.1.1.1".parse().unwrap(), + user_agent: None, + actor: AuditLogActor::Unauthenticated, + auth_method: None, + credential_id: None, + } + } + + async fn set_time_started( + datastore: &DataStore, + id: Uuid, + time_started: chrono::DateTime, + ) { + use async_bb8_diesel::AsyncRunQueryDsl; + use diesel::prelude::*; + use nexus_db_schema::schema::audit_log; + diesel::update(audit_log::table) + .filter(audit_log::id.eq(id)) + .set(audit_log::time_started.eq(time_started)) + .execute_async( + &*datastore.pool_connection_for_tests().await.unwrap(), + ) + .await + .expect("could not set time_started"); + } + + async fn assert_timed_out(datastore: &DataStore, id: Uuid) { + let entry = get_completed_entry(datastore, id).await; + assert_eq!(entry.result_kind, AuditLogResultKind::Timeout); + assert!(entry.http_status_code.is_none()); + assert!(entry.error_code.is_none()); + assert!(entry.error_message.is_none()); + } + + #[tokio::test] + async fn test_audit_log_timeout_incomplete_activation() { + let logctx = + dev::test_setup_log("test_audit_log_timeout_incomplete_activation"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let now = Utc::now(); + + // Create 5 stale incomplete entries at different times in the past. + // The query orders by (time_started, id), so they'll be processed + // oldest first: req-0 through req-4. + let mut stale_ids = Vec::new(); + for i in 0..5u32 { + let age = TimeDelta::try_hours(i64::from(6 - i)).unwrap(); + let params = make_entry(&format!("req-{i}")); + let entry = datastore + .audit_log_entry_init(opctx, params.into()) + .await + .unwrap(); + set_time_started(datastore, entry.id, now - age).await; + stale_ids.push(entry.id); + } + + // Create 1 recent incomplete entry (should not be timed out) + let recent = datastore + .audit_log_entry_init(opctx, make_entry("recent-req").into()) + .await + .unwrap(); + + // Create 1 old completed entry (should not be timed out) + let completed = datastore + .audit_log_entry_init(opctx, make_entry("completed-req").into()) + .await + .unwrap(); + let two_hours_ago = now - TimeDelta::try_hours(2).unwrap(); + set_time_started(datastore, completed.id, two_hours_ago).await; + datastore + .audit_log_entry_complete( + opctx, + &completed, + AuditLogCompletion::Success { http_status_code: 200 }.into(), + ) + .await + .unwrap(); + + // hold onto completed entry so we can check at the end it hasn't changed + let completed_before = + get_completed_entry(&datastore, completed.id).await; + + // max_timed_out_per_activation = 3, so it takes two activations to + // time out all 5 stale entries + let mut task = AuditLogTimeoutIncomplete::new( + datastore.clone(), + Duration::from_secs(3600), + 3, + ); + + // first activation times out the 3 earliest entries + let status = task.actually_activate(opctx).await; + assert_eq!(status.timed_out, 3); + assert!(status.error.is_none()); + for &id in &stale_ids[..3] { + assert_timed_out(&datastore, id).await; + } + for &id in &stale_ids[3..] { + assert_incomplete(&datastore, id).await; + } + assert_incomplete(&datastore, recent.id).await; + + // second activation times out remaining 2 stale + let status = task.actually_activate(opctx).await; + assert_eq!(status.timed_out, 2); + assert!(status.error.is_none()); + for &id in &stale_ids { + assert_timed_out(&datastore, id).await; + } + assert_incomplete(&datastore, recent.id).await; + + // third activation does nothing + let status = task.actually_activate(opctx).await; + assert_eq!(status.timed_out, 0); + assert!(status.error.is_none()); + + // completed entry is unaffected + assert_eq!( + get_completed_entry(&datastore, completed.id).await, + completed_before + ); + + db.terminate().await; + logctx.cleanup_successful(); + } +} diff --git a/nexus/src/app/background/tasks/mod.rs b/nexus/src/app/background/tasks/mod.rs index 5973d762bb6..475c1bc99ae 100644 --- a/nexus/src/app/background/tasks/mod.rs +++ b/nexus/src/app/background/tasks/mod.rs @@ -7,6 +7,7 @@ pub mod abandoned_vmm_reaper; pub mod alert_dispatcher; pub mod attached_subnets; +pub mod audit_log_timeout_incomplete; pub mod bfd; pub mod blueprint_execution; pub mod blueprint_load; diff --git a/nexus/tests/config.test.toml b/nexus/tests/config.test.toml index feea5de2c96..d7e20e5fa77 100644 --- a/nexus/tests/config.test.toml +++ b/nexus/tests/config.test.toml @@ -213,6 +213,9 @@ trust_quorum.period_secs = 60 attached_subnet_manager.period_secs = 60 session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 +audit_log_timeout_incomplete.period_secs = 600 +audit_log_timeout_incomplete.timeout_secs = 14400 +audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 [multicast] # Enable multicast functionality for tests (disabled by default in production) diff --git a/nexus/tests/integration_tests/audit_log.rs b/nexus/tests/integration_tests/audit_log.rs index 6b91a6ea908..f4d08907870 100644 --- a/nexus/tests/integration_tests/audit_log.rs +++ b/nexus/tests/integration_tests/audit_log.rs @@ -2,10 +2,11 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use chrono::{DateTime, Utc}; +use chrono::{DateTime, TimeDelta, Utc}; use dropshot::{ResultsPage, test_util::ClientTestContext}; use http::{Method, StatusCode, header}; use nexus_db_queries::authn::USER_TEST_PRIVILEGED; +use nexus_db_queries::context::OpContext; use nexus_test_utils::http_testing::{AuthnMode, NexusRequest, RequestBuilder}; use nexus_test_utils::resource_helpers::{ DiskTest, create_console_session, create_default_ip_pools, create_disk, @@ -216,6 +217,61 @@ async fn test_audit_log_list(ctx: &ControlPlaneTestContext) { let log = objects_list_page_authz::(client, &url).await; assert_eq!(log.items.len(), 1); assert_eq!(e2.id, log.items[0].id); + + // Test that timed-out audit log entries surface as Unknown in the API. + // Create an incomplete entry directly via the datastore, backdate it, + // time it out, then verify the API returns result = Unknown. + let nexus = &ctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(ctx.logctx.log.new(o!()), datastore.clone()); + + let params = nexus_db_model::AuditLogEntryInitParams { + request_id: "timeout-test-req".to_string(), + operation_id: "project_create".to_string(), + request_uri: "/v1/projects".to_string(), + source_ip: "1.1.1.1".parse().unwrap(), + user_agent: None, + actor: nexus_db_model::AuditLogActor::Unauthenticated, + auth_method: None, + credential_id: None, + }; + let entry = datastore + .audit_log_entry_init(&opctx, params.into()) + .await + .expect("init audit log entry"); + + let two_hours_ago = Utc::now() - TimeDelta::try_hours(2).unwrap(); + + // Backdate time_started so it's older than the cutoff + { + use async_bb8_diesel::AsyncRunQueryDsl; + use diesel::prelude::*; + use nexus_db_schema::schema::audit_log; + diesel::update(audit_log::table) + .filter(audit_log::id.eq(entry.id)) + .set(audit_log::time_started.eq(two_hours_ago)) + .execute_async( + &*datastore.pool_connection_for_tests().await.unwrap(), + ) + .await + .unwrap(); + } + + let cutoff = Utc::now() - TimeDelta::try_hours(1).unwrap(); + let timed_out = datastore + .audit_log_timeout_incomplete(&opctx, cutoff, 100) + .await + .expect("timeout incomplete entries"); + assert_eq!(timed_out, 1); + + let audit_log = fetch_log(client, two_hours_ago, None).await; + let found = audit_log + .items + .iter() + .find(|e| e.request_id == "timeout-test-req") + .expect("timed-out entry should appear in the API"); + assert_eq!(found.result, AuditLogEntryResult::Unknown); } #[nexus_test] diff --git a/nexus/types/src/internal_api/background.rs b/nexus/types/src/internal_api/background.rs index dfe15b5f7b5..a3479ba798e 100644 --- a/nexus/types/src/internal_api/background.rs +++ b/nexus/types/src/internal_api/background.rs @@ -973,6 +973,19 @@ pub struct SledSubnetDetails { pub errors: Vec, } +/// The status of an `audit_log_timeout_incomplete` background task activation. +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub struct AuditLogTimeoutIncompleteStatus { + /// Number of audit log entries timed out in this activation. + pub timed_out: usize, + /// The cutoff time used: entries started before this were eligible. + pub cutoff: DateTime, + /// Configured max rows to time out in this activation. + pub max_timed_out_per_activation: u32, + /// Error encountered during this activation, if any. + pub error: Option, +} + /// The status of a `session_cleanup` background task activation. #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct SessionCleanupStatus { diff --git a/schema/crdb/audit-log-incomplete-timeout/up01.sql b/schema/crdb/audit-log-incomplete-timeout/up01.sql new file mode 100644 index 00000000000..ec1cedcdc4a --- /dev/null +++ b/schema/crdb/audit-log-incomplete-timeout/up01.sql @@ -0,0 +1,4 @@ +-- Supports "find stale incomplete rows ordered by time_started". +CREATE INDEX IF NOT EXISTS audit_log_incomplete_by_time_started + ON omicron.public.audit_log (time_started, id) + WHERE time_completed IS NULL; diff --git a/schema/crdb/audit-log-incomplete-timeout/up01.verify.sql b/schema/crdb/audit-log-incomplete-timeout/up01.verify.sql new file mode 100644 index 00000000000..46b46c75d6b --- /dev/null +++ b/schema/crdb/audit-log-incomplete-timeout/up01.verify.sql @@ -0,0 +1,2 @@ +-- DO NOT EDIT. Generated by test_migration_verification_files. +SELECT CAST(IF((SELECT true WHERE EXISTS (SELECT index_name FROM omicron.crdb_internal.table_indexes WHERE descriptor_name = 'audit_log' AND index_name = 'audit_log_incomplete_by_time_started')),'true','Schema change verification failed: index audit_log_incomplete_by_time_started on table audit_log does not exist') AS BOOL); diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index ae61055b071..eb6d363a1b4 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -6652,6 +6652,11 @@ CREATE UNIQUE INDEX IF NOT EXISTS audit_log_by_time_completed ON omicron.public.audit_log (time_completed, id) WHERE time_completed IS NOT NULL; +-- Supports "find stale incomplete rows ordered by time_started". +CREATE INDEX IF NOT EXISTS audit_log_incomplete_by_time_started + ON omicron.public.audit_log (time_started, id) + WHERE time_completed IS NULL; + -- View of audit log entries that have been "completed". This lets us treat that -- subset of rows as its own table in the data model code. Completing an entry -- means updating the entry after an operation is complete with the result of @@ -8270,7 +8275,7 @@ INSERT INTO omicron.public.db_metadata ( version, target_version ) VALUES - (TRUE, NOW(), NOW(), '240.0.0', NULL) + (TRUE, NOW(), NOW(), '241.0.0', NULL) ON CONFLICT DO NOTHING; COMMIT; diff --git a/smf/nexus/multi-sled/config-partial.toml b/smf/nexus/multi-sled/config-partial.toml index c127f65e51c..bb5af9dfd75 100644 --- a/smf/nexus/multi-sled/config-partial.toml +++ b/smf/nexus/multi-sled/config-partial.toml @@ -123,6 +123,9 @@ trust_quorum.period_secs = 60 attached_subnet_manager.period_secs = 60 session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 +audit_log_timeout_incomplete.period_secs = 600 +audit_log_timeout_incomplete.timeout_secs = 14400 +audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 [default_region_allocation_strategy] # by default, allocate across 3 distinct sleds diff --git a/smf/nexus/single-sled/config-partial.toml b/smf/nexus/single-sled/config-partial.toml index 18ec33bce39..266cc5787a9 100644 --- a/smf/nexus/single-sled/config-partial.toml +++ b/smf/nexus/single-sled/config-partial.toml @@ -123,6 +123,9 @@ multicast_reconciler.period_secs = 60 attached_subnet_manager.period_secs = 60 session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 +audit_log_timeout_incomplete.period_secs = 600 +audit_log_timeout_incomplete.timeout_secs = 14400 +audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 [default_region_allocation_strategy] # by default, allocate without requirement for distinct sleds.