@@ -411,11 +411,14 @@ async fn command_exec_streaming_does_not_buffer_output() -> Result<()> {
411411 } )
412412 . await ?;
413413
414- let delta = read_command_exec_delta ( & mut mcp) . await ?;
415- assert_eq ! ( delta. process_id, process_id. as_str( ) ) ;
416- assert_eq ! ( delta. stream, CommandExecOutputStream :: Stdout ) ;
417- assert_eq ! ( STANDARD . decode( & delta. delta_base64) ?, b"abcde" ) ;
418- assert ! ( delta. cap_reached) ;
414+ let output = collect_command_exec_output_until (
415+ CommandExecDeltaReader :: Mcp ( & mut mcp) ,
416+ process_id. as_str ( ) ,
417+ "capped stdout" ,
418+ |_output, delta| delta. stream == CommandExecOutputStream :: Stdout && delta. cap_reached ,
419+ )
420+ . await ?;
421+ assert_eq ! ( output. stdout, "abcde" ) ;
419422 let terminate_request_id = mcp
420423 . send_command_exec_terminate_request ( CommandExecTerminateParams {
421424 process_id : process_id. clone ( ) ,
@@ -471,21 +474,13 @@ async fn command_exec_pipe_streams_output_and_accepts_write() -> Result<()> {
471474 } )
472475 . await ?;
473476
474- let first_stdout = read_command_exec_delta ( & mut mcp) . await ?;
475- let first_stderr = read_command_exec_delta ( & mut mcp) . await ?;
476- let seen = [ first_stdout, first_stderr] ;
477- assert ! (
478- seen. iter( )
479- . all( |delta| delta. process_id == process_id. as_str( ) )
480- ) ;
481- assert ! ( seen. iter( ) . any( |delta| {
482- delta. stream == CommandExecOutputStream :: Stdout
483- && delta. delta_base64 == STANDARD . encode( "out-start\n " )
484- } ) ) ;
485- assert ! ( seen. iter( ) . any( |delta| {
486- delta. stream == CommandExecOutputStream :: Stderr
487- && delta. delta_base64 == STANDARD . encode( "err-start\n " )
488- } ) ) ;
477+ wait_for_command_exec_outputs_contains (
478+ & mut mcp,
479+ process_id. as_str ( ) ,
480+ "out-start\n " ,
481+ "err-start\n " ,
482+ )
483+ . await ?;
489484
490485 let write_request_id = mcp
491486 . send_command_exec_write_request ( CommandExecWriteParams {
@@ -499,21 +494,13 @@ async fn command_exec_pipe_streams_output_and_accepts_write() -> Result<()> {
499494 . await ?;
500495 assert_eq ! ( write_response. result, serde_json:: json!( { } ) ) ;
501496
502- let next_delta = read_command_exec_delta ( & mut mcp) . await ?;
503- let final_delta = read_command_exec_delta ( & mut mcp) . await ?;
504- let seen = [ next_delta, final_delta] ;
505- assert ! (
506- seen. iter( )
507- . all( |delta| delta. process_id == process_id. as_str( ) )
508- ) ;
509- assert ! ( seen. iter( ) . any( |delta| {
510- delta. stream == CommandExecOutputStream :: Stdout
511- && delta. delta_base64 == STANDARD . encode( "out:hello\n " )
512- } ) ) ;
513- assert ! ( seen. iter( ) . any( |delta| {
514- delta. stream == CommandExecOutputStream :: Stderr
515- && delta. delta_base64 == STANDARD . encode( "err:hello\n " )
516- } ) ) ;
497+ wait_for_command_exec_outputs_contains (
498+ & mut mcp,
499+ process_id. as_str ( ) ,
500+ "out:hello\n " ,
501+ "err:hello\n " ,
502+ )
503+ . await ?;
517504
518505 let response = mcp
519506 . read_stream_until_response_message ( RequestId :: Integer ( command_request_id) )
@@ -562,17 +549,13 @@ async fn command_exec_tty_implies_streaming_and_reports_pty_output() -> Result<(
562549 } )
563550 . await ?;
564551
565- let started_text = read_command_exec_output_until_contains (
552+ wait_for_command_exec_output_contains (
566553 & mut mcp,
567554 process_id. as_str ( ) ,
568555 CommandExecOutputStream :: Stdout ,
569556 "tty\n " ,
570557 )
571558 . await ?;
572- assert ! (
573- started_text. contains( "tty\n " ) ,
574- "expected TTY startup output, got {started_text:?}"
575- ) ;
576559
577560 let write_request_id = mcp
578561 . send_command_exec_write_request ( CommandExecWriteParams {
@@ -586,17 +569,13 @@ async fn command_exec_tty_implies_streaming_and_reports_pty_output() -> Result<(
586569 . await ?;
587570 assert_eq ! ( write_response. result, serde_json:: json!( { } ) ) ;
588571
589- let echoed_text = read_command_exec_output_until_contains (
572+ wait_for_command_exec_output_contains (
590573 & mut mcp,
591574 process_id. as_str ( ) ,
592575 CommandExecOutputStream :: Stdout ,
593576 "echo:world\n " ,
594577 )
595578 . await ?;
596- assert ! (
597- echoed_text. contains( "echo:world\n " ) ,
598- "expected TTY echo output, got {echoed_text:?}"
599- ) ;
600579
601580 let response = mcp
602581 . read_stream_until_response_message ( RequestId :: Integer ( command_request_id) )
@@ -643,17 +622,13 @@ async fn command_exec_tty_supports_initial_size_and_resize() -> Result<()> {
643622 } )
644623 . await ?;
645624
646- let started_text = read_command_exec_output_until_contains (
625+ wait_for_command_exec_output_contains (
647626 & mut mcp,
648627 process_id. as_str ( ) ,
649628 CommandExecOutputStream :: Stdout ,
650629 "start:31 101\n " ,
651630 )
652631 . await ?;
653- assert ! (
654- started_text. contains( "start:31 101\n " ) ,
655- "unexpected initial size output: {started_text:?}"
656- ) ;
657632
658633 let resize_request_id = mcp
659634 . send_command_exec_resize_request ( CommandExecResizeParams {
@@ -681,17 +656,13 @@ async fn command_exec_tty_supports_initial_size_and_resize() -> Result<()> {
681656 . await ?;
682657 assert_eq ! ( write_response. result, serde_json:: json!( { } ) ) ;
683658
684- let resized_text = read_command_exec_output_until_contains (
659+ wait_for_command_exec_output_contains (
685660 & mut mcp,
686661 process_id. as_str ( ) ,
687662 CommandExecOutputStream :: Stdout ,
688663 "after:45 132\n " ,
689664 )
690665 . await ?;
691- assert ! (
692- resized_text. contains( "after:45 132\n " ) ,
693- "unexpected resized output: {resized_text:?}"
694- ) ;
695666
696667 let response = mcp
697668 . read_stream_until_response_message ( RequestId :: Integer ( command_request_id) )
@@ -744,11 +715,13 @@ async fn command_exec_process_ids_are_connection_scoped_and_disconnect_terminate
744715 )
745716 . await ?;
746717
747- let delta = read_command_exec_delta_ws ( & mut ws1) . await ?;
748- assert_eq ! ( delta. process_id, "shared-process" ) ;
749- assert_eq ! ( delta. stream, CommandExecOutputStream :: Stdout ) ;
750- let delta_text = String :: from_utf8 ( STANDARD . decode ( & delta. delta_base64 ) ?) ?;
751- assert ! ( delta_text. contains( "ready" ) ) ;
718+ collect_command_exec_output_until (
719+ CommandExecDeltaReader :: Websocket ( & mut ws1) ,
720+ "shared-process" ,
721+ "websocket ready output" ,
722+ |output, _delta| output. stdout . contains ( "ready\n " ) ,
723+ )
724+ . await ?;
752725 wait_for_process_marker ( & marker, /*should_exist*/ true ) . await ?;
753726
754727 send_request (
@@ -796,31 +769,98 @@ async fn read_command_exec_delta(
796769 decode_delta_notification ( notification)
797770}
798771
799- async fn read_command_exec_output_until_contains (
772+ async fn wait_for_command_exec_output_contains (
800773 mcp : & mut McpProcess ,
801774 process_id : & str ,
802775 stream : CommandExecOutputStream ,
803776 expected : & str ,
804- ) -> Result < String > {
777+ ) -> Result < ( ) > {
778+ let stream_name = match stream {
779+ CommandExecOutputStream :: Stdout => "stdout" ,
780+ CommandExecOutputStream :: Stderr => "stderr" ,
781+ } ;
782+ collect_command_exec_output_until (
783+ CommandExecDeltaReader :: Mcp ( mcp) ,
784+ process_id,
785+ format ! ( "{stream_name} containing {expected:?}" ) ,
786+ |output, _delta| match stream {
787+ CommandExecOutputStream :: Stdout => output. stdout . contains ( expected) ,
788+ CommandExecOutputStream :: Stderr => output. stderr . contains ( expected) ,
789+ } ,
790+ )
791+ . await ?;
792+ Ok ( ( ) )
793+ }
794+
795+ async fn wait_for_command_exec_outputs_contains (
796+ mcp : & mut McpProcess ,
797+ process_id : & str ,
798+ stdout_expected : & str ,
799+ stderr_expected : & str ,
800+ ) -> Result < ( ) > {
801+ collect_command_exec_output_until (
802+ CommandExecDeltaReader :: Mcp ( mcp) ,
803+ process_id,
804+ format ! ( "stdout containing {stdout_expected:?} and stderr containing {stderr_expected:?}" ) ,
805+ |output, _delta| {
806+ output. stdout . contains ( stdout_expected) && output. stderr . contains ( stderr_expected)
807+ } ,
808+ )
809+ . await ?;
810+ Ok ( ( ) )
811+ }
812+
813+ enum CommandExecDeltaReader < ' a > {
814+ Mcp ( & ' a mut McpProcess ) ,
815+ Websocket ( & ' a mut super :: connection_handling_websocket:: WsClient ) ,
816+ }
817+
818+ #[ derive( Default ) ]
819+ struct CollectedCommandExecOutput {
820+ stdout : String ,
821+ stderr : String ,
822+ }
823+
824+ async fn collect_command_exec_output_until (
825+ mut reader : CommandExecDeltaReader < ' _ > ,
826+ process_id : & str ,
827+ waiting_for : impl Into < String > ,
828+ mut should_stop : impl FnMut (
829+ & CollectedCommandExecOutput ,
830+ & CommandExecOutputDeltaNotification ,
831+ ) -> bool ,
832+ ) -> Result < CollectedCommandExecOutput > {
833+ let waiting_for = waiting_for. into ( ) ;
805834 let deadline = Instant :: now ( ) + DEFAULT_READ_TIMEOUT ;
806- let mut collected = String :: new ( ) ;
835+ let mut output = CollectedCommandExecOutput :: default ( ) ;
807836
808837 loop {
809838 let remaining = deadline. saturating_duration_since ( Instant :: now ( ) ) ;
810- let delta = timeout ( remaining, read_command_exec_delta ( mcp) )
811- . await
812- . with_context ( || {
813- format ! (
814- "timed out waiting for {expected:?} in command/exec output for {process_id}; collected {collected:?}"
815- )
816- } ) ??;
839+ let delta = timeout ( remaining, async {
840+ match & mut reader {
841+ CommandExecDeltaReader :: Mcp ( mcp) => read_command_exec_delta ( mcp) . await ,
842+ CommandExecDeltaReader :: Websocket ( stream) => {
843+ read_command_exec_delta_ws ( stream) . await
844+ }
845+ }
846+ } )
847+ . await
848+ . with_context ( || {
849+ format ! (
850+ "timed out waiting for {waiting_for} in command/exec output for {process_id}; collected stdout={:?}, stderr={:?}" ,
851+ output. stdout, output. stderr
852+ )
853+ } ) ??;
817854 assert_eq ! ( delta. process_id, process_id) ;
818- assert_eq ! ( delta. stream, stream) ;
819855
820856 let delta_text = String :: from_utf8 ( STANDARD . decode ( & delta. delta_base64 ) ?) ?;
821- collected. push_str ( & delta_text. replace ( '\r' , "" ) ) ;
822- if collected. contains ( expected) {
823- return Ok ( collected) ;
857+ let delta_text = delta_text. replace ( '\r' , "" ) ;
858+ match delta. stream {
859+ CommandExecOutputStream :: Stdout => output. stdout . push_str ( & delta_text) ,
860+ CommandExecOutputStream :: Stderr => output. stderr . push_str ( & delta_text) ,
861+ }
862+ if should_stop ( & output, & delta) {
863+ return Ok ( output) ;
824864 }
825865 }
826866}
0 commit comments