@@ -669,8 +669,9 @@ async fn stream_anthropic_sse(
669669 // subsequent `data:` line. Some gateways omit the `"type"` field from
670670 // the JSON payload, so we fall back to the SSE event name.
671671 let mut current_event = String :: new ( ) ;
672+ let mut message_stop_received = false ;
672673
673- loop {
674+ ' outer : loop {
674675 tokio:: select! {
675676 _ = cancel_token. cancelled( ) => {
676677 return Err ( AppError :: Cancelled ) ;
@@ -688,7 +689,8 @@ async fn stream_anthropic_sse(
688689 }
689690
690691 if parse_anthropic_sse_line( & line, & mut current_event, on_token, & mut output) ? {
691- return Ok ( output) ;
692+ message_stop_received = true ;
693+ break ' outer;
692694 }
693695 }
694696 }
@@ -699,6 +701,12 @@ async fn stream_anthropic_sse(
699701 }
700702 }
701703
704+ if !message_stop_received {
705+ return Err ( AppError :: Http (
706+ "Stream ended without completion signal — connection may have been interrupted. Please retry." . to_string ( ) ,
707+ ) ) ;
708+ }
709+
702710 Ok ( output)
703711}
704712
@@ -734,10 +742,7 @@ fn parse_anthropic_sse_line(
734742 } ;
735743 let payload = payload. trim ( ) ;
736744
737- let value: Value = match serde_json:: from_str ( payload) {
738- Ok ( v) => v,
739- Err ( _) => return Ok ( false ) ,
740- } ;
745+ let value: Value = serde_json:: from_str ( payload) ?;
741746
742747 // Prefer `"type"` from the JSON payload; fall back to the preceding
743748 // `event:` line when the gateway strips it.
0 commit comments