diff --git a/api b/api index 1444355a4..4f817b044 160000 --- a/api +++ b/api @@ -1 +1 @@ -Subproject commit 1444355a4a6d942c8b3661d450689b83fc318a2b +Subproject commit 4f817b04404a77f1787fed108558413329b87524 diff --git a/bd-artifact-upload/src/lib.rs b/bd-artifact-upload/src/lib.rs index f5a8bf58b..ea569e8ef 100644 --- a/bd-artifact-upload/src/lib.rs +++ b/bd-artifact-upload/src/lib.rs @@ -16,7 +16,7 @@ mod uploader; -pub use uploader::{Client, MockClient, SnappedFeatureFlag, Uploader}; +pub use uploader::{Client, EnqueueError, MockClient, SnappedFeatureFlag, UploadSource, Uploader}; #[cfg(test)] #[ctor::ctor] diff --git a/bd-artifact-upload/src/uploader.rs b/bd-artifact-upload/src/uploader.rs index c05aa8af6..171f954ec 100644 --- a/bd-artifact-upload/src/uploader.rs +++ b/bd-artifact-upload/src/uploader.rs @@ -27,8 +27,8 @@ use bd_error_reporter::reporter::handle_unexpected; use bd_log_primitives::LogFields; use bd_log_primitives::size::MemorySized; use bd_proto::protos::client::api::{UploadArtifactIntentRequest, UploadArtifactRequest}; -use bd_proto::protos::client::artifact::ArtifactUploadIndex; use bd_proto::protos::client::artifact::artifact_upload_index::Artifact; +use bd_proto::protos::client::artifact::{ArtifactUploadIndex, StorageFormat}; use bd_proto::protos::client::feature_flag::FeatureFlag; use bd_proto::protos::logging::payload::Data; use bd_runtime::runtime::{ConfigLoader, DurationWatch, IntWatch, artifact_upload}; @@ -41,6 +41,7 @@ use std::sync::{Arc, LazyLock}; #[cfg(test)] use tests::TestHooks; use time::OffsetDateTime; +use tokio::sync::oneshot; use uuid::Uuid; /// Root directory for all files used for storage and uploading. @@ -53,12 +54,14 @@ pub static REPORT_INDEX_FILE: LazyLock = LazyLock::new(|| "report_index pub enum ArtifactType { #[default] Report, + StateSnapshot, } impl ArtifactType { fn to_type_id(self) -> &'static str { match self { Self::Report => "client_report", + Self::StateSnapshot => "state_snapshot", } } } @@ -109,12 +112,25 @@ impl SnappedFeatureFlag { #[derive(Debug)] struct NewUpload { uuid: Uuid, - file: std::fs::File, + source: UploadSource, type_id: String, state: LogFields, timestamp: Option, session_id: String, feature_flags: Vec, + persisted_tx: Option>>, +} + +#[derive(Debug)] +pub enum UploadSource { + // When a file handle is provided the uploader will copy the contents of the file to disk and + // append a CRC checksum to the end of the file to allow for integrity checking when we later + // read. + File(std::fs::File), + // For raw files they are directly moved to the target location without modification. This is + // intended for use cases where the data format is already self-validating (e.g. crc checksum or + // zlib compression). + Path(PathBuf), } // Used for bounded_buffer logs @@ -122,8 +138,8 @@ impl std::fmt::Display for NewUpload { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "NewUpload {{ uuid: {}, file: {:?} }}", - self.uuid, self.file + "NewUpload {{ uuid: {}, source: {:?} }}", + self.uuid, self.source ) } } @@ -174,17 +190,28 @@ impl Stats { } } +#[derive(Debug, thiserror::Error)] +pub enum EnqueueError { + #[error("upload queue full")] + QueueFull, + #[error("upload channel closed")] + Closed, + #[error(transparent)] + Other(#[from] anyhow::Error), +} + #[automock] pub trait Client: Send + Sync { fn enqueue_upload( &self, - file: std::fs::File, + source: UploadSource, type_id: String, state: LogFields, timestamp: Option, session_id: String, feature_flags: Vec, - ) -> anyhow::Result; + persisted_tx: Option>>, + ) -> std::result::Result; } pub struct UploadClient { @@ -196,30 +223,35 @@ impl Client for UploadClient { /// Dispatches a payload to be uploaded, returning the associated artifact UUID. fn enqueue_upload( &self, - file: std::fs::File, + source: UploadSource, type_id: String, state: LogFields, timestamp: Option, session_id: String, feature_flags: Vec, - ) -> anyhow::Result { + persisted_tx: Option>>, + ) -> std::result::Result { let uuid = uuid::Uuid::new_v4(); let result = self .upload_tx .try_send(NewUpload { uuid, - file, + source, type_id, state, timestamp, session_id, feature_flags, + persisted_tx, }) .inspect_err(|e| log::warn!("failed to enqueue artifact upload: {e:?}")); self.counter_stats.record(&result); - result?; + result.map_err(|e| match e { + bd_bounded_buffer::TrySendError::FullSizeOverflow => EnqueueError::QueueFull, + bd_bounded_buffer::TrySendError::Closed => EnqueueError::Closed, + })?; Ok(uuid) } @@ -373,17 +405,27 @@ impl Uploader { return Ok(()); }; - let Ok(contents) = read_checksummed_data(&contents) else { - log::debug!( - "failed to validate CRC checksum for artifact {}, deleting and removing from index", - next.name - ); - - self.file_system.delete_file(&file_path).await?; - self.index.pop_front(); - self.write_index().await; - - return Ok(()); + // For client reports we copy the file into a new file with a CRC checksum appended to allow + // for integrity checking. For state snapshot since they are already zlib encoded we bypass + // this check. TODO(snowp): Consider consolidating the behavior here, but keeping + // reports the same for now to avoid more changes than necessary. + let contents = if next.storage_format.enum_value_or_default() == StorageFormat::RAW { + // TODO(snowp): Should we consider validating the file here in some way? + contents + } else { + let Ok(contents) = read_checksummed_data(&contents) else { + log::debug!( + "failed to validate CRC checksum for artifact {}, deleting and removing from index", + next.name + ); + + self.file_system.delete_file(&file_path).await?; + self.index.pop_front(); + self.write_index().await; + + return Ok(()); + }; + contents }; log::debug!("starting file upload for {:?}", next.name); self.upload_task_handle = Some(tokio::spawn(Self::upload_artifact( @@ -416,23 +458,25 @@ impl Uploader { } Some(NewUpload { uuid, - file, + source, type_id, state, timestamp, session_id, feature_flags, + persisted_tx, }) = self.upload_queued_rx.recv() => { log::debug!("tracking artifact: {uuid} for upload"); self .track_new_upload( uuid, - file, + source, type_id, state, session_id, timestamp, feature_flags, + persisted_tx, ) .await; } @@ -606,46 +650,126 @@ impl Uploader { async fn track_new_upload( &mut self, uuid: Uuid, - file: std::fs::File, + source: UploadSource, type_id: String, state: LogFields, session_id: String, timestamp: Option, feature_flags: Vec, + mut persisted_tx: Option>>, ) { - // If we've reached our limit of entries, stop the entry currently being uploaded (the oldest - // one) to make space for the newer one. + // Previously we would always drop the oldest entry when we hit capacity, but for state + // snapshots this would result in us dropping uploads that we know we need to hydrate logs + // that were scheduled for uploads. To mitigate this we treat state snapshots differently + // and avoid dropping them when we hit capacity, which means that in the worst case if we have a + // lot of state snapshots we might fill up this queue and apply backpressure to the state + // snapshot producer, deferring the snapshot limit enforcement to the producer instead of the + // uploader. + // TODO(snowp): Consider also having a bound on the size of the files persisted to disk. + // TODO(snowp): We should consider redoing how backpressure works for crash reports as well as + // there are cases in which we drop reports. For now limit the backpressure mechanism to + // StateSnapshots as we want stronger guarantees than what is currently provided for regular + // crash reports. if self.index.len() == usize::try_from(*self.max_entries.read()).unwrap_or_default() { - log::debug!("upload queue is full, dropping current upload"); - - self.stats.dropped.inc(); - self.stop_current_upload(); - self.index.pop_front(); + if let Some(index_to_drop) = self.index.iter().position(|entry| { + entry.type_id.as_deref() != Some(ArtifactType::StateSnapshot.to_type_id()) + }) { + log::debug!("upload queue is full, dropping oldest non-state upload"); + self.stats.dropped.inc(); + if index_to_drop == 0 { + self.stop_current_upload(); + } + if let Some(entry) = self.index.remove(index_to_drop) { + let file_path = REPORT_DIRECTORY.join(&entry.name); + if let Err(e) = self.file_system.delete_file(&file_path).await { + log::warn!("failed to delete artifact {:?}: {}", entry.name, e); + } + } + self.write_index().await; + } else { + self.stats.dropped.inc(); + if let Some(tx) = persisted_tx.take() { + let _ = tx.send(Err(EnqueueError::QueueFull)); + } + return; + } } let uuid = uuid.to_string(); - let target_file = match self - .file_system - .create_file(&REPORT_DIRECTORY.join(&uuid)) - .await - { - Ok(file) => file, - Err(e) => { - log::warn!("failed to create file for artifact: {uuid} on disk: {e}"); + let target_path = REPORT_DIRECTORY.join(&uuid); + let (write_result, storage_format) = match source { + UploadSource::File(file) => { + let target_file = match self.file_system.create_file(&target_path).await { + Ok(file) => file, + Err(e) => { + log::warn!("failed to create file for artifact: {uuid} on disk: {e}"); + if let Some(tx) = persisted_tx.take() { + let _ = tx.send(Err(EnqueueError::Other(anyhow::anyhow!( + "failed to create file for artifact {uuid}: {e}" + )))); + } - #[cfg(test)] - if let Some(hooks) = &self.test_hooks { - hooks.entry_received_tx.send(uuid.clone()).await.unwrap(); - } - return; + #[cfg(test)] + if let Some(hooks) = &self.test_hooks { + hooks.entry_received_tx.send(uuid.clone()).await.unwrap(); + } + return; + }, + }; + + ( + async_write_checksummed_data(tokio::fs::File::from_std(file), target_file).await, + StorageFormat::CHECKSUMMED, + ) + }, + UploadSource::Path(source_path) => { + let result = if let Err(e) = self + .file_system + .rename_file(&source_path, &target_path) + .await + { + log::debug!("failed to move artifact source, falling back to copy: {e}"); + match std::fs::File::open(&source_path) { + Ok(source_file) => match self.file_system.create_file(&target_path).await { + Ok(target_file) => { + let result = + async_write_checksummed_data(tokio::fs::File::from_std(source_file), target_file) + .await; + if result.is_ok() + && let Err(e) = self.file_system.delete_file(&source_path).await + { + log::debug!( + "failed to delete moved source file {}: {e}", + source_path.display() + ); + } + result + }, + Err(e) => Err(e), + }, + Err(e) => Err(anyhow::anyhow!( + "failed to open file for artifact {} on disk: {}", + source_path.display(), + e + )), + } + } else { + Ok(()) + }; + + (result, StorageFormat::RAW) }, }; - if let Err(e) = async_write_checksummed_data(tokio::fs::File::from_std(file), target_file).await - { + if let Err(e) = write_result { log::warn!("failed to write artifact to disk: {uuid} to disk: {e}"); + if let Some(tx) = persisted_tx.take() { + let _ = tx.send(Err(EnqueueError::Other(anyhow::anyhow!( + "failed to write artifact to disk {uuid}: {e}" + )))); + } #[cfg(test)] if let Some(hooks) = &self.test_hooks { @@ -673,6 +797,7 @@ impl Uploader { .into_iter() .map(|(key, value)| (key.into(), value.into_proto())) .collect(), + storage_format: storage_format.into(), feature_flags: feature_flags .into_iter() .map( @@ -692,6 +817,9 @@ impl Uploader { }); self.write_index().await; + if let Some(tx) = persisted_tx { + let _ = tx.send(Ok(())); + } #[cfg(test)] diff --git a/bd-artifact-upload/src/uploader_test.rs b/bd-artifact-upload/src/uploader_test.rs index 7ca5b6549..add6e9d8e 100644 --- a/bd-artifact-upload/src/uploader_test.rs +++ b/bd-artifact-upload/src/uploader_test.rs @@ -6,7 +6,14 @@ // https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt use super::UploadClient; -use crate::uploader::{Client, REPORT_DIRECTORY, REPORT_INDEX_FILE, SnappedFeatureFlag}; +use crate::uploader::{ + Client, + EnqueueError, + REPORT_DIRECTORY, + REPORT_INDEX_FILE, + SnappedFeatureFlag, + UploadSource, +}; use assert_matches::assert_matches; use bd_api::DataUpload; use bd_api::upload::{IntentResponse, UploadResponse}; @@ -160,12 +167,13 @@ async fn basic_flow() { let id = setup .client .enqueue_upload( - setup.make_file(b"abc"), + UploadSource::File(setup.make_file(b"abc")), "client_report".to_string(), [("foo".into(), "bar".into())].into(), Some(timestamp), "session_id".to_string(), vec![], + None, ) .unwrap(); @@ -219,7 +227,7 @@ async fn feature_flags() { let id = setup .client .enqueue_upload( - setup.make_file(b"abc"), + UploadSource::File(setup.make_file(b"abc")), "client_report".to_string(), [("foo".into(), "bar".into())].into(), Some(timestamp), @@ -232,6 +240,7 @@ async fn feature_flags() { ), SnappedFeatureFlag::new("key2".to_string(), None, timestamp - 2.std_seconds()), ], + None, ) .unwrap(); @@ -295,12 +304,13 @@ async fn pending_upload_limit() { let id1 = setup .client .enqueue_upload( - setup.make_file(b"1"), + UploadSource::File(setup.make_file(b"1")), "client_report".to_string(), [].into(), None, "session_id".to_string(), vec![], + None, ) .unwrap(); assert_eq!( @@ -311,12 +321,13 @@ async fn pending_upload_limit() { let id2 = setup .client .enqueue_upload( - setup.make_file(b"2"), + UploadSource::File(setup.make_file(b"2")), "client_report".to_string(), [].into(), None, "session_id".to_string(), vec![], + None, ) .unwrap(); assert_eq!( @@ -326,12 +337,13 @@ async fn pending_upload_limit() { let id3 = setup .client .enqueue_upload( - setup.make_file(b"3"), + UploadSource::File(setup.make_file(b"3")), "client_report".to_string(), [].into(), None, "session_id".to_string(), vec![], + None, ) .unwrap(); assert_eq!( @@ -394,12 +406,13 @@ async fn inconsistent_state_missing_file() { let id1 = setup .client .enqueue_upload( - setup.make_file(b"1"), + UploadSource::File(setup.make_file(b"1")), "client_report".to_string(), [].into(), None, "session_id".to_string(), vec![], + None, ) .unwrap(); assert_eq!( @@ -409,12 +422,13 @@ async fn inconsistent_state_missing_file() { let id2 = setup .client .enqueue_upload( - setup.make_file(b"2"), + UploadSource::File(setup.make_file(b"2")), "client_report".to_string(), [].into(), None, "session_id".to_string(), vec![], + None, ) .unwrap(); assert_eq!( @@ -452,12 +466,13 @@ async fn inconsistent_state_extra_file() { let id1 = setup .client .enqueue_upload( - setup.make_file(b"1"), + UploadSource::File(setup.make_file(b"1")), "client_report".to_string(), [].into(), None, "session_id".to_string(), vec![], + None, ) .unwrap(); assert_eq!( @@ -523,12 +538,13 @@ async fn disk_persistence() { let id1 = setup .client .enqueue_upload( - setup.make_file(b"1"), + UploadSource::File(setup.make_file(b"1")), "client_report".to_string(), [].into(), None, "session_id".to_string(), vec![], + None, ) .unwrap(); assert_eq!( @@ -570,12 +586,13 @@ async fn inconsistent_state_missing_index() { let id1 = setup .client .enqueue_upload( - setup.make_file(b"1"), + UploadSource::File(setup.make_file(b"1")), "client_report".to_string(), [].into(), None, "session_id".to_string(), vec![], + None, ) .unwrap(); assert_eq!( @@ -594,12 +611,13 @@ async fn inconsistent_state_missing_index() { let id2 = setup .client .enqueue_upload( - setup.make_file(b"2"), + UploadSource::File(setup.make_file(b"2")), "client_report".to_string(), [].into(), None, "session_id".to_string(), vec![], + None, ) .unwrap(); assert_eq!( @@ -640,12 +658,13 @@ async fn new_entry_disk_full() { let id1 = setup .client .enqueue_upload( - setup.make_file(b"1"), + UploadSource::File(setup.make_file(b"1")), "client_report".to_string(), [].into(), None, "session_id".to_string(), vec![], + None, ) .unwrap(); assert_eq!( @@ -669,12 +688,13 @@ async fn new_entry_disk_full_after_received() { let id1 = setup .client .enqueue_upload( - setup.make_file(b"1"), + UploadSource::File(setup.make_file(b"1")), "client_report".to_string(), [].into(), None, "session_id".to_string(), vec![], + None, ) .unwrap(); assert_eq!( @@ -710,12 +730,13 @@ async fn intent_retries() { let id1 = setup .client .enqueue_upload( - setup.make_file(b"1"), + UploadSource::File(setup.make_file(b"1")), "client_report".to_string(), [].into(), None, "session_id".to_string(), vec![], + None, ) .unwrap(); assert_eq!( @@ -745,12 +766,13 @@ async fn intent_drop() { let id1 = setup .client .enqueue_upload( - setup.make_file(b"1"), + UploadSource::File(setup.make_file(b"1")), "client_report".to_string(), [].into(), None, "session_id".to_string(), vec![], + None, ) .unwrap(); assert_eq!( @@ -782,12 +804,13 @@ async fn upload_retries() { let id1 = setup .client .enqueue_upload( - setup.make_file(b"1"), + UploadSource::File(setup.make_file(b"1")), "client_report".to_string(), [].into(), None, "session_id".to_string(), vec![], + None, ) .unwrap(); assert_eq!( @@ -834,12 +857,13 @@ async fn normalize_type_id_on_load() { let id = setup .client .enqueue_upload( - setup.make_file(b"abc"), + UploadSource::File(setup.make_file(b"abc")), "client_report".to_string(), [].into(), None, "session_id".to_string(), vec![], + None, ) .unwrap(); assert_eq!( @@ -883,3 +907,106 @@ async fn normalize_type_id_on_load() { assert_eq!(intent.payload.type_id, "client_report"); }); } + +#[tokio::test] +async fn enqueue_upload_acknowledges_after_disk_persist() { + let mut setup = Setup::new(2).await; + + let (persisted_tx, persisted_rx) = tokio::sync::oneshot::channel(); + let id = setup + .client + .enqueue_upload( + UploadSource::File(setup.make_file(b"snapshot")), + "state_snapshot".to_string(), + [].into(), + None, + "session_id".to_string(), + vec![], + Some(persisted_tx), + ) + .unwrap(); + + assert_eq!( + setup.entry_received_rx.recv().await.unwrap(), + id.to_string() + ); + persisted_rx.await.unwrap().unwrap(); +} + +#[tokio::test] +async fn enqueue_upload_from_path_acknowledges_after_disk_persist_and_removes_source() { + let mut setup = Setup::new(2).await; + + let source_path = std::path::PathBuf::from("source_snapshot.zz"); + setup + .filesystem + .write_file(&source_path, b"snapshot") + .await + .unwrap(); + + let (persisted_tx, persisted_rx) = tokio::sync::oneshot::channel(); + let id = setup + .client + .enqueue_upload( + UploadSource::Path(source_path.clone()), + "state_snapshot".to_string(), + [].into(), + None, + "session_id".to_string(), + vec![], + Some(persisted_tx), + ) + .unwrap(); + + assert_eq!( + setup.entry_received_rx.recv().await.unwrap(), + id.to_string() + ); + persisted_rx.await.unwrap().unwrap(); + assert!(!setup.filesystem.exists(&source_path).await.unwrap()); +} + +#[tokio::test] +async fn queue_full_with_only_state_snapshots_rejects_new_state_snapshot() { + let mut setup = Setup::new(1).await; + + let (persisted_tx1, persisted_rx1) = tokio::sync::oneshot::channel(); + let id1 = setup + .client + .enqueue_upload( + UploadSource::File(setup.make_file(b"state-1")), + "state_snapshot".to_string(), + [].into(), + None, + "session_id".to_string(), + vec![], + Some(persisted_tx1), + ) + .unwrap(); + assert_eq!( + setup.entry_received_rx.recv().await.unwrap(), + id1.to_string() + ); + persisted_rx1.await.unwrap().unwrap(); + + let (persisted_tx2, persisted_rx2) = tokio::sync::oneshot::channel(); + let _id2 = setup + .client + .enqueue_upload( + UploadSource::File(setup.make_file(b"state-2")), + "state_snapshot".to_string(), + [].into(), + None, + "session_id".to_string(), + vec![], + Some(persisted_tx2), + ) + .unwrap(); + + assert_matches!(persisted_rx2.await.unwrap(), Err(EnqueueError::QueueFull)); + assert!( + timeout(100.std_milliseconds(), setup.entry_received_rx.recv()) + .await + .is_err() + ); +} diff --git a/bd-client-common/src/file_system.rs b/bd-client-common/src/file_system.rs index 5f94ab880..bac094e07 100644 --- a/bd-client-common/src/file_system.rs +++ b/bd-client-common/src/file_system.rs @@ -29,6 +29,9 @@ pub trait FileSystem: Send + Sync { /// Deletes the file if it exists. async fn delete_file(&self, path: &Path) -> anyhow::Result<()>; + /// Renames/moves a file relative to the SDK root. + async fn rename_file(&self, from: &Path, to: &Path) -> anyhow::Result<()>; + /// Deletes the directory if it exists. async fn remove_dir(&self, path: &Path) -> anyhow::Result<()>; @@ -98,6 +101,10 @@ impl FileSystem for RealFileSystem { } } + async fn rename_file(&self, from: &Path, to: &Path) -> anyhow::Result<()> { + Ok(tokio::fs::rename(self.directory.join(from), self.directory.join(to)).await?) + } + async fn remove_dir(&self, path: &Path) -> anyhow::Result<()> { match tokio::fs::remove_dir_all(self.directory.join(path)).await { Ok(()) => Ok(()), diff --git a/bd-client-common/src/test.rs b/bd-client-common/src/test.rs index c961fe95c..5b50854e9 100644 --- a/bd-client-common/src/test.rs +++ b/bd-client-common/src/test.rs @@ -85,6 +85,22 @@ impl FileSystem for TestFileSystem { Ok(()) } + async fn rename_file(&self, from: &Path, to: &Path) -> anyhow::Result<()> { + let from_path = self.directory.path().join(from); + let to_path = self.directory.path().join(to); + + tokio::fs::rename(&from_path, &to_path).await.map_err(|e| { + anyhow::anyhow!( + "failed to rename file {} to {}: {}", + from_path.display(), + to_path.display(), + e + ) + })?; + + Ok(()) + } + async fn remove_dir(&self, path: &Path) -> anyhow::Result<()> { let dir_path = self.directory.path().join(path); if !dir_path.exists() { diff --git a/bd-crash-handler/src/lib.rs b/bd-crash-handler/src/lib.rs index e1d62591b..5f89331da 100644 --- a/bd-crash-handler/src/lib.rs +++ b/bd-crash-handler/src/lib.rs @@ -22,7 +22,7 @@ pub mod config_writer; mod file_watcher; pub mod global_state; -use bd_artifact_upload::SnappedFeatureFlag; +use bd_artifact_upload::{SnappedFeatureFlag, UploadSource}; use bd_client_common::debug_check_lifecycle_less_than; use bd_client_common::init_lifecycle::{InitLifecycle, InitLifecycleState}; use bd_error_reporter::reporter::handle_unexpected; @@ -482,12 +482,13 @@ impl Monitor { log::debug!("uploading report out of band"); let Ok(artifact_id) = self.artifact_client.enqueue_upload( - file, + UploadSource::File(file), "client_report".to_string(), state_fields.clone(), timestamp, session_id.clone(), reporting_feature_flags.clone(), + None, ) else { log::warn!( "Failed to enqueue issue report for upload: {}", diff --git a/bd-crash-handler/src/monitor_test.rs b/bd-crash-handler/src/monitor_test.rs index d75668c4b..ee3d5b05b 100644 --- a/bd-crash-handler/src/monitor_test.rs +++ b/bd-crash-handler/src/monitor_test.rs @@ -6,6 +6,7 @@ // https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt use crate::{Monitor, global_state}; +use bd_artifact_upload::UploadSource; use bd_client_common::init_lifecycle::InitLifecycleState; use bd_log_primitives::{AnnotatedLogFields, LogFields}; use bd_proto::flatbuffers::report::bitdrift_public::fbs::issue_reporting::v_1::{ @@ -374,7 +375,11 @@ impl Setup { make_mut(&mut self.upload_client) .expect_enqueue_upload() .withf( - move |mut file, ftype_id, fstate, ftimestamp, fsession_id, feature_flags| { + move |source, ftype_id, fstate, ftimestamp, fsession_id, feature_flags, _persisted_tx| { + let UploadSource::File(file) = source else { + return false; + }; + let mut file = file.try_clone().unwrap(); let mut output = vec![]; file.read_to_end(&mut output).unwrap(); let content_match = output == content; @@ -405,7 +410,7 @@ impl Setup { && flags_match }, ) - .returning(move |_, _, _, _, _, _| Ok(uuid)); + .returning(move |_, _, _, _, _, _, _| Ok(uuid)); } } @@ -786,12 +791,12 @@ async fn file_watcher_processes_multiple_reports() { .expect_enqueue_upload() .times(1) .in_sequence(&mut seq) - .returning(move |_, _, _, _, _, _| Ok(uuid1)); + .returning(move |_, _, _, _, _, _, _| Ok(uuid1)); make_mut(&mut setup.upload_client) .expect_enqueue_upload() .times(1) .in_sequence(&mut seq) - .returning(move |_, _, _, _, _, _| Ok(uuid2)); + .returning(move |_, _, _, _, _, _, _| Ok(uuid2)); // Create two crash reports let data1 = CrashReportBuilder::new("Crash1").reason("error1").build(); diff --git a/bd-log/src/rate_limit_log.rs b/bd-log/src/rate_limit_log.rs index f6840d7a5..fc17a3f3b 100644 --- a/bd-log/src/rate_limit_log.rs +++ b/bd-log/src/rate_limit_log.rs @@ -89,4 +89,7 @@ macro_rules! log_every { }; } -pub use {error_every, warn_every, warn_every_debug_assert, warn_every_debug_panic}; +pub use error_every; +pub use warn_every; +pub use warn_every_debug_assert; +pub use warn_every_debug_panic; diff --git a/bd-logger/AGENTS.md b/bd-logger/AGENTS.md new file mode 100644 index 000000000..ec026d579 --- /dev/null +++ b/bd-logger/AGENTS.md @@ -0,0 +1,123 @@ +# bd-logger - Agent Guidelines + +This document covers design decisions and behavioral invariants for `bd-logger` that are not +obvious from reading the code alone. + +## State Snapshot Uploads + +### Why State Snapshots Exist + +A snapshot file contains N state entries, each carrying its own original write timestamp. The +snapshot's filename timestamp is the *rotation* timestamp — the moment the journal was compacted +— which is always **after** the log timestamps it covers. The server reconstructs which state +was active at log time T by replaying the per-entry timestamps within the snapshot (entries with +write timestamp ≤ T), not by comparing T against the snapshot's rotation timestamp. Logs and +state travel separately — the logger just needs to ensure the relevant snapshot files are +uploaded so the server has them available when it processes those logs. + +### Architecture: Handle + Worker + +State upload coordination is split into two types: + +- **`StateUploadHandle`** — a cheap, `Arc`-cloneable coalescing handle. Each buffer uploader holds one. When + a batch is about to be flushed, the uploader calls + `handle.notify_upload_needed(batch_oldest_micros, batch_newest_micros)` in a fire-and-forget + manner. The call merges the range into shared pending state protected by a mutex, then + best-effort nudges the worker via a bounded wake channel; it never blocks the log upload path. + +- **`StateUploadWorker`** — a single background task that owns all snapshot creation and upload + logic. Because exactly one task processes requests, deduplication and cooldown enforcement are + centralized. On each wakeup (or retry tick), the worker drains/coalesces shared pending state, + then processes the widest pending range before deciding whether to act. + +The handle and worker are created together via `StateUploadHandle::new`. + +### Upload Decision Logic + +When the worker receives a batch's timestamp range `[oldest, newest]`, it evaluates in order: + +1. **Snapshot files exist** in `{state_store_path}/snapshots/` → upload snapshots whose filename + timestamp is within the current pending log range `[oldest, newest]` (oldest-first). +2. **No in-range snapshots found** → decide whether to create one on-demand via + `state_store.rotate_journal()`, subject to cooldown and the in-process + `last_change_at_rotation` optimization (skip if no changes since last worker-triggered + rotation). + +Snapshot discovery runs before on-demand creation checks, so persisted pending coverage can match +existing on-disk snapshots across restarts. + +### Snapshot Cooldown + +Typically snapshots are created in response to the state journal filling up due to state updates, but +but when logs are streamed we may need to periodically create snapshots in order to upload state changes. +The state uploader may trigger manual snapshot creation by calling into the state store. + +Creating a snapshot on every batch flush during high-volume streaming is wasteful. The worker +tracks `last_snapshot_creation_micros` and will not create a new snapshot if one was created +within `snapshot_creation_interval_micros` (a runtime-configurable value). During cooldown, the +worker defers on-demand creation and keeps pending work for retry. + +### Snapshot Move Semantics + +State snapshot uploads are enqueued via `enqueue_upload(UploadSource::Path(...))`: the snapshot +file is moved +(renamed) from `state/snapshots/` into `bd-artifact-upload`'s `report_uploads/` directory. This +means: + +- No re-copy/re-checksum pass is required for snapshot files (they are already zlib compressed). +- Once the enqueue ack succeeds, the file has left `state/snapshots/`, so the worker will not + re-upload it. +- If enqueue fails, the file remains in `state/snapshots/`, so the next retry still sees it. + +Upload selection is range-based over file presence; there is no separate uploaded watermark state. + +### Pending Range Durability + +The worker persists pending coverage to key-value storage (`state_upload.pending_range.1`) whenever +it drains/merges producer requests, and clears it after successful processing. On startup, it reads +this key and immediately processes recovered pending work before entering the normal wake loop. + +During successful upload progress, the worker tightens pending coverage by advancing +`pending_range.oldest_micros` after each snapshot enqueue persistence ack. This narrowed coverage is +persisted immediately so restart resumes with the same tighter lower bound. + +### Retention Ownership + +Retention is split by responsibility: + +- **Buffer consumer retention handles** are the source of truth for logs that may still be uploaded + in the future. +- **State upload worker retention handle** protects only the uploader's current pending coverage. + +The worker sets its retention handle from `pending_range.oldest_micros` while pending work exists +and uses `RETENTION_NONE` when pending work is empty. + +### BatchBuilder Timestamp Tracking + +`BatchBuilder` (in `consumer.rs`) tracks `oldest_micros` and `newest_micros` incrementally as +logs are added via `add_log`. This avoids a second scan of the batch at flush time. Both fields +are reset to `None` by `take()` when the batch is consumed. Callers must read `timestamp_range()` +*before* calling `take()` — `take()` resets the fields. + +The three flush paths that interact with state uploads are: +- `ContinuousBufferUploader::flush_current_batch` +- `StreamedBufferUpload::start` +- `CompleteBufferUpload::flush_batch` + +All three follow the same pattern: read `timestamp_range()`, call `notify_upload_needed` if a +range is available, then call `take()` to produce the log batch. + +### Wake Channel Backpressure + +The wake channel has capacity 1. If wake signaling is saturated, +`notify_upload_needed` still records the requested range in shared pending state and returns. A missed wake does not lose coverage; the worker will observe pending state on the next wake/timer cycle, and version tracking forces immediate reprocessing when producers update pending state while +the worker is active. + +### Key Invariants + +- Snapshot uploads are considered confirmed once they are successfully enqueued to the + `bd-artifact-upload` queue (which persists them to disk and retries the network upload). If + enqueue fails, the source file is still present in `state/snapshots/` and the next batch will + retry. +- Snapshot creation and upload progress logic run in the single worker task. Producer-side range + coalescing is concurrent but synchronized via a mutex-backed accumulator. diff --git a/bd-logger/Cargo.toml b/bd-logger/Cargo.toml index 942967b5d..42c6fa228 100644 --- a/bd-logger/Cargo.toml +++ b/bd-logger/Cargo.toml @@ -60,6 +60,7 @@ tokio.workspace = true tower.workspace = true tracing.workspace = true unwrap-infallible.workspace = true +uuid.workspace = true [dev-dependencies] assert_matches.workspace = true diff --git a/bd-logger/src/builder.rs b/bd-logger/src/builder.rs index a84c884fc..89bcf101a 100644 --- a/bd-logger/src/builder.rs +++ b/bd-logger/src/builder.rs @@ -13,6 +13,7 @@ use crate::internal::InternalLogger; use crate::log_replay::LoggerReplay; use crate::logger::Logger; use crate::logging_state::UninitializedLoggingContext; +use crate::state_upload::StateUploadHandle; use crate::{InitParams, LogAttributesOverrides}; use bd_api::{ AggregatedNetworkQualityProvider, @@ -325,11 +326,40 @@ impl LoggerBuilder { &collector_clone, shutdown_handle.make_shutdown(), ); + let artifact_client: Arc = Arc::new(artifact_client); + + // Create state upload handle for uploading state snapshots alongside logs. + // Gated by the `state.upload_enabled` runtime flag, which defaults to false as a + // safe rollout mechanism. + let state_upload_enabled = + *bd_runtime::runtime::state::StateUploadEnabled::register(&runtime_loader) + .into_inner() + .borrow(); + let (state_upload_handle, state_upload_worker) = if state_upload_enabled { + let snapshot_creation_interval_ms = + *bd_runtime::runtime::state::SnapshotCreationIntervalMs::register(&runtime_loader) + .into_inner() + .borrow(); + let (handle, worker) = StateUploadHandle::new( + Some(state_directory.clone()), + self.params.store.clone(), + Some(retention_registry.clone()), + Some(Arc::new(state_store.clone())), + snapshot_creation_interval_ms, + time_provider.clone(), + artifact_client.clone(), + &scope, + ) + .await; + (Some(Arc::new(handle)), Some(worker)) + } else { + (None, None) + }; let crash_monitor = Monitor::new( &self.params.sdk_directory, self.params.store.clone(), - Arc::new(artifact_client), + artifact_client.clone(), self.params.session_strategy.clone(), &init_lifecycle, state_store.clone(), @@ -365,6 +395,7 @@ impl LoggerBuilder { trigger_upload_rx, &scope, log.clone(), + state_upload_handle, ); let updater = Arc::new(client_config::Config::new( @@ -429,6 +460,12 @@ impl LoggerBuilder { async move { artifact_uploader.run().await; Ok(()) + }, + async move { + if let Some(worker) = state_upload_worker { + worker.run().await; + } + Ok(()) } ) .map(|_| ()) diff --git a/bd-logger/src/consumer.rs b/bd-logger/src/consumer.rs index 0b998a3b5..b9f63bb81 100644 --- a/bd-logger/src/consumer.rs +++ b/bd-logger/src/consumer.rs @@ -10,8 +10,9 @@ mod consumer_test; use crate::service::{self, UploadRequest}; +use crate::state_upload::StateUploadHandle; +use bd_api::TriggerUpload; use bd_api::upload::LogBatch; -use bd_api::{DataUpload, TriggerUpload}; use bd_buffer::{AbslCode, Buffer, BufferEvent, BufferEventWithResponse, Consumer, Error}; use bd_client_common::error::InvariantError; use bd_client_common::maybe_await; @@ -21,6 +22,7 @@ use bd_log_primitives::EncodableLog; use bd_resilient_kv::RetentionHandle; use bd_runtime::runtime::{ConfigLoader, DurationWatch, IntWatch, Watch}; use bd_shutdown::{ComponentShutdown, ComponentShutdownTrigger}; +use bd_time::OffsetDateTimeExt; use futures_util::future::try_join_all; use std::collections::{HashMap, HashSet}; use std::pin::Pin; @@ -93,17 +95,21 @@ pub struct BufferUploadManager { stream_buffer_shutdown_trigger: Option, old_logs_dropped: Counter, + + // State upload handle for uploading state snapshots before logs. + state_upload_handle: Option>, } impl BufferUploadManager { pub(crate) fn new( - data_upload_tx: Sender, + data_upload_tx: tokio::sync::mpsc::Sender, runtime_loader: &Arc, shutdown: ComponentShutdown, buffer_event_rx: Receiver, trigger_upload_rx: Receiver, stats: &Scope, logging: Arc, + state_upload_handle: Option>, ) -> Self { Self { log_upload_service: service::new(data_upload_tx, shutdown.clone(), runtime_loader, stats), @@ -123,6 +129,7 @@ impl BufferUploadManager { logging, stream_buffer_shutdown_trigger: None, old_logs_dropped: stats.counter("old_logs_dropped"), + state_upload_handle, } } @@ -164,7 +171,6 @@ impl BufferUploadManager { ) -> anyhow::Result<()> { log::debug!("received trigger upload request"); - let mut buffer_upload_completions = vec![]; for buffer_id in trigger_upload.buffer_ids { @@ -359,6 +365,7 @@ impl BufferUploadManager { self.feature_flags.clone(), shutdown_trigger.make_shutdown(), buffer_name.to_string(), + self.state_upload_handle.clone(), ), shutdown_trigger, )) @@ -377,6 +384,7 @@ impl BufferUploadManager { self.log_upload_service.clone(), buffer_name.to_string(), self.old_logs_dropped.clone(), + self.state_upload_handle.clone(), )) } } @@ -391,6 +399,8 @@ struct BatchBuilder { flags: Flags, total_bytes: usize, logs: Vec>, + oldest_micros: Option, + newest_micros: Option, } impl BatchBuilder { @@ -399,10 +409,17 @@ impl BatchBuilder { flags, total_bytes: 0, logs: Vec::new(), + oldest_micros: None, + newest_micros: None, } } fn add_log(&mut self, data: Vec) { + if let Some(ts) = EncodableLog::extract_timestamp(&data) { + let ts_micros = ts.unix_timestamp_micros().cast_unsigned(); + self.oldest_micros = Some(self.oldest_micros.map_or(ts_micros, |o| o.min(ts_micros))); + self.newest_micros = Some(self.newest_micros.map_or(ts_micros, |n| n.max(ts_micros))); + } self.total_bytes += data.len(); self.logs.push(data); } @@ -418,9 +435,17 @@ impl BatchBuilder { max_batch_size_bytes <= self.total_bytes || max_batch_size_logs <= self.logs.len() } + /// Returns the timestamp range (oldest, newest) of logs added to the current batch, + /// or `None` if no logs with extractable timestamps have been added. + fn timestamp_range(&self) -> Option<(u64, u64)> { + self.oldest_micros.zip(self.newest_micros) + } + /// Consumes the current batch, resetting all accounting. fn take(&mut self) -> Vec> { self.total_bytes = 0; + self.oldest_micros = None; + self.newest_micros = None; self.logs.drain(..).collect() } } @@ -449,6 +474,8 @@ struct ContinuousBufferUploader { buffer_id: String, + // State upload handle for uploading state snapshots before logs. + state_upload_handle: Option>, retention_handle: RetentionHandle, } @@ -460,6 +487,7 @@ impl ContinuousBufferUploader { feature_flags: Flags, shutdown: ComponentShutdown, buffer_id: String, + state_upload_handle: Option>, ) -> Self { Self { consumer, @@ -469,6 +497,7 @@ impl ContinuousBufferUploader { batch_builder: BatchBuilder::new(feature_flags.clone()), feature_flags, buffer_id, + state_upload_handle, retention_handle, } } @@ -518,11 +547,15 @@ impl ContinuousBufferUploader { // Disarm the deadline which forces a partial flush to fire. self.flush_batch_sleep = None; + let timestamp_range = self.batch_builder.timestamp_range(); let logs = self.batch_builder.take(); let logs_len = logs.len(); - log::debug!("flushing {logs_len} logs"); + if let (Some(handle), Some((oldest, newest))) = (&self.state_upload_handle, timestamp_range) { + handle.notify_upload_needed(oldest, newest); + } + // Attempt to perform an upload of these buffers, with retries ++. See logger/service.rs for // details about retry policies etc. let upload_future = async { @@ -639,6 +672,8 @@ impl StreamedBufferUpload { } } + // TODO(snowp): Handle streaming state updates. + let upload_future = async { self .log_upload_service @@ -685,6 +720,9 @@ struct CompleteBufferUpload { lookback_window: Option, old_logs_dropped: Counter, + + // State upload handle for uploading state snapshots before logs. + state_upload_handle: Option>, } impl CompleteBufferUpload { @@ -694,6 +732,7 @@ impl CompleteBufferUpload { log_upload_service: service::Upload, buffer_id: String, old_logs_dropped: Counter, + state_upload_handle: Option>, ) -> Self { let lookback_window_limit = *runtime_flags.upload_lookback_window_feature_flag.read(); @@ -710,6 +749,7 @@ impl CompleteBufferUpload { buffer_id, lookback_window, old_logs_dropped, + state_upload_handle, } } @@ -760,10 +800,15 @@ impl CompleteBufferUpload { } async fn flush_batch(&mut self) -> anyhow::Result<()> { + let timestamp_range = self.batch_builder.timestamp_range(); let logs = self.batch_builder.take(); - log::debug!("flushing {} logs", logs.len()); + // Upload state snapshot if needed before uploading logs + if let (Some(handle), Some((oldest, newest))) = (&self.state_upload_handle, timestamp_range) { + handle.notify_upload_needed(oldest, newest); + } + // Attempt to perform an upload of these buffers, with retries ++. See logger/service.rs for // details about retry policies etc. let result = self diff --git a/bd-logger/src/consumer_test.rs b/bd-logger/src/consumer_test.rs index 5281e7b32..91b188bc8 100644 --- a/bd-logger/src/consumer_test.rs +++ b/bd-logger/src/consumer_test.rs @@ -91,6 +91,7 @@ impl SetupSingleConsumer { make_flags(&runtime_loader), shutdown_trigger.make_shutdown(), "buffer".to_string(), + None, ); tokio::spawn(async move { uploader.consume_continuous_logs().await }); @@ -359,6 +360,7 @@ async fn continuous_buffer_sets_retention_none_when_batch_drains_buffer() { make_flags(&runtime_loader), shutdown_trigger.make_shutdown(), "buffer".to_string(), + None, ); tokio::spawn(async move { uploader.consume_continuous_logs().await.unwrap() }); @@ -712,6 +714,7 @@ impl SetupMultiConsumer { trigger_upload_rx, &collector_clone.scope("consumer"), bd_internal_logging::NoopLogger::new(), + None, ) .run() .await @@ -980,7 +983,6 @@ async fn log_streaming() { let runtime_loader = ConfigLoader::new(&PathBuf::from(".")); let (log_upload_tx, mut log_upload_rx) = tokio::sync::mpsc::channel(1); - let upload_service = service::new( log_upload_tx, shutdown_trigger.make_shutdown(), @@ -1092,7 +1094,6 @@ async fn log_streaming_shutdown() { let runtime_loader = ConfigLoader::new(&PathBuf::from(".")); let (log_upload_tx, mut log_upload_rx) = tokio::sync::mpsc::channel(1); - let upload_service = service::new( log_upload_tx, global_shutdown_trigger.make_shutdown(), diff --git a/bd-logger/src/lib.rs b/bd-logger/src/lib.rs index 445a321f7..0fbb88487 100644 --- a/bd-logger/src/lib.rs +++ b/bd-logger/src/lib.rs @@ -32,6 +32,9 @@ mod network; mod ordered_receiver; mod pre_config_buffer; mod service; +mod state_upload; + +pub use state_upload::{SnapshotRef, StateUploadHandle}; #[cfg(test)] mod test; diff --git a/bd-logger/src/state_upload.rs b/bd-logger/src/state_upload.rs new file mode 100644 index 000000000..b2b44b9ab --- /dev/null +++ b/bd-logger/src/state_upload.rs @@ -0,0 +1,633 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +//! State snapshot upload coordination for log uploads. +//! +//! This module provides the [`StateUploadHandle`] which tracks the coordination between log +//! and state snapshots. The server associates logs with state by timestamp - logs at time T use the +//! most recent state snapshot uploaded before time T. +//! +//! The [`StateUploadHandle`] ensures that: +//! - State snapshots are marked for upload as logs that depend on them +//! - Duplicate snapshot uploads are avoided across multiple buffers +//! - Snapshot coverage is tracked across process restarts via persistence +//! - Snapshot selection is scoped to the pending log timestamp range, avoiding unnecessary uploads +//! +//! ## Architecture +//! +//! Upload coordination is split into two parts: +//! +//! - [`StateUploadHandle`] — a cheap, cloneable handle held by each buffer uploader. Callers +//! fire-and-forget upload requests via [`StateUploadHandle::notify_upload_needed`], which +//! coalesces ranges in shared state and emits best-effort wake signals without blocking. +//! +//! - [`StateUploadWorker`] — a single background task that owns all snapshot creation and upload +//! logic. Because only one task processes requests, deduplication and cooldown enforcement happen +//! naturally without any locking between callers. +//! +//! The worker persists pending upload range coverage in key-value storage and recovers it on +//! startup. Snapshot artifacts are enqueued with `enqueue_upload(UploadSource::Path(...))`, which +//! preserves move semantics from `state/snapshots/` into artifact upload storage. + +#[cfg(test)] +#[path = "./state_upload_test.rs"] +mod tests; + +use bd_artifact_upload::{Client as ArtifactClient, EnqueueError, UploadSource}; +use bd_client_stats_store::{Counter, Scope}; +use bd_log_primitives::LogFields; +use bd_proto::protos::client::key_value::StateSnapshotRange; +use bd_resilient_kv::SnapshotFilename; +use bd_state::{RetentionHandle, RetentionRegistry}; +use bd_time::{OffsetDateTimeExt, TimeProvider}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use time::OffsetDateTime; +use tokio::sync::mpsc; +use tokio::time::{Duration, sleep}; + +const BACKPRESSURE_RETRY_INTERVAL: Duration = Duration::from_secs(30); +static PENDING_UPLOAD_RANGE_KEY: bd_key_value::Key = + bd_key_value::Key::new("state_upload.pending_range.1"); + + +/// A reference to a state snapshot that should be uploaded. +#[derive(Debug, Clone)] +pub struct SnapshotRef { + /// The timestamp of the snapshot (microseconds since epoch). + pub timestamp_micros: u64, + /// Path to the snapshot file. + pub path: PathBuf, +} + +#[derive(Clone, Copy)] +struct PendingRange { + oldest_micros: u64, + newest_micros: u64, +} + +impl PendingRange { + fn merge(&mut self, other: Self) { + self.oldest_micros = self.oldest_micros.min(other.oldest_micros); + self.newest_micros = self.newest_micros.max(other.newest_micros); + } +} + +#[derive(Default)] +struct PendingAccumulator { + range: Option, + version: u64, + wake_queued: bool, +} + +struct Stats { + snapshots_uploaded: Counter, + snapshots_skipped: Counter, + upload_failures: Counter, + backpressure_pauses: Counter, +} + +impl Stats { + fn new(scope: &Scope) -> Self { + Self { + snapshots_uploaded: scope.counter("snapshots_uploaded"), + snapshots_skipped: scope.counter("snapshots_skipped"), + upload_failures: scope.counter("upload_failures"), + backpressure_pauses: scope.counter("backpressure_pauses"), + } + } +} + +/// Coordinates state snapshot uploads before log uploads. +/// +/// This is a lightweight, cloneable coalescing handle. Buffer uploaders call +/// [`notify_upload_needed`][Self::notify_upload_needed] in a fire-and-forget manner — +/// the call is non-blocking and never waits for the snapshot to be created or uploaded. +/// +/// All actual snapshot creation and upload logic is handled by the companion +/// [`StateUploadWorker`], which runs as a single background task. +pub struct StateUploadHandle { + /// Best-effort wake channel for nudging the background worker. + wake_tx: mpsc::Sender<()>, + /// Shared pending-range accumulator. + pending_accumulator: Arc>, +} + +impl StateUploadHandle { + /// Creates a new handle and its companion worker. + /// + /// The returned [`StateUploadWorker`] must be spawned (e.g. via `tokio::spawn` or included in a + /// `try_join!`) for snapshot uploads to be processed. The handle can be cloned and + /// shared across multiple buffer uploaders. + pub async fn new( + state_store_path: Option, + store: Arc, + retention_registry: Option>, + state_store: Option>, + snapshot_creation_interval_ms: u32, + time_provider: Arc, + artifact_client: Arc, + stats_scope: &Scope, + ) -> (Self, StateUploadWorker) { + let stats = Stats::new(&stats_scope.scope("state_upload")); + + let (wake_tx, wake_rx) = mpsc::channel(1); + let pending_accumulator = Arc::new(parking_lot::Mutex::new(PendingAccumulator::default())); + let retention_handle = match &retention_registry { + Some(registry) => Some(registry.create_handle().await), + None => None, + }; + + let handle = Self { + wake_tx, + pending_accumulator: pending_accumulator.clone(), + }; + + let worker = StateUploadWorker { + last_snapshot_creation_micros: AtomicU64::new(0), + snapshot_creation_interval_micros: u64::from(snapshot_creation_interval_ms) * 1000, + last_change_at_rotation: None, + state_store_path, + store, + retention_handle, + state_store, + time_provider, + artifact_client, + wake_rx, + pending_accumulator, + pending_version_seen: 0, + pending_range: None, + stats, + }; + + (handle, worker) + } + + /// Notifies the uploader that a state snapshot upload may be needed for a log batch. + /// + /// This is non-blocking. The range is first merged into a shared accumulator, then the worker is + /// nudged via a best-effort wake channel. + pub fn notify_upload_needed(&self, batch_oldest_micros: u64, batch_newest_micros: u64) { + let should_wake = { + let mut pending = self.pending_accumulator.lock(); + let incoming = PendingRange { + oldest_micros: batch_oldest_micros, + newest_micros: batch_newest_micros, + }; + if let Some(existing) = &mut pending.range { + existing.merge(incoming); + } else { + pending.range = Some(incoming); + } + pending.version = pending.version.wrapping_add(1); + if pending.wake_queued { + false + } else { + pending.wake_queued = true; + true + } + }; + + if should_wake { + // If this fails there is already a pending wake in the channel so we don't have to worry + // about nudging the worker later - it will process the updated pending range when it wakes + // up. + let _ = self.wake_tx.try_send(()); + } + } +} + + + +// +// StateUploadWorker +// + +/// Background task that processes state snapshot upload requests. +/// +/// There is exactly one worker per logger instance. Because all upload logic runs in a single +/// task, deduplication and cooldown enforcement require no synchronization. +/// +/// Obtain via [`StateUploadHandle::new`] and spawn with `tokio::spawn` or `try_join!`. +pub struct StateUploadWorker { + /// Timestamp of the last snapshot creation (microseconds since epoch). + last_snapshot_creation_micros: AtomicU64, + /// Minimum interval between snapshot creations (microseconds). + snapshot_creation_interval_micros: u64, + /// The value of `last_change_micros` at the time of the most recent worker-initiated rotation. + /// Used to avoid redundant rotations when nothing has changed since the last one. `None` means + /// the worker has not yet performed a rotation, so the first attempt always proceeds. + last_change_at_rotation: Option, + + state_store_path: Option, + store: Arc, + retention_handle: Option, + state_store: Option>, + time_provider: Arc, + artifact_client: Arc, + + /// Used to coordinate updates to the pending range and best-effort wake signals from the handle. + wake_rx: mpsc::Receiver<()>, + pending_accumulator: Arc>, + pending_version_seen: u64, + pending_range: Option, + + stats: Stats, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ProcessResult { + Progress, + Backpressure, + DeferredCooldown, + Skipped, + Error, +} + +enum UploadPreflight { + Skipped, + DeferredCooldown, + Ready(Vec), +} + + +impl StateUploadWorker { + /// Returns the path to the state store directory, if configured. + #[must_use] + pub fn state_store_path(&self) -> Option<&Path> { + self.state_store_path.as_deref() + } + + /// Runs the worker event loop, processing upload requests until the channel is closed. + pub async fn run(mut self) { + log::debug!("state upload worker started"); + self.pending_range = self.read_persisted_pending_range(); + self.refresh_retention_handle(); + if self.pending_range.is_some() { + self.process_pending().await; + } + + loop { + tokio::select! { + Some(()) = self.wake_rx.recv() => { + self.drain_pending_accumulator(); + self.process_pending().await; + while self.pending_version_changed() { + self.drain_pending_accumulator(); + self.process_pending().await; + } + } + () = sleep(BACKPRESSURE_RETRY_INTERVAL), if self.pending_range.is_some() => { + self.drain_pending_accumulator(); + self.process_pending().await; + } + else => break, + } + } + } + + fn drain_pending_accumulator(&mut self) { + let mut pending = self.pending_accumulator.lock(); + if let Some(incoming) = pending.range.take() { + if let Some(existing) = &mut self.pending_range { + existing.merge(incoming); + } else { + self.pending_range = Some(incoming); + } + } + self.pending_version_seen = pending.version; + pending.wake_queued = false; + self.persist_pending_range(); + } + + fn pending_version_changed(&self) -> bool { + let pending = self.pending_accumulator.lock(); + pending.version != self.pending_version_seen + } + + async fn process_pending(&mut self) { + let Some(pending) = self.pending_range else { + return; + }; + let result = self + .process_upload(pending.oldest_micros, pending.newest_micros) + .await; + if matches!( + result, + ProcessResult::Backpressure | ProcessResult::DeferredCooldown + ) { + self.stats.backpressure_pauses.inc(); + } + + if matches!(result, ProcessResult::Progress | ProcessResult::Skipped) { + self.pending_range = None; + } + self.persist_pending_range(); + self.refresh_retention_handle(); + } + + // State upload flow: + // 1) Build an upload plan in `plan_upload_attempt` by checking coverage/last-change state, + // finding in-range snapshots, and deciding whether on-demand snapshot creation is needed. + // 2) Handle preflight outcomes: + // - `Skipped`: no work required for current coverage. + // - `DeferredCooldown`: uncovered changes exist but snapshot creation is rate-limited. + // - `Ready`: concrete snapshots should be uploaded now. + // 3) For each ready snapshot, enqueue and wait for persistence ack. + // 4) On success, count the snapshot upload and continue; on failure, keep pending work for retry. + async fn process_upload( + &mut self, + batch_oldest_micros: u64, + batch_newest_micros: u64, + ) -> ProcessResult { + let last_change = self + .state_store + .as_ref() + .map_or(0, |s| s.last_change_micros()); + + let snapshots = match self + .plan_upload_attempt(batch_oldest_micros, batch_newest_micros, last_change) + .await + { + UploadPreflight::Skipped => return ProcessResult::Skipped, + UploadPreflight::DeferredCooldown => return ProcessResult::DeferredCooldown, + UploadPreflight::Ready(snapshots) => snapshots, + }; + self.refresh_retention_handle(); + + // Upload each snapshot in order, advancing the watermark after each confirmed upload. + for snapshot_ref in snapshots { + log::debug!( + "uploading state snapshot {} for log batch (oldest={}, newest={})", + snapshot_ref.timestamp_micros, + batch_oldest_micros, + batch_newest_micros + ); + + let timestamp = OffsetDateTime::from_unix_timestamp_micros( + snapshot_ref.timestamp_micros.try_into().unwrap_or_default(), + ) + .ok(); + + let (persisted_tx, persisted_rx) = tokio::sync::oneshot::channel(); + match self.artifact_client.enqueue_upload( + UploadSource::Path(snapshot_ref.path.clone()), + "state_snapshot".to_string(), + LogFields::new(), + timestamp, + "state_snapshot".to_string(), + vec![], + Some(persisted_tx), + ) { + Ok(_uuid) => match persisted_rx.await { + Ok(Ok(())) => { + log::debug!( + "state snapshot persisted to artifact queue for timestamp {}", + snapshot_ref.timestamp_micros + ); + self.stats.snapshots_uploaded.inc(); + self.advance_pending_oldest_micros(snapshot_ref.timestamp_micros); + }, + Ok(Err(e)) => { + log::warn!("failed to persist state snapshot upload entry: {e}"); + self.stats.upload_failures.inc(); + if matches!(e, EnqueueError::QueueFull) { + return ProcessResult::Backpressure; + } + return ProcessResult::Error; + }, + Err(e) => { + log::warn!("state snapshot persistence ack channel dropped: {e}"); + self.stats.upload_failures.inc(); + return ProcessResult::Error; + }, + }, + Err(e) => { + log::warn!("failed to enqueue state snapshot upload: {e}"); + self.stats.upload_failures.inc(); + if matches!(e, EnqueueError::QueueFull) { + return ProcessResult::Backpressure; + } + return ProcessResult::Error; + }, + } + } + ProcessResult::Progress + } + + async fn plan_upload_attempt( + &mut self, + batch_oldest_micros: u64, + batch_newest_micros: u64, + last_change: u64, + ) -> UploadPreflight { + // Check for existing snapshots first — on restart, persisted pending ranges may reference + // snapshots created by the previous process that are still on disk. + let snapshots = self.find_snapshots_in_range(batch_oldest_micros, batch_newest_micros); + if !snapshots.is_empty() { + return UploadPreflight::Ready(snapshots); + } + + // Skip rotation if we've already rotated and nothing has changed since. This is purely an + // in-process optimization — `last_change_at_rotation` is not persisted across restarts. + if self.last_change_at_rotation == Some(last_change) { + log::debug!( + "state upload: no changes since last rotation (last_change={last_change}), skipping" + ); + return UploadPreflight::Skipped; + } + + let now_micros = self + .time_provider + .now() + .unix_timestamp_micros() + .cast_unsigned(); + if self.snapshot_creation_on_cooldown(now_micros) { + self.stats.snapshots_skipped.inc(); + log::debug!( + "deferring snapshot creation due to cooldown (last={}, now={now_micros}, interval={})", + self.last_snapshot_creation_micros.load(Ordering::Relaxed), + self.snapshot_creation_interval_micros + ); + return UploadPreflight::DeferredCooldown; + } + + let Some(snapshot) = self.create_snapshot_if_needed(last_change).await else { + // No snapshot was created (no state store, or rotation produced nothing). Record the attempt + // so we don't retry until the state store records a new change. + self.last_change_at_rotation = Some(last_change); + return UploadPreflight::Skipped; + }; + + // Record that we've rotated at this last_change value, so we don't redundantly rotate again + // until the state store records a new change. + self.last_change_at_rotation = Some(last_change); + + UploadPreflight::Ready(vec![snapshot]) + } + + fn find_snapshots_in_range( + &self, + batch_oldest_micros: u64, + batch_newest_micros: u64, + ) -> Vec { + self + .find_all_snapshots() + .into_iter() + .filter(|snapshot| { + snapshot.timestamp_micros >= batch_oldest_micros + && snapshot.timestamp_micros <= batch_newest_micros + }) + .collect() + } + + /// Finds all snapshot files, sorted oldest first. + pub(crate) fn find_all_snapshots(&self) -> Vec { + let Some(state_path) = self.state_store_path.as_ref() else { + return vec![]; + }; + let snapshots_dir = state_path.join("snapshots"); + + let Ok(entries) = std::fs::read_dir(&snapshots_dir) else { + return vec![]; + }; + + let mut found: Vec = entries + .flatten() + .filter_map(|entry| { + let path = entry.path(); + let filename = path.file_name().and_then(|f| f.to_str())?.to_owned(); + let parsed = SnapshotFilename::parse(&filename)?; + Some(SnapshotRef { + timestamp_micros: parsed.timestamp_micros, + path, + }) + }) + .collect(); + + found.sort_by_key(|s| s.timestamp_micros); + found + } + + /// Creates a new snapshot for uncovered state changes, if needed. + /// + /// Implements cooldown logic to prevent excessive snapshot creation during high-volume log + /// streaming. If a snapshot was created recently (within + /// `snapshot_creation_interval_micros`), returns `None` to defer creation for a later retry. + pub(crate) async fn create_snapshot_if_needed( + &self, + min_uncovered_micros: u64, + ) -> Option { + let state_store = self.state_store.as_ref()?; + if let Some(handle) = &self.retention_handle { + // Ensure cleanup during rotation doesn't remove the newly created snapshot before it can be + // enqueued for upload. + handle.update_retention_micros(min_uncovered_micros); + } + + let now_micros = { + let now = self.time_provider.now(); + now.unix_timestamp_micros().cast_unsigned() + }; + if self.snapshot_creation_on_cooldown(now_micros) { + log::debug!( + "skipping snapshot creation due to cooldown (last={}, now={now_micros}, interval={})", + self.last_snapshot_creation_micros.load(Ordering::Relaxed), + self.snapshot_creation_interval_micros + ); + self.stats.snapshots_skipped.inc(); + return None; + } + + log::debug!( + "creating snapshot for uncovered state changes (min_uncovered_micros={min_uncovered_micros})" + ); + + let Some(snapshot_path) = state_store.rotate_journal().await else { + log::debug!("snapshot creation failed or not supported"); + self.stats.upload_failures.inc(); + return None; + }; + + self + .last_snapshot_creation_micros + .store(now_micros, Ordering::Relaxed); + + let filename = snapshot_path.file_name().and_then(|f| f.to_str())?; + let Some(parsed) = SnapshotFilename::parse(filename) else { + log::debug!("failed to parse snapshot filename: {filename}"); + self.stats.upload_failures.inc(); + return None; + }; + + Some(SnapshotRef { + timestamp_micros: parsed.timestamp_micros, + path: snapshot_path, + }) + } + + fn snapshot_creation_on_cooldown(&self, now_micros: u64) -> bool { + let last_creation = self.last_snapshot_creation_micros.load(Ordering::Relaxed); + last_creation > 0 + && now_micros.saturating_sub(last_creation) < self.snapshot_creation_interval_micros + } + + fn persist_pending_range(&self) { + match self.pending_range { + Some(range) => self + .store + .set(&PENDING_UPLOAD_RANGE_KEY, &pending_range_to_proto(range)), + None => self + .store + .set(&PENDING_UPLOAD_RANGE_KEY, &StateSnapshotRange::default()), + } + } + + fn read_persisted_pending_range(&self) -> Option { + self + .store + .get(&PENDING_UPLOAD_RANGE_KEY) + .and_then(|proto| pending_range_from_proto(&proto)) + } + + fn refresh_retention_handle(&self) { + let Some(handle) = &self.retention_handle else { + return; + }; + match self.pending_range { + Some(range) => handle.update_retention_micros(range.oldest_micros), + None => handle.update_retention_micros(RetentionHandle::RETENTION_NONE), + } + } + + fn advance_pending_oldest_micros(&mut self, uploaded_snapshot_micros: u64) { + let Some(range) = &mut self.pending_range else { + return; + }; + range.oldest_micros = range.oldest_micros.max(uploaded_snapshot_micros); + self.persist_pending_range(); + self.refresh_retention_handle(); + } +} + +fn pending_range_to_proto(range: PendingRange) -> StateSnapshotRange { + let mut proto = StateSnapshotRange::new(); + proto.oldest_micros = range.oldest_micros; + proto.newest_micros = range.newest_micros; + proto +} + +fn pending_range_from_proto(proto: &StateSnapshotRange) -> Option { + if proto.oldest_micros == 0 && proto.newest_micros == 0 { + return None; + } + Some(PendingRange { + oldest_micros: proto.oldest_micros, + newest_micros: proto.newest_micros, + }) +} diff --git a/bd-logger/src/state_upload_test.rs b/bd-logger/src/state_upload_test.rs new file mode 100644 index 000000000..2832c2515 --- /dev/null +++ b/bd-logger/src/state_upload_test.rs @@ -0,0 +1,695 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +#![allow(clippy::unwrap_used)] + +use super::*; +use bd_runtime::runtime::{ConfigLoader, FeatureFlag as _, IntWatch}; +use bd_test_helpers::session::in_memory_store; +use bd_time::{SystemTimeProvider, TestTimeProvider}; +use time::OffsetDateTime; +use uuid::Uuid; + +/// Creates a persistent `bd_state::Store` backed by the given directory with snapshotting enabled. +/// Inserts a dummy entry so that `rotate_journal` produces a non-empty snapshot file. +async fn make_state_store( + dir: &std::path::Path, +) -> ( + Arc, + Arc, + bd_state::RetentionHandle, + Arc, +) { + let runtime_loader = ConfigLoader::new(dir); + runtime_loader + .update_snapshot(bd_proto::protos::client::api::RuntimeUpdate { + version_nonce: "test".to_string(), + runtime: Some(bd_proto::protos::client::runtime::Runtime { + values: std::iter::once(( + bd_runtime::runtime::state::MaxSnapshotCount::path().to_string(), + bd_proto::protos::client::runtime::runtime::Value { + type_: Some(bd_proto::protos::client::runtime::runtime::value::Type::UintValue(10)), + ..Default::default() + }, + )) + .collect(), + ..Default::default() + }) + .into(), + ..Default::default() + }) + .await + .unwrap(); + let stats = bd_client_stats_store::Collector::default().scope("test"); + let time_provider = Arc::new(TestTimeProvider::new( + OffsetDateTime::from_unix_timestamp(1).unwrap(), + )); + let result = bd_state::Store::persistent( + dir, + bd_state::PersistentStoreConfig::default(), + time_provider.clone(), + &runtime_loader, + &stats, + ) + .await + .unwrap(); + // Insert a value so journal rotation produces a snapshot file. + result + .store + .insert( + bd_state::Scope::GlobalState, + "test_key".to_string(), + bd_state::string_value("test_value"), + ) + .await + .unwrap(); + let retention_handle = result.retention_registry.create_handle().await; + retention_handle.update_retention_micros(0); + ( + Arc::new(result.store), + result.retention_registry, + retention_handle, + time_provider, + ) +} + +async fn insert_state_change(state_store: &bd_state::Store, key: &str) { + state_store + .insert( + bd_state::Scope::GlobalState, + key.to_string(), + bd_state::string_value("test_value"), + ) + .await + .unwrap(); +} + +struct Setup { + _temp_dir: tempfile::TempDir, + state_dir: std::path::PathBuf, + snapshots_dir: std::path::PathBuf, + store: Arc, + state_store: Arc, + retention_registry: Arc, + _state_retention_handle: bd_state::RetentionHandle, + time_provider: Arc, +} + +impl Setup { + async fn new() -> Self { + let temp_dir = tempfile::tempdir().unwrap(); + let state_dir = temp_dir.path().join("state"); + let snapshots_dir = state_dir.join("snapshots"); + let store = in_memory_store(); + let (state_store, retention_registry, state_retention_handle, time_provider) = + make_state_store(&state_dir).await; + Self { + _temp_dir: temp_dir, + state_dir, + snapshots_dir, + store, + state_store, + retention_registry, + _state_retention_handle: state_retention_handle, + time_provider, + } + } + + async fn worker_with_client( + &self, + cooldown_micros: u32, + client: Arc, + ) -> StateUploadWorker { + let stats = bd_client_stats_store::Collector::default().scope("test"); + let (_handle, worker) = StateUploadHandle::new( + Some(self.state_dir.clone()), + self.store.clone(), + Some(self.retention_registry.clone()), + Some(self.state_store.clone()), + cooldown_micros, + self.time_provider.clone(), + client, + &stats, + ) + .await; + worker + } + + fn now_micros(&self) -> u64 { + self + .time_provider + .now() + .unix_timestamp_micros() + .cast_unsigned() + } + + async fn create_snapshot_after_state_change(&self, key: &str) -> u64 { + self.time_provider.advance(time::Duration::seconds(1)); + insert_state_change(&self.state_store, key).await; + self.create_rotated_snapshot().await + } + + async fn create_rotated_snapshot(&self) -> u64 { + let path = self.state_store.rotate_journal().await.unwrap(); + let filename = path.file_name().unwrap().to_str().unwrap(); + bd_resilient_kv::SnapshotFilename::parse(filename) + .unwrap() + .timestamp_micros + } + + fn clear_snapshot_files(&self) { + if let Ok(entries) = std::fs::read_dir(&self.snapshots_dir) { + for entry in entries.flatten() { + std::fs::remove_file(entry.path()).unwrap(); + } + } + } + + fn count_snapshot_files(&self) -> usize { + std::fs::read_dir(&self.snapshots_dir).map_or(0, Iterator::count) + } +} + +#[tokio::test] +async fn no_snapshot_files_skips_upload() { + let store = in_memory_store(); + let stats = bd_client_stats_store::Collector::default().scope("test"); + let mut mock_client = bd_artifact_upload::MockClient::new(); + mock_client.expect_enqueue_upload().times(0); + let (_handle, mut worker) = StateUploadHandle::new( + None, + store, + None, + None, + 0, + Arc::new(SystemTimeProvider {}), + Arc::new(mock_client), + &stats, + ) + .await; + + worker.pending_range = Some(PendingRange { + oldest_micros: 1, + newest_micros: 100, + }); + worker.process_pending().await; + assert!(worker.pending_range.is_none()); +} + +#[tokio::test] +async fn cooldown_allows_snapshot_after_interval() { + let setup = Setup::new().await; + let worker = setup + .worker_with_client(1, Arc::new(bd_artifact_upload::MockClient::new())) + .await; + + let batch_ts = setup.now_micros(); + + let snapshot1 = worker.create_snapshot_if_needed(batch_ts).await; + assert!(snapshot1.is_some()); + + let file_count_after_first = setup.count_snapshot_files(); + + // Advance time past cooldown. + setup.time_provider.advance(time::Duration::milliseconds(2)); + + insert_state_change(&setup.state_store, "test_key_2").await; + + let future_batch_ts = batch_ts + 100; + let snapshot2 = worker.create_snapshot_if_needed(future_batch_ts).await; + assert!(snapshot2.is_some()); + assert_eq!( + setup.count_snapshot_files(), + file_count_after_first + 1, + "should create new snapshot after cooldown expires" + ); +} + +#[tokio::test] +async fn find_all_snapshots_returns_ordered_snapshots() { + let setup = Setup::new().await; + let worker = setup + .worker_with_client(0, Arc::new(bd_artifact_upload::MockClient::new())) + .await; + + let old_snapshot_ts = setup.create_rotated_snapshot().await; + setup.time_provider.advance(time::Duration::milliseconds(1)); + insert_state_change(&setup.state_store, "test_key_2").await; + let newer_snapshot_ts = setup.create_rotated_snapshot().await; + // find_all_snapshots returns snapshots sorted oldest first. + let snapshots = worker.find_all_snapshots(); + assert_eq!(snapshots.len(), 2); + assert_eq!( + snapshots[0].timestamp_micros, old_snapshot_ts, + "oldest snapshot first" + ); + assert_eq!(snapshots[1].timestamp_micros, newer_snapshot_ts); +} + +#[test] +fn pending_range_merge_widens_bounds() { + let mut pending = PendingRange { + oldest_micros: 100, + newest_micros: 200, + }; + pending.merge(PendingRange { + oldest_micros: 50, + newest_micros: 250, + }); + assert_eq!(pending.oldest_micros, 50); + assert_eq!(pending.newest_micros, 250); +} + +#[tokio::test] +async fn refresh_retention_handle_uses_pending_range_oldest() { + let store = in_memory_store(); + let stats = bd_client_stats_store::Collector::default().scope("test"); + let retention_registry = Arc::new(bd_state::RetentionRegistry::new(IntWatch::new_for_testing( + 10, + ))); + let (_handle, mut worker) = StateUploadHandle::new( + None, + store, + Some(retention_registry.clone()), + None, + 0, + Arc::new(SystemTimeProvider {}), + Arc::new(bd_artifact_upload::MockClient::new()), + &stats, + ) + .await; + + worker.pending_range = Some(PendingRange { + oldest_micros: 123, + newest_micros: 999, + }); + worker.refresh_retention_handle(); + assert_eq!( + retention_registry.min_retention_timestamp().await, + Some(123) + ); + + worker.pending_range = None; + worker.refresh_retention_handle(); + assert_eq!(retention_registry.min_retention_timestamp().await, None); +} + +#[tokio::test] +async fn process_upload_advances_pending_oldest_across_multiple_snapshots() { + let setup = Setup::new().await; + let first_snapshot_ts = setup.create_snapshot_after_state_change("test_key_1").await; + let second_snapshot_ts = setup.create_snapshot_after_state_change("test_key_2").await; + + let mut mock_client = bd_artifact_upload::MockClient::new(); + mock_client + .expect_enqueue_upload() + .times(2) + .returning(|_, _, _, _, _, _, persisted_tx| { + if let Some(tx) = persisted_tx { + let _ = tx.send(Ok(())); + } + Ok(Uuid::new_v4()) + }); + + let mut worker = setup.worker_with_client(0, Arc::new(mock_client)).await; + worker.pending_range = Some(PendingRange { + oldest_micros: first_snapshot_ts, + newest_micros: second_snapshot_ts, + }); + + let result = worker + .process_upload(first_snapshot_ts, second_snapshot_ts) + .await; + assert_eq!(result, ProcessResult::Progress); + assert_eq!( + worker.pending_range.map(|r| r.oldest_micros), + Some(second_snapshot_ts) + ); +} + +#[tokio::test] +async fn older_incoming_range_reexpands_pending_after_progress() { + let setup = Setup::new().await; + let first_snapshot_ts = setup.create_snapshot_after_state_change("test_key_1").await; + let second_snapshot_ts = setup.create_snapshot_after_state_change("test_key_2").await; + + let mut mock_client = bd_artifact_upload::MockClient::new(); + mock_client + .expect_enqueue_upload() + .times(2) + .returning(|_, _, _, _, _, _, persisted_tx| { + if let Some(tx) = persisted_tx { + let _ = tx.send(Ok(())); + } + Ok(Uuid::new_v4()) + }); + + let stats = bd_client_stats_store::Collector::default().scope("test"); + let (handle, mut worker) = StateUploadHandle::new( + Some(setup.state_dir.clone()), + setup.store.clone(), + Some(setup.retention_registry.clone()), + Some(setup.state_store.clone()), + 0, + setup.time_provider.clone(), + Arc::new(mock_client), + &stats, + ) + .await; + worker.pending_range = Some(PendingRange { + oldest_micros: first_snapshot_ts, + newest_micros: second_snapshot_ts, + }); + let result = worker + .process_upload(first_snapshot_ts, second_snapshot_ts) + .await; + assert_eq!(result, ProcessResult::Progress); + assert_eq!( + worker.pending_range.map(|r| r.oldest_micros), + Some(second_snapshot_ts) + ); + + handle.notify_upload_needed(first_snapshot_ts, second_snapshot_ts); + worker.drain_pending_accumulator(); + assert_eq!( + worker.pending_range.map(|r| r.oldest_micros), + Some(first_snapshot_ts) + ); +} + +#[tokio::test] +async fn notify_upload_needed_keeps_range_when_wake_channel_is_full() { + let store = in_memory_store(); + let stats = bd_client_stats_store::Collector::default().scope("test"); + let (handle, _worker) = StateUploadHandle::new( + None, + store, + None, + None, + 0, + Arc::new(SystemTimeProvider {}), + Arc::new(bd_artifact_upload::MockClient::new()), + &stats, + ) + .await; + + handle.wake_tx.try_send(()).unwrap(); + + handle.notify_upload_needed(100, 200); + let pending = handle.pending_accumulator.lock(); + let range = pending.range.unwrap(); + assert_eq!(range.oldest_micros, 100); + assert_eq!(range.newest_micros, 200); +} + +#[tokio::test] +async fn cooldown_defer_keeps_pending_for_retry() { + let setup = Setup::new().await; + let mut worker = setup + .worker_with_client(1000, Arc::new(bd_artifact_upload::MockClient::new())) + .await; + + // Set recent snapshot creation time to force a cooldown defer path. + let _created = worker.create_snapshot_if_needed(100).await.unwrap(); + setup.clear_snapshot_files(); + setup.time_provider.advance(time::Duration::milliseconds(1)); + insert_state_change(&setup.state_store, "test_key_2").await; + + worker.pending_range = Some(PendingRange { + oldest_micros: 1, + newest_micros: 2_000_000, + }); + worker.process_pending().await; + + assert!( + worker.pending_range.is_some(), + "cooldown defer should keep pending range for retry" + ); +} + + +#[tokio::test] +async fn enqueue_backpressure_keeps_pending_range() { + let setup = Setup::new().await; + let snapshot_ts = setup.create_rotated_snapshot().await; + + let mut mock_client = bd_artifact_upload::MockClient::new(); + mock_client + .expect_enqueue_upload() + .times(1) + .returning(|_, _, _, _, _, _, _| Err(bd_artifact_upload::EnqueueError::QueueFull)); + let mut worker = setup.worker_with_client(0, Arc::new(mock_client)).await; + + worker.pending_range = Some(PendingRange { + oldest_micros: 1, + newest_micros: snapshot_ts, + }); + worker.process_pending().await; + + assert_eq!(worker.pending_range.map(|r| r.oldest_micros), Some(1)); + assert_eq!( + worker.pending_range.map(|r| r.newest_micros), + Some(snapshot_ts) + ); +} + +#[tokio::test] +async fn persisted_ack_error_keeps_pending_range() { + let setup = Setup::new().await; + let snapshot_ts = setup.create_rotated_snapshot().await; + + let mut mock_client = bd_artifact_upload::MockClient::new(); + mock_client + .expect_enqueue_upload() + .times(1) + .returning(|_, _, _, _, _, _, persisted_tx| { + if let Some(tx) = persisted_tx { + let _ = tx.send(Err(bd_artifact_upload::EnqueueError::Closed)); + } + Ok(Uuid::new_v4()) + }); + + let mut worker = setup.worker_with_client(0, Arc::new(mock_client)).await; + + let result = worker.process_upload(1, snapshot_ts).await; + assert_eq!(result, ProcessResult::Error); +} + +#[tokio::test] +async fn persisted_ack_channel_drop_keeps_pending_range() { + let setup = Setup::new().await; + let snapshot_ts = setup.create_rotated_snapshot().await; + + let mut mock_client = bd_artifact_upload::MockClient::new(); + mock_client + .expect_enqueue_upload() + .times(1) + .returning(|_, _, _, _, _, _, _| Ok(Uuid::new_v4())); + + let mut worker = setup.worker_with_client(0, Arc::new(mock_client)).await; + + let result = worker.process_upload(1, snapshot_ts).await; + assert_eq!(result, ProcessResult::Error); +} + +#[tokio::test] +async fn successful_enqueue_ack_clears_pending() { + let setup = Setup::new().await; + let snapshot_ts = setup.create_rotated_snapshot().await; + + let mut mock_client = bd_artifact_upload::MockClient::new(); + mock_client + .expect_enqueue_upload() + .times(1) + .returning(|_, _, _, _, _, _, persisted_tx| { + if let Some(tx) = persisted_tx { + let _ = tx.send(Ok(())); + } + Ok(Uuid::new_v4()) + }); + + let mut worker = setup.worker_with_client(0, Arc::new(mock_client)).await; + + worker.pending_range = Some(PendingRange { + oldest_micros: 1, + newest_micros: snapshot_ts, + }); + worker.process_pending().await; + + assert!(worker.pending_range.is_none()); +} + +#[tokio::test] +async fn plan_upload_attempt_skips_when_no_state_changes_or_no_store() { + let store = in_memory_store(); + let stats = bd_client_stats_store::Collector::default().scope("test"); + let (_handle, mut worker) = StateUploadHandle::new( + None, + store, + None, + None, + 0, + Arc::new(SystemTimeProvider {}), + Arc::new(bd_artifact_upload::MockClient::new()), + &stats, + ) + .await; + + // First call with last_change=0: no prior rotation recorded, so we attempt rotation. No state + // store configured, so create_snapshot_if_needed returns None → Skipped. The attempt records + // last_change_at_rotation = Some(0). + let result = worker.plan_upload_attempt(0, 20, 0).await; + assert!(matches!(result, UploadPreflight::Skipped)); + + // Second call with last_change=0: we've already rotated at 0, nothing changed → Skipped. + let result = worker.plan_upload_attempt(0, 20, 0).await; + assert!(matches!(result, UploadPreflight::Skipped)); + + // Calls with different last_change values: no snapshots found, no state store → + // create_snapshot_if_needed returns None → Skipped. + let result = worker.plan_upload_attempt(0, 20, 15).await; + assert!(matches!(result, UploadPreflight::Skipped)); + + let result = worker.plan_upload_attempt(0, 20, 9).await; + assert!(matches!(result, UploadPreflight::Skipped)); +} + +#[tokio::test] +async fn skipped_with_no_state_changes_clears_pending() { + let store = in_memory_store(); + let stats = bd_client_stats_store::Collector::default().scope("test"); + let (_handle, mut worker) = StateUploadHandle::new( + None, + store, + None, + None, + 0, + Arc::new(SystemTimeProvider {}), + Arc::new(bd_artifact_upload::MockClient::new()), + &stats, + ) + .await; + + worker.pending_range = Some(PendingRange { + oldest_micros: 0, + newest_micros: 100, + }); + worker.process_pending().await; + + assert!(worker.pending_range.is_none()); +} + +#[tokio::test] +async fn plan_upload_attempt_returns_ready_for_in_range_snapshots() { + let setup = Setup::new().await; + let _first_snapshot_ts = setup.create_snapshot_after_state_change("test_key_1").await; + let second_snapshot_ts = setup.create_snapshot_after_state_change("test_key_2").await; + let mut worker = setup + .worker_with_client(0, Arc::new(bd_artifact_upload::MockClient::new())) + .await; + + match worker + .plan_upload_attempt(second_snapshot_ts, second_snapshot_ts, second_snapshot_ts) + .await + { + UploadPreflight::Ready(snapshots) => { + assert!(!snapshots.is_empty()); + assert_eq!( + snapshots.last().unwrap().timestamp_micros, + second_snapshot_ts + ); + }, + _ => panic!("expected ready preflight"), + } +} + +#[tokio::test] +async fn plan_upload_attempt_filters_snapshots_to_pending_range() { + let setup = Setup::new().await; + let first_snapshot_ts = setup.create_snapshot_after_state_change("test_key_1").await; + let second_snapshot_ts = setup.create_snapshot_after_state_change("test_key_2").await; + let mut worker = setup + .worker_with_client(0, Arc::new(bd_artifact_upload::MockClient::new())) + .await; + + match worker + .plan_upload_attempt(second_snapshot_ts, second_snapshot_ts, second_snapshot_ts) + .await + { + UploadPreflight::Ready(snapshots) => { + assert_eq!(snapshots.len(), 1); + assert_eq!(snapshots[0].timestamp_micros, second_snapshot_ts); + assert_ne!(snapshots[0].timestamp_micros, first_snapshot_ts); + }, + _ => panic!("expected ready preflight"), + } +} + +#[tokio::test] +async fn restart_with_zero_last_change_uploads_existing_snapshot() { + // Simulate pre-restart: create a snapshot and persist a pending range. + let setup = Setup::new().await; + let snapshot_ts = setup + .create_snapshot_after_state_change("pre_restart") + .await; + setup.store.set( + &PENDING_UPLOAD_RANGE_KEY, + &pending_range_to_proto(PendingRange { + oldest_micros: snapshot_ts, + newest_micros: snapshot_ts, + }), + ); + + // Create a fresh in-memory state store to simulate a restart where last_change_micros is 0. + // In production this happens when the previous process only wrote to the System scope (not + // cleared on restart) and ephemeral scopes were already empty. + let stats = bd_client_stats_store::Collector::default().scope("test"); + let restart_store = Arc::new(bd_state::Store::in_memory( + setup.time_provider.clone(), + None, + &ConfigLoader::new(setup.state_dir.as_path()), + &stats, + )); + assert_eq!(restart_store.last_change_micros(), 0); + + // The worker should find the snapshot on disk and upload it despite last_change being 0. + let mut mock_client = bd_artifact_upload::MockClient::new(); + mock_client + .expect_enqueue_upload() + .times(1) + .returning(|_, _, _, _, _, _, persisted_tx| { + if let Some(tx) = persisted_tx { + let _ = tx.send(Ok(())); + } + Ok(Uuid::new_v4()) + }); + + let (handle, worker) = StateUploadHandle::new( + Some(setup.state_dir.clone()), + setup.store.clone(), + Some(setup.retention_registry.clone()), + Some(restart_store), + 0, + setup.time_provider.clone(), + Arc::new(mock_client), + &stats, + ) + .await; + + drop(handle); + worker.run().await; + + // Pending range should be cleared after successful upload. + assert!( + setup + .store + .get(&PENDING_UPLOAD_RANGE_KEY) + .and_then(|proto| pending_range_from_proto(&proto)) + .is_none(), + "pending range should be cleared after restart upload" + ); +} diff --git a/bd-logger/src/test/mod.rs b/bd-logger/src/test/mod.rs index c5f9cafac..2d3d22ba1 100644 --- a/bd-logger/src/test/mod.rs +++ b/bd-logger/src/test/mod.rs @@ -10,3 +10,4 @@ mod directory_lock_integration; mod embedded_logger_integration; mod logger_integration; mod setup; +mod state_upload_integration; diff --git a/bd-logger/src/test/setup.rs b/bd-logger/src/test/setup.rs index bafda23f5..abbe9e454 100644 --- a/bd-logger/src/test/setup.rs +++ b/bd-logger/src/test/setup.rs @@ -141,6 +141,24 @@ impl Setup { }) } + /// Creates a Setup with runtime values pre-cached to disk. + /// + /// This starts a temporary logger to cache the runtime, then restarts with the cached config. + /// Use this when runtime values must be present at logger initialization time (e.g., + /// `state.use_persistent_storage` which determines state store type at startup). + pub fn new_with_cached_runtime(options: SetupOptions) -> Self { + { + let _primer = Self::new_with_options(SetupOptions { + sdk_directory: options.sdk_directory.clone(), + disk_storage: options.disk_storage, + extra_runtime_values: options.extra_runtime_values.clone(), + ..Default::default() + }); + std::thread::sleep(std::time::Duration::from_millis(100)); + } + Self::new_with_options(options) + } + pub fn new_with_options(options: SetupOptions) -> Self { let mut server = bd_test_helpers::test_api_server::start_server(false, None); let shutdown = ComponentShutdownTrigger::default(); diff --git a/bd-logger/src/test/state_upload_integration.rs b/bd-logger/src/test/state_upload_integration.rs new file mode 100644 index 000000000..dd180f576 --- /dev/null +++ b/bd-logger/src/test/state_upload_integration.rs @@ -0,0 +1,727 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +#![allow(clippy::unwrap_used)] + +use super::setup::{Setup, SetupOptions}; +use crate::log_level; +use bd_log_matcher::builder::message_equals; +use bd_proto::protos::bdtail::bdtail_config::{BdTailConfigurations, BdTailStream}; +use bd_proto::protos::client::api::configuration_update::StateOfTheWorld; +use bd_proto::protos::config::v1::config::{BufferConfigList, buffer_config}; +use bd_proto::protos::logging::payload::LogType; +use bd_runtime::runtime::FeatureFlag as _; +use bd_test_helpers::config_helper::{ + ConfigurationUpdateParts, + configuration_update, + configuration_update_from_parts, + default_buffer_config, + make_buffer_matcher_matching_everything, + make_workflow_config_flushing_buffer, +}; +use bd_test_helpers::runtime::ValueKind; +use std::sync::Arc; +use tempfile::TempDir; + +#[test] +fn continuous_buffer_creates_and_uploads_state_snapshot() { + let mut setup = Setup::new_with_cached_runtime(SetupOptions { + sdk_directory: Arc::new(TempDir::with_prefix("sdk").unwrap()), + disk_storage: true, + extra_runtime_values: vec![ + ( + bd_runtime::runtime::state::UsePersistentStorage::path(), + ValueKind::Bool(true), + ), + ( + bd_runtime::runtime::log_upload::BatchSizeFlag::path(), + ValueKind::Int(1), + ), + ( + bd_runtime::runtime::state::SnapshotCreationIntervalMs::path(), + ValueKind::Int(0), + ), + ( + bd_runtime::runtime::state::MaxSnapshotCount::path(), + ValueKind::Int(10), + ), + ( + bd_runtime::runtime::state::StateUploadEnabled::path(), + ValueKind::Bool(true), + ), + ], + ..Default::default() + }); + + setup.send_configuration_update(configuration_update( + "", + StateOfTheWorld { + buffer_config_list: Some(BufferConfigList { + buffer_config: vec![default_buffer_config( + buffer_config::Type::CONTINUOUS, + make_buffer_matcher_matching_everything().into(), + )], + ..Default::default() + }) + .into(), + ..Default::default() + }, + )); + + setup + .logger_handle + .set_feature_flag_exposure("test_flag".to_string(), Some("variant_a".to_string())); + setup + .logger_handle + .set_feature_flag_exposure("another_flag".to_string(), Some("variant_b".to_string())); + + setup.log( + log_level::INFO, + LogType::NORMAL, + "test message".into(), + [].into(), + [].into(), + None, + ); + + let log_upload = setup.server.blocking_next_log_upload(); + assert!(log_upload.is_some(), "expected log upload"); + + let timeout = std::time::Duration::from_secs(2); + let start = std::time::Instant::now(); + + while start.elapsed() < timeout { + if let Some(upload) = setup.server.blocking_next_artifact_upload() { + assert!( + !upload.contents.is_empty(), + "state snapshot should have content" + ); + + return; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } +} + +#[test] +fn trigger_buffer_flush_creates_snapshot() { + let mut setup = Setup::new_with_cached_runtime(SetupOptions { + sdk_directory: Arc::new(TempDir::with_prefix("sdk").unwrap()), + disk_storage: true, + extra_runtime_values: vec![ + ( + bd_runtime::runtime::state::UsePersistentStorage::path(), + ValueKind::Bool(true), + ), + ( + bd_runtime::runtime::state::SnapshotCreationIntervalMs::path(), + ValueKind::Int(0), + ), + ( + bd_runtime::runtime::state::MaxSnapshotCount::path(), + ValueKind::Int(10), + ), + ( + bd_runtime::runtime::state::StateUploadEnabled::path(), + ValueKind::Bool(true), + ), + ], + ..Default::default() + }); + + setup.send_configuration_update(configuration_update_from_parts( + "", + ConfigurationUpdateParts { + buffer_config: vec![default_buffer_config( + buffer_config::Type::TRIGGER, + make_buffer_matcher_matching_everything().into(), + )], + workflows: make_workflow_config_flushing_buffer("default", message_equals("flush")), + ..Default::default() + }, + )); + + setup + .logger_handle + .set_feature_flag_exposure("trigger_flag".to_string(), Some("enabled".to_string())); + + for i in 0 .. 3 { + setup.log( + log_level::INFO, + LogType::NORMAL, + format!("trigger log {i}").into(), + [].into(), + [].into(), + None, + ); + } + + setup.log( + log_level::INFO, + LogType::NORMAL, + "flush".into(), + [].into(), + [].into(), + None, + ); + + let log_upload = setup.server.blocking_next_log_upload(); + assert!( + log_upload.is_some(), + "expected log upload from trigger buffer flush" + ); + + let upload = log_upload.unwrap(); + let logs = upload.logs(); + assert!(!logs.is_empty(), "expected logs in upload"); + + let timeout = std::time::Duration::from_secs(2); + let start = std::time::Instant::now(); + + while start.elapsed() < timeout { + if let Some(artifact) = setup.server.blocking_next_artifact_upload() { + assert!( + !artifact.contents.is_empty(), + "state snapshot should have content" + ); + + return; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + panic!("expected state snapshot upload within timeout"); +} + +#[test] +fn trigger_buffer_with_multiple_flushes_uploads_state_once() { + let sdk_directory = Arc::new(TempDir::with_prefix("sdk").unwrap()); + + let mut setup = Setup::new_with_cached_runtime(SetupOptions { + sdk_directory, + disk_storage: true, + extra_runtime_values: vec![ + ( + bd_runtime::runtime::state::UsePersistentStorage::path(), + ValueKind::Bool(true), + ), + ( + bd_runtime::runtime::state::SnapshotCreationIntervalMs::path(), + ValueKind::Int(0), + ), + ( + bd_runtime::runtime::state::StateUploadEnabled::path(), + ValueKind::Bool(true), + ), + ], + ..Default::default() + }); + + setup.send_configuration_update(configuration_update_from_parts( + "", + ConfigurationUpdateParts { + buffer_config: vec![default_buffer_config( + buffer_config::Type::TRIGGER, + make_buffer_matcher_matching_everything().into(), + )], + workflows: make_workflow_config_flushing_buffer("default", message_equals("flush")), + ..Default::default() + }, + )); + + setup + .logger_handle + .set_feature_flag_exposure("multi_flush_flag".to_string(), Some("value".to_string())); + + setup.log( + log_level::INFO, + LogType::NORMAL, + "first batch log".into(), + [].into(), + [].into(), + None, + ); + setup.log( + log_level::INFO, + LogType::NORMAL, + "flush".into(), + [].into(), + [].into(), + None, + ); + + let _ = setup.server.blocking_next_log_upload(); + + let mut first_flush_artifacts = 0; + let timeout = std::time::Duration::from_millis(500); + let start = std::time::Instant::now(); + while start.elapsed() < timeout { + if setup.server.blocking_next_artifact_upload().is_some() { + first_flush_artifacts += 1; + } else { + break; + } + } + + setup.log( + log_level::INFO, + LogType::NORMAL, + "second batch log".into(), + [].into(), + [].into(), + None, + ); + setup.log( + log_level::INFO, + LogType::NORMAL, + "flush".into(), + [].into(), + [].into(), + None, + ); + + let _ = setup.server.blocking_next_log_upload(); + + let mut second_flush_artifacts = 0; + let start = std::time::Instant::now(); + while start.elapsed() < timeout { + if setup.server.blocking_next_artifact_upload().is_some() { + second_flush_artifacts += 1; + } else { + break; + } + } + + assert!( + second_flush_artifacts <= first_flush_artifacts, + "second flush ({second_flush_artifacts}) should not upload more state than first \ + ({first_flush_artifacts})" + ); +} + +#[test] +fn state_correlator_prevents_duplicate_uploads() { + let sdk_directory = Arc::new(TempDir::with_prefix("sdk").unwrap()); + + let mut setup = Setup::new_with_cached_runtime(SetupOptions { + sdk_directory, + disk_storage: true, + extra_runtime_values: vec![ + ( + bd_runtime::runtime::state::UsePersistentStorage::path(), + ValueKind::Bool(true), + ), + ( + bd_runtime::runtime::log_upload::BatchSizeFlag::path(), + ValueKind::Int(1), + ), + ( + bd_runtime::runtime::state::SnapshotCreationIntervalMs::path(), + ValueKind::Int(0), + ), + ( + bd_runtime::runtime::state::StateUploadEnabled::path(), + ValueKind::Bool(true), + ), + ], + ..Default::default() + }); + + setup.configure_stream_all_logs(); + + setup + .logger_handle + .set_feature_flag_exposure("dup_test_flag".to_string(), Some("value".to_string())); + + setup.log( + log_level::INFO, + LogType::NORMAL, + "first log".into(), + [].into(), + [].into(), + None, + ); + + let _ = setup.server.blocking_next_log_upload(); + + let mut first_batch_artifacts = 0; + let timeout = std::time::Duration::from_millis(500); + let start = std::time::Instant::now(); + while start.elapsed() < timeout { + if setup.server.blocking_next_artifact_upload().is_some() { + first_batch_artifacts += 1; + } else { + break; + } + } + + setup.log( + log_level::INFO, + LogType::NORMAL, + "second log".into(), + [].into(), + [].into(), + None, + ); + + let _ = setup.server.blocking_next_log_upload(); + + let mut second_batch_artifacts = 0; + let start = std::time::Instant::now(); + while start.elapsed() < timeout { + if setup.server.blocking_next_artifact_upload().is_some() { + second_batch_artifacts += 1; + } else { + break; + } + } + + assert!( + second_batch_artifacts <= first_batch_artifacts, + "second batch ({second_batch_artifacts}) should not upload more state than first batch \ + ({first_batch_artifacts})" + ); +} + +#[test] +fn new_state_changes_trigger_new_snapshot() { + let sdk_directory = Arc::new(TempDir::with_prefix("sdk").unwrap()); + + let mut setup = Setup::new_with_cached_runtime(SetupOptions { + sdk_directory, + disk_storage: true, + extra_runtime_values: vec![ + ( + bd_runtime::runtime::state::UsePersistentStorage::path(), + ValueKind::Bool(true), + ), + ( + bd_runtime::runtime::log_upload::BatchSizeFlag::path(), + ValueKind::Int(1), + ), + ( + bd_runtime::runtime::state::SnapshotCreationIntervalMs::path(), + ValueKind::Int(0), + ), + ( + bd_runtime::runtime::state::StateUploadEnabled::path(), + ValueKind::Bool(true), + ), + ], + ..Default::default() + }); + + setup.configure_stream_all_logs(); + + setup + .logger_handle + .set_feature_flag_exposure("flag_v1".to_string(), Some("value1".to_string())); + + setup.log( + log_level::INFO, + LogType::NORMAL, + "log after first state".into(), + [].into(), + [].into(), + None, + ); + + let _ = setup.server.blocking_next_log_upload(); + + let timeout = std::time::Duration::from_millis(500); + let start = std::time::Instant::now(); + while start.elapsed() < timeout { + if setup.server.blocking_next_artifact_upload().is_none() { + break; + } + } + + setup + .logger_handle + .set_feature_flag_exposure("flag_v2".to_string(), Some("value2".to_string())); + + setup.log( + log_level::INFO, + LogType::NORMAL, + "log after second state".into(), + [].into(), + [].into(), + None, + ); + + let _ = setup.server.blocking_next_log_upload(); +} + +#[test] +fn stream_only_buffer_does_not_upload_state_snapshot() { + let sdk_directory = Arc::new(TempDir::with_prefix("sdk").unwrap()); + + let mut setup = Setup::new_with_cached_runtime(SetupOptions { + sdk_directory, + disk_storage: true, + extra_runtime_values: vec![ + ( + bd_runtime::runtime::state::UsePersistentStorage::path(), + ValueKind::Bool(true), + ), + ( + bd_runtime::runtime::log_upload::BatchSizeFlag::path(), + ValueKind::Int(2), + ), + ( + bd_runtime::runtime::state::SnapshotCreationIntervalMs::path(), + ValueKind::Int(0), + ), + ( + bd_runtime::runtime::state::MaxSnapshotCount::path(), + ValueKind::Int(10), + ), + ( + bd_runtime::runtime::state::StateUploadEnabled::path(), + ValueKind::Bool(true), + ), + ], + ..Default::default() + }); + + setup.send_configuration_update(configuration_update( + "", + StateOfTheWorld { + bdtail_configuration: Some(BdTailConfigurations { + active_streams: vec![BdTailStream { + stream_id: "all".into(), + matcher: None.into(), + ..Default::default() + }], + ..Default::default() + }) + .into(), + ..Default::default() + }, + )); + + setup + .logger_handle + .set_feature_flag_exposure("streaming_flag".to_string(), Some("active".to_string())); + + setup.log( + log_level::INFO, + LogType::NORMAL, + "streaming log 1".into(), + [].into(), + [].into(), + None, + ); + setup.log( + log_level::INFO, + LogType::NORMAL, + "streaming log 2".into(), + [].into(), + [].into(), + None, + ); + + let log_upload = setup.server.blocking_next_log_upload(); + assert!(log_upload.is_some(), "expected streamed log upload"); + + let timeout = std::time::Duration::from_secs(2); + let start = std::time::Instant::now(); + let mut found_artifact = false; + + while start.elapsed() < timeout { + if setup.server.blocking_next_artifact_upload().is_some() { + found_artifact = true; + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + assert!( + !found_artifact, + "stream-only buffers should not trigger state snapshot artifact upload" + ); +} + +#[test] +fn continuous_buffer_uploads_state_with_first_batch() { + let sdk_directory = Arc::new(TempDir::with_prefix("sdk").unwrap()); + + let mut setup = Setup::new_with_cached_runtime(SetupOptions { + sdk_directory, + disk_storage: true, + extra_runtime_values: vec![ + ( + bd_runtime::runtime::state::UsePersistentStorage::path(), + ValueKind::Bool(true), + ), + ( + bd_runtime::runtime::log_upload::BatchSizeFlag::path(), + ValueKind::Int(2), + ), + ( + bd_runtime::runtime::state::SnapshotCreationIntervalMs::path(), + ValueKind::Int(0), + ), + ( + bd_runtime::runtime::state::MaxSnapshotCount::path(), + ValueKind::Int(10), + ), + ( + bd_runtime::runtime::state::StateUploadEnabled::path(), + ValueKind::Bool(true), + ), + ], + ..Default::default() + }); + + setup.configure_stream_all_logs(); + setup + .logger_handle + .set_feature_flag_exposure("continuous_flag".to_string(), Some("active".to_string())); + + setup.log( + log_level::INFO, + LogType::NORMAL, + "continuous log 1".into(), + [].into(), + [].into(), + None, + ); + setup.log( + log_level::INFO, + LogType::NORMAL, + "continuous log 2".into(), + [].into(), + [].into(), + None, + ); + + let log_upload = setup.server.blocking_next_log_upload(); + assert!(log_upload.is_some(), "expected continuous log upload"); + + let timeout = std::time::Duration::from_secs(2); + let start = std::time::Instant::now(); + while start.elapsed() < timeout { + if let Some(artifact) = setup.server.blocking_next_artifact_upload() { + assert!( + !artifact.contents.is_empty(), + "continuous buffers should trigger state snapshot artifact upload" + ); + return; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + panic!("expected artifact upload for state snapshot with continuous buffers"); +} + +#[test] +fn continuous_streaming_multiple_batches_single_state_upload() { + let sdk_directory = Arc::new(TempDir::with_prefix("sdk").unwrap()); + + let mut setup = Setup::new_with_cached_runtime(SetupOptions { + sdk_directory, + disk_storage: true, + extra_runtime_values: vec![ + ( + bd_runtime::runtime::state::UsePersistentStorage::path(), + ValueKind::Bool(true), + ), + ( + bd_runtime::runtime::log_upload::BatchSizeFlag::path(), + ValueKind::Int(1), + ), + ( + bd_runtime::runtime::state::SnapshotCreationIntervalMs::path(), + ValueKind::Int(0), + ), + ( + bd_runtime::runtime::state::StateUploadEnabled::path(), + ValueKind::Bool(true), + ), + ], + ..Default::default() + }); + + setup.configure_stream_all_logs(); + + setup + .logger_handle + .set_feature_flag_exposure("batch_flag".to_string(), Some("test".to_string())); + + setup.log( + log_level::INFO, + LogType::NORMAL, + "batch 1 log".into(), + [].into(), + [].into(), + None, + ); + + let _ = setup.server.blocking_next_log_upload(); + + let mut first_batch_artifacts = 0; + let timeout = std::time::Duration::from_millis(500); + let start = std::time::Instant::now(); + while start.elapsed() < timeout { + if setup.server.blocking_next_artifact_upload().is_some() { + first_batch_artifacts += 1; + } else { + break; + } + } + + setup.log( + log_level::INFO, + LogType::NORMAL, + "batch 2 log".into(), + [].into(), + [].into(), + None, + ); + + let _ = setup.server.blocking_next_log_upload(); + + let mut second_batch_artifacts = 0; + let start = std::time::Instant::now(); + while start.elapsed() < timeout { + if setup.server.blocking_next_artifact_upload().is_some() { + second_batch_artifacts += 1; + } else { + break; + } + } + + setup.log( + log_level::INFO, + LogType::NORMAL, + "batch 3 log".into(), + [].into(), + [].into(), + None, + ); + + let _ = setup.server.blocking_next_log_upload(); + + let mut third_batch_artifacts = 0; + let start = std::time::Instant::now(); + while start.elapsed() < timeout { + if setup.server.blocking_next_artifact_upload().is_some() { + third_batch_artifacts += 1; + } else { + break; + } + } + + assert!( + second_batch_artifacts <= first_batch_artifacts + && third_batch_artifacts <= first_batch_artifacts, + "subsequent batches should not upload more state than first batch \ + (first={first_batch_artifacts}, second={second_batch_artifacts}, \ + third={third_batch_artifacts})" + ); +} diff --git a/bd-proto/src/protos/client/artifact.rs b/bd-proto/src/protos/client/artifact.rs index 4a0f9dd64..6d1a36e46 100644 --- a/bd-proto/src/protos/client/artifact.rs +++ b/bd-proto/src/protos/client/artifact.rs @@ -174,6 +174,8 @@ pub mod artifact_upload_index { pub feature_flags: ::std::vec::Vec, // @@protoc_insertion_point(field:bitdrift_public.protobuf.client.v1.ArtifactUploadIndex.Artifact.type_id) pub type_id: ::std::option::Option<::std::string::String>, + // @@protoc_insertion_point(field:bitdrift_public.protobuf.client.v1.ArtifactUploadIndex.Artifact.storage_format) + pub storage_format: ::protobuf::EnumOrUnknown, // special fields // @@protoc_insertion_point(special_field:bitdrift_public.protobuf.client.v1.ArtifactUploadIndex.Artifact.special_fields) pub special_fields: ::protobuf::SpecialFields, @@ -191,7 +193,7 @@ pub mod artifact_upload_index { } pub(in super) fn generated_message_descriptor_data() -> ::protobuf::reflect::GeneratedMessageDescriptorData { - let mut fields = ::std::vec::Vec::with_capacity(7); + let mut fields = ::std::vec::Vec::with_capacity(8); let mut oneofs = ::std::vec::Vec::with_capacity(0); fields.push(::protobuf::reflect::rt::v2::make_simpler_field_accessor::<_, _>( "name", @@ -228,6 +230,11 @@ pub mod artifact_upload_index { |m: &Artifact| { &m.type_id }, |m: &mut Artifact| { &mut m.type_id }, )); + fields.push(::protobuf::reflect::rt::v2::make_simpler_field_accessor::<_, _>( + "storage_format", + |m: &Artifact| { &m.storage_format }, + |m: &mut Artifact| { &mut m.storage_format }, + )); ::protobuf::reflect::GeneratedMessageDescriptorData::new_2::( "ArtifactUploadIndex.Artifact", fields, @@ -279,6 +286,9 @@ pub mod artifact_upload_index { 66 => { self.type_id = ::std::option::Option::Some(is.read_string()?); }, + 72 => { + self.storage_format = is.read_enum_or_unknown()?; + }, tag => { ::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?; }, @@ -318,6 +328,9 @@ pub mod artifact_upload_index { if let Some(v) = self.type_id.as_ref() { my_size += ::protobuf::rt::string_size(8, &v); } + if self.storage_format != ::protobuf::EnumOrUnknown::new(super::StorageFormat::CHECKSUMMED) { + my_size += ::protobuf::rt::int32_size(9, self.storage_format.value()); + } my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields()); self.special_fields.cached_size().set(my_size as u32); my_size @@ -352,6 +365,9 @@ pub mod artifact_upload_index { if let Some(v) = self.type_id.as_ref() { os.write_string(8, v)?; } + if self.storage_format != ::protobuf::EnumOrUnknown::new(super::StorageFormat::CHECKSUMMED) { + os.write_enum(9, ::protobuf::EnumOrUnknown::value(&self.storage_format))?; + } os.write_unknown_fields(self.special_fields.unknown_fields())?; ::std::result::Result::Ok(()) } @@ -376,6 +392,7 @@ pub mod artifact_upload_index { self.session_id.clear(); self.feature_flags.clear(); self.type_id = ::std::option::Option::None; + self.storage_format = ::protobuf::EnumOrUnknown::new(super::StorageFormat::CHECKSUMMED); self.special_fields.clear(); } @@ -403,13 +420,75 @@ pub mod artifact_upload_index { } } +#[derive(Clone,Copy,PartialEq,Eq,Debug,Hash)] +// @@protoc_insertion_point(enum:bitdrift_public.protobuf.client.v1.StorageFormat) +pub enum StorageFormat { + // @@protoc_insertion_point(enum_value:bitdrift_public.protobuf.client.v1.StorageFormat.CHECKSUMMED) + CHECKSUMMED = 0, + // @@protoc_insertion_point(enum_value:bitdrift_public.protobuf.client.v1.StorageFormat.RAW) + RAW = 1, +} + +impl ::protobuf::Enum for StorageFormat { + const NAME: &'static str = "StorageFormat"; + + fn value(&self) -> i32 { + *self as i32 + } + + fn from_i32(value: i32) -> ::std::option::Option { + match value { + 0 => ::std::option::Option::Some(StorageFormat::CHECKSUMMED), + 1 => ::std::option::Option::Some(StorageFormat::RAW), + _ => ::std::option::Option::None + } + } + + fn from_str(str: &str) -> ::std::option::Option { + match str { + "CHECKSUMMED" => ::std::option::Option::Some(StorageFormat::CHECKSUMMED), + "RAW" => ::std::option::Option::Some(StorageFormat::RAW), + _ => ::std::option::Option::None + } + } + + const VALUES: &'static [StorageFormat] = &[ + StorageFormat::CHECKSUMMED, + StorageFormat::RAW, + ]; +} + +impl ::protobuf::EnumFull for StorageFormat { + fn enum_descriptor() -> ::protobuf::reflect::EnumDescriptor { + static descriptor: ::protobuf::rt::Lazy<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::Lazy::new(); + descriptor.get(|| file_descriptor().enum_by_package_relative_name("StorageFormat").unwrap()).clone() + } + + fn descriptor(&self) -> ::protobuf::reflect::EnumValueDescriptor { + let index = *self as usize; + Self::enum_descriptor().value_by_index(index) + } +} + +impl ::std::default::Default for StorageFormat { + fn default() -> Self { + StorageFormat::CHECKSUMMED + } +} + +impl StorageFormat { + fn generated_enum_descriptor_data() -> ::protobuf::reflect::GeneratedEnumDescriptorData { + ::protobuf::reflect::GeneratedEnumDescriptorData::new::("StorageFormat") + } +} + static file_descriptor_proto_data: &'static [u8] = b"\ \n1bitdrift_public/protobuf/client/v1/artifact.proto\x12\"bitdrift_publi\ c.protobuf.client.v1\x1a5bitdrift_public/protobuf/client/v1/feature_flag\ .proto\x1a1bitdrift_public/protobuf/logging/v1/payload.proto\x1a\x1fgoog\ - le/protobuf/timestamp.proto\"\xf5\x04\n\x13ArtifactUploadIndex\x12\\\n\ + le/protobuf/timestamp.proto\"\xcf\x05\n\x13ArtifactUploadIndex\x12\\\n\ \x08artifact\x18\x01\x20\x03(\x0b2@.bitdrift_public.protobuf.client.v1.A\ - rtifactUploadIndex.ArtifactR\x08artifact\x1a\xff\x03\n\x08Artifact\x12\ + rtifactUploadIndex.ArtifactR\x08artifact\x1a\xd9\x04\n\x08Artifact\x12\ \x12\n\x04name\x18\x01\x20\x01(\tR\x04name\x12.\n\x04time\x18\x02\x20\ \x01(\x0b2\x1a.google.protobuf.TimestampR\x04time\x12<\n\x1apending_inte\ nt_negotiation\x18\x03\x20\x01(\x08R\x18pendingIntentNegotiation\x12j\n\ @@ -418,9 +497,12 @@ static file_descriptor_proto_data: &'static [u8] = b"\ n_id\x18\x06\x20\x01(\tR\tsessionId\x12T\n\rfeature_flags\x18\x07\x20\ \x03(\x0b2/.bitdrift_public.protobuf.client.v1.FeatureFlagR\x0cfeatureFl\ ags\x12\x1c\n\x07type_id\x18\x08\x20\x01(\tH\0R\x06typeId\x88\x01\x01\ - \x1af\n\rMetadataEntry\x12\x10\n\x03key\x18\x01\x20\x01(\tR\x03key\x12?\ - \n\x05value\x18\x02\x20\x01(\x0b2).bitdrift_public.protobuf.logging.v1.D\ - ataR\x05value:\x028\x01B\n\n\x08_type_idb\x06proto3\ + \x12X\n\x0estorage_format\x18\t\x20\x01(\x0e21.bitdrift_public.protobuf.\ + client.v1.StorageFormatR\rstorageFormat\x1af\n\rMetadataEntry\x12\x10\n\ + \x03key\x18\x01\x20\x01(\tR\x03key\x12?\n\x05value\x18\x02\x20\x01(\x0b2\ + ).bitdrift_public.protobuf.logging.v1.DataR\x05value:\x028\x01B\n\n\x08_\ + type_id*)\n\rStorageFormat\x12\x0f\n\x0bCHECKSUMMED\x10\0\x12\x07\n\x03R\ + AW\x10\x01b\x06proto3\ "; /// `FileDescriptorProto` object which was a source for this generated file @@ -444,7 +526,8 @@ pub fn file_descriptor() -> &'static ::protobuf::reflect::FileDescriptor { let mut messages = ::std::vec::Vec::with_capacity(2); messages.push(ArtifactUploadIndex::generated_message_descriptor_data()); messages.push(artifact_upload_index::Artifact::generated_message_descriptor_data()); - let mut enums = ::std::vec::Vec::with_capacity(0); + let mut enums = ::std::vec::Vec::with_capacity(1); + enums.push(StorageFormat::generated_enum_descriptor_data()); ::protobuf::reflect::GeneratedFileDescriptor::new_generated( file_descriptor_proto(), deps, diff --git a/bd-proto/src/protos/client/key_value.rs b/bd-proto/src/protos/client/key_value.rs index 95066227e..8dfece71e 100644 --- a/bd-proto/src/protos/client/key_value.rs +++ b/bd-proto/src/protos/client/key_value.rs @@ -686,6 +686,146 @@ pub mod app_version { } } +// @@protoc_insertion_point(message:bitdrift_public.protobuf.client.v1.StateSnapshotRange) +#[derive(PartialEq,Clone,Default,Debug)] +pub struct StateSnapshotRange { + // message fields + // @@protoc_insertion_point(field:bitdrift_public.protobuf.client.v1.StateSnapshotRange.oldest_micros) + pub oldest_micros: u64, + // @@protoc_insertion_point(field:bitdrift_public.protobuf.client.v1.StateSnapshotRange.newest_micros) + pub newest_micros: u64, + // special fields + // @@protoc_insertion_point(special_field:bitdrift_public.protobuf.client.v1.StateSnapshotRange.special_fields) + pub special_fields: ::protobuf::SpecialFields, +} + +impl<'a> ::std::default::Default for &'a StateSnapshotRange { + fn default() -> &'a StateSnapshotRange { + ::default_instance() + } +} + +impl StateSnapshotRange { + pub fn new() -> StateSnapshotRange { + ::std::default::Default::default() + } + + fn generated_message_descriptor_data() -> ::protobuf::reflect::GeneratedMessageDescriptorData { + let mut fields = ::std::vec::Vec::with_capacity(2); + let mut oneofs = ::std::vec::Vec::with_capacity(0); + fields.push(::protobuf::reflect::rt::v2::make_simpler_field_accessor::<_, _>( + "oldest_micros", + |m: &StateSnapshotRange| { &m.oldest_micros }, + |m: &mut StateSnapshotRange| { &mut m.oldest_micros }, + )); + fields.push(::protobuf::reflect::rt::v2::make_simpler_field_accessor::<_, _>( + "newest_micros", + |m: &StateSnapshotRange| { &m.newest_micros }, + |m: &mut StateSnapshotRange| { &mut m.newest_micros }, + )); + ::protobuf::reflect::GeneratedMessageDescriptorData::new_2::( + "StateSnapshotRange", + fields, + oneofs, + ) + } +} + +impl ::protobuf::Message for StateSnapshotRange { + const NAME: &'static str = "StateSnapshotRange"; + + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::Result<()> { + while let Some(tag) = is.read_raw_tag_or_eof()? { + match tag { + 8 => { + self.oldest_micros = is.read_uint64()?; + }, + 16 => { + self.newest_micros = is.read_uint64()?; + }, + tag => { + ::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u64 { + let mut my_size = 0; + if self.oldest_micros != 0 { + my_size += ::protobuf::rt::uint64_size(1, self.oldest_micros); + } + if self.newest_micros != 0 { + my_size += ::protobuf::rt::uint64_size(2, self.newest_micros); + } + my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields()); + self.special_fields.cached_size().set(my_size as u32); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::Result<()> { + if self.oldest_micros != 0 { + os.write_uint64(1, self.oldest_micros)?; + } + if self.newest_micros != 0 { + os.write_uint64(2, self.newest_micros)?; + } + os.write_unknown_fields(self.special_fields.unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn special_fields(&self) -> &::protobuf::SpecialFields { + &self.special_fields + } + + fn mut_special_fields(&mut self) -> &mut ::protobuf::SpecialFields { + &mut self.special_fields + } + + fn new() -> StateSnapshotRange { + StateSnapshotRange::new() + } + + fn clear(&mut self) { + self.oldest_micros = 0; + self.newest_micros = 0; + self.special_fields.clear(); + } + + fn default_instance() -> &'static StateSnapshotRange { + static instance: StateSnapshotRange = StateSnapshotRange { + oldest_micros: 0, + newest_micros: 0, + special_fields: ::protobuf::SpecialFields::new(), + }; + &instance + } +} + +impl ::protobuf::MessageFull for StateSnapshotRange { + fn descriptor() -> ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::Lazy::new(); + descriptor.get(|| file_descriptor().message_by_package_relative_name("StateSnapshotRange").unwrap()).clone() + } +} + +impl ::std::fmt::Display for StateSnapshotRange { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for StateSnapshotRange { + type RuntimeType = ::protobuf::reflect::rt::RuntimeTypeMessage; +} + static file_descriptor_proto_data: &'static [u8] = b"\ \n2bitdrift_public/protobuf/client/v1/key_value.proto\x12\"bitdrift_publ\ ic.protobuf.client.v1\x1a1bitdrift_public/protobuf/logging/v1/payload.pr\ @@ -698,7 +838,9 @@ static file_descriptor_proto_data: &'static [u8] = b"\ uf.logging.v1.Log.FieldR\x06fields\"\x80\x01\n\nAppVersion\x12\x18\n\x07\ version\x18\x01\x20\x01(\tR\x07version\x12*\n\x10app_version_code\x18\ \x02\x20\x01(\x03H\0R\x0eappVersionCode\x12#\n\x0cbuild_number\x18\x03\ - \x20\x01(\tH\0R\x0bbuildNumberB\x07\n\x05extrab\x06proto3\ + \x20\x01(\tH\0R\x0bbuildNumberB\x07\n\x05extra\"^\n\x12StateSnapshotRang\ + e\x12#\n\roldest_micros\x18\x01\x20\x01(\x04R\x0coldestMicros\x12#\n\rne\ + west_micros\x18\x02\x20\x01(\x04R\x0cnewestMicrosb\x06proto3\ "; /// `FileDescriptorProto` object which was a source for this generated file @@ -718,11 +860,12 @@ pub fn file_descriptor() -> &'static ::protobuf::reflect::FileDescriptor { let mut deps = ::std::vec::Vec::with_capacity(2); deps.push(super::payload::file_descriptor().clone()); deps.push(::protobuf::well_known_types::timestamp::file_descriptor().clone()); - let mut messages = ::std::vec::Vec::with_capacity(4); + let mut messages = ::std::vec::Vec::with_capacity(5); messages.push(FixedSessionStrategyState::generated_message_descriptor_data()); messages.push(ActivitySessionStrategyState::generated_message_descriptor_data()); messages.push(CrashGlobalState::generated_message_descriptor_data()); messages.push(AppVersion::generated_message_descriptor_data()); + messages.push(StateSnapshotRange::generated_message_descriptor_data()); let mut enums = ::std::vec::Vec::with_capacity(0); ::protobuf::reflect::GeneratedFileDescriptor::new_generated( file_descriptor_proto(), diff --git a/bd-resilient-kv/VERSIONED_FORMAT.md b/bd-resilient-kv/VERSIONED_FORMAT.md index aa684ad49..9373fbcee 100644 --- a/bd-resilient-kv/VERSIONED_FORMAT.md +++ b/bd-resilient-kv/VERSIONED_FORMAT.md @@ -255,11 +255,23 @@ required by downstream consumers. - Snapshot creation is gated by the **minimum retention timestamp**. If no handles are registered, snapshots are skipped. +- A journal rotation may therefore complete without producing an archived snapshot file when + retention does not require one. - Snapshots older than the minimum retention timestamp are eligible for deletion. - While any handle is pending, cleanup keeps all snapshots (minimum retention treated as `0`). - A **max snapshot count** safety cap bounds the number of retained snapshots unless retention requires keeping everything. +### Rotation Result Contract + +Rotation always creates a new active journal generation, but snapshot output is conditional: + +- if retention requires a snapshot, rotation returns a snapshot path and the archived `.zz` file is + created; +- if retention does not require a snapshot, rotation returns no snapshot path. + +Callers must treat snapshot production as optional and handle the no-snapshot case explicitly. + ### Runtime Controls - `state.max_snapshot_count=0` disables snapshotting entirely. This is intended as a diff --git a/bd-resilient-kv/src/tests/versioned_kv_store_test.rs b/bd-resilient-kv/src/tests/versioned_kv_store_test.rs index 836c264bc..1150e34c8 100644 --- a/bd-resilient-kv/src/tests/versioned_kv_store_test.rs +++ b/bd-resilient-kv/src/tests/versioned_kv_store_test.rs @@ -785,7 +785,7 @@ async fn test_manual_rotation() -> anyhow::Result<()> { let rotation = setup.store.rotate_journal().await?; // Verify archived file exists (compressed) - assert!(rotation.snapshot_path.exists()); + assert!(rotation.snapshot_path.as_ref().unwrap().exists()); // Verify active journal still works let (ts3, _) = setup @@ -815,7 +815,7 @@ async fn test_manual_rotation() -> anyhow::Result<()> { // Decompress the archive and load it as a Store to verify that it contains the old state. let snapshot_store = setup - .make_store_from_snapshot_file(&rotation.snapshot_path) + .make_store_from_snapshot_file(rotation.snapshot_path.as_ref().unwrap()) .await?; assert_eq!( snapshot_store.get(Scope::FeatureFlagExposure, "key1"), @@ -993,7 +993,7 @@ async fn test_multiple_rotations() -> anyhow::Result<()> { .insert(Scope::FeatureFlagExposure, key, value) .await?; let rotation = setup.store.rotate_journal().await?; - snapshot_paths.push(rotation.snapshot_path.clone()); + snapshot_paths.push(rotation.snapshot_path.clone().unwrap()); } // Verify all compressed archives exist @@ -1049,11 +1049,10 @@ async fn test_rotation_with_retention_registry() -> anyhow::Result<()> { // Rotate WITHOUT any retention handles - snapshot should NOT be created let rotation1 = store.rotate_journal().await?; - let snapshot_path1 = rotation1.snapshot_path; - // Snapshot file should NOT exist because no handles require it + // Snapshot should NOT be returned because no handles require it assert!( - !snapshot_path1.exists(), + rotation1.snapshot_path.is_none(), "Snapshot should not be created when no retention handles exist" ); @@ -1076,7 +1075,7 @@ async fn test_rotation_with_retention_registry() -> anyhow::Result<()> { time_provider.advance(1.seconds()); let rotation2 = store.rotate_journal().await?; - let snapshot_path2 = rotation2.snapshot_path; + let snapshot_path2 = rotation2.snapshot_path.unwrap(); // Snapshot file SHOULD exist because handle requires retention assert!( @@ -1102,11 +1101,10 @@ async fn test_rotation_with_retention_registry() -> anyhow::Result<()> { time_provider.advance(1.seconds()); let rotation3 = store.rotate_journal().await?; - let snapshot_path3 = rotation3.snapshot_path; // After handle is dropped, snapshot should not be created assert!( - !snapshot_path3.exists(), + rotation3.snapshot_path.is_none(), "Snapshot should not be created after handle is dropped" ); @@ -1150,38 +1148,26 @@ async fn test_multiple_rotations_with_same_timestamp() -> anyhow::Result<()> { // Perform first rotation let rotation1 = store.rotate_journal().await?; - assert!( - rotation1.snapshot_path.exists(), - "First rotation should create snapshot" - ); + let snapshot_path1 = rotation1.snapshot_path.unwrap(); // Perform second rotation WITHOUT inserting new data // This means both rotations will have the same max timestamp let rotation2 = store.rotate_journal().await?; - assert!( - rotation2.snapshot_path.exists(), - "Second rotation should create snapshot" - ); + let snapshot_path2 = rotation2.snapshot_path.unwrap(); // Verify both snapshots exist with different filenames (due to different generations) - assert!( - rotation1.snapshot_path.exists(), - "First snapshot should still exist" - ); - assert!( - rotation2.snapshot_path.exists(), - "Second snapshot should exist" - ); + assert!(snapshot_path1.exists(), "First snapshot should still exist"); + assert!(snapshot_path2.exists(), "Second snapshot should exist"); // Verify the paths are different (different generations prevent collision) assert_ne!( - rotation1.snapshot_path, rotation2.snapshot_path, + snapshot_path1, snapshot_path2, "Snapshots should have different paths despite same timestamp" ); // Verify we can read both snapshots (they should be different files) - let snapshot1_data = std::fs::read(&rotation1.snapshot_path)?; - let snapshot2_data = std::fs::read(&rotation2.snapshot_path)?; + let snapshot1_data = std::fs::read(&snapshot_path1)?; + let snapshot2_data = std::fs::read(&snapshot_path2)?; // The files should exist and be valid assert!( diff --git a/bd-resilient-kv/src/tests/versioned_recovery_error_test.rs b/bd-resilient-kv/src/tests/versioned_recovery_error_test.rs index 2c217503c..62558251d 100644 --- a/bd-resilient-kv/src/tests/versioned_recovery_error_test.rs +++ b/bd-resilient-kv/src/tests/versioned_recovery_error_test.rs @@ -138,7 +138,7 @@ async fn test_recovery_with_deletions() -> anyhow::Result<()> { let rotation = store.rotate_journal().await?; // Read the snapshot - let compressed_data = std::fs::read(&rotation.snapshot_path)?; + let compressed_data = std::fs::read(rotation.snapshot_path.as_ref().unwrap())?; let decompressed_data = decompress_zlib(&compressed_data)?; // Use u64::MAX as snapshot timestamp since we're only checking the latest state diff --git a/bd-resilient-kv/src/versioned_kv_journal/store.rs b/bd-resilient-kv/src/versioned_kv_journal/store.rs index 356c569b4..de6cab215 100644 --- a/bd-resilient-kv/src/versioned_kv_journal/store.rs +++ b/bd-resilient-kv/src/versioned_kv_journal/store.rs @@ -114,7 +114,9 @@ impl From for DataLoss { pub struct Rotation { pub new_journal_path: PathBuf, pub old_journal_path: PathBuf, - pub snapshot_path: PathBuf, + /// Path to the snapshot file, if one was created during rotation. `None` when snapshotting is + /// disabled or no retention handle requires the snapshot. + pub snapshot_path: Option, } /// Result of opening a journal file, containing the journal, initial state, and data loss info. @@ -837,7 +839,11 @@ impl PersistentStore { Ok(Rotation { new_journal_path, old_journal_path, - snapshot_path: archived_path, + snapshot_path: if should_create_snapshot { + Some(archived_path) + } else { + None + }, }) } } diff --git a/bd-runtime/src/runtime.rs b/bd-runtime/src/runtime.rs index 7ad9742b0..a641ed7ad 100644 --- a/bd-runtime/src/runtime.rs +++ b/bd-runtime/src/runtime.rs @@ -940,8 +940,8 @@ pub mod state { // When set to false, the state store operates in-memory only and does not persist // feature flags or global state to disk. When set to true, the state store will // attempt to use persistent storage, falling back to in-memory if initialization fails. - // Defaults to false for safety during crash loops. - bool_feature_flag!(UsePersistentStorage, "state.use_persistent_storage", false); + // Defaults to true; in-memory mode is only used as a fallback when persistent storage fails. + bool_feature_flag!(UsePersistentStorage, "state.use_persistent_storage", true); // Controls the initial buffer size for the persistent state store in bytes. // This determines the starting size of the memory-mapped file. The buffer will grow @@ -957,6 +957,19 @@ pub mod state { // this bounds the memory usage of the state store. int_feature_flag!(MaxCapacity, "state.max_capacity_bytes", 1024 * 1024); + // Minimum interval between state snapshot creations in milliseconds. This batching is primarily + // necessary for log streaming configurations where all logs are streamed rapidly and there are a + // lot of state changes. Defaults to 5 minutes. + int_feature_flag!( + SnapshotCreationIntervalMs, + "state.snapshot_creation_interval_ms", + 5 * 60 * 1000 + ); + + // Controls whether state snapshot uploads are enabled. When disabled, state snapshots are + // not uploaded alongside log uploads. Defaults to false as a safe rollout mechanism. + bool_feature_flag!(StateUploadEnabled, "state.upload_enabled", false); + // Controls the maximum number of snapshots to retain for persistent state. // Snapshots will generally be cleaned up based on the retention window dictated by the active // retention handles, but this flag places an upper limit on the number of snapshots retained. The diff --git a/bd-state/README.md b/bd-state/README.md new file mode 100644 index 000000000..382822443 --- /dev/null +++ b/bd-state/README.md @@ -0,0 +1,157 @@ +# bd-state + +`bd-state` is the runtime state layer used by the logger pipeline. It wraps +`bd-resilient-kv::VersionedKVStore` and adds: + +- process-lifecycle semantics (capture previous process state, then clear selected scopes), +- change tracking (`last_change_micros`) for upload decisions, +- a simple async API for reads/writes, +- explicit snapshot rotation entry points used by `bd-logger`. + +This document focuses on how state is recorded, persisted, and uploaded with logs. + +## Related references + +- Journal/file format: `bd-resilient-kv/VERSIONED_FORMAT.md` +- Logger upload invariants: `bd-logger/AGENTS.md` +- Upload coordinator implementation: `bd-logger/src/state_upload.rs` + +## 1) State model and process lifecycle + +State is namespaced by `Scope` (`FeatureFlagExposure`, `GlobalState`, `System`) and keyed by +string. Values are protobuf `StateValue`s. + +On persistent startup (`Store::persistent`): + +1. `VersionedKVStore` is opened and replayed into an in-memory `ScopedMaps` cache. +2. That cache is cloned into `previous_state` (for crash/context reporting). +3. `FeatureFlagExposure` and `GlobalState` are cleared for the new process. +4. `System` scope is not blanket-cleared by this startup path. + +If persistent initialization fails, `persistent_or_fallback` falls back to in-memory storage and +returns `fallback_occurred = true`. + +## 2) How writes are recorded + +`Store` delegates to `VersionedKVStore` and tracks effective state mutations: + +- `insert(scope, key, value)`: + - no-op if new value equals existing value, + - otherwise appends a journal entry and updates cache. +- `remove(scope, key)`: + - appends a tombstone entry (empty `StateValue`) if key exists. +- `extend(scope, entries)`: + - batch write path in underlying store; currently treated as changed when non-empty. +- `clear(scope)`: + - iterates keys in that scope and removes each one. + +For real mutations, `Store` updates `last_change_micros` using `fetch_max`, so the timestamp is +monotonic non-decreasing even with concurrent writers. + +## 3) On-disk persistence format and layout + +When persistent mode is enabled, files live under the configured state directory. + +Active journal: + +- `state.jrn.` (e.g. `state.jrn.0`) +- memory-mapped for fast append and replay +- not compressed + +Archived snapshots (on rotation): + +- `snapshots/state.jrn.g.t.zz` +- zlib-compressed archived journal of the old generation + +Journal entry framing and semantics are defined in `VERSIONED_FORMAT.md`: + +- each entry stores scope + key + timestamp + protobuf payload + CRC, +- timestamp is per-entry write time (microseconds), +- tombstones represent deletions, +- compacted journals preserve original entry timestamps. + +Important timestamp nuance: snapshot filename timestamp is the **rotation time marker** for that +archived file, while each entry inside the file keeps its own original write timestamp. + +## 4) Rotation, compaction, and retention + +Rotation can happen automatically (high-water mark/capacity pressure) or manually +(`Store::rotate_journal` from `bd-state`): + +1. create a new active generation, +2. rewrite compacted live state into the new journal (preserving entry timestamps), +3. archive+compress the old generation to `snapshots/*.zz`, +4. run snapshot cleanup policy, +5. delete the old uncompressed generation file. + +Snapshot creation is retention-aware: + +- controlled by `RetentionRegistry` and `state.max_snapshot_count`, +- snapshotting is disabled if max snapshot count is `0`, +- if no retention handle requests data, rotation may skip snapshot creation, +- cleanup removes snapshots older than required retention and enforces max count safety cap. + +`Store::rotate_journal()` returns `Some(snapshot_path)` when rotation produces an archived snapshot +file and returns `None` when rotation completes without archived snapshot output. + +## 5) How snapshots are uploaded with logs + +State snapshots and logs are separate artifact streams; coordination lives in `bd-logger`. + +### Producer side (log batch flush paths) + +`BatchBuilder` tracks `(oldest_micros, newest_micros)` incrementally while logs are added. +Before consuming a batch (`take()`), uploader paths call: + +`StateUploadHandle::notify_upload_needed(oldest, newest)` + +This currently happens in: + +- continuous flush (`flush_current_batch`), +- trigger/complete flush (`flush_batch`). + +`StreamedBufferUpload::start` currently has a TODO for state upload integration. + +`notify_upload_needed` is non-blocking: + +- merges ranges in a shared accumulator, +- best-effort wakes a single worker via a capacity-1 channel, +- does not block log upload path. + +### Worker side (`StateUploadWorker`) + +Single background worker owns all decisions and retries: + +1. drains/coalesces pending range, persists it to key-value key `state_upload.pending_range.1`, +2. computes preflight decision: + - if snapshots exist in requested range: upload them oldest-first, + - else create an on-demand snapshot via `state_store.rotate_journal()` (subject to cooldown and + in-process duplicate-rotation checks), +3. enqueues each snapshot through `bd-artifact-upload` as `UploadSource::Path(...)`, +4. waits for persistence ack from artifact queue. + +Retention ownership is split: + +- buffer consumer retention handles represent logs that may still be uploaded in the future, +- uploader retention handle represents only uploader pending coverage. + +The uploader updates its retention from `pending_range.oldest_micros` while pending work exists and +sets `RETENTION_NONE` when pending work is empty. As uploads succeed, it tightens +`pending_range.oldest_micros` and persists the updated range so restart resumes with the same +coverage. + +Queue semantics are move-based: + +- success: snapshot file is moved out of `{state_store_path}/snapshots/` into artifact upload + storage, +- enqueue failure: source file remains in `{state_store_path}/snapshots/`, so later retries can + re-attempt. + +The worker keeps pending coverage on backpressure/errors and retries periodically; pending range is +recovered on startup so upload intent survives process restart. + +## 6) Why this matches server-side state hydration + +The server reconstructs active state for log time `T` from per-entry timestamps in snapshots +(entries with write timestamp `<= T`), not by snapshot filename time alone. The client therefore +only needs to ensure relevant snapshot files are eventually uploaded along with log traffic. diff --git a/bd-state/src/lib.rs b/bd-state/src/lib.rs index b603eaf72..9d6e68996 100644 --- a/bd-state/src/lib.rs +++ b/bd-state/src/lib.rs @@ -22,8 +22,15 @@ pub mod test; pub use self::InitStrategy::{InMemoryOnly, PersistentWithFallback}; use ahash::AHashMap; -use bd_resilient_kv::{DataLoss, RetentionRegistry, ScopedMaps, StateValue}; -pub use bd_resilient_kv::{PersistentStoreConfig, Scope, StateValue as Value, Value_type}; +use bd_resilient_kv::{DataLoss, ScopedMaps, StateValue}; +pub use bd_resilient_kv::{ + PersistentStoreConfig, + RetentionHandle, + RetentionRegistry, + Scope, + StateValue as Value, + Value_type, +}; use bd_runtime::runtime::ConfigLoader; use bd_time::{OffsetDateTimeExt, TimeProvider}; use itertools::Itertools as _; @@ -684,6 +691,28 @@ impl Store { pub async fn read(&self) -> impl StateReader + '_ { self.inner.read().await } + + /// Triggers a journal rotation to create a snapshot. + /// + /// This is primarily used to create a state snapshot before uploading logs, ensuring the server + /// has the state context needed to hydrate those logs. The rotation creates a compressed `.zz` + /// snapshot file in the `state/snapshots/` directory. + /// + /// Returns the path to the created snapshot file, or `None` if: + /// - The store is in-memory only (no persistence) + /// - The rotation failed for some reason + /// + /// Note: For in-memory stores, this is a no-op that returns `None`. + pub async fn rotate_journal(&self) -> Option { + let mut locked = self.inner.write().await; + match locked.rotate_journal().await { + Ok(rotation) => rotation.snapshot_path, + Err(e) => { + log::debug!("Failed to rotate journal for snapshot: {e}"); + None + }, + } + } } impl StateReader for tokio::sync::RwLockReadGuard<'_, bd_resilient_kv::VersionedKVStore> { diff --git a/bd-test-helpers/src/filter.rs b/bd-test-helpers/src/filter.rs index 50483ef39..e4e912706 100644 --- a/bd-test-helpers/src/filter.rs +++ b/bd-test-helpers/src/filter.rs @@ -79,7 +79,11 @@ pub mod macros { }; } - pub use {capture_field, field_value, regex_match_and_substitute_field, remove_field, set_field}; + pub use capture_field; + pub use field_value; + pub use regex_match_and_substitute_field; + pub use remove_field; + pub use set_field; } #[must_use] diff --git a/bd-test-helpers/src/runtime.rs b/bd-test-helpers/src/runtime.rs index 573bc2769..9f641d6a9 100644 --- a/bd-test-helpers/src/runtime.rs +++ b/bd-test-helpers/src/runtime.rs @@ -11,6 +11,7 @@ use bd_proto::protos::client::runtime::runtime::{Value, value}; /// A simple representation of a runtime value. This is used to provide better ergonomics than the /// protobuf enums. +#[derive(Clone)] pub enum ValueKind { Bool(bool), Int(u32),