Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
05ca00a
Add state upload: StateLogCorrelator, state snapshot uploads alongsid…
snowp Feb 25, 2026
fe31d0d
cleanup
snowp Feb 25, 2026
00d667e
remove fallback reporting
snowp Feb 25, 2026
32fbcbb
remove agents
snowp Feb 25, 2026
daa9a71
remove refernces to correlator, better range computation
snowp Feb 25, 2026
a7ad81b
use helper
snowp Feb 25, 2026
11e2113
use time helper in more places
snowp Feb 25, 2026
6457e37
document in agents
snowp Feb 25, 2026
be80130
fix oneshot consideration
snowp Feb 25, 2026
9e1e3c3
improve durability
snowp Feb 26, 2026
0ee2d8e
better durability guarantees
snowp Feb 26, 2026
38340f3
improve test coverage
snowp Feb 26, 2026
7e4606f
use enum for error propagation
snowp Feb 26, 2026
66b7107
avoid losing ranges if handle ch is full
snowp Feb 26, 2026
f3df1da
update agents
snowp Feb 26, 2026
bcbaa44
fix tests
snowp Feb 26, 2026
46c7a1d
todo
snowp Feb 26, 2026
0ee1ba4
persistence etc
snowp Feb 26, 2026
59e7346
update continuous buffer snapshot retentin
snowp Feb 26, 2026
298b4ac
Merge origin/main
snowp Feb 27, 2026
0182771
dedupe enqueue interface
snowp Feb 27, 2026
ead0152
cleanup
snowp Feb 27, 2026
1a92229
cleanup
snowp Feb 27, 2026
fb979b0
comments
snowp Feb 27, 2026
f7aa7da
clean up tests some more
snowp Feb 27, 2026
aafc827
increase default snapshot delay
snowp Feb 27, 2026
c54e0c2
cleanup
snowp Feb 27, 2026
7f7ab16
fix agents
snowp Feb 27, 2026
b43d7a5
handle raw vs checksummed artifact files
snowp Feb 27, 2026
6d6b7b9
avoid zlib parsing, add more docs
snowp Feb 27, 2026
34462fe
clarify more
snowp Feb 28, 2026
83b30ab
fmt nightly
snowp Feb 28, 2026
8ee0aae
fix rotation return bug, rework deduping repeat rotations and clarify…
snowp Mar 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bd-artifact-upload/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

mod uploader;

pub use uploader::{Client, MockClient, SnappedFeatureFlag, Uploader};
pub use uploader::{Client, EnqueueError, MockClient, SnappedFeatureFlag, UploadSource, Uploader};

#[cfg(test)]
#[ctor::ctor]
Expand Down
220 changes: 174 additions & 46 deletions bd-artifact-upload/src/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use bd_error_reporter::reporter::handle_unexpected;
use bd_log_primitives::LogFields;
use bd_log_primitives::size::MemorySized;
use bd_proto::protos::client::api::{UploadArtifactIntentRequest, UploadArtifactRequest};
use bd_proto::protos::client::artifact::ArtifactUploadIndex;
use bd_proto::protos::client::artifact::artifact_upload_index::Artifact;
use bd_proto::protos::client::artifact::{ArtifactUploadIndex, StorageFormat};
use bd_proto::protos::client::feature_flag::FeatureFlag;
use bd_proto::protos::logging::payload::Data;
use bd_runtime::runtime::{ConfigLoader, DurationWatch, IntWatch, artifact_upload};
Expand All @@ -41,6 +41,7 @@ use std::sync::{Arc, LazyLock};
#[cfg(test)]
use tests::TestHooks;
use time::OffsetDateTime;
use tokio::sync::oneshot;
use uuid::Uuid;

/// Root directory for all files used for storage and uploading.
Expand All @@ -53,12 +54,14 @@ pub static REPORT_INDEX_FILE: LazyLock<PathBuf> = LazyLock::new(|| "report_index
pub enum ArtifactType {
#[default]
Report,
StateSnapshot,
}

impl ArtifactType {
fn to_type_id(self) -> &'static str {
match self {
Self::Report => "client_report",
Self::StateSnapshot => "state_snapshot",
}
}
}
Expand Down Expand Up @@ -109,21 +112,34 @@ impl SnappedFeatureFlag {
#[derive(Debug)]
struct NewUpload {
uuid: Uuid,
file: std::fs::File,
source: UploadSource,
type_id: String,
state: LogFields,
timestamp: Option<OffsetDateTime>,
session_id: String,
feature_flags: Vec<SnappedFeatureFlag>,
persisted_tx: Option<oneshot::Sender<std::result::Result<(), EnqueueError>>>,
}

#[derive(Debug)]
pub enum UploadSource {
// When a file handle is provided the uploader will copy the contents of the file to disk and
// append a CRC checksum to the end of the file to allow for integrity checking when we later
// read.
File(std::fs::File),
// For raw files they are directly moved to the target location without modification. This is
// intended for use cases where the data format is already self-validating (e.g. crc checksum or
// zlib compression).
Path(PathBuf),
}

// Used for bounded_buffer logs
impl std::fmt::Display for NewUpload {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"NewUpload {{ uuid: {}, file: {:?} }}",
self.uuid, self.file
"NewUpload {{ uuid: {}, source: {:?} }}",
self.uuid, self.source
)
}
}
Expand Down Expand Up @@ -174,17 +190,28 @@ impl Stats {
}
}

#[derive(Debug, thiserror::Error)]
pub enum EnqueueError {
#[error("upload queue full")]
QueueFull,
#[error("upload channel closed")]
Closed,
#[error(transparent)]
Other(#[from] anyhow::Error),
}

#[automock]
pub trait Client: Send + Sync {
fn enqueue_upload(
&self,
file: std::fs::File,
source: UploadSource,
type_id: String,
state: LogFields,
timestamp: Option<OffsetDateTime>,
session_id: String,
feature_flags: Vec<SnappedFeatureFlag>,
) -> anyhow::Result<Uuid>;
persisted_tx: Option<oneshot::Sender<std::result::Result<(), EnqueueError>>>,
) -> std::result::Result<Uuid, EnqueueError>;
}

pub struct UploadClient {
Expand All @@ -196,30 +223,35 @@ impl Client for UploadClient {
/// Dispatches a payload to be uploaded, returning the associated artifact UUID.
fn enqueue_upload(
&self,
file: std::fs::File,
source: UploadSource,
type_id: String,
state: LogFields,
timestamp: Option<OffsetDateTime>,
session_id: String,
feature_flags: Vec<SnappedFeatureFlag>,
) -> anyhow::Result<Uuid> {
persisted_tx: Option<oneshot::Sender<std::result::Result<(), EnqueueError>>>,
) -> std::result::Result<Uuid, EnqueueError> {
let uuid = uuid::Uuid::new_v4();

let result = self
.upload_tx
.try_send(NewUpload {
uuid,
file,
source,
type_id,
state,
timestamp,
session_id,
feature_flags,
persisted_tx,
})
.inspect_err(|e| log::warn!("failed to enqueue artifact upload: {e:?}"));

self.counter_stats.record(&result);
result?;
result.map_err(|e| match e {
bd_bounded_buffer::TrySendError::FullSizeOverflow => EnqueueError::QueueFull,
bd_bounded_buffer::TrySendError::Closed => EnqueueError::Closed,
})?;

Ok(uuid)
}
Expand Down Expand Up @@ -373,17 +405,27 @@ impl Uploader {
return Ok(());
};

let Ok(contents) = read_checksummed_data(&contents) else {
log::debug!(
"failed to validate CRC checksum for artifact {}, deleting and removing from index",
next.name
);

self.file_system.delete_file(&file_path).await?;
self.index.pop_front();
self.write_index().await;

return Ok(());
// For client reports we copy the file into a new file with a CRC checksum appended to allow
// for integrity checking. For state snapshot since they are already zlib encoded we bypass
// this check. TODO(snowp): Consider consolidating the behavior here, but keeping
// reports the same for now to avoid more changes than necessary.
let contents = if next.storage_format.enum_value_or_default() == StorageFormat::RAW {
// TODO(snowp): Should we consider validating the file here in some way?
contents
} else {
let Ok(contents) = read_checksummed_data(&contents) else {
log::debug!(
"failed to validate CRC checksum for artifact {}, deleting and removing from index",
next.name
);

self.file_system.delete_file(&file_path).await?;
self.index.pop_front();
self.write_index().await;

return Ok(());
};
contents
};
log::debug!("starting file upload for {:?}", next.name);
self.upload_task_handle = Some(tokio::spawn(Self::upload_artifact(
Expand Down Expand Up @@ -416,23 +458,25 @@ impl Uploader {
}
Some(NewUpload {
uuid,
file,
source,
type_id,
state,
timestamp,
session_id,
feature_flags,
persisted_tx,
}) = self.upload_queued_rx.recv() => {
log::debug!("tracking artifact: {uuid} for upload");
self
.track_new_upload(
uuid,
file,
source,
type_id,
state,
session_id,
timestamp,
feature_flags,
persisted_tx,
)
.await;
}
Expand Down Expand Up @@ -606,46 +650,126 @@ impl Uploader {
async fn track_new_upload(
&mut self,
uuid: Uuid,
file: std::fs::File,
source: UploadSource,
type_id: String,
state: LogFields,
session_id: String,
timestamp: Option<OffsetDateTime>,
feature_flags: Vec<SnappedFeatureFlag>,
mut persisted_tx: Option<oneshot::Sender<std::result::Result<(), EnqueueError>>>,
) {
// If we've reached our limit of entries, stop the entry currently being uploaded (the oldest
// one) to make space for the newer one.
// Previously we would always drop the oldest entry when we hit capacity, but for state
// snapshots this would result in us dropping uploads that we know we need to hydrate logs
// that were scheduled for uploads. To mitigate this we treat state snapshots differently
// and avoid dropping them when we hit capacity, which means that in the worst case if we have a
// lot of state snapshots we might fill up this queue and apply backpressure to the state
// snapshot producer, deferring the snapshot limit enforcement to the producer instead of the
// uploader.

// TODO(snowp): Consider also having a bound on the size of the files persisted to disk.
// TODO(snowp): We should consider redoing how backpressure works for crash reports as well as
// there are cases in which we drop reports. For now limit the backpressure mechanism to
// StateSnapshots as we want stronger guarantees than what is currently provided for regular
// crash reports.
if self.index.len() == usize::try_from(*self.max_entries.read()).unwrap_or_default() {
log::debug!("upload queue is full, dropping current upload");

self.stats.dropped.inc();
self.stop_current_upload();
self.index.pop_front();
if let Some(index_to_drop) = self.index.iter().position(|entry| {
entry.type_id.as_deref() != Some(ArtifactType::StateSnapshot.to_type_id())
}) {
Comment on lines +675 to +677
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what happens if there is no network? Don't we have to drop eventually? How does this work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way this ends up working with this PR is that the list of pending artifacts will grow and focus on keeping the state snapshots. Once this fills out we back pressure to the state upload and the uploader will stop sending snapshots into bd-artifact-upload. The limit that prevents unbounded growth then becomes the max snapshots that we retain on disk as enforced during the rotation process

log::debug!("upload queue is full, dropping oldest non-state upload");
self.stats.dropped.inc();
if index_to_drop == 0 {
self.stop_current_upload();
}
if let Some(entry) = self.index.remove(index_to_drop) {
let file_path = REPORT_DIRECTORY.join(&entry.name);
if let Err(e) = self.file_system.delete_file(&file_path).await {
log::warn!("failed to delete artifact {:?}: {}", entry.name, e);
}
}
self.write_index().await;
} else {
self.stats.dropped.inc();
if let Some(tx) = persisted_tx.take() {
let _ = tx.send(Err(EnqueueError::QueueFull));
}
return;
}
}

let uuid = uuid.to_string();

let target_file = match self
.file_system
.create_file(&REPORT_DIRECTORY.join(&uuid))
.await
{
Ok(file) => file,
Err(e) => {
log::warn!("failed to create file for artifact: {uuid} on disk: {e}");
let target_path = REPORT_DIRECTORY.join(&uuid);
let (write_result, storage_format) = match source {
UploadSource::File(file) => {
let target_file = match self.file_system.create_file(&target_path).await {
Ok(file) => file,
Err(e) => {
log::warn!("failed to create file for artifact: {uuid} on disk: {e}");
if let Some(tx) = persisted_tx.take() {
let _ = tx.send(Err(EnqueueError::Other(anyhow::anyhow!(
"failed to create file for artifact {uuid}: {e}"
))));
}

#[cfg(test)]
if let Some(hooks) = &self.test_hooks {
hooks.entry_received_tx.send(uuid.clone()).await.unwrap();
}
return;
#[cfg(test)]
if let Some(hooks) = &self.test_hooks {
hooks.entry_received_tx.send(uuid.clone()).await.unwrap();
}
return;
},
};

(
async_write_checksummed_data(tokio::fs::File::from_std(file), target_file).await,
StorageFormat::CHECKSUMMED,
)
},
UploadSource::Path(source_path) => {
let result = if let Err(e) = self
.file_system
.rename_file(&source_path, &target_path)
.await
{
log::debug!("failed to move artifact source, falling back to copy: {e}");
match std::fs::File::open(&source_path) {
Ok(source_file) => match self.file_system.create_file(&target_path).await {
Ok(target_file) => {
let result =
async_write_checksummed_data(tokio::fs::File::from_std(source_file), target_file)
.await;
if result.is_ok()
&& let Err(e) = self.file_system.delete_file(&source_path).await
{
log::debug!(
"failed to delete moved source file {}: {e}",
source_path.display()
);
}
result
},
Err(e) => Err(e),
},
Err(e) => Err(anyhow::anyhow!(
"failed to open file for artifact {} on disk: {}",
source_path.display(),
e
)),
}
} else {
Ok(())
};

(result, StorageFormat::RAW)
},
};

if let Err(e) = async_write_checksummed_data(tokio::fs::File::from_std(file), target_file).await
{
if let Err(e) = write_result {
log::warn!("failed to write artifact to disk: {uuid} to disk: {e}");
if let Some(tx) = persisted_tx.take() {
let _ = tx.send(Err(EnqueueError::Other(anyhow::anyhow!(
"failed to write artifact to disk {uuid}: {e}"
))));
}

#[cfg(test)]
if let Some(hooks) = &self.test_hooks {
Expand Down Expand Up @@ -673,6 +797,7 @@ impl Uploader {
.into_iter()
.map(|(key, value)| (key.into(), value.into_proto()))
.collect(),
storage_format: storage_format.into(),
feature_flags: feature_flags
.into_iter()
.map(
Expand All @@ -692,6 +817,9 @@ impl Uploader {
});

self.write_index().await;
if let Some(tx) = persisted_tx {
let _ = tx.send(Ok(()));
}


#[cfg(test)]
Expand Down
Loading
Loading