Skip to content

feat(grpc): add server-side stream validators#2602

Open
sauravzg wants to merge 2 commits into
masterfrom
sauravz/server-stream-validators
Open

feat(grpc): add server-side stream validators#2602
sauravzg wants to merge 2 commits into
masterfrom
sauravz/server-stream-validators

Conversation

@sauravzg
Copy link
Copy Markdown
Contributor

Introduces ServerSendStreamValidator and ServerRecvStreamValidator to enforce strict gRPC semantics on server streams. These are intended to wrap raw transport streams before passing them to application-level handlers, ensuring that user-provided service logic cannot violate protocol rules.

  • ServerSendStreamValidator ensures proper response sequencing (Headers -> Messages -> Trailers) and prevents invalid state transitions, while fully supporting trailers-only responses.
  • ServerRecvStreamValidator safely manages terminal states, ensuring that any subsequent polls after stream completion consistently return an error to prevent undefined behavior.
  • Adds comprehensive unit tests to verify state machine correctness and protocol compliance.

This is recreation of #2595 to allow using a stacked PR workflow

@sauravzg sauravzg requested a review from dfawley April 23, 2026 18:21
@sauravzg sauravzg force-pushed the sauravz/handle-trailers branch from 507787e to e2f4bd0 Compare April 23, 2026 18:29
@sauravzg sauravzg force-pushed the sauravz/server-stream-validators branch 2 times, most recently from c5c0e67 to 038f365 Compare April 23, 2026 18:52
@sauravzg sauravzg force-pushed the sauravz/handle-trailers branch from e2f4bd0 to 90ae261 Compare April 27, 2026 15:40
@sauravzg sauravzg force-pushed the sauravz/server-stream-validators branch from 038f365 to 89b8525 Compare April 27, 2026 15:40
Base automatically changed from sauravz/handle-trailers to master April 29, 2026 12:02
@sauravzg sauravzg force-pushed the sauravz/server-stream-validators branch from 89b8525 to 355c9cd Compare April 29, 2026 12:04
Copy link
Copy Markdown
Member

@dfawley dfawley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to figure out how exactly this is going to be used.

Most likely it will be a server-side interceptor? In which case, when an error occurs, I would think we'd want to terminate the RPC with a status that indicates the cause of the problem.

Comment thread grpc/src/server/stream_util.rs Outdated
@sauravzg sauravzg force-pushed the sauravz/server-stream-validators branch from 355c9cd to de7ee29 Compare April 30, 2026 18:34
@sauravzg
Copy link
Copy Markdown
Contributor Author

I'm trying to figure out how exactly this is going to be used.

Most likely it will be a server-side interceptor? In which case, when an error occurs, I would think we'd want to terminate the RPC with a status that indicates the cause of the problem.

Yes on the server side interceptor.

RecvStreamVal.. is somewhat straight forward. This'll be used closer to the protobuf API to ensure that we transition our generic stream request receiver to the protobuf API stream object.

SendStreamVal.. is closer to the transport to validate the outgoing data after all other interceptors.

I somewhat agree with the need for status. We need to update the SendStream API on the server side to return status. This wasn't a problem previously because Trailers were a part of the SendStream and would be used to send trailers and terminate the API.

Now that we are returning trailers instead of sending them, we need more details from SendStream to propagate to trailers . Does it sound okay to add Status to the SendStream API itself, because we initially had some reservations on it , but I guess the situation has changed.

@sauravzg sauravzg removed their assignment Apr 30, 2026
@sauravzg sauravzg requested a review from dfawley April 30, 2026 18:55
@dfawley
Copy link
Copy Markdown
Member

dfawley commented Apr 30, 2026

I somewhat agree with the need for status. We need to update the SendStream API on the server side to return status. This wasn't a problem previously because Trailers were a part of the SendStream and would be used to send trailers and terminate the API.

I was expecting we'd just need an interceptor that can terminate the RPC underneath the handler when either the SendStream or RecvStream has any problem. I.e. we can't do stream validation via something that simply wraps each stream independently.

@sauravzg
Copy link
Copy Markdown
Contributor Author

sauravzg commented May 4, 2026

I somewhat agree with the need for status. We need to update the SendStream API on the server side to return status. This wasn't a problem previously because Trailers were a part of the SendStream and would be used to send trailers and terminate the API.

I was expecting we'd just need an interceptor that can terminate the RPC underneath the handler when either the SendStream or RecvStream has any problem. I.e. we can't do stream validation via something that simply wraps each stream independently.

I mean we are definitely gonna need an interceptor for this. But I was planning to separate it into two responsibilities , one for lifecycle for stream and another for lifecycle of handler.

Something along the lines of RecvStreamValidator and moving the logic of stream termination closer to the protobuf API.
and then
SendStreamValidator and moving the logic of handling errors from the SendStream closer to the transport to handle error from interceptors.

But we might need some status on the SendStream now, because if sendstream terminates, we'll need to return Trailers. I could either terminate the stream with some arbitrary Internal stream broken error, or we could try and "safely" propagate the error to trailers?

@dfawley
Copy link
Copy Markdown
Member

dfawley commented May 7, 2026

But I was planning to separate it into two responsibilities , one for lifecycle for stream and another for lifecycle of handler.

Sure but they will need to be linked, so it would make more sense to me to make them all together.

Something along the lines of RecvStreamValidator and moving the logic of stream termination closer to the protobuf API.
and then SendStreamValidator and moving the logic of handling errors from the SendStream closer to the transport to handle error from interceptors.

So you're thinking two different interceptors might be needed?

or we could try and "safely" propagate the error to trailers?

Yes we should make sure the RPC error includes an explanation of the protocol violation. I expect the interceptor would basically select on the handler future and a receive from a oneshot channel that the validator writes to, and then terminates with whichever status comes first, dropping the other future.

@sauravzg
Copy link
Copy Markdown
Contributor Author

sauravzg commented May 7, 2026

So, we discussed offline. It'd make sense for both stream and handler validation to be a part of the same PR.

I initially thought we'd need different handlers, but based on my progress so far, I believe a single one suffices.

Yes we should make sure the RPC error includes an explanation of the protocol violation. I expect the interceptor would basically select on the handler future and a receive from a oneshot channel that the validator writes to, and then terminates with whichever status comes first, dropping the other future.

Since , we don't expose error status from sendstream or recvstream right now, we cannot propagate anything beyond an arbitrary "internal error: stream broken" . Would you want to tweak our stream APIs to be able to propagate errors Err(Status) or Err(String) instead of Err()

@sauravzg sauravzg force-pushed the sauravz/server-stream-validators branch from de7ee29 to a2a44a2 Compare May 8, 2026 16:28
@sauravzg sauravzg changed the base branch from master to sauravz/server-interceptor-api May 8, 2026 16:29
@sauravzg
Copy link
Copy Markdown
Contributor Author

sauravzg commented May 8, 2026

The validation implementation has been updated to be interceptor based and the stream validation utilities are now private.

There's still a question about if we should introduce "status" or "error messages" into the sendstream and recvstream to be propagated to trailer or will an arbitrary "internal error" suffice.

Also, this now is stacked on top of #2634

@sauravzg sauravzg force-pushed the sauravz/server-stream-validators branch from a2a44a2 to e930953 Compare May 14, 2026 16:08
@sauravzg sauravzg force-pushed the sauravz/server-interceptor-api branch from 56323f1 to 51903a9 Compare May 14, 2026 16:08
Comment thread grpc/src/server/handler_validation.rs Outdated
Comment thread grpc/src/server/handler_validation.rs Outdated

struct ChannelAwareSendStreamValidator<S> {
inner: ServerSendStreamValidator<S>,
error_tx: tokio::sync::mpsc::Sender<()>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would an Option<Oneshot> be preferable here? It might make the access simpler, but I'm not sure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is mpsc because the sender is shared between send and recv streams. So, this is subject to the conversation below, if we can eliminate Chann...Recv...Valida...

}
}

struct ChannelAwareRecvStreamValidator<R> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need this? It seems OK if the application keeps receiving after getting an error. We want our wrapper to avoid propagating that read down further, since it would be illegal-per-the-API to do so, but I don't think that needs to error the whole call?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand the reason behind the asymmetry in how we treat recvstream errors vs sendstream errors?

Probably worth noting that this could happen on interceptor side , in which case maybe we want to send out an error.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the sending side, if the server application attempts to violate the gRPC protocol by sending the wrong data on the stream (e.g. transmit headers after messages), the stream is corrupted and we can't reasonably proceed. We should end the stream with an error.

On the receiving side, if the server application requests data after we told it there was no more data....well that's wrong, but it doesn't actually cause any problems. The overall call can still function just fine. We just keep telling it "no, you have no more data" over and over if they ask.

So I believe this situation is fine. But if you have reasons why it's more dangerous than that then I'm happy to discuss further.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about other types of error, beyond polling the empty stream again? What if an interceptor sends an error, which doesn't necessarily mean end of stream? I assume we want to terminate the call on this one with some form of internal error instead of letting the application continue.

This'd imply we don't need the validator, but would still the channel error signal which is able to distinguish between repolling and an interceptor error?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think it's not clear to me what the role of this interceptor is. I was assuming it was validating the application's use of the stream. But maybe it's intended for something else? On the client side, it's mainly there to assert the generated code's assumptions in case an interceptor below it did something wrong.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The application's interceptors will wrap this interceptor's send & recv streams before calling the final handler.

That means we can't observe any errors they introduce between them and the application; we can only see their usage of this interceptor's streams. I.e. if they return an error from their recv back to the application after calling our recv, we have no visibility into that.

Aside: you're right those should be the most common patterns, but I could see this kind of thing happening, too:

fn recv(..) -> .. {
  let my_message = wrap(x);
  inner.recv(&my_message)
}

What situation are you still worried about?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We have zero visibility to what they do beyond streams. So, I want to understand that's the expected behavior when an interceptor does

fn recv(x) {
  if num_messages> 5:
     return Err()
  inner.recv(x);
}

Are we supposed to treat it like a regular recvstream failure and propagate the error, or are we supposed to close the handler with internal stream error of some sort?

I am leaning towards the latter.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I want to understand that's the expected behavior when an interceptor does

If this interceptor is inside our channel at the boundary, we don't ever see that error anyway. So we don't need to think about it for this.

If their interceptor is doing that, we'd normally expect that interceptor to also terminate the handler future ASAP and return an error from the intercept method.

The handler itself should also see that error and know the RPC was cancelled or errored, and return from their handler ASAP, but they can't return a very useful status based on this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, you are right. Roughly putting this closer to the transport and expecting to prevent interceptor doing bad things from there is essentially lost cause.

So, I've actually been thinking of the behavior that we want for the last couple of days for corner cases.

  1. Should recvs and send side interceptors receive some guarantees on the state machine from the transport or the application? Likely no, since this seems like a lost cause since a buggy interceptor can create out of order messages for downstream interceptors?
  2. Should the transport be protected from bad sendstream from interceptors? Likely yes, because while hyper provides some safety, it doesn't guarantee the full grpc spec safety.
  3. Should the application handler be protected from bad recvstream from interceptors? Likely yes, this'll probably be somewhere in codegen?
  4. What's the behavior we want on some interceptor's individual recvstream message failure(think throttling interceptor mid streaming request?) to the application, should this end up cancelling the stream or should the interceptor be responsible for handling that and grpc just propagates the error to the application handler? leaning towards, making interceptor's problem.
  5. What's the behavior we want on some interceptor's individual sendstream message failure(think throttling and not a protocol ordering violation ), does this drop handler and cancel stream?

Here's what I am currently leaning towards

  • Some validation on the codegen side for recvstream. Mostly focusing on ensuring terminal states are terminal. That ensures that the API doesn't allow.
  • Some validation on the codegen side for sendstream. This will be focussed on error handling from interceptors, all errors are fallible and terminate the handler with reset stream.
  • Some validation closer to the transport side for sendstream (roughly this PR). This ensure that we send valid grpc to transport.

Does that sound good ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Some validation on the codegen side for recvstream. Mostly focusing on ensuring terminal states are terminal. That ensures that the API doesn't allow.

On the server the recvstream is simple. There are two aspects of it:

  1. No more messages received after end. I actually don't care too much about this. The application/etc should not be reading anymore once they get None. If they do and there is somehow more data there, then "oh well"? I don't think we should worry about this at all.

  2. Unary request vs. streaming request. This needs to be enforced. The generated code should be validating this -- a unary request handler (which includes server-streaming RPCs) should read from the stream twice and confirm the second read returns None. If multiple messages are received, that's an error - the handler should never be called and it should immediately return a cardinality violation error (UNIMPLEMENTED per https://github.com/grpc/grpc/blob/master/doc/statuscodes.md) that the client application will see. This behavior probably doesn't need to be in an interceptor, but if it works out more nicely that way then I'm fine with it.

On the sendstream, it's a little more complex, since it must begin with headers, then only messages (any number):

  • Some validation on the codegen side for sendstream. This will be focussed on error handling from interceptors, all errors are fallible and terminate the handler with reset stream.

The codegen doesn't need to care about any of this, since it never gives the stream out directly -- it only calls the proto-based handler which is type-constrained to send the right number of messages and no headers/trailers. This ensures response stream cardinality without needing a validator.

Some validation closer to the transport side for sendstream (roughly this PR). This ensure that we send valid grpc to transport.

The channel needs to enforce the application (interceptors+handler) is following the gRPC protocol. So the channel needs what you have here for the send side. If the application violates the protocol, this interceptor needs to terminate the RPC with INTERNAL. (So: what you said SGTM.)

tokio::select! {
res = next.handle(headers, options, &mut wrapped_tx, wrapped_rx) => {
if error_rx.try_recv().is_ok() {
Trailers::new(Err(StatusError::new(StatusCodeError::Internal, "Stream validation error")))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the one hand, it's unfortunate that we aren't being any more specific than this. "Server attempted to send multiple headers / messages before headers / etc"

On the other hand, maybe it's for the best, since the client maybe shouldn't get details about server bugs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current API that's likely the best we can do. The issue may exaggerate when dealing with interceptors because there would be no way to propagate error from an interceptor's stream which may be a painful debugging experience.

But maybe the right answer is that server interceptor implementations can log errors when as needed, and we treat all of it as internal error with a generic error message to the client of some form.

@dfawley dfawley added this to the grpc-next milestone May 14, 2026
sauravzg added 2 commits May 15, 2026 12:37
Enforces strict gRPC stream state transitions and sequence validation on
the server side to prevent malformed responses and improper polling.

Key Changes:

- **Send Stream Validation (`ServerSendStreamValidator`)**: Tracks state
  transitions (`Init` -> `HeadersSent` -> `MessagesSent` -> `Done`) to
  guarantee headers are sent exactly once and always precede messages,
  rejecting invalid operations.
- **Receive Stream Validation (`ServerRecvStreamValidator`)**: Prevents
  erroneous polling by returning stable errors once the receive stream
  reaches EOF or a terminal error state.
- **Preemptive Error Interception (`StreamValidationInterceptor`)**:
  Wraps streams with channel-aware validators to immediately preempt
  handler execution via `tokio::select!` upon detecting any protocol
  violation or underlying transport error, automatically returning an
  `Internal` status.
@sauravzg sauravzg force-pushed the sauravz/server-interceptor-api branch from 51903a9 to e80434b Compare May 15, 2026 12:43
@sauravzg sauravzg force-pushed the sauravz/server-stream-validators branch from e930953 to 986af3c Compare May 15, 2026 12:43
@sauravzg sauravzg requested a review from dfawley May 15, 2026 13:10
@sauravzg sauravzg force-pushed the sauravz/server-interceptor-api branch from e80434b to a00b4b0 Compare May 18, 2026 10:49
Base automatically changed from sauravz/server-interceptor-api to master May 18, 2026 11:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants