Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions score/mw/com/impl/rust/com-api/com-api-concept/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ rust_library(
],
deps = [
"@score_baselibs_rust//src/containers",
"@score_communication_crate_index//:futures",
"@score_communication_crate_index//:thiserror",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use containers::fixed_capacity::FixedCapacityQueue;
use core::fmt::Debug;
use core::future::Future;
use core::ops::{Deref, DerefMut};
use futures::stream::Stream;
use std::path::Path;

/// Result type alias with `std::result::Result` using `com_api::Error` as error type
Expand Down Expand Up @@ -850,6 +851,39 @@ pub trait Subscription<T: CommData + Debug, R: Runtime + ?Sized> {
new_samples: usize,
max_samples: usize,
) -> impl Future<Output = Result<SampleContainer<Self::Sample<'a>>>> + 'a;

/// Returns a stream that continuously yields `SampleContainer` whenever at least `new_samples` are available.
///
/// The stream creates and maintains an internal `SampleContainer` buffer during its lifetime,
/// automatically collecting samples from the communication buffer. The buffer is created when
/// the stream is created and persists until the subscription is dropped.
///
/// **Polling Behavior:**
/// When user poll the stream, it checks if the internal buffer contains at least `1` sample.
/// If yes, it yields those samples to user. If not, it returns pending and waits for samples.
///
/// **Buffer Management:**
/// The internal buffer has a maximum capacity of `max_samples`. When the buffer is full and
/// new samples arrive, the oldest samples are automatically dropped to make room. This allows
/// continuous sample flow without requiring repeated `receive()` calls.
///
/// The stream manages its own internal `SampleContainer`, so it does not take one as a parameter.
/// Instead, it yields the updated container each time you poll for new samples.
///
/// # Parameters
/// * `max_samples` - Maximum capacity of the internal buffer
///
/// # Returns
/// A stream that yields `Sample` with at least `1` event each poll
///
/// # Errors
/// Returns an error if a problem occurs during sample reception
fn to_stream<'a>(
//TODO: We may take self by value and consume it in the stream and can be recover before dropping the stream.
//Based on implementation complexity, we can decide whether to take self by value or by reference.
&'a self,
max_samples: usize,
) -> impl Stream<Item = Result<Self::Sample<'a>>> + 'a;
}

/// A trait for types that can be default-constructed in place, skipping intermediate moves.
Expand Down
10 changes: 10 additions & 0 deletions score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use core::mem::ManuallyDrop;
use core::ops::{Deref, DerefMut};
use core::panic;
use core::ptr::NonNull;
use futures::stream::{self, Stream};
use futures::task::{AtomicWaker, Context, Poll};
use std::cmp::Ordering;
use std::pin::Pin;
Expand Down Expand Up @@ -574,6 +575,15 @@ where
.await
}
}

#[allow(clippy::manual_async_fn)]
fn to_stream<'a>(
&'a self,
_max_samples: usize,
) -> impl Stream<Item = Result<Self::Sample<'a>>> + 'a {
//TODO: Implementation of to_stream for LoLa will be be done once API approved.
stream::empty()
}
}

// The ReceiveFuture struct encapsulates the state and logic for asynchronously receiving samples
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use core::marker::PhantomData;
use core::mem::MaybeUninit;
use core::ops::{Deref, DerefMut};
use core::sync::atomic::AtomicUsize;
use futures::stream::{self, Stream};
use std::collections::VecDeque;
use std::path::Path;

Expand Down Expand Up @@ -336,6 +337,14 @@ where
) -> impl Future<Output = Result<SampleContainer<Self::Sample<'a>>>> + 'a {
async { todo!() }
}

#[allow(clippy::manual_async_fn)]
fn to_stream<'a>(
&'a self,
_max_samples: usize,
) -> impl Stream<Item = Result<Self::Sample<'a>>> + 'a {
stream::empty()
}
}

pub struct Publisher<T> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
@startuml com_api_stream_api_lola_runtime

' Copyright (c) 2026 Contributors to the Eclipse Foundation
'
' See the NOTICE file(s) distributed with this work for additional
' information regarding copyright ownership.
'
' This program and the accompanying materials are made available under the
' terms of the Apache License Version 2.0 which is available at
' <https://www.apache.org/licenses/LICENSE-2.0>
'
' SPDX-License-Identifier: Apache-2.0

title Async Stream API in Lola Runtime

actor User
participant "Subscription" as Subscription
participant "SampleContainer" as SampleContainer
participant "Lola" as Lola

User -> Subscription: stream(new_samples, max_samples)
Subscription --> SampleContainer: Create with max_samples capacity

Note over Lola: Backend asynchronously\npushes samples in the SampleContainer

loop User polls the stream
User -> Subscription: poll_next().await
activate Subscription
Subscription -> SampleContainer: Check samples >= 1

alt Samples available
Subscription --> User: Ready(Samples)
else Waiting for Samples
Subscription --> User: Pending
end
deactivate Subscription
end

Note over SampleContainer: Oldest samples dropped\nwhen buffer full

@enduml