From 8f06de6b402e5b41495628adda6284f353782df1 Mon Sep 17 00:00:00 2001 From: David Crespo Date: Tue, 10 Mar 2026 10:19:13 -0500 Subject: [PATCH 1/8] add-background-task claude skill, fix flaky omdb test --- .claude/skills/add-background-task/SKILL.md | 95 +++++++++++++++++++++ dev-tools/omdb/src/bin/omdb/nexus.rs | 13 +++ dev-tools/omdb/tests/successes.out | 4 +- dev-tools/omdb/tests/test_all_output.rs | 4 + 4 files changed, 114 insertions(+), 2 deletions(-) create mode 100644 .claude/skills/add-background-task/SKILL.md diff --git a/.claude/skills/add-background-task/SKILL.md b/.claude/skills/add-background-task/SKILL.md new file mode 100644 index 00000000000..423e0b18318 --- /dev/null +++ b/.claude/skills/add-background-task/SKILL.md @@ -0,0 +1,95 @@ +--- +name: add-background-task +description: Add a new Nexus background task. Use when the user wants to create a periodic background task in Nexus that runs on a timer. +--- + +# Add a Nexus background task + +All background tasks live in Nexus. A task implements the `BackgroundTask` trait (`nexus/src/app/background/mod.rs`), runs on a configurable period, and reports status as `serde_json::Value`. + +## General approach + +There are many existing background tasks in `nexus/src/app/background/tasks/`. Before writing anything, read a few tasks that are similar in shape to the one you're adding (e.g., a simple periodic cleanup vs. a task that watches a channel). Use those as models for structure, naming, logging, error handling, and status reporting. The goal is to conform to the patterns already in use, not to invent new ones. + +## Checklist + +These are the touch points for adding a new background task. Follow them in order. + +### 1. Status type (`nexus/types/src/internal_api/background.rs`) + +Define a struct for the task's activation status. Derive `Clone, Debug, Deserialize, Serialize, PartialEq, Eq`. For errors, use `Option` if the task can only fail in one way per activation, or `Vec` if it accumulates multiple independent errors. Match what similar tasks do. + +### 2. Task implementation (`nexus/src/app/background/tasks/.rs`) + +Create the task module. The struct holds whatever state it needs (typically `Arc` plus config). Implement `BackgroundTask::activate` by delegating to an `async fn actually_activate(&mut self, opctx) -> YourStatus` method, then serialize the status to `serde_json::Value`. The `actually_activate` pattern makes unit testing easy without going through the trait. + +Logging conventions: `debug` when there's nothing to do, `info` when routine work was done, `warn` when the work done indicates something is wrong (e.g., cleaning up after a crash), `error` on failure. + +Include unit tests in the same file using `TestDatabase::new_with_datastore`. Tests call `actually_activate` directly. + +### 3. Register the module (`nexus/src/app/background/tasks/mod.rs`) + +Add `pub mod ;` in alphabetical order. + +### 4. Activator (`nexus/background-task-interface/src/init.rs`) + +Add `pub task_: Activator` to the `BackgroundTasks` struct, maintaining alphabetical order among the task fields. + +### 5. Config (`nexus-config/src/nexus_config.rs`) + +Add a config struct (e.g., `YourTaskConfig`) with at minimum `period_secs: Duration` (using `#[serde_as(as = "DurationSeconds")]`). If the task does bounded work per activation, name the limit field `max__per_activation` (e.g., `max_delete_per_activation`, `max_update_per_activation`) to match existing conventions. Add the field to `BackgroundTaskConfig`. Update the test config literal and expected parse output at the bottom of the file. + +### 6. Config files + +Add the new config fields to all of these: +- `nexus/examples/config.toml` +- `nexus/examples/config-second.toml` +- `nexus/tests/config.test.toml` +- `smf/nexus/single-sled/config-partial.toml` +- `smf/nexus/multi-sled/config-partial.toml` + +### 7. Wire up in `nexus/src/app/background/init.rs` + +- Import the task module. +- Add `Activator::new()` in the `BackgroundTasks` constructor. +- Destructure it in the `start` method. +- Call `driver.register(TaskDefinition { ... })` with the task. The last task registered should pass `datastore` by move (not `.clone()`), so adjust the previous last task if needed. +- If extra data is needed from `BackgroundTasksData`, add the field there and plumb it from `nexus/src/app/mod.rs`. + +### 8. Schema migration (if needed) + +If the task needs a new index or schema change to support its query, add a migration under `schema/crdb/`. See `schema/crdb/README.adoc` for the procedure. Also update `dbinit.sql` and bump the version in `nexus/db-model/src/schema_versions.rs`. + +### 9. Datastore method (if needed) + +If the task needs a new query, add it in the appropriate `nexus/db-queries/src/db/datastore/` file. Add a test in the same file or in `nexus/db-queries/src/db/datastore/mod.rs`. + +If the task modifies rows that other code paths also modify, think about races: what happens if both run concurrently on the same row? Both paths should typically guard their writes so only one succeeds. + +### 10. omdb output (`dev-tools/omdb/src/bin/omdb/nexus.rs`) + +Add a `print_task_` function and wire it into the match in `print_task_details` (alphabetical order). Import the status type. Use the `const_max_len` + `WIDTH` pattern to align columns: + +```rust +const LABEL: &str = "label:"; +const WIDTH: usize = const_max_len(&[LABEL, ...]) + 1; +println!(" {LABEL: { print_task_read_only_region_replacement_start(details); } + "reconfigurator_config_watcher" => { + print_task_reconfigurator_config_watcher(details); + } "region_replacement" => { print_task_region_replacement(details); } @@ -2318,6 +2321,16 @@ fn print_task_read_only_region_replacement_start(details: &serde_json::Value) { } } +fn print_task_reconfigurator_config_watcher(details: &serde_json::Value) { + match details.get("config_updated").and_then(|v| v.as_bool()) { + Some(updated) => println!(" config updated: {updated}"), + None => eprintln!( + "warning: failed to interpret task details: {:?}", + details + ), + } +} + fn print_task_region_replacement(details: &serde_json::Value) { match serde_json::from_value::(details.clone()) { Err(error) => eprintln!( diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index e434e5e2b89..d42b0a7c975 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -771,7 +771,7 @@ task: "reconfigurator_config_watcher" configured period: every s last completed activation: , triggered by started at (s ago) and ran for ms -warning: unknown background task: "reconfigurator_config_watcher" (don't know how to interpret details: Object {"config_updated": Bool(false)}) + config updated: task: "region_replacement" configured period: every days h m s @@ -1366,7 +1366,7 @@ task: "reconfigurator_config_watcher" configured period: every s last completed activation: , triggered by started at (s ago) and ran for ms -warning: unknown background task: "reconfigurator_config_watcher" (don't know how to interpret details: Object {"config_updated": Bool(false)}) + config updated: task: "region_replacement" configured period: every days h m s diff --git a/dev-tools/omdb/tests/test_all_output.rs b/dev-tools/omdb/tests/test_all_output.rs index cd1e4e15fd1..624bd71f794 100644 --- a/dev-tools/omdb/tests/test_all_output.rs +++ b/dev-tools/omdb/tests/test_all_output.rs @@ -333,6 +333,10 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) { redactor.extra_variable_length("cockroachdb_version", &crdb_version); } + // The `reconfigurator_config_watcher` task's output depends on + // whether it has had time to complete an activation. + redactor.field("config updated:", r"\w+"); + // The `tuf_artifact_replication` task's output depends on how // many sleds happened to register with Nexus before its first // execution. These redactions work around the issue described in From b4bf111a89f2bbee58ade94e2bd19759d0e221de Mon Sep 17 00:00:00 2001 From: David Crespo Date: Tue, 10 Mar 2026 12:43:42 -0500 Subject: [PATCH 2/8] background task to time out incomplete audit logs --- dev-tools/omdb/src/bin/omdb/nexus.rs | 36 +++ dev-tools/omdb/tests/env.out | 15 + dev-tools/omdb/tests/successes.out | 21 ++ nexus-config/src/nexus_config.rs | 29 ++ nexus/background-task-interface/src/init.rs | 1 + nexus/db-model/src/audit_log.rs | 10 +- nexus/db-model/src/schema_versions.rs | 3 +- .../db-queries/src/db/datastore/audit_log.rs | 302 +++++++++++++++++- nexus/examples/config-second.toml | 3 + nexus/examples/config.toml | 3 + nexus/src/app/background/init.rs | 24 +- .../tasks/audit_log_timeout_incomplete.rs | 253 +++++++++++++++ nexus/src/app/background/tasks/mod.rs | 1 + nexus/tests/config.test.toml | 3 + nexus/tests/integration_tests/audit_log.rs | 58 +++- nexus/types/src/internal_api/background.rs | 13 + .../audit-log-incomplete-timeout/up01.sql | 4 + .../up01.verify.sql | 2 + schema/crdb/dbinit.sql | 7 +- smf/nexus/multi-sled/config-partial.toml | 3 + smf/nexus/single-sled/config-partial.toml | 3 + 21 files changed, 782 insertions(+), 12 deletions(-) create mode 100644 nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs create mode 100644 schema/crdb/audit-log-incomplete-timeout/up01.sql create mode 100644 schema/crdb/audit-log-incomplete-timeout/up01.verify.sql diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index 33765cb4683..9914734ab6e 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -52,6 +52,7 @@ use nexus_types::deployment::OximeterReadPolicy; use nexus_types::fm; use nexus_types::internal_api::background::AbandonedVmmReaperStatus; use nexus_types::internal_api::background::AttachedSubnetManagerStatus; +use nexus_types::internal_api::background::AuditLogTimeoutIncompleteStatus; use nexus_types::internal_api::background::BlueprintPlannerStatus; use nexus_types::internal_api::background::BlueprintRendezvousStats; use nexus_types::internal_api::background::BlueprintRendezvousStatus; @@ -1220,6 +1221,9 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { "attached_subnet_manager" => { print_task_attached_subnet_manager_status(details); } + "audit_log_timeout_incomplete" => { + print_task_audit_log_timeout_incomplete(details); + } "blueprint_planner" => { print_task_blueprint_planner(details); } @@ -2679,6 +2683,38 @@ fn print_task_saga_recovery(details: &serde_json::Value) { } } +fn print_task_audit_log_timeout_incomplete(details: &serde_json::Value) { + match serde_json::from_value::( + details.clone(), + ) { + Err(error) => eprintln!( + "warning: failed to interpret task details: {:?}: {:?}", + error, details + ), + Ok(status) => { + const TIMED_OUT: &str = "timed_out:"; + const CUTOFF: &str = "cutoff:"; + const MAX_UPDATE: &str = "max_update_per_activation:"; + const ERROR: &str = "error:"; + const WIDTH: usize = + const_max_len(&[TIMED_OUT, CUTOFF, MAX_UPDATE, ERROR]) + 1; + + println!(" {TIMED_OUT:(details.clone()) { Err(error) => eprintln!( diff --git a/dev-tools/omdb/tests/env.out b/dev-tools/omdb/tests/env.out index eaeca47b768..0c4cd7deff9 100644 --- a/dev-tools/omdb/tests/env.out +++ b/dev-tools/omdb/tests/env.out @@ -38,6 +38,11 @@ task: "attached_subnet_manager" distributes attached subnets to sleds and switch +task: "audit_log_timeout_incomplete" + transitions stale incomplete audit log entries to timeout status so they + become visible in the audit log + + task: "bfd_manager" Manages bidirectional fowarding detection (BFD) configuration on rack switches @@ -283,6 +288,11 @@ task: "attached_subnet_manager" distributes attached subnets to sleds and switch +task: "audit_log_timeout_incomplete" + transitions stale incomplete audit log entries to timeout status so they + become visible in the audit log + + task: "bfd_manager" Manages bidirectional fowarding detection (BFD) configuration on rack switches @@ -515,6 +525,11 @@ task: "attached_subnet_manager" distributes attached subnets to sleds and switch +task: "audit_log_timeout_incomplete" + transitions stale incomplete audit log entries to timeout status so they + become visible in the audit log + + task: "bfd_manager" Manages bidirectional fowarding detection (BFD) configuration on rack switches diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index d42b0a7c975..c5c8a66980b 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -273,6 +273,11 @@ task: "attached_subnet_manager" distributes attached subnets to sleds and switch +task: "audit_log_timeout_incomplete" + transitions stale incomplete audit log entries to timeout status so they + become visible in the audit log + + task: "bfd_manager" Manages bidirectional fowarding detection (BFD) configuration on rack switches @@ -588,6 +593,14 @@ task: "attached_subnet_manager" no dendrite instances found no sleds found +task: "audit_log_timeout_incomplete" + configured period: every m + last completed activation: , triggered by + started at (s ago) and ran for ms + timed_out: 0 + cutoff: + max_update_per_activation: 1000 + task: "bfd_manager" configured period: every s last completed activation: , triggered by @@ -1183,6 +1196,14 @@ task: "attached_subnet_manager" no dendrite instances found no sleds found +task: "audit_log_timeout_incomplete" + configured period: every m + last completed activation: , triggered by + started at (s ago) and ran for ms + timed_out: 0 + cutoff: + max_update_per_activation: 1000 + task: "bfd_manager" configured period: every s last completed activation: , triggered by diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index eb6a9fd2458..bccd74ad1fb 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -437,6 +437,8 @@ pub struct BackgroundTaskConfig { pub attached_subnet_manager: AttachedSubnetManagerConfig, /// configuration for console session cleanup task pub session_cleanup: SessionCleanupConfig, + /// configuration for audit log incomplete timeout task + pub audit_log_timeout_incomplete: AuditLogTimeoutIncompleteConfig, } #[serde_as] @@ -450,6 +452,21 @@ pub struct SessionCleanupConfig { pub max_delete_per_activation: u32, } +#[serde_as] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct AuditLogTimeoutIncompleteConfig { + /// period (in seconds) for periodic activations of this task + #[serde_as(as = "DurationSeconds")] + pub period_secs: Duration, + + /// how old an incomplete entry must be before it is timed out + #[serde_as(as = "DurationSeconds")] + pub timeout_secs: Duration, + + /// max rows per SQL statement + pub max_update_per_activation: u32, +} + #[serde_as] #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct DnsTasksConfig { @@ -1267,6 +1284,9 @@ mod test { attached_subnet_manager.period_secs = 60 session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 + audit_log_timeout_incomplete.period_secs = 600 + audit_log_timeout_incomplete.timeout_secs = 14400 + audit_log_timeout_incomplete.max_update_per_activation = 1000 [default_region_allocation_strategy] type = "random" seed = 0 @@ -1534,6 +1554,12 @@ mod test { period_secs: Duration::from_secs(300), max_delete_per_activation: 10_000, }, + audit_log_timeout_incomplete: + AuditLogTimeoutIncompleteConfig { + period_secs: Duration::from_secs(600), + timeout_secs: Duration::from_secs(14400), + max_update_per_activation: 1000, + }, }, multicast: MulticastConfig { enabled: false }, default_region_allocation_strategy: @@ -1641,6 +1667,9 @@ mod test { attached_subnet_manager.period_secs = 60 session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 + audit_log_timeout_incomplete.period_secs = 600 + audit_log_timeout_incomplete.timeout_secs = 14400 + audit_log_timeout_incomplete.max_update_per_activation = 1000 [default_region_allocation_strategy] type = "random" diff --git a/nexus/background-task-interface/src/init.rs b/nexus/background-task-interface/src/init.rs index e3a0e4f5343..deb702a414f 100644 --- a/nexus/background-task-interface/src/init.rs +++ b/nexus/background-task-interface/src/init.rs @@ -37,6 +37,7 @@ pub struct BackgroundTasks { pub task_instance_reincarnation: Activator, pub task_service_firewall_propagation: Activator, pub task_abandoned_vmm_reaper: Activator, + pub task_audit_log_timeout_incomplete: Activator, pub task_vpc_route_manager: Activator, pub task_saga_recovery: Activator, pub task_lookup_region_port: Activator, diff --git a/nexus/db-model/src/audit_log.rs b/nexus/db-model/src/audit_log.rs index f8a794ba12d..b914db1c705 100644 --- a/nexus/db-model/src/audit_log.rs +++ b/nexus/db-model/src/audit_log.rs @@ -276,15 +276,15 @@ pub enum AuditLogCompletion { /// error, and I don't think we even have API timeouts) but rather that the /// attempts to complete the log entry failed (or were never even attempted /// because, e.g., Nexus crashed during the operation), and this entry had - /// to be cleaned up later by a background job (which doesn't exist yet) - /// after a timeout. Note we represent this result status as "Unknown" in - /// the external API because timeout is an implementation detail and makes - /// it sound like the operation timed out. + /// to be cleaned up later by a background job after a timeout. Note we + /// represent this result status as "Unknown" in the external API because + /// timeout is an implementation detail and makes it sound like the + /// operation timed out. Timeout, } #[derive(AsChangeset, Clone)] -#[diesel(table_name = audit_log)] +#[diesel(table_name = audit_log, treat_none_as_null = true)] pub struct AuditLogCompletionUpdate { pub time_completed: DateTime, pub result_kind: AuditLogResultKind, diff --git a/nexus/db-model/src/schema_versions.rs b/nexus/db-model/src/schema_versions.rs index 28d548d9ca2..f03e6405a93 100644 --- a/nexus/db-model/src/schema_versions.rs +++ b/nexus/db-model/src/schema_versions.rs @@ -16,7 +16,7 @@ use std::{collections::BTreeMap, sync::LazyLock}; /// /// This must be updated when you change the database schema. Refer to /// schema/crdb/README.adoc in the root of this repository for details. -pub const SCHEMA_VERSION: Version = Version::new(237, 0, 0); +pub const SCHEMA_VERSION: Version = Version::new(238, 0, 0); /// List of all past database schema versions, in *reverse* order /// @@ -28,6 +28,7 @@ static KNOWN_VERSIONS: LazyLock> = LazyLock::new(|| { // | leaving the first copy as an example for the next person. // v // KnownVersion::new(next_int, "unique-dirname-with-the-sql-files"), + KnownVersion::new(238, "audit-log-incomplete-timeout"), KnownVersion::new(237, "switch-slot-enum"), KnownVersion::new( 236, diff --git a/nexus/db-queries/src/db/datastore/audit_log.rs b/nexus/db-queries/src/db/datastore/audit_log.rs index 5d983e8f178..a6fd2ef85e4 100644 --- a/nexus/db-queries/src/db/datastore/audit_log.rs +++ b/nexus/db-queries/src/db/datastore/audit_log.rs @@ -6,6 +6,7 @@ use super::DataStore; use crate::authz; use crate::context::OpContext; use crate::db; +use crate::db::model::AuditLogCompletion; use crate::db::model::AuditLogCompletionUpdate; use crate::db::model::AuditLogEntry; use crate::db::model::AuditLogEntryInit; @@ -106,7 +107,14 @@ impl DataStore { }) } - // set duration and result on an existing entry + /// Complete an audit log entry with the request's outcome. + /// + /// The `time_completed IS NULL` filter is critical: once an entry has been + /// completed — whether by the normal request path or by the background + /// timeout task — it is part of the immutable audit log. Overwriting it + /// would change its `time_completed` timestamp, shifting its position in + /// the `(time_completed, id)` pagination order and violating the guarantee + /// that past query results are stable. pub async fn audit_log_entry_complete( &self, opctx: &OpContext, @@ -115,14 +123,63 @@ impl DataStore { ) -> UpdateResult<()> { use nexus_db_schema::schema::audit_log; opctx.authorize(authz::Action::CreateChild, &authz::AUDIT_LOG).await?; - diesel::update(audit_log::table) + let rows = diesel::update(audit_log::table) .filter(audit_log::id.eq(entry.id)) + .filter(audit_log::time_completed.is_null()) .set(completion) .execute_async(&*self.pool_connection_authorized(opctx).await?) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + if rows == 0 { + warn!( + opctx.log, + "audit log entry already completed \ + (likely timed out by background task)"; + "entry_id" => %entry.id, + ); + } Ok(()) } + + /// Transition stale incomplete audit log rows to `result_kind = timeout`. + /// + /// Finds up to `limit` rows where `time_completed IS NULL` and + /// `time_started < cutoff`, atomically sets them to the timeout + /// completion state, and returns the number of rows updated. + pub async fn audit_log_timeout_incomplete( + &self, + opctx: &OpContext, + cutoff: DateTime, + limit: u32, + ) -> Result { + use nexus_db_schema::schema::audit_log; + opctx.authorize(authz::Action::CreateChild, &authz::AUDIT_LOG).await?; + + // Diesel's ValidSubselect doesn't allow a subquery from the same + // table being updated, so we alias audit_log for the subquery. + let audit_log_sub = + diesel::alias!(nexus_db_schema::schema::audit_log as audit_log_sub); + + let subquery = audit_log_sub + .filter(audit_log_sub.field(audit_log::time_completed).is_null()) + .filter(audit_log_sub.field(audit_log::time_started).lt(cutoff)) + .order(( + audit_log_sub.field(audit_log::time_started), + audit_log_sub.field(audit_log::id), + )) + .limit(i64::from(limit)) + .select(audit_log_sub.field(audit_log::id)); + + let completion: AuditLogCompletionUpdate = + AuditLogCompletion::Timeout.into(); + + diesel::update(audit_log::table) + .filter(audit_log::id.eq_any(subquery)) + .set(completion) + .execute_async(&*self.pool_connection_authorized(opctx).await?) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } #[cfg(test)] @@ -130,8 +187,9 @@ mod tests { use super::*; use crate::db::pub_test_utils::TestDatabase; use assert_matches::assert_matches; + use chrono::TimeDelta; use nexus_db_model::{ - AuditLogActor, AuditLogCompletion, AuditLogEntryInitParams, + AuditLogActor, AuditLogEntryInitParams, AuditLogResultKind, }; use omicron_common::api::external::Error; use omicron_test_utils::dev; @@ -371,4 +429,242 @@ mod tests { db.terminate().await; logctx.cleanup_successful(); } + + fn make_entry_params(request_id: &str) -> AuditLogEntryInitParams { + AuditLogEntryInitParams { + request_id: request_id.to_string(), + operation_id: "project_create".to_string(), + request_uri: "/v1/projects".to_string(), + source_ip: "1.1.1.1".parse().unwrap(), + user_agent: None, + actor: AuditLogActor::Unauthenticated, + auth_method: None, + credential_id: None, + } + } + + /// Force time_started into the past for an audit log entry so we can + /// test the timeout logic without waiting. + async fn set_time_started( + datastore: &DataStore, + id: Uuid, + time_started: DateTime, + ) { + use diesel::sql_types; + diesel::sql_query( + "UPDATE omicron.public.audit_log \ + SET time_started = $1 WHERE id = $2", + ) + .bind::(time_started) + .bind::(id) + .execute_async(&*datastore.pool_connection_for_tests().await.unwrap()) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_audit_log_timeout_incomplete() { + let logctx = dev::test_setup_log("test_audit_log_timeout_incomplete"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let now = Utc::now(); + let two_hours_ago = now - TimeDelta::try_hours(2).unwrap(); + let cutoff = now - TimeDelta::try_hours(1).unwrap(); + + // Create an incomplete entry with time_started in the past + let old_entry = datastore + .audit_log_entry_init(opctx, make_entry_params("old-req").into()) + .await + .unwrap(); + set_time_started(datastore, old_entry.id, two_hours_ago).await; + + // Create an incomplete entry that's recent (should not be timed out) + let _recent_entry = datastore + .audit_log_entry_init(opctx, make_entry_params("recent-req").into()) + .await + .unwrap(); + + // Create an old entry that's already completed (should not be touched) + let completed_entry = datastore + .audit_log_entry_init( + opctx, + make_entry_params("completed-req").into(), + ) + .await + .unwrap(); + set_time_started(datastore, completed_entry.id, two_hours_ago).await; + datastore + .audit_log_entry_complete( + opctx, + &completed_entry, + AuditLogCompletion::Success { http_status_code: 200 }.into(), + ) + .await + .unwrap(); + + // Run the timeout + let timed_out = datastore + .audit_log_timeout_incomplete(opctx, cutoff, 100) + .await + .unwrap(); + + // Only the old incomplete entry should be affected + assert_eq!(timed_out, 1); + + // The timed-out entry should now appear in the audit log with + // result_kind = timeout + let pagparams = DataPageParams { + marker: None, + limit: NonZeroU32::new(100).unwrap(), + direction: dropshot::PaginationOrder::Ascending, + }; + let entries = datastore + .audit_log_list(opctx, &pagparams, two_hours_ago, None) + .await + .unwrap(); + + let timed_out_entry = + entries.iter().find(|e| e.id == old_entry.id).unwrap(); + assert_eq!(timed_out_entry.result_kind, AuditLogResultKind::Timeout); + assert!(timed_out_entry.http_status_code.is_none()); + assert!(timed_out_entry.error_code.is_none()); + assert!(timed_out_entry.error_message.is_none()); + + // The completed entry should still be success + let completed = + entries.iter().find(|e| e.id == completed_entry.id).unwrap(); + assert_eq!(completed.result_kind, AuditLogResultKind::Success); + + // Running again should find nothing (idempotent) + let timed_out = datastore + .audit_log_timeout_incomplete(opctx, cutoff, 100) + .await + .unwrap(); + assert_eq!(timed_out, 0); + + db.terminate().await; + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_audit_log_timeout_respects_limit() { + let logctx = + dev::test_setup_log("test_audit_log_timeout_respects_limit"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let now = Utc::now(); + let two_hours_ago = now - TimeDelta::try_hours(2).unwrap(); + let cutoff = now - TimeDelta::try_hours(1).unwrap(); + + // Create 5 stale incomplete entries + for i in 0..5 { + let entry = datastore + .audit_log_entry_init( + opctx, + make_entry_params(&format!("req-{i}")).into(), + ) + .await + .unwrap(); + set_time_started(datastore, entry.id, two_hours_ago).await; + } + + // Timeout with limit of 2 + let batch1 = datastore + .audit_log_timeout_incomplete(opctx, cutoff, 2) + .await + .unwrap(); + assert_eq!(batch1, 2); + + let batch2 = datastore + .audit_log_timeout_incomplete(opctx, cutoff, 2) + .await + .unwrap(); + assert_eq!(batch2, 2); + + let batch3 = datastore + .audit_log_timeout_incomplete(opctx, cutoff, 2) + .await + .unwrap(); + assert_eq!(batch3, 1); + + // All gone + let batch4 = datastore + .audit_log_timeout_incomplete(opctx, cutoff, 2) + .await + .unwrap(); + assert_eq!(batch4, 0); + + db.terminate().await; + logctx.cleanup_successful(); + } + + /// A late completion of an already-timed-out entry should be a no-op. + /// Once an entry is timed out and visible in the audit log, it's part + /// of the immutable record. Overwriting it would change its timestamp + /// and violate the guarantee that the past doesn't change. + #[tokio::test] + async fn test_audit_log_timeout_then_late_completion() { + let logctx = + dev::test_setup_log("test_audit_log_timeout_then_late_completion"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let now = Utc::now(); + let two_hours_ago = now - TimeDelta::try_hours(2).unwrap(); + let cutoff = now - TimeDelta::try_hours(1).unwrap(); + + let entry = datastore + .audit_log_entry_init(opctx, make_entry_params("late-req").into()) + .await + .unwrap(); + set_time_started(datastore, entry.id, two_hours_ago).await; + + // Timeout the entry + let timed_out = datastore + .audit_log_timeout_incomplete(opctx, cutoff, 100) + .await + .unwrap(); + assert_eq!(timed_out, 1); + + // Record the timed-out state + let pagparams = DataPageParams { + marker: None, + limit: NonZeroU32::new(100).unwrap(), + direction: dropshot::PaginationOrder::Ascending, + }; + let entries = datastore + .audit_log_list(opctx, &pagparams, two_hours_ago, None) + .await + .unwrap(); + let timed_out_entry = + entries.iter().find(|e| e.id == entry.id).unwrap(); + assert_eq!(timed_out_entry.result_kind, AuditLogResultKind::Timeout); + let timeout_completed_at = timed_out_entry.time_completed; + + // Now a late completion arrives — it should be a no-op because + // the entry is already completed (as timeout) + datastore + .audit_log_entry_complete( + opctx, + &entry, + AuditLogCompletion::Success { http_status_code: 200 }.into(), + ) + .await + .expect("late completion should not error"); + + // The entry should still show as timeout, unchanged + let entries = datastore + .audit_log_list(opctx, &pagparams, two_hours_ago, None) + .await + .unwrap(); + let found = entries.iter().find(|e| e.id == entry.id).unwrap(); + assert_eq!(found.result_kind, AuditLogResultKind::Timeout); + assert_eq!(found.time_completed, timeout_completed_at); + assert!(found.http_status_code.is_none()); + + db.terminate().await; + logctx.cleanup_successful(); + } } diff --git a/nexus/examples/config-second.toml b/nexus/examples/config-second.toml index 447a74a4d3d..7a66c8fd616 100644 --- a/nexus/examples/config-second.toml +++ b/nexus/examples/config-second.toml @@ -191,6 +191,9 @@ trust_quorum.period_secs = 60 attached_subnet_manager.period_secs = 60 session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 +audit_log_timeout_incomplete.period_secs = 600 +audit_log_timeout_incomplete.timeout_secs = 14400 +audit_log_timeout_incomplete.max_update_per_activation = 1000 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/examples/config.toml b/nexus/examples/config.toml index ff102e670b1..8071192d3c1 100644 --- a/nexus/examples/config.toml +++ b/nexus/examples/config.toml @@ -175,6 +175,9 @@ trust_quorum.period_secs = 60 attached_subnet_manager.period_secs = 60 session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 +audit_log_timeout_incomplete.period_secs = 600 +audit_log_timeout_incomplete.timeout_secs = 14400 +audit_log_timeout_incomplete.max_update_per_activation = 1000 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index a444a2f5eed..4dd7bb9804b 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -92,6 +92,7 @@ use super::driver::TaskDefinition; use super::tasks::abandoned_vmm_reaper; use super::tasks::alert_dispatcher::AlertDispatcher; use super::tasks::attached_subnets; +use super::tasks::audit_log_timeout_incomplete; use super::tasks::bfd; use super::tasks::blueprint_execution; use super::tasks::blueprint_load; @@ -246,6 +247,7 @@ impl BackgroundTasksInitializer { task_instance_reincarnation: Activator::new(), task_service_firewall_propagation: Activator::new(), task_abandoned_vmm_reaper: Activator::new(), + task_audit_log_timeout_incomplete: Activator::new(), task_vpc_route_manager: Activator::new(), task_saga_recovery: Activator::new(), task_lookup_region_port: Activator::new(), @@ -359,6 +361,7 @@ impl BackgroundTasksInitializer { task_trust_quorum_manager, task_attached_subnet_manager, task_session_cleanup, + task_audit_log_timeout_incomplete, // Add new background tasks here. Be sure to use this binding in a // call to `Driver::register()` below. That's what actually wires // up the Activator to the corresponding background task. @@ -1177,7 +1180,7 @@ impl BackgroundTasksInitializer { absolute timeout", period: config.session_cleanup.period_secs, task_impl: Box::new(session_cleanup::SessionCleanup::new( - datastore, + datastore.clone(), args.console_session_absolute_timeout, config.session_cleanup.max_delete_per_activation, )), @@ -1186,6 +1189,25 @@ impl BackgroundTasksInitializer { activator: task_session_cleanup, }); + driver.register(TaskDefinition { + name: "audit_log_timeout_incomplete", + description: "transitions stale incomplete audit log entries to \ + timeout status so they become visible in the audit log", + period: config.audit_log_timeout_incomplete.period_secs, + task_impl: Box::new( + audit_log_timeout_incomplete::AuditLogTimeoutIncomplete::new( + datastore, + config.audit_log_timeout_incomplete.timeout_secs, + config + .audit_log_timeout_incomplete + .max_update_per_activation, + ), + ), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_audit_log_timeout_incomplete, + }); + driver } } diff --git a/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs b/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs new file mode 100644 index 00000000000..b279a2a6ac9 --- /dev/null +++ b/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs @@ -0,0 +1,253 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Background task that transitions stale incomplete audit log entries to +//! `result_kind = timeout`, making them visible through the audit log API. + +use crate::app::background::BackgroundTask; +use chrono::TimeDelta; +use chrono::Utc; +use futures::future::BoxFuture; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::DataStore; +use nexus_types::internal_api::background::AuditLogTimeoutIncompleteStatus; +use serde_json::json; +use std::sync::Arc; +use std::time::Duration; + +pub struct AuditLogTimeoutIncomplete { + datastore: Arc, + timeout: Duration, + max_update_per_activation: u32, +} + +impl AuditLogTimeoutIncomplete { + pub fn new( + datastore: Arc, + timeout: Duration, + max_update_per_activation: u32, + ) -> Self { + Self { datastore, timeout, max_update_per_activation } + } + + pub(crate) async fn actually_activate( + &mut self, + opctx: &OpContext, + ) -> AuditLogTimeoutIncompleteStatus { + let timeout_delta = match TimeDelta::from_std(self.timeout) { + Ok(d) => d, + Err(e) => { + let msg = format!("invalid timeout duration: {e:#}"); + slog::error!(&opctx.log, "{msg}"); + return AuditLogTimeoutIncompleteStatus { + timed_out: 0, + cutoff: Utc::now(), + max_update_per_activation: self.max_update_per_activation, + error: Some(msg), + }; + } + }; + + let cutoff = Utc::now() - timeout_delta; + let timed_out = match self + .datastore + .audit_log_timeout_incomplete( + opctx, + cutoff, + self.max_update_per_activation, + ) + .await + { + Ok(count) => count, + Err(err) => { + let msg = + format!("audit log timeout incomplete failed: {err:#}"); + slog::error!(&opctx.log, "{msg}"); + return AuditLogTimeoutIncompleteStatus { + timed_out: 0, + cutoff, + max_update_per_activation: self.max_update_per_activation, + error: Some(msg), + }; + } + }; + + if timed_out > 0 { + slog::warn!( + &opctx.log, + "audit log timeout: timed out {timed_out} incomplete \ + entries (may indicate Nexus crashes or bugs)"; + "cutoff" => %cutoff, + "limit" => self.max_update_per_activation, + ); + } else { + slog::debug!( + &opctx.log, + "audit log timeout: no stale entries found"; + "cutoff" => %cutoff, + ); + } + + AuditLogTimeoutIncompleteStatus { + timed_out, + cutoff, + max_update_per_activation: self.max_update_per_activation, + error: None, + } + } +} + +impl BackgroundTask for AuditLogTimeoutIncomplete { + fn activate<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> BoxFuture<'a, serde_json::Value> { + Box::pin(async { + let status = self.actually_activate(opctx).await; + match serde_json::to_value(status) { + Ok(val) => val, + Err(err) => { + json!({ "error": format!("failed to serialize status: {err}") }) + } + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeDelta; + use chrono::Utc; + use diesel::sql_types; + use nexus_db_model::{AuditLogActor, AuditLogEntryInitParams}; + use nexus_db_queries::db::pub_test_utils::TestDatabase; + use omicron_test_utils::dev; + use uuid::Uuid; + + fn make_entry_params(request_id: &str) -> AuditLogEntryInitParams { + AuditLogEntryInitParams { + request_id: request_id.to_string(), + operation_id: "project_create".to_string(), + request_uri: "/v1/projects".to_string(), + source_ip: "1.1.1.1".parse().unwrap(), + user_agent: None, + actor: AuditLogActor::Unauthenticated, + auth_method: None, + credential_id: None, + } + } + + async fn set_time_started( + datastore: &DataStore, + id: Uuid, + time_started: chrono::DateTime, + ) { + use async_bb8_diesel::AsyncRunQueryDsl; + diesel::sql_query( + "UPDATE omicron.public.audit_log \ + SET time_started = $1 WHERE id = $2", + ) + .bind::(time_started) + .bind::(id) + .execute_async(&*datastore.pool_connection_for_tests().await.unwrap()) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_audit_log_timeout_incomplete_activation() { + let logctx = + dev::test_setup_log("test_audit_log_timeout_incomplete_activation"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let now = Utc::now(); + let two_hours_ago = now - TimeDelta::try_hours(2).unwrap(); + + // Create 3 stale incomplete entries + for i in 0..3 { + let entry = datastore + .audit_log_entry_init( + opctx, + make_entry_params(&format!("req-{i}")).into(), + ) + .await + .unwrap(); + set_time_started(datastore, entry.id, two_hours_ago).await; + } + + // Create 1 recent incomplete entry (should not be timed out) + datastore + .audit_log_entry_init(opctx, make_entry_params("recent-req").into()) + .await + .unwrap(); + + // timeout = 1 hour, max_update_per_activation = 100 + let mut task = AuditLogTimeoutIncomplete::new( + datastore.clone(), + Duration::from_secs(3600), + 100, + ); + + let status = task.actually_activate(opctx).await; + assert_eq!(status.timed_out, 3); + assert!(status.error.is_none()); + + // Second activation should find nothing + let status = task.actually_activate(opctx).await; + assert_eq!(status.timed_out, 0); + assert!(status.error.is_none()); + + db.terminate().await; + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_audit_log_timeout_incomplete_respects_max_update_per_activation() + { + let logctx = dev::test_setup_log( + "test_audit_log_timeout_incomplete_respects_max_update_per_activation", + ); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let now = Utc::now(); + let two_hours_ago = now - TimeDelta::try_hours(2).unwrap(); + + // Create 5 stale entries + for i in 0..5 { + let entry = datastore + .audit_log_entry_init( + opctx, + make_entry_params(&format!("req-{i}")).into(), + ) + .await + .unwrap(); + set_time_started(datastore, entry.id, two_hours_ago).await; + } + + // max_update_per_activation = 2 — should process 2 per activation. + let mut task = AuditLogTimeoutIncomplete::new( + datastore.clone(), + Duration::from_secs(3600), + 2, + ); + + let status = task.actually_activate(opctx).await; + assert_eq!(status.timed_out, 2); + assert!(status.error.is_none()); + + let status = task.actually_activate(opctx).await; + assert_eq!(status.timed_out, 2); + assert!(status.error.is_none()); + + let status = task.actually_activate(opctx).await; + assert_eq!(status.timed_out, 1); + assert!(status.error.is_none()); + + db.terminate().await; + logctx.cleanup_successful(); + } +} diff --git a/nexus/src/app/background/tasks/mod.rs b/nexus/src/app/background/tasks/mod.rs index 1ff9a53f257..2aeb2017b43 100644 --- a/nexus/src/app/background/tasks/mod.rs +++ b/nexus/src/app/background/tasks/mod.rs @@ -7,6 +7,7 @@ pub mod abandoned_vmm_reaper; pub mod alert_dispatcher; pub mod attached_subnets; +pub mod audit_log_timeout_incomplete; pub mod bfd; pub mod blueprint_execution; pub mod blueprint_load; diff --git a/nexus/tests/config.test.toml b/nexus/tests/config.test.toml index 4be3336ddba..db9f7d69227 100644 --- a/nexus/tests/config.test.toml +++ b/nexus/tests/config.test.toml @@ -209,6 +209,9 @@ trust_quorum.period_secs = 60 attached_subnet_manager.period_secs = 60 session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 +audit_log_timeout_incomplete.period_secs = 600 +audit_log_timeout_incomplete.timeout_secs = 14400 +audit_log_timeout_incomplete.max_update_per_activation = 1000 [multicast] # Enable multicast functionality for tests (disabled by default in production) diff --git a/nexus/tests/integration_tests/audit_log.rs b/nexus/tests/integration_tests/audit_log.rs index 6b91a6ea908..f4d08907870 100644 --- a/nexus/tests/integration_tests/audit_log.rs +++ b/nexus/tests/integration_tests/audit_log.rs @@ -2,10 +2,11 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use chrono::{DateTime, Utc}; +use chrono::{DateTime, TimeDelta, Utc}; use dropshot::{ResultsPage, test_util::ClientTestContext}; use http::{Method, StatusCode, header}; use nexus_db_queries::authn::USER_TEST_PRIVILEGED; +use nexus_db_queries::context::OpContext; use nexus_test_utils::http_testing::{AuthnMode, NexusRequest, RequestBuilder}; use nexus_test_utils::resource_helpers::{ DiskTest, create_console_session, create_default_ip_pools, create_disk, @@ -216,6 +217,61 @@ async fn test_audit_log_list(ctx: &ControlPlaneTestContext) { let log = objects_list_page_authz::(client, &url).await; assert_eq!(log.items.len(), 1); assert_eq!(e2.id, log.items[0].id); + + // Test that timed-out audit log entries surface as Unknown in the API. + // Create an incomplete entry directly via the datastore, backdate it, + // time it out, then verify the API returns result = Unknown. + let nexus = &ctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(ctx.logctx.log.new(o!()), datastore.clone()); + + let params = nexus_db_model::AuditLogEntryInitParams { + request_id: "timeout-test-req".to_string(), + operation_id: "project_create".to_string(), + request_uri: "/v1/projects".to_string(), + source_ip: "1.1.1.1".parse().unwrap(), + user_agent: None, + actor: nexus_db_model::AuditLogActor::Unauthenticated, + auth_method: None, + credential_id: None, + }; + let entry = datastore + .audit_log_entry_init(&opctx, params.into()) + .await + .expect("init audit log entry"); + + let two_hours_ago = Utc::now() - TimeDelta::try_hours(2).unwrap(); + + // Backdate time_started so it's older than the cutoff + { + use async_bb8_diesel::AsyncRunQueryDsl; + use diesel::prelude::*; + use nexus_db_schema::schema::audit_log; + diesel::update(audit_log::table) + .filter(audit_log::id.eq(entry.id)) + .set(audit_log::time_started.eq(two_hours_ago)) + .execute_async( + &*datastore.pool_connection_for_tests().await.unwrap(), + ) + .await + .unwrap(); + } + + let cutoff = Utc::now() - TimeDelta::try_hours(1).unwrap(); + let timed_out = datastore + .audit_log_timeout_incomplete(&opctx, cutoff, 100) + .await + .expect("timeout incomplete entries"); + assert_eq!(timed_out, 1); + + let audit_log = fetch_log(client, two_hours_ago, None).await; + let found = audit_log + .items + .iter() + .find(|e| e.request_id == "timeout-test-req") + .expect("timed-out entry should appear in the API"); + assert_eq!(found.result, AuditLogEntryResult::Unknown); } #[nexus_test] diff --git a/nexus/types/src/internal_api/background.rs b/nexus/types/src/internal_api/background.rs index 6525e46d761..ed62d70c2c9 100644 --- a/nexus/types/src/internal_api/background.rs +++ b/nexus/types/src/internal_api/background.rs @@ -973,6 +973,19 @@ pub struct SledSubnetDetails { pub errors: Vec, } +/// The status of an `audit_log_timeout_incomplete` background task activation. +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub struct AuditLogTimeoutIncompleteStatus { + /// Number of audit log entries timed out in this activation. + pub timed_out: usize, + /// The cutoff time used: entries started before this were eligible. + pub cutoff: DateTime, + /// Configured max rows to time out in this activation. + pub max_update_per_activation: u32, + /// Error encountered during this activation, if any. + pub error: Option, +} + /// The status of a `session_cleanup` background task activation. #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct SessionCleanupStatus { diff --git a/schema/crdb/audit-log-incomplete-timeout/up01.sql b/schema/crdb/audit-log-incomplete-timeout/up01.sql new file mode 100644 index 00000000000..ec1cedcdc4a --- /dev/null +++ b/schema/crdb/audit-log-incomplete-timeout/up01.sql @@ -0,0 +1,4 @@ +-- Supports "find stale incomplete rows ordered by time_started". +CREATE INDEX IF NOT EXISTS audit_log_incomplete_by_time_started + ON omicron.public.audit_log (time_started, id) + WHERE time_completed IS NULL; diff --git a/schema/crdb/audit-log-incomplete-timeout/up01.verify.sql b/schema/crdb/audit-log-incomplete-timeout/up01.verify.sql new file mode 100644 index 00000000000..46b46c75d6b --- /dev/null +++ b/schema/crdb/audit-log-incomplete-timeout/up01.verify.sql @@ -0,0 +1,2 @@ +-- DO NOT EDIT. Generated by test_migration_verification_files. +SELECT CAST(IF((SELECT true WHERE EXISTS (SELECT index_name FROM omicron.crdb_internal.table_indexes WHERE descriptor_name = 'audit_log' AND index_name = 'audit_log_incomplete_by_time_started')),'true','Schema change verification failed: index audit_log_incomplete_by_time_started on table audit_log does not exist') AS BOOL); diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index 7c04d3e8522..cff21854e5f 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -6652,6 +6652,11 @@ CREATE UNIQUE INDEX IF NOT EXISTS audit_log_by_time_completed ON omicron.public.audit_log (time_completed, id) WHERE time_completed IS NOT NULL; +-- Supports "find stale incomplete rows ordered by time_started". +CREATE INDEX IF NOT EXISTS audit_log_incomplete_by_time_started + ON omicron.public.audit_log (time_started, id) + WHERE time_completed IS NULL; + -- View of audit log entries that have been "completed". This lets us treat that -- subset of rows as its own table in the data model code. Completing an entry -- means updating the entry after an operation is complete with the result of @@ -8237,7 +8242,7 @@ INSERT INTO omicron.public.db_metadata ( version, target_version ) VALUES - (TRUE, NOW(), NOW(), '237.0.0', NULL) + (TRUE, NOW(), NOW(), '238.0.0', NULL) ON CONFLICT DO NOTHING; COMMIT; diff --git a/smf/nexus/multi-sled/config-partial.toml b/smf/nexus/multi-sled/config-partial.toml index 3473610508f..cd95f93f137 100644 --- a/smf/nexus/multi-sled/config-partial.toml +++ b/smf/nexus/multi-sled/config-partial.toml @@ -119,6 +119,9 @@ trust_quorum.period_secs = 60 attached_subnet_manager.period_secs = 60 session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 +audit_log_timeout_incomplete.period_secs = 600 +audit_log_timeout_incomplete.timeout_secs = 14400 +audit_log_timeout_incomplete.max_update_per_activation = 1000 [default_region_allocation_strategy] # by default, allocate across 3 distinct sleds diff --git a/smf/nexus/single-sled/config-partial.toml b/smf/nexus/single-sled/config-partial.toml index b5f52c0ba34..41f153595d6 100644 --- a/smf/nexus/single-sled/config-partial.toml +++ b/smf/nexus/single-sled/config-partial.toml @@ -119,6 +119,9 @@ multicast_reconciler.period_secs = 60 attached_subnet_manager.period_secs = 60 session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 +audit_log_timeout_incomplete.period_secs = 600 +audit_log_timeout_incomplete.timeout_secs = 14400 +audit_log_timeout_incomplete.max_update_per_activation = 1000 [default_region_allocation_strategy] # by default, allocate without requirement for distinct sleds. From c5b7f89dc0293c57793c3f25661267b57a4cc36a Mon Sep 17 00:00:00 2001 From: David Crespo Date: Thu, 12 Mar 2026 22:25:51 -0500 Subject: [PATCH 3/8] preemptive eliza fixes based on #10042 --- .claude/skills/add-background-task/SKILL.md | 6 +- dev-tools/omdb/src/bin/omdb/nexus.rs | 4 +- dev-tools/omdb/tests/successes.out | 12 +- nexus-config/src/nexus_config.rs | 8 +- nexus/db-model/src/audit_log.rs | 2 +- .../db-queries/src/db/datastore/audit_log.rs | 154 +------------- nexus/examples/config-second.toml | 2 +- nexus/examples/config.toml | 2 +- nexus/src/app/background/init.rs | 2 +- .../tasks/audit_log_timeout_incomplete.rs | 192 +++++++++++------- nexus/tests/config.test.toml | 2 +- nexus/types/src/internal_api/background.rs | 2 +- smf/nexus/multi-sled/config-partial.toml | 2 +- smf/nexus/single-sled/config-partial.toml | 2 +- 14 files changed, 149 insertions(+), 243 deletions(-) diff --git a/.claude/skills/add-background-task/SKILL.md b/.claude/skills/add-background-task/SKILL.md index 423e0b18318..a3ed334dcfc 100644 --- a/.claude/skills/add-background-task/SKILL.md +++ b/.claude/skills/add-background-task/SKILL.md @@ -25,7 +25,7 @@ Create the task module. The struct holds whatever state it needs (typically `Arc Logging conventions: `debug` when there's nothing to do, `info` when routine work was done, `warn` when the work done indicates something is wrong (e.g., cleaning up after a crash), `error` on failure. -Include unit tests in the same file using `TestDatabase::new_with_datastore`. Tests call `actually_activate` directly. +Include a unit test in the same file using `TestDatabase::new_with_datastore` that calls `actually_activate` directly. If the task has a datastore method, a single test exercising the task end-to-end (including the limit/batching behavior) is sufficient — don't add a redundant test for the datastore method separately unless it has complex logic worth testing in isolation. ### 3. Register the module (`nexus/src/app/background/tasks/mod.rs`) @@ -37,7 +37,7 @@ Add `pub task_: Activator` to the `BackgroundTasks` struct, maintaining al ### 5. Config (`nexus-config/src/nexus_config.rs`) -Add a config struct (e.g., `YourTaskConfig`) with at minimum `period_secs: Duration` (using `#[serde_as(as = "DurationSeconds")]`). If the task does bounded work per activation, name the limit field `max__per_activation` (e.g., `max_delete_per_activation`, `max_update_per_activation`) to match existing conventions. Add the field to `BackgroundTaskConfig`. Update the test config literal and expected parse output at the bottom of the file. +Add a config struct (e.g., `YourTaskConfig`) with at minimum `period_secs: Duration` (using `#[serde_as(as = "DurationSeconds")]`). If the task does bounded work per activation, name the limit field `max__per_activation` (e.g., `max_deleted_per_activation`, `max_timed_out_per_activation`) to match existing conventions. Add the field to `BackgroundTaskConfig`. Update the test config literal and expected parse output at the bottom of the file. ### 6. Config files @@ -62,7 +62,7 @@ If the task needs a new index or schema change to support its query, add a migra ### 9. Datastore method (if needed) -If the task needs a new query, add it in the appropriate `nexus/db-queries/src/db/datastore/` file. Add a test in the same file or in `nexus/db-queries/src/db/datastore/mod.rs`. +If the task needs a new query, add it in the appropriate `nexus/db-queries/src/db/datastore/` file. Prefer the Diesel typed DSL over raw SQL (`diesel::sql_query`) for queries and test helpers. Only fall back to raw SQL when the DSL genuinely can't express the query. If the task modifies rows that other code paths also modify, think about races: what happens if both run concurrently on the same row? Both paths should typically guard their writes so only one succeeds. diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index 9914734ab6e..1e20072beb5 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -2694,7 +2694,7 @@ fn print_task_audit_log_timeout_incomplete(details: &serde_json::Value) { Ok(status) => { const TIMED_OUT: &str = "timed_out:"; const CUTOFF: &str = "cutoff:"; - const MAX_UPDATE: &str = "max_update_per_activation:"; + const MAX_UPDATE: &str = "max_timed_out_per_activation:"; const ERROR: &str = "error:"; const WIDTH: usize = const_max_len(&[TIMED_OUT, CUTOFF, MAX_UPDATE, ERROR]) + 1; @@ -2706,7 +2706,7 @@ fn print_task_audit_log_timeout_incomplete(details: &serde_json::Value) { ); println!( " {MAX_UPDATE:m last completed activation: , triggered by started at (s ago) and ran for ms - timed_out: 0 - cutoff: - max_update_per_activation: 1000 + timed_out: 0 + cutoff: + max_timed_out_per_activation: 1000 task: "bfd_manager" configured period: every s @@ -1200,9 +1200,9 @@ task: "audit_log_timeout_incomplete" configured period: every m last completed activation: , triggered by started at (s ago) and ran for ms - timed_out: 0 - cutoff: - max_update_per_activation: 1000 + timed_out: 0 + cutoff: + max_timed_out_per_activation: 1000 task: "bfd_manager" configured period: every s diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index bccd74ad1fb..57bca6edc13 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -464,7 +464,7 @@ pub struct AuditLogTimeoutIncompleteConfig { pub timeout_secs: Duration, /// max rows per SQL statement - pub max_update_per_activation: u32, + pub max_timed_out_per_activation: u32, } #[serde_as] @@ -1286,7 +1286,7 @@ mod test { session_cleanup.max_delete_per_activation = 10000 audit_log_timeout_incomplete.period_secs = 600 audit_log_timeout_incomplete.timeout_secs = 14400 - audit_log_timeout_incomplete.max_update_per_activation = 1000 + audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 [default_region_allocation_strategy] type = "random" seed = 0 @@ -1558,7 +1558,7 @@ mod test { AuditLogTimeoutIncompleteConfig { period_secs: Duration::from_secs(600), timeout_secs: Duration::from_secs(14400), - max_update_per_activation: 1000, + max_timed_out_per_activation: 1000, }, }, multicast: MulticastConfig { enabled: false }, @@ -1669,7 +1669,7 @@ mod test { session_cleanup.max_delete_per_activation = 10000 audit_log_timeout_incomplete.period_secs = 600 audit_log_timeout_incomplete.timeout_secs = 14400 - audit_log_timeout_incomplete.max_update_per_activation = 1000 + audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 [default_region_allocation_strategy] type = "random" diff --git a/nexus/db-model/src/audit_log.rs b/nexus/db-model/src/audit_log.rs index b914db1c705..7db6486e1ef 100644 --- a/nexus/db-model/src/audit_log.rs +++ b/nexus/db-model/src/audit_log.rs @@ -224,7 +224,7 @@ impl From for AuditLogEntryInit { /// `audit_log_complete` is a view on `audit_log` filtering for rows with /// non-null `time_completed`, not its own table. -#[derive(Queryable, Selectable, Clone, Debug)] +#[derive(Queryable, Selectable, Clone, Debug, PartialEq)] #[diesel(table_name = audit_log_complete)] pub struct AuditLogEntry { pub id: Uuid, diff --git a/nexus/db-queries/src/db/datastore/audit_log.rs b/nexus/db-queries/src/db/datastore/audit_log.rs index a6fd2ef85e4..5108b56bcc0 100644 --- a/nexus/db-queries/src/db/datastore/audit_log.rs +++ b/nexus/db-queries/src/db/datastore/audit_log.rs @@ -450,154 +450,16 @@ mod tests { id: Uuid, time_started: DateTime, ) { - use diesel::sql_types; - diesel::sql_query( - "UPDATE omicron.public.audit_log \ - SET time_started = $1 WHERE id = $2", - ) - .bind::(time_started) - .bind::(id) - .execute_async(&*datastore.pool_connection_for_tests().await.unwrap()) - .await - .unwrap(); - } - - #[tokio::test] - async fn test_audit_log_timeout_incomplete() { - let logctx = dev::test_setup_log("test_audit_log_timeout_incomplete"); - let db = TestDatabase::new_with_datastore(&logctx.log).await; - let (opctx, datastore) = (db.opctx(), db.datastore()); - - let now = Utc::now(); - let two_hours_ago = now - TimeDelta::try_hours(2).unwrap(); - let cutoff = now - TimeDelta::try_hours(1).unwrap(); - - // Create an incomplete entry with time_started in the past - let old_entry = datastore - .audit_log_entry_init(opctx, make_entry_params("old-req").into()) - .await - .unwrap(); - set_time_started(datastore, old_entry.id, two_hours_ago).await; - - // Create an incomplete entry that's recent (should not be timed out) - let _recent_entry = datastore - .audit_log_entry_init(opctx, make_entry_params("recent-req").into()) - .await - .unwrap(); - - // Create an old entry that's already completed (should not be touched) - let completed_entry = datastore - .audit_log_entry_init( - opctx, - make_entry_params("completed-req").into(), - ) - .await - .unwrap(); - set_time_started(datastore, completed_entry.id, two_hours_ago).await; - datastore - .audit_log_entry_complete( - opctx, - &completed_entry, - AuditLogCompletion::Success { http_status_code: 200 }.into(), + use diesel::prelude::*; + use nexus_db_schema::schema::audit_log; + diesel::update(audit_log::table) + .filter(audit_log::id.eq(id)) + .set(audit_log::time_started.eq(time_started)) + .execute_async( + &*datastore.pool_connection_for_tests().await.unwrap(), ) .await - .unwrap(); - - // Run the timeout - let timed_out = datastore - .audit_log_timeout_incomplete(opctx, cutoff, 100) - .await - .unwrap(); - - // Only the old incomplete entry should be affected - assert_eq!(timed_out, 1); - - // The timed-out entry should now appear in the audit log with - // result_kind = timeout - let pagparams = DataPageParams { - marker: None, - limit: NonZeroU32::new(100).unwrap(), - direction: dropshot::PaginationOrder::Ascending, - }; - let entries = datastore - .audit_log_list(opctx, &pagparams, two_hours_ago, None) - .await - .unwrap(); - - let timed_out_entry = - entries.iter().find(|e| e.id == old_entry.id).unwrap(); - assert_eq!(timed_out_entry.result_kind, AuditLogResultKind::Timeout); - assert!(timed_out_entry.http_status_code.is_none()); - assert!(timed_out_entry.error_code.is_none()); - assert!(timed_out_entry.error_message.is_none()); - - // The completed entry should still be success - let completed = - entries.iter().find(|e| e.id == completed_entry.id).unwrap(); - assert_eq!(completed.result_kind, AuditLogResultKind::Success); - - // Running again should find nothing (idempotent) - let timed_out = datastore - .audit_log_timeout_incomplete(opctx, cutoff, 100) - .await - .unwrap(); - assert_eq!(timed_out, 0); - - db.terminate().await; - logctx.cleanup_successful(); - } - - #[tokio::test] - async fn test_audit_log_timeout_respects_limit() { - let logctx = - dev::test_setup_log("test_audit_log_timeout_respects_limit"); - let db = TestDatabase::new_with_datastore(&logctx.log).await; - let (opctx, datastore) = (db.opctx(), db.datastore()); - - let now = Utc::now(); - let two_hours_ago = now - TimeDelta::try_hours(2).unwrap(); - let cutoff = now - TimeDelta::try_hours(1).unwrap(); - - // Create 5 stale incomplete entries - for i in 0..5 { - let entry = datastore - .audit_log_entry_init( - opctx, - make_entry_params(&format!("req-{i}")).into(), - ) - .await - .unwrap(); - set_time_started(datastore, entry.id, two_hours_ago).await; - } - - // Timeout with limit of 2 - let batch1 = datastore - .audit_log_timeout_incomplete(opctx, cutoff, 2) - .await - .unwrap(); - assert_eq!(batch1, 2); - - let batch2 = datastore - .audit_log_timeout_incomplete(opctx, cutoff, 2) - .await - .unwrap(); - assert_eq!(batch2, 2); - - let batch3 = datastore - .audit_log_timeout_incomplete(opctx, cutoff, 2) - .await - .unwrap(); - assert_eq!(batch3, 1); - - // All gone - let batch4 = datastore - .audit_log_timeout_incomplete(opctx, cutoff, 2) - .await - .unwrap(); - assert_eq!(batch4, 0); - - db.terminate().await; - logctx.cleanup_successful(); + .expect("could not set time_started"); } /// A late completion of an already-timed-out entry should be a no-op. diff --git a/nexus/examples/config-second.toml b/nexus/examples/config-second.toml index 7a66c8fd616..5b540e02eea 100644 --- a/nexus/examples/config-second.toml +++ b/nexus/examples/config-second.toml @@ -193,7 +193,7 @@ session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 audit_log_timeout_incomplete.period_secs = 600 audit_log_timeout_incomplete.timeout_secs = 14400 -audit_log_timeout_incomplete.max_update_per_activation = 1000 +audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/examples/config.toml b/nexus/examples/config.toml index 8071192d3c1..59930e86c32 100644 --- a/nexus/examples/config.toml +++ b/nexus/examples/config.toml @@ -177,7 +177,7 @@ session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 audit_log_timeout_incomplete.period_secs = 600 audit_log_timeout_incomplete.timeout_secs = 14400 -audit_log_timeout_incomplete.max_update_per_activation = 1000 +audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index 4dd7bb9804b..412950b98e0 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -1200,7 +1200,7 @@ impl BackgroundTasksInitializer { config.audit_log_timeout_incomplete.timeout_secs, config .audit_log_timeout_incomplete - .max_update_per_activation, + .max_timed_out_per_activation, ), ), opctx: opctx.child(BTreeMap::new()), diff --git a/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs b/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs index b279a2a6ac9..3103a87e965 100644 --- a/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs +++ b/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs @@ -19,16 +19,16 @@ use std::time::Duration; pub struct AuditLogTimeoutIncomplete { datastore: Arc, timeout: Duration, - max_update_per_activation: u32, + max_timed_out_per_activation: u32, } impl AuditLogTimeoutIncomplete { pub fn new( datastore: Arc, timeout: Duration, - max_update_per_activation: u32, + max_timed_out_per_activation: u32, ) -> Self { - Self { datastore, timeout, max_update_per_activation } + Self { datastore, timeout, max_timed_out_per_activation } } pub(crate) async fn actually_activate( @@ -43,7 +43,8 @@ impl AuditLogTimeoutIncomplete { return AuditLogTimeoutIncompleteStatus { timed_out: 0, cutoff: Utc::now(), - max_update_per_activation: self.max_update_per_activation, + max_timed_out_per_activation: self + .max_timed_out_per_activation, error: Some(msg), }; } @@ -55,7 +56,7 @@ impl AuditLogTimeoutIncomplete { .audit_log_timeout_incomplete( opctx, cutoff, - self.max_update_per_activation, + self.max_timed_out_per_activation, ) .await { @@ -67,7 +68,8 @@ impl AuditLogTimeoutIncomplete { return AuditLogTimeoutIncompleteStatus { timed_out: 0, cutoff, - max_update_per_activation: self.max_update_per_activation, + max_timed_out_per_activation: self + .max_timed_out_per_activation, error: Some(msg), }; } @@ -79,7 +81,7 @@ impl AuditLogTimeoutIncomplete { "audit log timeout: timed out {timed_out} incomplete \ entries (may indicate Nexus crashes or bugs)"; "cutoff" => %cutoff, - "limit" => self.max_update_per_activation, + "limit" => self.max_timed_out_per_activation, ); } else { slog::debug!( @@ -92,7 +94,7 @@ impl AuditLogTimeoutIncomplete { AuditLogTimeoutIncompleteStatus { timed_out, cutoff, - max_update_per_activation: self.max_update_per_activation, + max_timed_out_per_activation: self.max_timed_out_per_activation, error: None, } } @@ -120,13 +122,47 @@ mod tests { use super::*; use chrono::TimeDelta; use chrono::Utc; - use diesel::sql_types; - use nexus_db_model::{AuditLogActor, AuditLogEntryInitParams}; + use nexus_db_model::{ + AuditLogActor, AuditLogCompletion, AuditLogEntry, + AuditLogEntryInitParams, AuditLogResultKind, + }; use nexus_db_queries::db::pub_test_utils::TestDatabase; use omicron_test_utils::dev; use uuid::Uuid; - fn make_entry_params(request_id: &str) -> AuditLogEntryInitParams { + async fn get_completed_entry( + datastore: &DataStore, + id: Uuid, + ) -> AuditLogEntry { + use async_bb8_diesel::AsyncRunQueryDsl; + use diesel::prelude::*; + use nexus_db_schema::schema::audit_log_complete; + audit_log_complete::table + .filter(audit_log_complete::id.eq(id)) + .select(AuditLogEntry::as_select()) + .first_async(&*datastore.pool_connection_for_tests().await.unwrap()) + .await + .expect("could not load audit log entry") + } + + async fn assert_incomplete(datastore: &DataStore, id: Uuid) { + use async_bb8_diesel::AsyncRunQueryDsl; + use diesel::prelude::*; + use nexus_db_schema::schema::audit_log; + let time_completed: Option> = audit_log::table + .filter(audit_log::id.eq(id)) + .select(audit_log::time_completed) + .first_async(&*datastore.pool_connection_for_tests().await.unwrap()) + .await + .expect("could not load audit log entry"); + assert!( + time_completed.is_none(), + "expected entry {id} to be incomplete, \ + but time_completed = {time_completed:?}" + ); + } + + fn make_entry(request_id: &str) -> AuditLogEntryInitParams { AuditLogEntryInitParams { request_id: request_id.to_string(), operation_id: "project_create".to_string(), @@ -145,15 +181,24 @@ mod tests { time_started: chrono::DateTime, ) { use async_bb8_diesel::AsyncRunQueryDsl; - diesel::sql_query( - "UPDATE omicron.public.audit_log \ - SET time_started = $1 WHERE id = $2", - ) - .bind::(time_started) - .bind::(id) - .execute_async(&*datastore.pool_connection_for_tests().await.unwrap()) - .await - .unwrap(); + use diesel::prelude::*; + use nexus_db_schema::schema::audit_log; + diesel::update(audit_log::table) + .filter(audit_log::id.eq(id)) + .set(audit_log::time_started.eq(time_started)) + .execute_async( + &*datastore.pool_connection_for_tests().await.unwrap(), + ) + .await + .expect("could not set time_started"); + } + + async fn assert_timed_out(datastore: &DataStore, id: Uuid) { + let entry = get_completed_entry(datastore, id).await; + assert_eq!(entry.result_kind, AuditLogResultKind::Timeout); + assert!(entry.http_status_code.is_none()); + assert!(entry.error_code.is_none()); + assert!(entry.error_message.is_none()); } #[tokio::test] @@ -164,88 +209,87 @@ mod tests { let (opctx, datastore) = (db.opctx(), db.datastore()); let now = Utc::now(); - let two_hours_ago = now - TimeDelta::try_hours(2).unwrap(); - // Create 3 stale incomplete entries - for i in 0..3 { + // Create 5 stale incomplete entries at different times in the past. + // The query orders by (time_started, id), so they'll be processed + // oldest first: req-0 through req-4. + let mut stale_ids = Vec::new(); + for i in 0..5u32 { + let age = TimeDelta::try_hours(i64::from(6 - i)).unwrap(); + let params = make_entry(&format!("req-{i}")); let entry = datastore - .audit_log_entry_init( - opctx, - make_entry_params(&format!("req-{i}")).into(), - ) + .audit_log_entry_init(opctx, params.into()) .await .unwrap(); - set_time_started(datastore, entry.id, two_hours_ago).await; + set_time_started(datastore, entry.id, now - age).await; + stale_ids.push(entry.id); } // Create 1 recent incomplete entry (should not be timed out) + let recent = datastore + .audit_log_entry_init(opctx, make_entry("recent-req").into()) + .await + .unwrap(); + + // Create 1 old completed entry (should not be timed out) + let completed = datastore + .audit_log_entry_init(opctx, make_entry("completed-req").into()) + .await + .unwrap(); + let two_hours_ago = now - TimeDelta::try_hours(2).unwrap(); + set_time_started(datastore, completed.id, two_hours_ago).await; datastore - .audit_log_entry_init(opctx, make_entry_params("recent-req").into()) + .audit_log_entry_complete( + opctx, + &completed, + AuditLogCompletion::Success { http_status_code: 200 }.into(), + ) .await .unwrap(); - // timeout = 1 hour, max_update_per_activation = 100 + // hold onto completed entry so we can check at the end it hasn't changed + let completed_before = + get_completed_entry(&datastore, completed.id).await; + + // max_timed_out_per_activation = 3, so it takes two activations to + // time out all 5 stale entries let mut task = AuditLogTimeoutIncomplete::new( datastore.clone(), Duration::from_secs(3600), - 100, + 3, ); + // first activation times out the 3 earliest entries let status = task.actually_activate(opctx).await; assert_eq!(status.timed_out, 3); assert!(status.error.is_none()); - - // Second activation should find nothing - let status = task.actually_activate(opctx).await; - assert_eq!(status.timed_out, 0); - assert!(status.error.is_none()); - - db.terminate().await; - logctx.cleanup_successful(); - } - - #[tokio::test] - async fn test_audit_log_timeout_incomplete_respects_max_update_per_activation() - { - let logctx = dev::test_setup_log( - "test_audit_log_timeout_incomplete_respects_max_update_per_activation", - ); - let db = TestDatabase::new_with_datastore(&logctx.log).await; - let (opctx, datastore) = (db.opctx(), db.datastore()); - - let now = Utc::now(); - let two_hours_ago = now - TimeDelta::try_hours(2).unwrap(); - - // Create 5 stale entries - for i in 0..5 { - let entry = datastore - .audit_log_entry_init( - opctx, - make_entry_params(&format!("req-{i}")).into(), - ) - .await - .unwrap(); - set_time_started(datastore, entry.id, two_hours_ago).await; + for &id in &stale_ids[..3] { + assert_timed_out(&datastore, id).await; } + for &id in &stale_ids[3..] { + assert_incomplete(&datastore, id).await; + } + assert_incomplete(&datastore, recent.id).await; - // max_update_per_activation = 2 — should process 2 per activation. - let mut task = AuditLogTimeoutIncomplete::new( - datastore.clone(), - Duration::from_secs(3600), - 2, - ); - + // second activation times out remaining 2 stale let status = task.actually_activate(opctx).await; assert_eq!(status.timed_out, 2); assert!(status.error.is_none()); + for &id in &stale_ids { + assert_timed_out(&datastore, id).await; + } + assert_incomplete(&datastore, recent.id).await; + // third activation does nothing let status = task.actually_activate(opctx).await; - assert_eq!(status.timed_out, 2); + assert_eq!(status.timed_out, 0); assert!(status.error.is_none()); - let status = task.actually_activate(opctx).await; - assert_eq!(status.timed_out, 1); - assert!(status.error.is_none()); + // completed entry is unaffected + assert_eq!( + get_completed_entry(&datastore, completed.id).await, + completed_before + ); db.terminate().await; logctx.cleanup_successful(); diff --git a/nexus/tests/config.test.toml b/nexus/tests/config.test.toml index db9f7d69227..cf17c0fd5df 100644 --- a/nexus/tests/config.test.toml +++ b/nexus/tests/config.test.toml @@ -211,7 +211,7 @@ session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 audit_log_timeout_incomplete.period_secs = 600 audit_log_timeout_incomplete.timeout_secs = 14400 -audit_log_timeout_incomplete.max_update_per_activation = 1000 +audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 [multicast] # Enable multicast functionality for tests (disabled by default in production) diff --git a/nexus/types/src/internal_api/background.rs b/nexus/types/src/internal_api/background.rs index ed62d70c2c9..ff46ac326d2 100644 --- a/nexus/types/src/internal_api/background.rs +++ b/nexus/types/src/internal_api/background.rs @@ -981,7 +981,7 @@ pub struct AuditLogTimeoutIncompleteStatus { /// The cutoff time used: entries started before this were eligible. pub cutoff: DateTime, /// Configured max rows to time out in this activation. - pub max_update_per_activation: u32, + pub max_timed_out_per_activation: u32, /// Error encountered during this activation, if any. pub error: Option, } diff --git a/smf/nexus/multi-sled/config-partial.toml b/smf/nexus/multi-sled/config-partial.toml index cd95f93f137..90b4d2b6971 100644 --- a/smf/nexus/multi-sled/config-partial.toml +++ b/smf/nexus/multi-sled/config-partial.toml @@ -121,7 +121,7 @@ session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 audit_log_timeout_incomplete.period_secs = 600 audit_log_timeout_incomplete.timeout_secs = 14400 -audit_log_timeout_incomplete.max_update_per_activation = 1000 +audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 [default_region_allocation_strategy] # by default, allocate across 3 distinct sleds diff --git a/smf/nexus/single-sled/config-partial.toml b/smf/nexus/single-sled/config-partial.toml index 41f153595d6..a8c9f85b627 100644 --- a/smf/nexus/single-sled/config-partial.toml +++ b/smf/nexus/single-sled/config-partial.toml @@ -121,7 +121,7 @@ session_cleanup.period_secs = 300 session_cleanup.max_delete_per_activation = 10000 audit_log_timeout_incomplete.period_secs = 600 audit_log_timeout_incomplete.timeout_secs = 14400 -audit_log_timeout_incomplete.max_update_per_activation = 1000 +audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 [default_region_allocation_strategy] # by default, allocate without requirement for distinct sleds. From d37eed5db8122c887ecac1ae9a52aed76cecc22f Mon Sep 17 00:00:00 2001 From: David Crespo Date: Wed, 18 Mar 2026 12:28:01 -0500 Subject: [PATCH 4/8] move validation of timeout to config new --- .../tasks/audit_log_timeout_incomplete.rs | 21 ++++--------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs b/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs index 3103a87e965..7ce5282e353 100644 --- a/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs +++ b/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs @@ -18,7 +18,7 @@ use std::time::Duration; pub struct AuditLogTimeoutIncomplete { datastore: Arc, - timeout: Duration, + timeout: TimeDelta, max_timed_out_per_activation: u32, } @@ -28,6 +28,8 @@ impl AuditLogTimeoutIncomplete { timeout: Duration, max_timed_out_per_activation: u32, ) -> Self { + let timeout = TimeDelta::from_std(timeout) + .expect("timeout must be representable as a TimeDelta"); Self { datastore, timeout, max_timed_out_per_activation } } @@ -35,22 +37,7 @@ impl AuditLogTimeoutIncomplete { &mut self, opctx: &OpContext, ) -> AuditLogTimeoutIncompleteStatus { - let timeout_delta = match TimeDelta::from_std(self.timeout) { - Ok(d) => d, - Err(e) => { - let msg = format!("invalid timeout duration: {e:#}"); - slog::error!(&opctx.log, "{msg}"); - return AuditLogTimeoutIncompleteStatus { - timed_out: 0, - cutoff: Utc::now(), - max_timed_out_per_activation: self - .max_timed_out_per_activation, - error: Some(msg), - }; - } - }; - - let cutoff = Utc::now() - timeout_delta; + let cutoff = Utc::now() - self.timeout; let timed_out = match self .datastore .audit_log_timeout_incomplete( From a6c7e63403f0eeef7b77bbd2e7887c830302320a Mon Sep 17 00:00:00 2001 From: David Crespo Date: Wed, 18 Mar 2026 14:26:45 -0500 Subject: [PATCH 5/8] error logging tweaks --- .../tasks/audit_log_timeout_incomplete.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs b/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs index 7ce5282e353..2c5a23a7bb7 100644 --- a/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs +++ b/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs @@ -13,6 +13,7 @@ use nexus_db_queries::context::OpContext; use nexus_db_queries::db::DataStore; use nexus_types::internal_api::background::AuditLogTimeoutIncompleteStatus; use serde_json::json; +use slog_error_chain::InlineErrorChain; use std::sync::Arc; use std::time::Duration; @@ -28,8 +29,12 @@ impl AuditLogTimeoutIncomplete { timeout: Duration, max_timed_out_per_activation: u32, ) -> Self { - let timeout = TimeDelta::from_std(timeout) - .expect("timeout must be representable as a TimeDelta"); + let Ok(timeout) = TimeDelta::from_std(timeout) else { + panic!( + "invalid timeout {timeout:?} \ + (must be representable as a TimeDelta)" + ); + }; Self { datastore, timeout, max_timed_out_per_activation } } @@ -49,15 +54,17 @@ impl AuditLogTimeoutIncomplete { { Ok(count) => count, Err(err) => { - let msg = - format!("audit log timeout incomplete failed: {err:#}"); - slog::error!(&opctx.log, "{msg}"); + slog::error!( + &opctx.log, + "audit log timeout incomplete failed"; + &err, + ); return AuditLogTimeoutIncompleteStatus { timed_out: 0, cutoff, max_timed_out_per_activation: self .max_timed_out_per_activation, - error: Some(msg), + error: Some(InlineErrorChain::new(&err).to_string()), }; } }; From 99eab7bd1ae54ced95cbe9477519117251341835 Mon Sep 17 00:00:00 2001 From: David Crespo Date: Wed, 18 Mar 2026 16:13:44 -0500 Subject: [PATCH 6/8] update skill based on last few commits on both branches --- .claude/skills/add-background-task/SKILL.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.claude/skills/add-background-task/SKILL.md b/.claude/skills/add-background-task/SKILL.md index a3ed334dcfc..35e22ecbdca 100644 --- a/.claude/skills/add-background-task/SKILL.md +++ b/.claude/skills/add-background-task/SKILL.md @@ -23,7 +23,9 @@ Define a struct for the task's activation status. Derive `Clone, Debug, Deserial Create the task module. The struct holds whatever state it needs (typically `Arc` plus config). Implement `BackgroundTask::activate` by delegating to an `async fn actually_activate(&mut self, opctx) -> YourStatus` method, then serialize the status to `serde_json::Value`. The `actually_activate` pattern makes unit testing easy without going through the trait. -Logging conventions: `debug` when there's nothing to do, `info` when routine work was done, `warn` when the work done indicates something is wrong (e.g., cleaning up after a crash), `error` on failure. +Logging conventions: `debug` when there's nothing to do, `info` when routine work was done, `warn` when the work done indicates something is wrong (e.g., cleaning up after a crash), `error` on failure. Log errors as structured fields with the `; &err` slog syntax (which uses the `SlogInlineError` trait), not by interpolating into the message string. For the error string in the status struct, use `InlineErrorChain::new(&err).to_string()` (from `slog_error_chain`) to capture the full cause chain. Status error strings should not repeat the task name — omdb already shows which task you're looking at. + +If the task takes config values that need conversion or validation (e.g., converting a `Duration` to `TimeDelta`, or checking a numeric range), do it once in `new()` and store the validated form. Don't re-validate on every activation — if the config is invalid, panic in `new()` with a message that includes the invalid value. Include a unit test in the same file using `TestDatabase::new_with_datastore` that calls `actually_activate` directly. If the task has a datastore method, a single test exercising the task end-to-end (including the limit/batching behavior) is sufficient — don't add a redundant test for the datastore method separately unless it has complex logic worth testing in isolation. From a247d5423aeee47245f2b56e55e79391768fa855 Mon Sep 17 00:00:00 2001 From: David Crespo Date: Wed, 18 Mar 2026 21:23:56 -0500 Subject: [PATCH 7/8] eliza comments --- nexus/db-queries/src/db/datastore/audit_log.rs | 17 ++++++++--------- .../tasks/audit_log_timeout_incomplete.rs | 7 +++++-- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/audit_log.rs b/nexus/db-queries/src/db/datastore/audit_log.rs index 5108b56bcc0..b244f93b16f 100644 --- a/nexus/db-queries/src/db/datastore/audit_log.rs +++ b/nexus/db-queries/src/db/datastore/audit_log.rs @@ -108,13 +108,6 @@ impl DataStore { } /// Complete an audit log entry with the request's outcome. - /// - /// The `time_completed IS NULL` filter is critical: once an entry has been - /// completed — whether by the normal request path or by the background - /// timeout task — it is part of the immutable audit log. Overwriting it - /// would change its `time_completed` timestamp, shifting its position in - /// the `(time_completed, id)` pagination order and violating the guarantee - /// that past query results are stable. pub async fn audit_log_entry_complete( &self, opctx: &OpContext, @@ -125,6 +118,12 @@ impl DataStore { opctx.authorize(authz::Action::CreateChild, &authz::AUDIT_LOG).await?; let rows = diesel::update(audit_log::table) .filter(audit_log::id.eq(entry.id)) + // This filter is very important: once an entry has been completed + // — whether by the normal request path or by the background + // timeout task — it is part of the immutable audit log. Overwriting + // it would change its `time_completed` timestamp, shifting its + // position in the `(time_completed, id)` pagination order and + // violating the guarantee that past query results are stable. .filter(audit_log::time_completed.is_null()) .set(completion) .execute_async(&*self.pool_connection_authorized(opctx).await?) @@ -160,7 +159,7 @@ impl DataStore { let audit_log_sub = diesel::alias!(nexus_db_schema::schema::audit_log as audit_log_sub); - let subquery = audit_log_sub + let stale_ids = audit_log_sub .filter(audit_log_sub.field(audit_log::time_completed).is_null()) .filter(audit_log_sub.field(audit_log::time_started).lt(cutoff)) .order(( @@ -174,7 +173,7 @@ impl DataStore { AuditLogCompletion::Timeout.into(); diesel::update(audit_log::table) - .filter(audit_log::id.eq_any(subquery)) + .filter(audit_log::id.eq_any(stale_ids)) .set(completion) .execute_async(&*self.pool_connection_authorized(opctx).await?) .await diff --git a/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs b/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs index 2c5a23a7bb7..85fc52073b3 100644 --- a/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs +++ b/nexus/src/app/background/tasks/audit_log_timeout_incomplete.rs @@ -31,7 +31,8 @@ impl AuditLogTimeoutIncomplete { ) -> Self { let Ok(timeout) = TimeDelta::from_std(timeout) else { panic!( - "invalid timeout {timeout:?} \ + "invalid audit_log_timeout_incomplete.timeout_secs \ + value {timeout:?} \ (must be representable as a TimeDelta)" ); }; @@ -42,7 +43,9 @@ impl AuditLogTimeoutIncomplete { &mut self, opctx: &OpContext, ) -> AuditLogTimeoutIncompleteStatus { - let cutoff = Utc::now() - self.timeout; + let cutoff = Utc::now() + .checked_sub_signed(self.timeout) + .expect("now - timeout overflowed (timeout_secs is too large)"); let timed_out = match self .datastore .audit_log_timeout_incomplete( From e9cafb93a966bfa2d1b31c06bdc77a5cb7ae1bf2 Mon Sep 17 00:00:00 2001 From: David Crespo Date: Wed, 18 Mar 2026 22:45:35 -0500 Subject: [PATCH 8/8] add note to skill about passing in mutable status object --- .claude/skills/add-background-task/SKILL.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.claude/skills/add-background-task/SKILL.md b/.claude/skills/add-background-task/SKILL.md index 35e22ecbdca..34b1d569561 100644 --- a/.claude/skills/add-background-task/SKILL.md +++ b/.claude/skills/add-background-task/SKILL.md @@ -21,7 +21,9 @@ Define a struct for the task's activation status. Derive `Clone, Debug, Deserial ### 2. Task implementation (`nexus/src/app/background/tasks/.rs`) -Create the task module. The struct holds whatever state it needs (typically `Arc` plus config). Implement `BackgroundTask::activate` by delegating to an `async fn actually_activate(&mut self, opctx) -> YourStatus` method, then serialize the status to `serde_json::Value`. The `actually_activate` pattern makes unit testing easy without going through the trait. +Create the task module. The struct holds whatever state it needs (typically `Arc` plus config). Implement `BackgroundTask::activate` by delegating to an `actually_activate` helper, then serialize the status to `serde_json::Value`. The `actually_activate` pattern makes unit testing easy without going through the trait. + +`actually_activate` can either build and return the status (`async fn actually_activate(&mut self, opctx) -> YourStatus`), or take a mutable reference to one (`async fn actually_activate(&mut self, opctx, status: &mut YourStatus) -> Result<(), Error>`). The first is simpler and works well when the task either fully succeeds or fully fails. The second is better when the task can partially complete (e.g., it loops over work items): `activate` creates the status struct up front, passes it in, and serializes it afterward regardless of `Ok`/`Err`, so any progress already recorded in `status` (items processed, partial counts, earlier errors) is preserved even if the method bails out with `?` later. Logging conventions: `debug` when there's nothing to do, `info` when routine work was done, `warn` when the work done indicates something is wrong (e.g., cleaning up after a crash), `error` on failure. Log errors as structured fields with the `; &err` slog syntax (which uses the `SlogInlineError` trait), not by interpolating into the message string. For the error string in the status struct, use `InlineErrorChain::new(&err).to_string()` (from `slog_error_chain`) to capture the full cause chain. Status error strings should not repeat the task name — omdb already shows which task you're looking at.