Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions nexus/db-model/src/saga_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,17 @@ impl From<steno::SagaCachedState> for SagaState {
}
}

impl From<SagaState> for nexus_types::inventory::SagaState {
fn from(value: SagaState) -> Self {
match value {
SagaState::Running => Self::Running,
SagaState::Unwinding => Self::Unwinding,
SagaState::Done => Self::Done,
SagaState::Abandoned => Self::Abandoned,
}
}
}

/// Represents a row in the "Saga" table
#[derive(Queryable, Insertable, Clone, Debug, Selectable, PartialEq)]
#[diesel(table_name = saga)]
Expand Down
9 changes: 9 additions & 0 deletions nexus/db-queries/src/db/datastore/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,13 @@ impl DataStore {
.collect::<Result<Vec<_>, _>>()
.map_err(|e| Error::internal_error(&e.to_string()))?;

// TODO-K: Make nicer and actually the same as the others
let inv_stale_sagas = &collection.stale_sagas;
println!("SAGAS!");
for saga in inv_stale_sagas {
println!("{saga:?}");
}

// This implementation inserts all records associated with the
// collection in one transaction. This is primarily for simplicity. It
// means we don't have to worry about other readers seeing a
Expand Down Expand Up @@ -4416,6 +4423,8 @@ impl DataStore {
cockroach_status,
ntp_timesync,
internal_dns_generation_status,
// TODO-K: Fill in once there is something in the DB
stale_sagas: vec![],
})
}

Expand Down
135 changes: 135 additions & 0 deletions nexus/db-queries/src/db/datastore/saga.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,36 @@ impl DataStore {
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}

pub async fn saga_list_by_states_batched(
&self,
opctx: &OpContext,
states: Vec<SagaState>,
) -> Result<Vec<db::saga_types::Saga>, Error> {
let mut sagas = vec![];
let mut paginator = Paginator::new(
SQL_BATCH_SIZE,
dropshot::PaginationOrder::Ascending,
);
let conn = self.pool_connection_authorized(opctx).await?;
while let Some(p) = paginator.next() {
use nexus_db_schema::schema::saga::dsl;

let mut batch =
paginated(dsl::saga, dsl::id, &p.current_pagparams())
.filter(dsl::saga_state.eq_any(states.clone()))
.select(db::saga_types::Saga::as_select())
.load_async(&*conn)
.await
.map_err(|e| {
public_error_from_diesel(e, ErrorHandler::Server)
})?;

paginator = p.found_batch(&batch, &|row| row.id);
sagas.append(&mut batch);
}
Ok(sagas)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -563,6 +593,28 @@ mod test {
db::model::saga_types::Saga::new(self.sec_id, params)
}

fn new_unwinding_db_saga(&self) -> db::model::saga_types::Saga {
let params = steno::SagaCreateParams {
id: self.saga_id,
name: steno::SagaName::new("test saga"),
dag: serde_json::value::Value::Null,
state: steno::SagaCachedState::Unwinding,
};

db::model::saga_types::Saga::new(self.sec_id, params)
}

fn new_done_db_saga(&self) -> db::model::saga_types::Saga {
let params = steno::SagaCreateParams {
id: self.saga_id,
name: steno::SagaName::new("test saga"),
dag: serde_json::value::Value::Null,
state: steno::SagaCachedState::Done,
};

db::model::saga_types::Saga::new(self.sec_id, params)
}

fn new_abandoned_db_saga(&self) -> db::model::saga_types::Saga {
let params = steno::SagaCreateParams {
id: self.saga_id,
Expand Down Expand Up @@ -741,4 +793,87 @@ mod test {
db.terminate().await;
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_list_sagas_by_states() {
// Test setup
let logctx = dev::test_setup_log("test_list_sagas_by_states");
let db = TestDatabase::new_with_datastore(&logctx.log).await;
let (opctx, datastore) = (db.opctx(), db.datastore());
let sec_id = db::SecId(uuid::Uuid::new_v4());
let sec_id2 = db::SecId(uuid::Uuid::new_v4());

// Insert one saga in each state, plus an additional running saga.
let running = SagaTestContext::new(sec_id).new_running_db_saga();
let running2 = SagaTestContext::new(sec_id2).new_running_db_saga();
let unwinding = SagaTestContext::new(sec_id).new_unwinding_db_saga();
let done = SagaTestContext::new(sec_id).new_done_db_saga();
let abandoned = SagaTestContext::new(sec_id).new_abandoned_db_saga();

let conn = datastore.pool_connection_for_tests().await.unwrap();
diesel::insert_into(nexus_db_schema::schema::saga::dsl::saga)
.values(vec![
running.clone(),
running2.clone(),
unwinding.clone(),
done.clone(),
abandoned.clone(),
])
.execute_async(&*conn)
.await
.expect("Failed to insert test setup data");

let sanitize_timestamps = |sagas: &mut Vec<db::saga_types::Saga>| {
for saga in sagas {
saga.time_created = chrono::DateTime::UNIX_EPOCH;
saga.adopt_time = chrono::DateTime::UNIX_EPOCH;
}
};

// Querying for Running and Unwinding should return exactly those three.
let mut observed_sagas = datastore
.saga_list_by_states_batched(
&opctx,
vec![SagaState::Running, SagaState::Unwinding],
)
.await
.expect("Failed to list sagas by states");

let mut expected_sagas =
vec![running.clone(), running2.clone(), unwinding.clone()];
expected_sagas.sort_by(|a, b| a.id.cmp(&b.id));
sanitize_timestamps(&mut observed_sagas);
sanitize_timestamps(&mut expected_sagas);

assert_eq!(
observed_sagas, expected_sagas,
"Should return the Running and Unwinding sagas"
);

// Querying for Done should return only the Done saga.
let mut observed_done = datastore
.saga_list_by_states_batched(&opctx, vec![SagaState::Done])
.await
.expect("Failed to list Done sagas");

let mut expected_done = vec![done.clone()];
sanitize_timestamps(&mut observed_done);
sanitize_timestamps(&mut expected_done);

assert_eq!(observed_done, expected_done, "Should return the Done saga");

// Querying for an empty state list should return nothing.
let observed_empty = datastore
.saga_list_by_states_batched(&opctx, vec![])
.await
.expect("Failed to list sagas with empty state filter");
assert!(
observed_empty.is_empty(),
"Empty state filter should return no sagas"
);

// Test cleanup
db.terminate().await;
logctx.cleanup_successful();
}
}
6 changes: 6 additions & 0 deletions nexus/inventory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ workspace = true

[dependencies]
anyhow.workspace = true
async-bb8-diesel.workspace = true
base64.workspace = true
camino.workspace = true
chrono.workspace = true
clickhouse-admin-keeper-client.workspace = true
diesel.workspace = true
dns-service-client.workspace = true
dropshot.workspace = true
clickhouse-admin-server-client.workspace = true
clickhouse-admin-types.workspace = true
cockroach-admin-types.workspace = true
Expand All @@ -25,6 +28,9 @@ iddqd.workspace = true
illumos-utils.workspace = true
itertools.workspace = true
sled-agent-types-versions.workspace = true
nexus-db-queries.workspace = true
nexus-db-model.workspace = true
nexus-db-schema.workspace = true
nexus-types.workspace = true
ntp-admin-client.workspace = true
omicron-common.workspace = true
Expand Down
10 changes: 10 additions & 0 deletions nexus/inventory/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use nexus_types::inventory::Collection;
use nexus_types::inventory::HostPhase1ActiveSlot;
use nexus_types::inventory::HostPhase1FlashHash;
use nexus_types::inventory::InternalDnsGenerationStatus;
use nexus_types::inventory::InventorySaga;
use nexus_types::inventory::RotPage;
use nexus_types::inventory::RotPageFound;
use nexus_types::inventory::RotPageWhich;
Expand Down Expand Up @@ -131,6 +132,7 @@ pub struct CollectionBuilder {
cockroach_status: BTreeMap<InternalNodeId, CockroachStatus>,
ntp_timesync: IdOrdMap<TimeSync>,
internal_dns_generation_status: IdOrdMap<InternalDnsGenerationStatus>,
stale_sagas: Vec<InventorySaga>,
// CollectionBuilderRng is taken by value, rather than passed in as a
// mutable ref, to encourage a tree-like structure where each RNG is
// generally independent.
Expand Down Expand Up @@ -165,6 +167,7 @@ impl CollectionBuilder {
cockroach_status: BTreeMap::new(),
ntp_timesync: IdOrdMap::new(),
internal_dns_generation_status: IdOrdMap::new(),
stale_sagas: vec![],
rng: CollectionBuilderRng::from_entropy(),
}
}
Expand Down Expand Up @@ -192,6 +195,7 @@ impl CollectionBuilder {
cockroach_status: self.cockroach_status,
ntp_timesync: self.ntp_timesync,
internal_dns_generation_status: self.internal_dns_generation_status,
stale_sagas: self.stale_sagas,
}
}

Expand Down Expand Up @@ -696,6 +700,12 @@ impl CollectionBuilder {
self.clickhouse_keeper_cluster_membership.insert(membership);
}

/// Record information about long running sagas
// TODO-K: Will probably have to remove mut
pub fn found_stale_sagas(&mut self, mut sagas: Vec<InventorySaga>) {
self.stale_sagas.append(&mut sagas);
}

/// Record information about timesync
pub fn found_ntp_timesync(
&mut self,
Expand Down
Loading
Loading