@@ -111,10 +111,24 @@ impl State {
111111 ) -> Result < Option < MaybeDelayed < ' a > > , Error > {
112112 match self . maybe_launch_process ( driver, operation, ctx. rela_path ) ? {
113113 Some ( Process :: SingleFile { mut child, command } ) => {
114- std:: io:: copy ( src, & mut child. stdin . take ( ) . expect ( "configured" ) ) ?;
114+ // To avoid deadlock when the filter immediately echoes input to output (like `cat`),
115+ // we need to write to stdin and read from stdout concurrently. If we write all data
116+ // to stdin before reading from stdout, and the pipe buffer fills up, both processes
117+ // will block: the filter blocks writing to stdout (buffer full), and we block writing
118+ // to stdin (waiting for the filter to consume data).
119+ //
120+ // Solution: Read all data into a buffer, then spawn a thread to write it to stdin
121+ // while we can immediately read from stdout.
122+ let mut input_data = Vec :: new ( ) ;
123+ std:: io:: copy ( src, & mut input_data) ?;
124+
125+ let stdin = child. stdin . take ( ) . expect ( "configured" ) ;
126+ let write_thread = WriterThread :: spawn ( input_data, stdin) ?;
127+
115128 Ok ( Some ( MaybeDelayed :: Immediate ( Box :: new ( ReadFilterOutput {
116129 inner : child. stdout . take ( ) ,
117130 child : driver. required . then_some ( ( child, command) ) ,
131+ write_thread : Some ( write_thread) ,
118132 } ) ) ) )
119133 }
120134 Some ( Process :: MultiFile { client, key } ) => {
@@ -202,11 +216,67 @@ pub enum MaybeDelayed<'a> {
202216 Immediate ( Box < dyn std:: io:: Read + ' a > ) ,
203217}
204218
219+ /// A helper to manage writing to stdin on a separate thread to avoid deadlock.
220+ struct WriterThread {
221+ handle : Option < std:: thread:: JoinHandle < std:: io:: Result < ( ) > > > ,
222+ }
223+
224+ impl WriterThread {
225+ /// Spawn a thread that will write all data from `data` to `stdin`.
226+ fn spawn ( data : Vec < u8 > , mut stdin : std:: process:: ChildStdin ) -> std:: io:: Result < Self > {
227+ let handle = std:: thread:: Builder :: new ( )
228+ . name ( "gix-filter-stdin-writer" . into ( ) )
229+ . spawn ( move || {
230+ use std:: io:: Write ;
231+ stdin. write_all ( & data) ?;
232+ // Explicitly drop stdin to close the pipe and signal EOF to the child
233+ drop ( stdin) ;
234+ Ok ( ( ) )
235+ } ) ?;
236+
237+ Ok ( Self { handle : Some ( handle) } )
238+ }
239+
240+ /// Wait for the writer thread to complete and return any error that occurred.
241+ fn join ( & mut self ) -> std:: io:: Result < ( ) > {
242+ if let Some ( handle) = self . handle . take ( ) {
243+ match handle. join ( ) {
244+ Ok ( result) => result,
245+ Err ( panic_info) => {
246+ let msg = if let Some ( s) = panic_info. downcast_ref :: < String > ( ) {
247+ format ! ( "Writer thread panicked: {s}" )
248+ } else if let Some ( s) = panic_info. downcast_ref :: < & str > ( ) {
249+ format ! ( "Writer thread panicked: {s}" )
250+ } else {
251+ "Writer thread panicked while writing to filter stdin" . to_string ( )
252+ } ;
253+ Err ( std:: io:: Error :: other ( msg) )
254+ }
255+ }
256+ } else {
257+ Ok ( ( ) )
258+ }
259+ }
260+ }
261+
262+ impl Drop for WriterThread {
263+ fn drop ( & mut self ) {
264+ // Best effort join on drop. If the thread panicked or failed, log it for debugging.
265+ // We can't propagate errors from Drop, so we only log them. Thread panics during Drop
266+ // are unusual but can occur if the filter process closes stdin unexpectedly.
267+ if let Err ( _err) = self . join ( ) {
268+ gix_trace:: debug!( err = %_err, "Failed to join writer thread during drop" ) ;
269+ }
270+ }
271+ }
272+
205273/// A utility type to facilitate streaming the output of a filter process.
206274struct ReadFilterOutput {
207275 inner : Option < std:: process:: ChildStdout > ,
208276 /// The child is present if we need its exit code to be positive.
209277 child : Option < ( std:: process:: Child , std:: process:: Command ) > ,
278+ /// The thread writing to stdin, if any. Must be joined when reading is done.
279+ write_thread : Option < WriterThread > ,
210280}
211281
212282pub ( crate ) fn handle_io_err ( err : & std:: io:: Error , running : & mut HashMap < BString , process:: Client > , process : & BStr ) {
@@ -222,9 +292,28 @@ impl std::io::Read for ReadFilterOutput {
222292 fn read ( & mut self , buf : & mut [ u8 ] ) -> std:: io:: Result < usize > {
223293 match self . inner . as_mut ( ) {
224294 Some ( inner) => {
225- let num_read = inner. read ( buf) ?;
295+ let num_read = match inner. read ( buf) {
296+ Ok ( n) => n,
297+ Err ( e) => {
298+ // On read error, ensure we join the writer thread before propagating the error
299+ if let Some ( mut write_thread) = self . write_thread . take ( ) {
300+ // Try to join but prioritize the original read error
301+ if let Err ( _thread_err) = write_thread. join ( ) {
302+ gix_trace:: debug!( thread_err = %_thread_err, read_err = %e, "Writer thread error during failed read" ) ;
303+ }
304+ }
305+ return Err ( e) ;
306+ }
307+ } ;
308+
226309 if num_read == 0 {
227310 self . inner . take ( ) ;
311+
312+ // Join the writer thread first to ensure all data has been written
313+ if let Some ( mut write_thread) = self . write_thread . take ( ) {
314+ write_thread. join ( ) ?;
315+ }
316+
228317 if let Some ( ( mut child, cmd) ) = self . child . take ( ) {
229318 let status = child. wait ( ) ?;
230319 if !status. success ( ) {
0 commit comments