4040//! Note: While this process coordinates file transfers, the actual chunked transfer
4141//! mechanism is implemented in the FT worker for improved modularity and performance.
4242//!
43- use crate :: hyperware:: process:: downloads:: {
44- AutoDownloadCompleteRequest , AutoDownloadError , AutoUpdateRequest , DirEntry ,
45- DownloadCompleteRequest , DownloadError , DownloadRequest , DownloadResponse , Entry , FileEntry ,
46- HashMismatch , LocalDownloadRequest , RemoteDownloadRequest , RemoveFileRequest ,
43+ use crate :: hyperware:: process:: {
44+ chain:: { ChainRequest , ChainResponse } ,
45+ downloads:: {
46+ AutoDownloadCompleteRequest , AutoDownloadError , AutoUpdateRequest , DirEntry ,
47+ DownloadCompleteRequest , DownloadError , DownloadRequest , DownloadResponse , Entry ,
48+ FileEntry , HashMismatch , LocalDownloadRequest , RemoteDownloadRequest , RemoveFileRequest ,
49+ } ,
4750} ;
4851use ft_worker_lib:: { spawn_receive_transfer, spawn_send_transfer} ;
4952use hyperware:: process:: downloads:: AutoDownloadSuccess ;
@@ -72,6 +75,7 @@ wit_bindgen::generate!({
7275mod ft_worker_lib;
7376
7477pub const VFS_TIMEOUT : u64 = 5 ; // 5s
78+ pub const CHAIN_TIMEOUT : u64 = 60 ; // 60s
7579
7680#[ derive( Debug , Serialize , Deserialize , process_macros:: SerdeJsonInto ) ]
7781#[ serde( untagged) ] // untagged as a meta-type for all incoming responses
@@ -89,6 +93,15 @@ pub struct AutoUpdateStatus {
8993
9094type AutoUpdates = HashMap < ( PackageId , String ) , AutoUpdateStatus > ;
9195
96+ #[ derive( Debug , Serialize , Deserialize , Clone ) ]
97+ pub struct ManualDownloadStatus {
98+ mirrors_left : Vec < String > , // vec(node/url)
99+ mirrors_failed : Vec < ( String , DownloadError ) > , // vec(node/url, error)
100+ active_mirror : String , // (node/url)
101+ }
102+
103+ type ManualDownloads = HashMap < ( PackageId , String ) , ManualDownloadStatus > ;
104+
92105#[ derive( Debug , Serialize , Deserialize ) ]
93106pub struct State {
94107 // persisted metadata about which packages we are mirroring
@@ -128,6 +141,8 @@ fn init(our: Address) {
128141
129142 // metadata for in-flight auto-updates
130143 let mut auto_updates: AutoUpdates = HashMap :: new ( ) ;
144+ // metadata for in-flight manual downloads (used for mirror retries)
145+ let mut manual_downloads: ManualDownloads = HashMap :: new ( ) ;
131146
132147 loop {
133148 match await_message ( ) {
@@ -139,6 +154,7 @@ fn init(our: Address) {
139154 & mut downloads,
140155 // &mut tmp,
141156 & mut auto_updates,
157+ & mut manual_downloads,
142158 ) {
143159 print_to_terminal ( 1 , & format ! ( "error handling message: {e:?}" ) ) ;
144160 }
@@ -163,6 +179,17 @@ fn init(our: Address) {
163179 // Then remove and get metadata
164180 if let Some ( metadata) = auto_updates. remove ( & key) {
165181 try_next_mirror ( metadata, key, & mut auto_updates, error) ;
182+ } else if let Some ( metadata) = manual_downloads. remove ( & key) {
183+ if let Err ( err) =
184+ try_next_manual_mirror ( metadata, key, & mut manual_downloads, error)
185+ {
186+ print_to_terminal (
187+ 1 ,
188+ & format ! (
189+ "downloads: failed manual mirror retry on send error: {err:?}"
190+ ) ,
191+ ) ;
192+ }
166193 }
167194 }
168195 }
@@ -182,6 +209,7 @@ fn handle_message(
182209 downloads : & mut Directory ,
183210 // _tmp: &mut Directory,
184211 auto_updates : & mut AutoUpdates ,
212+ manual_downloads : & mut ManualDownloads ,
185213) -> anyhow:: Result < ( ) > {
186214 if message. is_request ( ) {
187215 match message. body ( ) . try_into ( ) ? {
@@ -208,6 +236,18 @@ fn handle_message(
208236 desired_version_hash,
209237 } = download_request. clone ( ) ;
210238
239+ let key = (
240+ package_id. clone ( ) . to_process_lib ( ) ,
241+ desired_version_hash. clone ( ) ,
242+ ) ;
243+
244+ if !download_from. starts_with ( "http" )
245+ && !auto_updates. contains_key ( & key)
246+ && !manual_downloads. contains_key ( & key)
247+ {
248+ build_manual_mirror_status ( & download_request, manual_downloads) ?;
249+ }
250+
211251 if download_from. starts_with ( "http" ) {
212252 // use http-client to GET it
213253 Request :: to ( ( "our" , "http-client" , "distro" , "sys" ) )
@@ -310,6 +350,16 @@ fn handle_message(
310350 DownloadError :: InvalidManifest ,
311351 ) ;
312352 }
353+ return Ok ( ( ) ) ;
354+ }
355+
356+ if let Some ( err) = req. err {
357+ if let Some ( metadata) = manual_downloads. remove ( & key) {
358+ try_next_manual_mirror ( metadata, key, manual_downloads, err) ?;
359+ return Ok ( ( ) ) ;
360+ }
361+ } else {
362+ manual_downloads. remove ( & key) ;
313363 }
314364 }
315365 DownloadRequest :: GetFiles ( maybe_id) => {
@@ -534,26 +584,31 @@ fn handle_message(
534584
535585 if let Some ( context) = message. context ( ) {
536586 let download_request = serde_json:: from_slice :: < LocalDownloadRequest > ( context) ?;
587+ let key = (
588+ download_request. package_id . clone ( ) . to_process_lib ( ) ,
589+ download_request. desired_version_hash . clone ( ) ,
590+ ) ;
537591 match download_response {
538592 DownloadResponse :: Err ( e) => {
539593 print_to_terminal ( 1 , & format ! ( "downloads: got error response: {e:?}" ) ) ;
540- let key = (
541- download_request. package_id . clone ( ) . to_process_lib ( ) ,
542- download_request. desired_version_hash . clone ( ) ,
543- ) ;
544-
545594 if let Some ( metadata) = auto_updates. remove ( & key) {
546595 try_next_mirror ( metadata, key, auto_updates, e) ;
547- } else {
548- // If not an auto-update, forward error normally
549- Request :: to ( ( "our" , "main" , "app-store" , "sys" ) )
550- . body ( DownloadCompleteRequest {
551- package_id : download_request. package_id ,
552- version_hash : download_request. desired_version_hash ,
553- err : Some ( e) ,
554- } )
555- . send ( ) ?;
596+ return Ok ( ( ) ) ;
597+ }
598+
599+ if let Some ( metadata) = manual_downloads. remove ( & key) {
600+ try_next_manual_mirror ( metadata, key, manual_downloads, e) ?;
601+ return Ok ( ( ) ) ;
556602 }
603+
604+ // If not handled by retry logic, forward error normally
605+ Request :: to ( ( "our" , "main" , "app-store" , "sys" ) )
606+ . body ( DownloadCompleteRequest {
607+ package_id : download_request. package_id ,
608+ version_hash : download_request. desired_version_hash ,
609+ err : Some ( e) ,
610+ } )
611+ . send ( ) ?;
557612 }
558613 DownloadResponse :: Success => {
559614 // todo: maybe we do something here.
@@ -646,6 +701,149 @@ fn handle_message(
646701 Ok ( ( ) )
647702}
648703
704+ fn build_manual_mirror_status (
705+ download_request : & LocalDownloadRequest ,
706+ manual_downloads : & mut ManualDownloads ,
707+ ) -> anyhow:: Result < ( ) > {
708+ let key = (
709+ download_request. package_id . clone ( ) . to_process_lib ( ) ,
710+ download_request. desired_version_hash . clone ( ) ,
711+ ) ;
712+
713+ if manual_downloads. contains_key ( & key) {
714+ return Ok ( ( ) ) ;
715+ }
716+
717+ let mut mirror_candidates = match fetch_mirror_candidates ( & key. 0 ) {
718+ Ok ( candidates) => candidates,
719+ Err ( err) => {
720+ print_to_terminal (
721+ 1 ,
722+ & format ! ( "downloads: failed to fetch mirrors for manual download: {err:?}" ) ,
723+ ) ;
724+ Vec :: new ( )
725+ }
726+ } ;
727+
728+ // ensure requested mirror is first
729+ match mirror_candidates
730+ . iter ( )
731+ . position ( |m| m == & download_request. download_from )
732+ {
733+ Some ( idx) => {
734+ let requested = mirror_candidates. remove ( idx) ;
735+ mirror_candidates. insert ( 0 , requested) ;
736+ }
737+ None => mirror_candidates. insert ( 0 , download_request. download_from . clone ( ) ) ,
738+ }
739+
740+ if mirror_candidates. len ( ) <= 1 {
741+ return Ok ( ( ) ) ;
742+ }
743+
744+ manual_downloads. insert (
745+ key,
746+ ManualDownloadStatus {
747+ mirrors_left : mirror_candidates[ 1 ..] . to_vec ( ) ,
748+ mirrors_failed : Vec :: new ( ) ,
749+ active_mirror : mirror_candidates[ 0 ] . clone ( ) ,
750+ } ,
751+ ) ;
752+
753+ Ok ( ( ) )
754+ }
755+
756+ fn fetch_mirror_candidates ( package_id : & PackageId ) -> anyhow:: Result < Vec < String > > {
757+ let resp = Request :: to ( ( "our" , "chain" , "app-store" , "sys" ) )
758+ . body ( serde_json:: to_vec ( & ChainRequest :: GetApp (
759+ crate :: hyperware:: process:: main:: PackageId :: from_process_lib ( package_id. clone ( ) ) ,
760+ ) ) ?)
761+ . send_and_await_response ( CHAIN_TIMEOUT ) ??;
762+
763+ let msg = serde_json:: from_slice :: < ChainResponse > ( resp. body ( ) ) ?;
764+
765+ if let ChainResponse :: GetApp ( Some ( app) ) = msg {
766+ if let Some ( metadata) = app. metadata {
767+ let mut seen = HashSet :: new ( ) ;
768+ let mut mirror_candidates: Vec < String > = Vec :: new ( ) ;
769+
770+ if !metadata. properties . publisher . is_empty ( )
771+ && seen. insert ( metadata. properties . publisher . clone ( ) )
772+ {
773+ mirror_candidates. push ( metadata. properties . publisher ) ;
774+ }
775+
776+ for mirror in metadata. properties . mirrors {
777+ if mirror. is_empty ( ) {
778+ continue ;
779+ }
780+ if seen. insert ( mirror. clone ( ) ) {
781+ mirror_candidates. push ( mirror) ;
782+ }
783+ }
784+
785+ return Ok ( mirror_candidates) ;
786+ }
787+ }
788+
789+ Ok ( Vec :: new ( ) )
790+ }
791+
792+ fn try_next_manual_mirror (
793+ mut metadata : ManualDownloadStatus ,
794+ key : ( PackageId , String ) ,
795+ manual_downloads : & mut ManualDownloads ,
796+ error : DownloadError ,
797+ ) -> anyhow:: Result < ( ) > {
798+ print_to_terminal (
799+ 0 ,
800+ & format ! (
801+ "manual_download: got error from mirror {mirror:?} {error:?}, trying next mirror: {next_mirror:?}" ,
802+ mirror = metadata. active_mirror,
803+ error = error,
804+ next_mirror = metadata. mirrors_left. iter( ) . next( ) . cloned( ) ,
805+ ) ,
806+ ) ;
807+
808+ let ( package_id, version_hash) = key. clone ( ) ;
809+
810+ match metadata. mirrors_left . first ( ) . cloned ( ) {
811+ Some ( next_mirror) => {
812+ metadata
813+ . mirrors_failed
814+ . push ( ( metadata. active_mirror . clone ( ) , error) ) ;
815+ metadata. mirrors_left . remove ( 0 ) ;
816+ metadata. active_mirror = next_mirror. clone ( ) ;
817+ manual_downloads. insert ( key, metadata) ;
818+
819+ Request :: to ( ( "our" , "downloads" , "app-store" , "sys" ) )
820+ . body ( serde_json:: to_vec ( & DownloadRequest :: LocalDownload (
821+ LocalDownloadRequest {
822+ package_id : crate :: hyperware:: process:: main:: PackageId :: from_process_lib (
823+ package_id,
824+ ) ,
825+ download_from : next_mirror. clone ( ) ,
826+ desired_version_hash : version_hash,
827+ } ,
828+ ) ) ?)
829+ . send ( ) ?;
830+ }
831+ None => {
832+ Request :: to ( ( "our" , "main" , "app-store" , "sys" ) )
833+ . body ( DownloadCompleteRequest {
834+ package_id : crate :: hyperware:: process:: main:: PackageId :: from_process_lib (
835+ package_id,
836+ ) ,
837+ version_hash,
838+ err : Some ( error) ,
839+ } )
840+ . send ( ) ?;
841+ }
842+ }
843+
844+ Ok ( ( ) )
845+ }
846+
649847/// Try the next available mirror for a download, recording the current mirror's failure
650848fn try_next_mirror (
651849 mut metadata : AutoUpdateStatus ,
0 commit comments