From 7993413132689a14d9f0650ac3cd8ef8a98065a3 Mon Sep 17 00:00:00 2001 From: Jocelyn Date: Tue, 11 Nov 2025 13:22:43 -0500 Subject: [PATCH 01/15] partitioned stream --- .../azure_storage_blob/src/streams/mod.rs | 1 + .../src/streams/partitioned_stream.rs | 187 ++++++++++++++++++ 2 files changed, 188 insertions(+) create mode 100644 sdk/storage/azure_storage_blob/src/streams/mod.rs create mode 100644 sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs diff --git a/sdk/storage/azure_storage_blob/src/streams/mod.rs b/sdk/storage/azure_storage_blob/src/streams/mod.rs new file mode 100644 index 0000000000..083f902096 --- /dev/null +++ b/sdk/storage/azure_storage_blob/src/streams/mod.rs @@ -0,0 +1 @@ +pub(crate) mod partitioned_stream; diff --git a/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs b/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs new file mode 100644 index 0000000000..60f316bb01 --- /dev/null +++ b/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs @@ -0,0 +1,187 @@ +use std::{ + mem, + pin::{pin, Pin}, + task::Poll, +}; + +use azure_core::stream::SeekableStream; +use bytes::Bytes; +use futures::{ready, stream::FusedStream, AsyncRead, Stream}; + +type AzureResult = azure_core::Result; + +pub(crate) struct PartitionedStream { + inner: Box, + buf: Vec, + partition_len: usize, + buf_offset: usize, + total_read: usize, + inner_complete: bool, +} + +impl PartitionedStream { + pub(crate) fn new(inner: Box, partition_len: usize) -> Self { + assert!(partition_len > 0); + Self { + buf: vec![0u8; std::cmp::min(partition_len, inner.len())], + inner, + partition_len, + buf_offset: 0, + total_read: 0, + inner_complete: false, + } + } + + fn take(&mut self) -> Vec { + let mut ret = mem::replace( + &mut self.buf, + vec![0u8; std::cmp::min(self.partition_len, self.inner.len() - self.total_read)], + ); + ret.truncate(self.buf_offset); + self.buf_offset = 0; + ret + } +} + +impl Stream for PartitionedStream { + type Item = AzureResult; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + + loop { + if this.inner_complete || this.buf_offset >= this.buf.len() { + let ret = this.take(); + return if ret.is_empty() { + Poll::Ready(None) + } else { + Poll::Ready(Some(Ok(Bytes::from(ret)))) + }; + } else { + match ready!(pin!(&mut this.inner).poll_read(cx, &mut this.buf[this.buf_offset..])) + { + Ok(bytes_read) => { + this.buf_offset += bytes_read; + this.total_read += bytes_read; + this.inner_complete = bytes_read == 0; + } + Err(e) => { + return Poll::Ready(Some(Err(e.into()))); + } + } + } + } + } +} + +impl FusedStream for PartitionedStream { + fn is_terminated(&self) -> bool { + self.inner_complete && self.buf.is_empty() + } +} + +#[cfg(test)] +mod tests { + use azure_core::stream::BytesStream; + use futures::TryStreamExt; + + use super::*; + + fn get_random_data(len: usize) -> Vec { + let mut data: Vec = vec![0; len]; + rand::fill(&mut data[..]); + data + } + + #[tokio::test] + async fn partitions_exact_multiple() -> AzureResult<()> { + for part_count in [2usize, 3, 11, 16] { + for part_len in [1024usize, 1000, 9999, 1] { + let data = get_random_data(part_len * part_count); + let stream = + PartitionedStream::new(Box::new(BytesStream::new(data.clone())), part_len); + + let parts: Vec<_> = stream.try_collect().await?; + + assert_eq!(parts.len(), part_count); + for (i, bytes) in parts.iter().enumerate() { + assert_eq!(bytes.len(), part_len); + assert_eq!(bytes[..], data[i * part_len..i * part_len + part_len]); + } + } + } + Ok(()) + } + + #[tokio::test] + async fn partitions_leftover_multiple() -> AzureResult<()> { + for part_count in [2usize, 3, 11, 16] { + for part_len in [1024usize, 1000, 9999] { + for dangling_len in [part_len / 2, 100, 128, 99] { + let data = get_random_data(part_len * (part_count - 1) + dangling_len); + let stream = + PartitionedStream::new(Box::new(BytesStream::new(data.clone())), part_len); + + let parts: Vec<_> = stream.try_collect().await?; + + assert_eq!(parts.len(), part_count); + for (i, bytes) in parts[..parts.len()].iter().enumerate() { + if i == parts.len() - 1 { + assert_eq!(bytes.len(), dangling_len); + assert_eq!(bytes[..], data[i * part_len..]); + } else { + assert_eq!(bytes.len(), part_len); + assert_eq!(bytes[..], data[i * part_len..i * part_len + part_len]); + } + } + } + } + } + Ok(()) + } + + #[tokio::test] + async fn partitions_exactly_one() -> AzureResult<()> { + for len in [1024usize, 1000, 9999, 1] { + let data = get_random_data(len); + let mut stream = PartitionedStream::new(Box::new(BytesStream::new(data.clone())), len); + + let single_partition = stream.try_next().await?.unwrap(); + + assert!(stream.try_next().await?.is_none()); + assert_eq!(single_partition[..], data[..]); + } + Ok(()) + } + + #[tokio::test] + async fn partitions_less_than_one() -> AzureResult<()> { + let part_len = 99999usize; + for len in [1024usize, 1000, 9999, 1] { + let data = get_random_data(len); + let mut stream = + PartitionedStream::new(Box::new(BytesStream::new(data.clone())), part_len); + + let single_partition = stream.try_next().await?.unwrap(); + + assert!(stream.try_next().await?.is_none()); + assert_eq!(single_partition[..], data[..]); + } + Ok(()) + } + + #[tokio::test] + async fn partitions_none() -> AzureResult<()> { + for part_len in [1024usize, 1000, 9999, 1] { + let data = get_random_data(0); + let mut stream = + PartitionedStream::new(Box::new(BytesStream::new(data.clone())), part_len); + + assert!(stream.try_next().await?.is_none()); + } + Ok(()) + } +} From 51117f4e19dbfe9719a046828a87cca5338702e5 Mon Sep 17 00:00:00 2001 From: Jocelyn Date: Wed, 12 Nov 2025 14:48:48 -0500 Subject: [PATCH 02/15] partitioned transfer core (upload/copy) --- .../src/partitioned_transfer/mod.rs | 118 ++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs diff --git a/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs new file mode 100644 index 0000000000..1beee9cf8f --- /dev/null +++ b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs @@ -0,0 +1,118 @@ +use std::{future::Future, num::NonZero}; + +use futures::{ + future::{self}, + Stream, TryStreamExt, +}; + +type AzureResult = azure_core::Result; + +async fn run_all_with_concurrency_limit( + mut ops_queue: impl Stream TFut, TErr>> + Unpin, + parallel: NonZero, +) -> Result<(), TErr> +where + TFut: Future>, +{ + let parallel = parallel.get(); + + let first_op = ops_queue.try_next().await?.ok_or_else(|| todo!())?; + + let mut get_next_completed_op_future = future::select_all(vec![Box::pin(first_op())]); + let mut get_next_queue_op_future = ops_queue.try_next(); + loop { + // while max parallel running ops, focus on just running ops + let mut running_ops = get_next_completed_op_future.into_inner(); + while running_ops.len() >= parallel { + let result; + (result, _, running_ops) = future::select_all(running_ops).await; + result? + } + get_next_completed_op_future = future::select_all(running_ops); + + match future::select(get_next_queue_op_future, get_next_completed_op_future).await { + future::Either::Left((Err(e), _)) => return Err(e), + future::Either::Right(((Err(e), _, _), _)) => return Err(e), + + // next op in the queue arrived first + future::Either::Left((Ok(next_op_in_queue), running_ops_fut)) => { + get_next_queue_op_future = ops_queue.try_next(); + get_next_completed_op_future = running_ops_fut; + + match next_op_in_queue { + Some(op) => { + running_ops = get_next_completed_op_future.into_inner(); + running_ops.push(Box::pin(op())); + get_next_completed_op_future = future::select_all(running_ops); + } + // queue was finished, race is over + None => break, + } + } + // a running op completed first + future::Either::Right(((Ok(_res), _, remaining_running_ops), next_op_fut)) => { + get_next_queue_op_future = next_op_fut; + get_next_completed_op_future = if remaining_running_ops.is_empty() { + todo!("handle no remaining ops but op queue hasn't completed") + } else { + future::select_all(remaining_running_ops) + }; + } + } + } + + let mut running_ops = get_next_completed_op_future.into_inner(); + while !running_ops.is_empty() { + let result; + (result, _, running_ops) = future::select_all(running_ops).await; + result?; + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::{sync::mpsc::channel, time::Duration}; + + #[tokio::test] + async fn limit_ops() -> AzureResult<()> { + let parallel = 4usize; + let num_ops = parallel + 1; + let wait_time_millis = 10u64; + let op_time_millis = wait_time_millis + 50; + + let (sender, receiver) = channel(); + + let ops = (0..num_ops).map(|i| { + let s = sender.clone(); + Ok(async move || { + s.send(i).unwrap(); + tokio::time::sleep(Duration::from_millis(op_time_millis)).await; + AzureResult::<()>::Ok(()) + }) + }); + + let race = future::select( + Box::pin(run_all_with_concurrency_limit( + futures::stream::iter(ops), + NonZero::new(parallel).unwrap(), + )), + Box::pin(tokio::time::sleep(Duration::from_millis(wait_time_millis))), + ) + .await; + match race { + future::Either::Left(_) => panic!("Wrong future won the race."), + future::Either::Right((_, run_all_fut)) => { + let mut nums: Vec<_> = receiver.try_iter().collect(); + nums.sort(); + assert_eq!(nums, (0..parallel).collect::>()); + + run_all_fut.await?; + assert_eq!(receiver.try_iter().collect::>().len(), 1); + } + } + + Ok(()) + } +} From 071fbe8a5c6c099f03eb2bd6d9ede3a914fa8755 Mon Sep 17 00:00:00 2001 From: Jocelyn Date: Wed, 12 Nov 2025 14:51:23 -0500 Subject: [PATCH 03/15] hookup moduled --- Cargo.lock | 2 ++ sdk/storage/azure_storage_blob/Cargo.toml | 3 +++ sdk/storage/azure_storage_blob/src/lib.rs | 2 ++ 3 files changed, 7 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 09b0c144b7..915a96f751 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -509,7 +509,9 @@ dependencies = [ "azure_core_test", "azure_identity", "azure_storage_blob_test", + "bytes", "futures", + "rand 0.9.2", "serde", "serde_json", "tokio", diff --git a/sdk/storage/azure_storage_blob/Cargo.toml b/sdk/storage/azure_storage_blob/Cargo.toml index 56a9b27b0b..3646901ba3 100644 --- a/sdk/storage/azure_storage_blob/Cargo.toml +++ b/sdk/storage/azure_storage_blob/Cargo.toml @@ -19,6 +19,8 @@ default = ["azure_core/default"] [dependencies] async-trait.workspace = true azure_core = { workspace = true, features = ["xml"] } +bytes.workspace = true +futures.workspace = true serde.workspace = true serde_json.workspace = true typespec_client_core = { workspace = true, features = ["derive"] } @@ -35,6 +37,7 @@ azure_core_test = { workspace = true, features = [ azure_identity.workspace = true azure_storage_blob_test.path = "../azure_storage_blob_test" futures.workspace = true +rand.workspace = true tokio = { workspace = true, features = ["macros"] } tracing.workspace = true diff --git a/sdk/storage/azure_storage_blob/src/lib.rs b/sdk/storage/azure_storage_blob/src/lib.rs index 4cd17cd46a..abf4db548b 100644 --- a/sdk/storage/azure_storage_blob/src/lib.rs +++ b/sdk/storage/azure_storage_blob/src/lib.rs @@ -11,7 +11,9 @@ pub mod clients; #[allow(unused_imports)] mod generated; mod parsers; +mod partitioned_transfer; mod pipeline; +mod streams; pub use clients::*; pub use parsers::*; pub mod models; From 30ab03e96601c496081e4720bc206cc318544049 Mon Sep 17 00:00:00 2001 From: Jocelyn Date: Thu, 13 Nov 2025 13:20:25 -0500 Subject: [PATCH 04/15] pass spellcheck --- .../azure_storage_blob/src/partitioned_transfer/mod.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs index 1beee9cf8f..34d6975b49 100644 --- a/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs +++ b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs @@ -84,6 +84,8 @@ mod tests { let (sender, receiver) = channel(); + // setup a series of operations that send a unique number to a channel + // we can then assert the expected numbers made it to the channel at expected times let ops = (0..num_ops).map(|i| { let s = sender.clone(); Ok(async move || { @@ -104,9 +106,9 @@ mod tests { match race { future::Either::Left(_) => panic!("Wrong future won the race."), future::Either::Right((_, run_all_fut)) => { - let mut nums: Vec<_> = receiver.try_iter().collect(); - nums.sort(); - assert_eq!(nums, (0..parallel).collect::>()); + let mut items: Vec<_> = receiver.try_iter().collect(); + items.sort(); + assert_eq!(items, (0..parallel).collect::>()); run_all_fut.await?; assert_eq!(receiver.try_iter().collect::>().len(), 1); From b505b8293532d432020acc98cab7daa873265c06 Mon Sep 17 00:00:00 2001 From: Jocelyn Date: Fri, 14 Nov 2025 14:48:01 -0500 Subject: [PATCH 05/15] caught some todos --- .../src/partitioned_transfer/mod.rs | 92 +++++++++++++++++-- 1 file changed, 84 insertions(+), 8 deletions(-) diff --git a/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs index 34d6975b49..0a6029a6b5 100644 --- a/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs +++ b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs @@ -16,7 +16,10 @@ where { let parallel = parallel.get(); - let first_op = ops_queue.try_next().await?.ok_or_else(|| todo!())?; + let first_op = match ops_queue.try_next().await? { + Some(item) => item, + None => return Ok(()), + }; let mut get_next_completed_op_future = future::select_all(vec![Box::pin(first_op())]); let mut get_next_queue_op_future = ops_queue.try_next(); @@ -50,13 +53,20 @@ where } } // a running op completed first - future::Either::Right(((Ok(_res), _, remaining_running_ops), next_op_fut)) => { - get_next_queue_op_future = next_op_fut; - get_next_completed_op_future = if remaining_running_ops.is_empty() { - todo!("handle no remaining ops but op queue hasn't completed") + future::Either::Right(((Ok(_), _, remaining_running_ops), next_op_fut)) => { + // select panics on empty iter, so we can't race in this case. + // forcibly wait for next op in queue and handle it before continuing. + if remaining_running_ops.is_empty() { + let next_op = match next_op_fut.await? { + Some(item) => item, + None => return Ok(()), + }; + get_next_queue_op_future = ops_queue.try_next(); + get_next_completed_op_future = future::select_all(vec![Box::pin(next_op())]); } else { - future::select_all(remaining_running_ops) - }; + get_next_queue_op_future = next_op_fut; + get_next_completed_op_future = future::select_all(remaining_running_ops); + } } } } @@ -72,8 +82,10 @@ where #[cfg(test)] mod tests { + use futures::{ready, FutureExt}; + use super::*; - use std::{sync::mpsc::channel, time::Duration}; + use std::{pin::Pin, sync::mpsc::channel, task::Poll, time::Duration}; #[tokio::test] async fn limit_ops() -> AzureResult<()> { @@ -117,4 +129,68 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn slow_stream() -> AzureResult<()> { + let parallel = 10; + let num_ops = 5; + let op_time_millis = 10; + let stream_time_millis = op_time_millis + 10; + // setup a series of operations that send a unique number to a channel + // we can then assert the expected numbers made it to the channel at expected times + let ops = (0..num_ops).map(|_| { + Ok(async move || { + tokio::time::sleep(Duration::from_millis(op_time_millis)).await; + AzureResult::<()>::Ok(()) + }) + }); + + run_all_with_concurrency_limit( + SlowStream::new(ops, Duration::from_millis(stream_time_millis)), + NonZero::new(parallel).unwrap(), + ) + .await + } + + #[tokio::test] + async fn empty_ops() -> AzureResult<()> { + let parallel = 4usize; + + // not possible to manually type what we need + // make a vec with a concrete element and then remove it to get the desired typing + let op = || future::ready::>(Ok(())); + let mut ops = vec![Ok(op)]; + ops.pop(); + + run_all_with_concurrency_limit(futures::stream::iter(ops), NonZero::new(parallel).unwrap()) + .await + } + + struct SlowStream { + sleep: Pin>, + interval: Duration, + iter: Iter, + } + impl SlowStream { + fn new(iter: Iter, interval: Duration) -> Self { + Self { + sleep: Box::pin(tokio::time::sleep(interval)), + interval, + iter, + } + } + } + impl Stream for SlowStream { + type Item = Iter::Item; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + ready!(this.sleep.poll_unpin(cx)); + this.sleep = Box::pin(tokio::time::sleep(this.interval)); + Poll::Ready(this.iter.next()) + } + } } From ef2833b167519bf6464b30fe7f1374b74e2f2db8 Mon Sep 17 00:00:00 2001 From: Jocelyn Date: Wed, 19 Nov 2025 22:21:47 -0500 Subject: [PATCH 06/15] broke out some functions | comments --- .../src/partitioned_transfer/mod.rs | 73 +++++++++++++------ 1 file changed, 52 insertions(+), 21 deletions(-) diff --git a/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs index 0a6029a6b5..bc185d8435 100644 --- a/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs +++ b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs @@ -1,4 +1,4 @@ -use std::{future::Future, num::NonZero}; +use std::{cmp::max, future::Future, num::NonZero, pin::Pin}; use futures::{ future::{self}, @@ -7,15 +7,24 @@ use futures::{ type AzureResult = azure_core::Result; -async fn run_all_with_concurrency_limit( - mut ops_queue: impl Stream TFut, TErr>> + Unpin, +async fn run_all_with_concurrency_limit( + mut ops_queue: impl Stream Fut, Err>> + Unpin, parallel: NonZero, -) -> Result<(), TErr> +) -> Result<(), Err> where - TFut: Future>, + Fut: Future>, { let parallel = parallel.get(); + // if no real parallelism, take the simple option of executing ops sequentially. + // The "true" implementation can't handle parallel < 2. + if parallel == 1 { + while let Some(op) = ops_queue.try_next().await? { + op().await?; + } + return Ok(()); + } + let first_op = match ops_queue.try_next().await? { Some(item) => item, None => return Ok(()), @@ -25,35 +34,29 @@ where let mut get_next_queue_op_future = ops_queue.try_next(); loop { // while max parallel running ops, focus on just running ops - let mut running_ops = get_next_completed_op_future.into_inner(); - while running_ops.len() >= parallel { - let result; - (result, _, running_ops) = future::select_all(running_ops).await; - result? - } - get_next_completed_op_future = future::select_all(running_ops); + get_next_completed_op_future = run_down(get_next_completed_op_future, parallel - 1).await?; match future::select(get_next_queue_op_future, get_next_completed_op_future).await { future::Either::Left((Err(e), _)) => return Err(e), future::Either::Right(((Err(e), _, _), _)) => return Err(e), - // next op in the queue arrived first + // Next op in the queue arrived first. Add it to existing running ops. future::Either::Left((Ok(next_op_in_queue), running_ops_fut)) => { get_next_queue_op_future = ops_queue.try_next(); get_next_completed_op_future = running_ops_fut; match next_op_in_queue { Some(op) => { - running_ops = get_next_completed_op_future.into_inner(); - running_ops.push(Box::pin(op())); - get_next_completed_op_future = future::select_all(running_ops); + get_next_completed_op_future = + combine_select_all(get_next_completed_op_future, Box::pin(op())); } // queue was finished, race is over None => break, } } - // a running op completed first + // A running op completed first. Start another select_all with remaining running ops. future::Either::Right(((Ok(_), _, remaining_running_ops), next_op_fut)) => { + // remaining_running_ops could be empty now. // select panics on empty iter, so we can't race in this case. // forcibly wait for next op in queue and handle it before continuing. if remaining_running_ops.is_empty() { @@ -71,13 +74,41 @@ where } } - let mut running_ops = get_next_completed_op_future.into_inner(); - while !running_ops.is_empty() { + let _ = future::try_join_all(get_next_completed_op_future.into_inner()).await?; + Ok(()) +} + +/// Loops `future::select_all()` with the existing `SelectAll`` until the target remaining +/// inner futures is reached. Will always leave at least one inner future remaining, for +/// type simplicity (select_all panics on len == 0); +async fn run_down( + select_fut: future::SelectAll>>, + target_remaining: usize, +) -> Result>>, Err> +where + Fut: Future>, +{ + let target_remaining = max(target_remaining, 1); + let mut select_vec = select_fut.into_inner(); + while select_vec.len() > target_remaining { let result; - (result, _, running_ops) = future::select_all(running_ops).await; + (result, _, select_vec) = future::select_all(select_vec).await; result?; } - Ok(()) + Ok(future::select_all(select_vec)) +} + +/// Adds a pin-boxed future to an existing SelectAll of pin-boxed futures. +fn combine_select_all( + select_fut: future::SelectAll>>, + new_fut: Pin>, +) -> future::SelectAll>> +where + Fut: Future, +{ + let mut futures = select_fut.into_inner(); + futures.push(new_fut); + future::select_all(futures) } #[cfg(test)] From 7cb2cfb0807bf98342acc7e775824c4a59701301 Mon Sep 17 00:00:00 2001 From: Jocelyn Date: Wed, 19 Nov 2025 22:31:29 -0500 Subject: [PATCH 07/15] replace assertion with NonZero usage --- .../src/streams/partitioned_stream.rs | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs b/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs index 60f316bb01..1ad3098233 100644 --- a/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs +++ b/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs @@ -1,5 +1,6 @@ use std::{ mem, + num::NonZero, pin::{pin, Pin}, task::Poll, }; @@ -20,8 +21,8 @@ pub(crate) struct PartitionedStream { } impl PartitionedStream { - pub(crate) fn new(inner: Box, partition_len: usize) -> Self { - assert!(partition_len > 0); + pub(crate) fn new(inner: Box, partition_len: NonZero) -> Self { + let partition_len = partition_len.get(); Self { buf: vec![0u8; std::cmp::min(partition_len, inner.len())], inner, @@ -101,8 +102,10 @@ mod tests { for part_count in [2usize, 3, 11, 16] { for part_len in [1024usize, 1000, 9999, 1] { let data = get_random_data(part_len * part_count); - let stream = - PartitionedStream::new(Box::new(BytesStream::new(data.clone())), part_len); + let stream = PartitionedStream::new( + Box::new(BytesStream::new(data.clone())), + NonZero::new(part_len).unwrap(), + ); let parts: Vec<_> = stream.try_collect().await?; @@ -122,8 +125,10 @@ mod tests { for part_len in [1024usize, 1000, 9999] { for dangling_len in [part_len / 2, 100, 128, 99] { let data = get_random_data(part_len * (part_count - 1) + dangling_len); - let stream = - PartitionedStream::new(Box::new(BytesStream::new(data.clone())), part_len); + let stream = PartitionedStream::new( + Box::new(BytesStream::new(data.clone())), + NonZero::new(part_len).unwrap(), + ); let parts: Vec<_> = stream.try_collect().await?; @@ -147,7 +152,10 @@ mod tests { async fn partitions_exactly_one() -> AzureResult<()> { for len in [1024usize, 1000, 9999, 1] { let data = get_random_data(len); - let mut stream = PartitionedStream::new(Box::new(BytesStream::new(data.clone())), len); + let mut stream = PartitionedStream::new( + Box::new(BytesStream::new(data.clone())), + NonZero::new(len).unwrap(), + ); let single_partition = stream.try_next().await?.unwrap(); @@ -162,8 +170,10 @@ mod tests { let part_len = 99999usize; for len in [1024usize, 1000, 9999, 1] { let data = get_random_data(len); - let mut stream = - PartitionedStream::new(Box::new(BytesStream::new(data.clone())), part_len); + let mut stream = PartitionedStream::new( + Box::new(BytesStream::new(data.clone())), + NonZero::new(part_len).unwrap(), + ); let single_partition = stream.try_next().await?.unwrap(); @@ -177,8 +187,10 @@ mod tests { async fn partitions_none() -> AzureResult<()> { for part_len in [1024usize, 1000, 9999, 1] { let data = get_random_data(0); - let mut stream = - PartitionedStream::new(Box::new(BytesStream::new(data.clone())), part_len); + let mut stream = PartitionedStream::new( + Box::new(BytesStream::new(data.clone())), + NonZero::new(part_len).unwrap(), + ); assert!(stream.try_next().await?.is_none()); } From 134c514a48888fd0d6b1b25ffd32b55e30a813aa Mon Sep 17 00:00:00 2001 From: Jocelyn <41338290+jaschrep-msft@users.noreply.github.com> Date: Mon, 24 Nov 2025 14:32:02 -0500 Subject: [PATCH 08/15] Apply suggestions from code review Accept useful generated comments. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../src/partitioned_transfer/mod.rs | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs index bc185d8435..89d7d6bb26 100644 --- a/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs +++ b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs @@ -7,6 +7,44 @@ use futures::{ type AzureResult = azure_core::Result; +/// Executes async operations from a queue with a concurrency limit. +/// +/// This function consumes a stream (`ops_queue`) of async operation factories (closures returning futures), +/// and runs up to `parallel` operations concurrently. As operations complete, new ones are started from the queue, +/// maintaining the concurrency limit. If any operation or queue item returns an error, the function returns early +/// with that error. When all operations and queue items are complete, returns `Ok(())`. +/// +/// # Parameters +/// - `ops_queue`: A stream yielding `Result TFut, TErr>`. Each item is either a closure producing a future, +/// or an error. The stream must be `Unpin`. +/// - `parallel`: The maximum number of operations to run concurrently. Must be non-zero. +/// +/// # Behavior +/// - Operations are scheduled as soon as possible, up to the concurrency limit. +/// - If an error is encountered in the queue or in any operation, the function returns that error immediately. +/// - When the queue is exhausted, waits for all running operations to complete before returning. +/// +/// # Example +/// ```rust +/// use futures::{stream, StreamExt}; +/// use std::num::NonZeroUsize; +/// +/// async fn example() { +/// let ops = vec![ +/// Ok(|| async { Ok(()) }), +/// Ok(|| async { Ok(()) }), +/// ]; +/// let ops_stream = stream::iter(ops); +/// run_all_with_concurrency_limit(ops_stream, NonZeroUsize::new(2).unwrap()).await.unwrap(); +/// } +/// ``` +/// +/// # Errors +/// Returns the first error encountered from the queue or any operation. +/// +/// # Type Parameters +/// - `TFut`: Future type returned by each operation. +/// - `TErr`: Error type for queue or operation failures. async fn run_all_with_concurrency_limit( mut ops_queue: impl Stream Fut, Err>> + Unpin, parallel: NonZero, From e4f2492aa17f847244715667f5038e206d1e3434 Mon Sep 17 00:00:00 2001 From: Jocelyn Date: Mon, 24 Nov 2025 14:39:16 -0500 Subject: [PATCH 09/15] rename tests --- .../azure_storage_blob/src/partitioned_transfer/mod.rs | 6 +++--- .../src/streams/partitioned_stream.rs | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs index 89d7d6bb26..4dcf2c8436 100644 --- a/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs +++ b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs @@ -157,7 +157,7 @@ mod tests { use std::{pin::Pin, sync::mpsc::channel, task::Poll, time::Duration}; #[tokio::test] - async fn limit_ops() -> AzureResult<()> { + async fn enforce_concurrency_limit() -> AzureResult<()> { let parallel = 4usize; let num_ops = parallel + 1; let wait_time_millis = 10u64; @@ -200,7 +200,7 @@ mod tests { } #[tokio::test] - async fn slow_stream() -> AzureResult<()> { + async fn handles_slow_stream() -> AzureResult<()> { let parallel = 10; let num_ops = 5; let op_time_millis = 10; @@ -222,7 +222,7 @@ mod tests { } #[tokio::test] - async fn empty_ops() -> AzureResult<()> { + async fn success_when_no_ops() -> AzureResult<()> { let parallel = 4usize; // not possible to manually type what we need diff --git a/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs b/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs index 1ad3098233..d7c5a1acee 100644 --- a/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs +++ b/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs @@ -98,7 +98,7 @@ mod tests { } #[tokio::test] - async fn partitions_exact_multiple() -> AzureResult<()> { + async fn partitions_exact_len() -> AzureResult<()> { for part_count in [2usize, 3, 11, 16] { for part_len in [1024usize, 1000, 9999, 1] { let data = get_random_data(part_len * part_count); @@ -120,7 +120,7 @@ mod tests { } #[tokio::test] - async fn partitions_leftover_multiple() -> AzureResult<()> { + async fn partitions_with_remainder() -> AzureResult<()> { for part_count in [2usize, 3, 11, 16] { for part_len in [1024usize, 1000, 9999] { for dangling_len in [part_len / 2, 100, 128, 99] { @@ -149,7 +149,7 @@ mod tests { } #[tokio::test] - async fn partitions_exactly_one() -> AzureResult<()> { + async fn exactly_one_partition() -> AzureResult<()> { for len in [1024usize, 1000, 9999, 1] { let data = get_random_data(len); let mut stream = PartitionedStream::new( @@ -166,7 +166,7 @@ mod tests { } #[tokio::test] - async fn partitions_less_than_one() -> AzureResult<()> { + async fn less_than_one_partition() -> AzureResult<()> { let part_len = 99999usize; for len in [1024usize, 1000, 9999, 1] { let data = get_random_data(len); @@ -184,7 +184,7 @@ mod tests { } #[tokio::test] - async fn partitions_none() -> AzureResult<()> { + async fn successful_empty_stream_when_empty_source_stream() -> AzureResult<()> { for part_len in [1024usize, 1000, 9999, 1] { let data = get_random_data(0); let mut stream = PartitionedStream::new( From ec5df02283ff7a76e8dd219816068a305c808e7f Mon Sep 17 00:00:00 2001 From: Jocelyn Date: Mon, 24 Nov 2025 15:29:04 -0500 Subject: [PATCH 10/15] removed doctest generated docs tried to write a doctest for a non-public function. --- .../src/partitioned_transfer/mod.rs | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs index 4dcf2c8436..8580716619 100644 --- a/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs +++ b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs @@ -24,21 +24,6 @@ type AzureResult = azure_core::Result; /// - If an error is encountered in the queue or in any operation, the function returns that error immediately. /// - When the queue is exhausted, waits for all running operations to complete before returning. /// -/// # Example -/// ```rust -/// use futures::{stream, StreamExt}; -/// use std::num::NonZeroUsize; -/// -/// async fn example() { -/// let ops = vec![ -/// Ok(|| async { Ok(()) }), -/// Ok(|| async { Ok(()) }), -/// ]; -/// let ops_stream = stream::iter(ops); -/// run_all_with_concurrency_limit(ops_stream, NonZeroUsize::new(2).unwrap()).await.unwrap(); -/// } -/// ``` -/// /// # Errors /// Returns the first error encountered from the queue or any operation. /// From 1670b1b64a2df0ab047a5999f383397bced48bcd Mon Sep 17 00:00:00 2001 From: Jocelyn Date: Wed, 3 Dec 2025 13:55:58 -0500 Subject: [PATCH 11/15] use BytesMut and pin_project --- Cargo.lock | 1 + sdk/storage/azure_storage_blob/Cargo.toml | 1 + .../src/streams/partitioned_stream.rs | 71 ++++++++++--------- 3 files changed, 41 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 915a96f751..fa7a9e36a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -511,6 +511,7 @@ dependencies = [ "azure_storage_blob_test", "bytes", "futures", + "pin-project", "rand 0.9.2", "serde", "serde_json", diff --git a/sdk/storage/azure_storage_blob/Cargo.toml b/sdk/storage/azure_storage_blob/Cargo.toml index 3646901ba3..10574dad54 100644 --- a/sdk/storage/azure_storage_blob/Cargo.toml +++ b/sdk/storage/azure_storage_blob/Cargo.toml @@ -21,6 +21,7 @@ async-trait.workspace = true azure_core = { workspace = true, features = ["xml"] } bytes.workspace = true futures.workspace = true +pin-project.workspace = true serde.workspace = true serde_json.workspace = true typespec_client_core = { workspace = true, features = ["derive"] } diff --git a/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs b/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs index d7c5a1acee..a67e5c18db 100644 --- a/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs +++ b/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs @@ -1,21 +1,23 @@ +use pin_project::pin_project; use std::{ - mem, + mem::{self, MaybeUninit}, num::NonZero, pin::{pin, Pin}, task::Poll, }; use azure_core::stream::SeekableStream; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use futures::{ready, stream::FusedStream, AsyncRead, Stream}; type AzureResult = azure_core::Result; +#[pin_project] pub(crate) struct PartitionedStream { + #[pin] inner: Box, - buf: Vec, + buf: BytesMut, partition_len: usize, - buf_offset: usize, total_read: usize, inner_complete: bool, } @@ -24,24 +26,13 @@ impl PartitionedStream { pub(crate) fn new(inner: Box, partition_len: NonZero) -> Self { let partition_len = partition_len.get(); Self { - buf: vec![0u8; std::cmp::min(partition_len, inner.len())], + buf: BytesMut::with_capacity(std::cmp::min(partition_len, inner.len())), inner, partition_len, - buf_offset: 0, total_read: 0, inner_complete: false, } } - - fn take(&mut self) -> Vec { - let mut ret = mem::replace( - &mut self.buf, - vec![0u8; std::cmp::min(self.partition_len, self.inner.len() - self.total_read)], - ); - ret.truncate(self.buf_offset); - self.buf_offset = 0; - ret - } } impl Stream for PartitionedStream { @@ -51,27 +42,43 @@ impl Stream for PartitionedStream { self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - let this = self.get_mut(); + let mut this = self.project(); loop { - if this.inner_complete || this.buf_offset >= this.buf.len() { - let ret = this.take(); + if *this.inner_complete || this.buf.len() >= *this.partition_len { + let ret = mem::replace( + this.buf, + BytesMut::with_capacity(std::cmp::min( + *this.partition_len, + this.inner.len() - *this.total_read, + )), + ); return if ret.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(Bytes::from(ret)))) + Poll::Ready(Some(Ok(ret.freeze()))) }; - } else { - match ready!(pin!(&mut this.inner).poll_read(cx, &mut this.buf[this.buf_offset..])) - { - Ok(bytes_read) => { - this.buf_offset += bytes_read; - this.total_read += bytes_read; - this.inner_complete = bytes_read == 0; - } - Err(e) => { - return Poll::Ready(Some(Err(e.into()))); - } + } + let read_buffer; + unsafe { + // original slice comes from the known remaining capacity of BytesMut. + // Those bytes are valid reserved memory but have had no values written + // to them. Those are the exact bytes we want to read into. + read_buffer = mem::transmute::<&mut [MaybeUninit], &mut [u8]>( + this.buf.spare_capacity_mut(), + ); + } + match ready!(this.inner.as_mut().poll_read(cx, read_buffer)) { + Ok(bytes_read) => { + // poll_read() wrote these bytes_read-many bytes into + // the spare capacity, so we can mark those new bytes + // as part of the length + unsafe { this.buf.set_len(this.buf.len() + bytes_read) }; + *this.total_read += bytes_read; + *this.inner_complete = bytes_read == 0; + } + Err(e) => { + return Poll::Ready(Some(Err(e.into()))); } } } @@ -159,7 +166,7 @@ mod tests { let single_partition = stream.try_next().await?.unwrap(); - assert!(stream.try_next().await?.is_none()); + assert_eq!(stream.try_next().await?, None); assert_eq!(single_partition[..], data[..]); } Ok(()) From ffa6131d98ad44483b65bd2aeeda34a7353075fb Mon Sep 17 00:00:00 2001 From: Jocelyn Date: Wed, 3 Dec 2025 14:14:29 -0500 Subject: [PATCH 12/15] restrict unsafe scope --- .../src/streams/partitioned_stream.rs | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs b/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs index a67e5c18db..8bab591a5a 100644 --- a/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs +++ b/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs @@ -59,20 +59,18 @@ impl Stream for PartitionedStream { Poll::Ready(Some(Ok(ret.freeze()))) }; } - let read_buffer; - unsafe { - // original slice comes from the known remaining capacity of BytesMut. + match ready!(this.inner.as_mut().poll_read(cx, unsafe { + // spare_capacity_mut() gives us the known remaining capacity of BytesMut. // Those bytes are valid reserved memory but have had no values written - // to them. Those are the exact bytes we want to read into. - read_buffer = mem::transmute::<&mut [MaybeUninit], &mut [u8]>( - this.buf.spare_capacity_mut(), - ); - } - match ready!(this.inner.as_mut().poll_read(cx, read_buffer)) { + // to them. Those are the exact bytes we want to write into. + // This transmuted data is not saved to a variable, leaving it inaccessible + // to anything but poll_read(). + mem::transmute::<&mut [MaybeUninit], &mut [u8]>(this.buf.spare_capacity_mut()) + })) { Ok(bytes_read) => { - // poll_read() wrote these bytes_read-many bytes into - // the spare capacity, so we can mark those new bytes - // as part of the length + // poll_read() wrote bytes_read-many bytes into the spare capacity. + // those values are therefore initialized and we can add them to + // the existing buffer length unsafe { this.buf.set_len(this.buf.len() + bytes_read) }; *this.total_read += bytes_read; *this.inner_complete = bytes_read == 0; From 4a2143e5da60b0a4518705fd6c7f114176aa0ac3 Mon Sep 17 00:00:00 2001 From: Jocelyn Date: Wed, 3 Dec 2025 16:08:20 -0500 Subject: [PATCH 13/15] unused macro --- .../azure_storage_blob/src/streams/partitioned_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs b/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs index 8bab591a5a..55fffac3df 100644 --- a/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs +++ b/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs @@ -2,7 +2,7 @@ use pin_project::pin_project; use std::{ mem::{self, MaybeUninit}, num::NonZero, - pin::{pin, Pin}, + pin::Pin, task::Poll, }; From 1d4e817d1c1f99cc489d2b3069ad7acb2d413763 Mon Sep 17 00:00:00 2001 From: Jocelyn Date: Wed, 3 Dec 2025 16:50:32 -0500 Subject: [PATCH 14/15] add type from rust std to dictionary --- eng/dict/rust-custom.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/eng/dict/rust-custom.txt b/eng/dict/rust-custom.txt index 174812db94..0e096afd20 100644 --- a/eng/dict/rust-custom.txt +++ b/eng/dict/rust-custom.txt @@ -12,6 +12,7 @@ rustflags rustls rustsec turbofish +uninit dylib cdylib staticlib From cc031968f9f1aae9395adaadc0710be918d44f07 Mon Sep 17 00:00:00 2001 From: Jocelyn Date: Fri, 5 Dec 2025 14:43:32 -0500 Subject: [PATCH 15/15] transmute -> slice::from_raw_parts --- .../src/streams/partitioned_stream.rs | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs b/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs index 55fffac3df..8f7fbc7488 100644 --- a/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs +++ b/sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs @@ -1,10 +1,5 @@ use pin_project::pin_project; -use std::{ - mem::{self, MaybeUninit}, - num::NonZero, - pin::Pin, - task::Poll, -}; +use std::{mem, num::NonZero, pin::Pin, slice, task::Poll}; use azure_core::stream::SeekableStream; use bytes::{Bytes, BytesMut}; @@ -59,14 +54,21 @@ impl Stream for PartitionedStream { Poll::Ready(Some(Ok(ret.freeze()))) }; } - match ready!(this.inner.as_mut().poll_read(cx, unsafe { + + let spare_capacity = this.buf.spare_capacity_mut(); + let spare_capacity = unsafe { // spare_capacity_mut() gives us the known remaining capacity of BytesMut. // Those bytes are valid reserved memory but have had no values written // to them. Those are the exact bytes we want to write into. - // This transmuted data is not saved to a variable, leaving it inaccessible - // to anything but poll_read(). - mem::transmute::<&mut [MaybeUninit], &mut [u8]>(this.buf.spare_capacity_mut()) - })) { + // MaybeUninit can be safely cast into u8, and so this pointer cast + // is safe. Since the spare capacity length is safely known, we can + // provide those to from_raw_parts without worry. + slice::from_raw_parts_mut( + spare_capacity.as_mut_ptr() as *mut u8, + spare_capacity.len(), + ) + }; + match ready!(this.inner.as_mut().poll_read(cx, spare_capacity)) { Ok(bytes_read) => { // poll_read() wrote bytes_read-many bytes into the spare capacity. // those values are therefore initialized and we can add them to