@@ -35,32 +35,51 @@ impl SubprocessBackend {
3535 /// Returns the Package (which keeps the temp directory alive)
3636 async fn receive_and_unpack_package (
3737 & self ,
38+ ctx : & tower_telemetry:: Context ,
3839 mut package_stream : Box < dyn tokio:: io:: AsyncRead + Send + Unpin > ,
3940 ) -> Result < Package , Error > {
41+ use tower_telemetry:: { debug, error, info} ;
4042 // Create temp directory for this package
41- let temp_dir = TmpDir :: new ( "tower-package" )
42- . await
43- . map_err ( |_| Error :: PackageCreateFailed ) ?;
43+ let temp_dir = TmpDir :: new ( "tower-package" ) . await . map_err ( |e| {
44+ error ! ( ctx: ctx, "Failed to create temp directory: {:?}" , e) ;
45+ Error :: PackageCreateFailed
46+ } ) ?;
4447
4548 // Save stream to tar.gz file
4649 let tar_gz_path = temp_dir. to_path_buf ( ) . join ( "package.tar.gz" ) ;
47- let mut file = File :: create ( & tar_gz_path)
48- . await
49- . map_err ( |_| Error :: PackageCreateFailed ) ?;
50+ debug ! ( ctx: ctx, "Saving package stream to {:?}" , tar_gz_path) ;
5051
51- tokio:: io:: copy ( & mut package_stream, & mut file)
52+ let mut file = File :: create ( & tar_gz_path) . await . map_err ( |e| {
53+ error ! ( ctx: ctx, "Failed to create package file: {:?}" , e) ;
54+ Error :: PackageCreateFailed
55+ } ) ?;
56+
57+ let bytes_copied = tokio:: io:: copy ( & mut package_stream, & mut file)
5258 . await
53- . map_err ( |_| Error :: PackageCreateFailed ) ?;
59+ . map_err ( |e| {
60+ error ! ( ctx: ctx, "Failed to save package stream: {:?}" , e) ;
61+ Error :: PackageCreateFailed
62+ } ) ?;
63+
64+ debug ! ( ctx: ctx, "Downloaded {} bytes" , bytes_copied) ;
5465
55- file. flush ( ) . await . map_err ( |_| Error :: PackageCreateFailed ) ?;
66+ file. flush ( ) . await . map_err ( |e| {
67+ error ! ( ctx: ctx, "Failed to flush package file: {:?}" , e) ;
68+ Error :: PackageCreateFailed
69+ } ) ?;
5670 drop ( file) ;
5771
5872 // Unpack the package
73+ info ! ( ctx: ctx, "Unpacking package" ) ;
5974 let mut package = Package :: default ( ) ;
6075 package. package_file_path = Some ( tar_gz_path) ;
6176 package. tmp_dir = Some ( temp_dir) ;
62- package. unpack ( ) . await ?;
77+ package. unpack ( ) . await . map_err ( |e| {
78+ error ! ( ctx: ctx, "Failed to unpack package: {:?}" , e) ;
79+ Error :: PackageUnpackFailed
80+ } ) ?;
6381
82+ info ! ( ctx: ctx, "Successfully unpacked package" ) ;
6483 Ok ( package)
6584 }
6685}
@@ -93,7 +112,9 @@ impl ExecutionBackend for SubprocessBackend {
93112 } ;
94113
95114 // Receive package stream and unpack it
96- let mut package = self . receive_and_unpack_package ( spec. package_stream ) . await ?;
115+ let mut package = self
116+ . receive_and_unpack_package ( & spec. telemetry_ctx , spec. package_stream )
117+ . await ?;
97118
98119 let unpacked_path = package
99120 . unpacked_path
0 commit comments