From 06286180b34631752a84facd871164627d997254 Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Wed, 15 Apr 2026 10:14:00 +0200 Subject: [PATCH] Rust::com Async Stream Receive API proposal * Proposal for async stream receive api with basic sequence diagram --- .../impl/rust/com-api/com-api-concept/BUILD | 1 + .../com-api-concept/com_api_concept.rs | 34 +++++++++++++++ .../com-api/com-api-runtime-lola/consumer.rs | 10 +++++ .../com-api/com-api-runtime-mock/runtime.rs | 9 ++++ ...com_api_async_stream_api_lola_runtime.puml | 41 +++++++++++++++++++ 5 files changed, 95 insertions(+) create mode 100644 score/mw/com/impl/rust/doc/sequence_diagram/com_api_async_stream_api_lola_runtime.puml diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/BUILD b/score/mw/com/impl/rust/com-api/com-api-concept/BUILD index 76f9b0d18..5a468415e 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/BUILD +++ b/score/mw/com/impl/rust/com-api/com-api-concept/BUILD @@ -31,6 +31,7 @@ rust_library( ], deps = [ "@score_baselibs_rust//src/containers", + "@score_communication_crate_index//:futures", "@score_communication_crate_index//:thiserror", ], ) diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index ac1862c46..f9aee5021 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -55,6 +55,7 @@ use containers::fixed_capacity::FixedCapacityQueue; use core::fmt::Debug; use core::future::Future; use core::ops::{Deref, DerefMut}; +use futures::stream::Stream; use std::path::Path; /// Result type alias with `std::result::Result` using `com_api::Error` as error type @@ -850,6 +851,39 @@ pub trait Subscription { new_samples: usize, max_samples: usize, ) -> impl Future>>> + 'a; + + /// Returns a stream that continuously yields `SampleContainer` whenever at least `new_samples` are available. + /// + /// The stream creates and maintains an internal `SampleContainer` buffer during its lifetime, + /// automatically collecting samples from the communication buffer. The buffer is created when + /// the stream is created and persists until the subscription is dropped. + /// + /// **Polling Behavior:** + /// When user poll the stream, it checks if the internal buffer contains at least `1` sample. + /// If yes, it yields those samples to user. If not, it returns pending and waits for samples. + /// + /// **Buffer Management:** + /// The internal buffer has a maximum capacity of `max_samples`. When the buffer is full and + /// new samples arrive, the oldest samples are automatically dropped to make room. This allows + /// continuous sample flow without requiring repeated `receive()` calls. + /// + /// The stream manages its own internal `SampleContainer`, so it does not take one as a parameter. + /// Instead, it yields the updated container each time you poll for new samples. + /// + /// # Parameters + /// * `max_samples` - Maximum capacity of the internal buffer + /// + /// # Returns + /// A stream that yields `Sample` with at least `1` event each poll + /// + /// # Errors + /// Returns an error if a problem occurs during sample reception + fn to_stream<'a>( + //TODO: We may take self by value and consume it in the stream and can be recover before dropping the stream. + //Based on implementation complexity, we can decide whether to take self by value or by reference. + &'a self, + max_samples: usize, + ) -> impl Stream>> + 'a; } /// A trait for types that can be default-constructed in place, skipping intermediate moves. diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index 73c8f90f5..acd86e6b0 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -36,6 +36,7 @@ use core::mem::ManuallyDrop; use core::ops::{Deref, DerefMut}; use core::panic; use core::ptr::NonNull; +use futures::stream::{self, Stream}; use futures::task::{AtomicWaker, Context, Poll}; use std::cmp::Ordering; use std::pin::Pin; @@ -574,6 +575,15 @@ where .await } } + + #[allow(clippy::manual_async_fn)] + fn to_stream<'a>( + &'a self, + _max_samples: usize, + ) -> impl Stream>> + 'a { + //TODO: Implementation of to_stream for LoLa will be be done once API approved. + stream::empty() + } } // The ReceiveFuture struct encapsulates the state and logic for asynchronously receiving samples diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs index d099256f3..07f44c6c9 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs @@ -31,6 +31,7 @@ use core::marker::PhantomData; use core::mem::MaybeUninit; use core::ops::{Deref, DerefMut}; use core::sync::atomic::AtomicUsize; +use futures::stream::{self, Stream}; use std::collections::VecDeque; use std::path::Path; @@ -336,6 +337,14 @@ where ) -> impl Future>>> + 'a { async { todo!() } } + + #[allow(clippy::manual_async_fn)] + fn to_stream<'a>( + &'a self, + _max_samples: usize, + ) -> impl Stream>> + 'a { + stream::empty() + } } pub struct Publisher { diff --git a/score/mw/com/impl/rust/doc/sequence_diagram/com_api_async_stream_api_lola_runtime.puml b/score/mw/com/impl/rust/doc/sequence_diagram/com_api_async_stream_api_lola_runtime.puml new file mode 100644 index 000000000..c3f84134a --- /dev/null +++ b/score/mw/com/impl/rust/doc/sequence_diagram/com_api_async_stream_api_lola_runtime.puml @@ -0,0 +1,41 @@ +@startuml com_api_stream_api_lola_runtime + +' Copyright (c) 2026 Contributors to the Eclipse Foundation +' +' See the NOTICE file(s) distributed with this work for additional +' information regarding copyright ownership. +' +' This program and the accompanying materials are made available under the +' terms of the Apache License Version 2.0 which is available at +' +' +' SPDX-License-Identifier: Apache-2.0 + +title Async Stream API in Lola Runtime + +actor User +participant "Subscription" as Subscription +participant "SampleContainer" as SampleContainer +participant "Lola" as Lola + +User -> Subscription: stream(new_samples, max_samples) +Subscription --> SampleContainer: Create with max_samples capacity + +Note over Lola: Backend asynchronously\npushes samples in the SampleContainer + +loop User polls the stream + User -> Subscription: poll_next().await + activate Subscription + Subscription -> SampleContainer: Check samples >= 1 + + alt Samples available + Subscription --> User: Ready(Samples) + else Waiting for Samples + Subscription --> User: Pending + end + deactivate Subscription +end + +Note over SampleContainer: Oldest samples dropped\nwhen buffer full + +@enduml