Skip to content

Rust::com Async Stream Receive API#349

Open
bharatGoswami8 wants to merge 1 commit intoeclipse-score:mainfrom
bharatGoswami8:async_stream_api
Open

Rust::com Async Stream Receive API#349
bharatGoswami8 wants to merge 1 commit intoeclipse-score:mainfrom
bharatGoswami8:async_stream_api

Conversation

@bharatGoswami8
Copy link
Copy Markdown
Contributor

@bharatGoswami8 bharatGoswami8 commented Apr 23, 2026

@bharatGoswami8 bharatGoswami8 self-assigned this Apr 23, 2026
@bharatGoswami8 bharatGoswami8 added enhancement New feature or request rust-api and removed enhancement New feature or request labels Apr 23, 2026
* Proposal for async stream receive api with basic sequence diagram
@bharatGoswami8 bharatGoswami8 changed the title Rust::com Async Stream Receive API proposal Rust::com Async Stream Receive API Apr 23, 2026
@bharatGoswami8
Copy link
Copy Markdown
Contributor Author

Hi @artemsheinacn
We have created Proposal for Stream API, please share your opinion on this.

@artemsheinacn
Copy link
Copy Markdown
Contributor

Hi @bharatGoswami8, there is no way to detect if events were dropped when the last receive call yielded, right? This kind of behavior is masking potential issue when consumer is not fast enough to go through all the events. It may be out of scope of events API, but it's what I need in FEO signalling as we can't just dismiss and drop the signals.

@bharatGoswami8
Copy link
Copy Markdown
Contributor Author

bharatGoswami8 commented Apr 28, 2026

Hi @bharatGoswami8, there is no way to detect if events were dropped when the last receive call yielded, right? This kind of behavior is masking potential issue when consumer is not fast enough to go through all the events. It may be out of scope of events API, but it's what I need in FEO signalling as we can't just dismiss and drop the signals.

@artemsheinacn , Yes, we don't have any way to detect if sample is dropped before consumer read, but in that case what i think consumer app should have a mechanism to read the sample as soon as event received and for that may be read in separate thread and maintain a large queue so no processing load to read thread.

@artemsheinacn
Copy link
Copy Markdown
Contributor

@bharatGoswami8 But why would I implement it using an additional thread and more memory consumption if I can just keep using my implementation of streaming API which provides proper error-state detection?

@bharatGoswami8
Copy link
Copy Markdown
Contributor Author

But why would I implement it using an additional thread and more memory consumption if I can just keep using my implementation of streaming API which provides proper error-state detection?

@artemsheinacn , let's discussion this at #371

@artemsheinacn
Copy link
Copy Markdown
Contributor

So, in short this API is not compatible with the FEO signalling implementation. In FEO events are not discardable and with the API from this PR we can't detect if an event was discarded which is an error-state and should lead to panic.
I would propose to define a strategy parameter to be able to switch between different behaviors concerning sample container overflow, for example like so:

enum OverflowStrategy {
   Replace, // drop the oldest events to free space for the new ones
   Panic, // terminate
   Error, // emit an error
   Resize, // allocate and resize to fit
   ...
}

fn to_stream<'a>(
        &'a self,
        max_samples: usize,
        container_overflow_strategy: OverflowStrategy,
    ) -> impl Stream<Item = Result<Self::Sample<'a>>> + 'a;

@bharatGoswami8
Copy link
Copy Markdown
Contributor Author

bharatGoswami8 commented Apr 29, 2026

So, in short this API is not compatible with the FEO signalling implementation. In FEO events are not discardable and with the API from this PR we can't detect if an event was discarded which is an error-state and should lead to panic. I would propose to define a strategy parameter to be able to switch between different behaviors concerning sample container overflow, for example like so:

enum OverflowStrategy {
   Replace, // drop the oldest events to free space for the new ones
   Panic, // terminate
   Error, // emit an error
   Resize, // allocate and resize to fit
   ...
}

fn to_stream<'a>(
        &'a self,
        max_samples: usize,
        container_overflow_strategy: OverflowStrategy,
    ) -> impl Stream<Item = Result<Self::Sample<'a>>> + 'a;

We can have control parameter with default Replace but I’m concerned that runtime resizing might not be the best approach.

@darkwisebear , @pawelrutkaq , your opinion on this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants