@@ -60,6 +60,7 @@ pub struct Runtime<S> {
6060 service : S ,
6161 config : Arc < Config > ,
6262 client : Arc < ApiClient > ,
63+ concurrency_limit : u32 ,
6364}
6465
6566/// One-time marker to log X-Ray behavior in concurrent mode.
@@ -102,7 +103,9 @@ where
102103 pub fn new ( handler : F ) -> Self {
103104 trace ! ( "Loading config from env" ) ;
104105 let config = Arc :: new ( Config :: from_env ( ) ) ;
105- let pool_size = config. max_concurrency . unwrap_or ( 1 ) . max ( 1 ) as usize ;
106+ let concurrency_limit = max_concurrency_from_env ( ) . unwrap_or ( 1 ) . max ( 1 ) ;
107+ // Strategy: allocate all worker tasks up-front, so size the client pool to match.
108+ let pool_size = concurrency_limit as usize ;
106109 let client = Arc :: new (
107110 ApiClient :: builder ( )
108111 . with_pool_size ( pool_size)
@@ -113,6 +116,7 @@ where
113116 service : wrap_handler ( handler, client. clone ( ) ) ,
114117 config,
115118 client,
119+ concurrency_limit,
116120 }
117121 }
118122}
@@ -149,6 +153,7 @@ impl<S> Runtime<S> {
149153 client : self . client ,
150154 config : self . config ,
151155 service : layer. layer ( self . service ) ,
156+ concurrency_limit : self . concurrency_limit ,
152157 }
153158 }
154159}
@@ -164,9 +169,8 @@ where
164169 /// sequential `run_with_incoming` loop so that the same handler can run on both
165170 /// classic Lambda and Lambda Managed Instances.
166171 pub async fn run_concurrent ( self ) -> Result < ( ) , BoxError > {
167- if self . config . is_concurrent ( ) {
168- let max_concurrency = self . config . max_concurrency . unwrap_or ( 1 ) ;
169- Self :: run_concurrent_inner ( self . service , self . config , self . client , max_concurrency) . await
172+ if self . concurrency_limit > 1 {
173+ Self :: run_concurrent_inner ( self . service , self . config , self . client , self . concurrency_limit ) . await
170174 } else {
171175 let incoming = incoming ( & self . client ) ;
172176 Self :: run_with_incoming ( self . service , self . config , incoming) . await
@@ -178,9 +182,9 @@ where
178182 service : S ,
179183 config : Arc < Config > ,
180184 client : Arc < ApiClient > ,
181- max_concurrency : u32 ,
185+ concurrency_limit : u32 ,
182186 ) -> Result < ( ) , BoxError > {
183- let limit = max_concurrency as usize ;
187+ let limit = concurrency_limit as usize ;
184188
185189 let mut workers = FuturesUnordered :: new ( ) ;
186190 for _ in 1 ..limit {
@@ -321,6 +325,13 @@ async fn next_event_future(client: Arc<ApiClient>) -> Result<http::Response<hype
321325 client. call ( req) . await
322326}
323327
328+ fn max_concurrency_from_env ( ) -> Option < u32 > {
329+ env:: var ( "AWS_LAMBDA_MAX_CONCURRENCY" )
330+ . ok ( )
331+ . and_then ( |v| v. parse :: < u32 > ( ) . ok ( ) )
332+ . filter ( |& c| c > 0 )
333+ }
334+
324335async fn concurrent_worker_loop < S > ( mut service : S , config : Arc < Config > , client : Arc < ApiClient > ) -> Result < ( ) , BoxError >
325336where
326337 S : Service < LambdaInvocation , Response = ( ) , Error = BoxError > ,
@@ -573,6 +584,7 @@ mod endpoint_tests {
573584 client : client. clone ( ) ,
574585 config : Arc :: new ( config) ,
575586 service : wrap_handler ( f, client) ,
587+ concurrency_limit : 1 ,
576588 } ;
577589 let client = & runtime. client ;
578590 let incoming = incoming ( client) . take ( 1 ) ;
@@ -620,14 +632,14 @@ mod endpoint_tests {
620632 version : "1" . to_string ( ) ,
621633 log_stream : "test_stream" . to_string ( ) ,
622634 log_group : "test_log" . to_string ( ) ,
623- max_concurrency : None ,
624635 } ) ;
625636
626637 let client = Arc :: new ( client) ;
627638 let runtime = Runtime {
628639 client : client. clone ( ) ,
629640 config,
630641 service : wrap_handler ( f, client) ,
642+ concurrency_limit : 1 ,
631643 } ;
632644 let client = & runtime. client ;
633645 let incoming = incoming ( client) . take ( 1 ) ;
@@ -651,60 +663,6 @@ mod endpoint_tests {
651663 . await
652664 }
653665
654- #[ test]
655- fn config_parses_max_concurrency ( ) {
656- // Preserve existing env values
657- let prev_fn = env:: var ( "AWS_LAMBDA_FUNCTION_NAME" ) . ok ( ) ;
658- let prev_mem = env:: var ( "AWS_LAMBDA_FUNCTION_MEMORY_SIZE" ) . ok ( ) ;
659- let prev_ver = env:: var ( "AWS_LAMBDA_FUNCTION_VERSION" ) . ok ( ) ;
660- let prev_log_stream = env:: var ( "AWS_LAMBDA_LOG_STREAM_NAME" ) . ok ( ) ;
661- let prev_log_group = env:: var ( "AWS_LAMBDA_LOG_GROUP_NAME" ) . ok ( ) ;
662- let prev_max = env:: var ( "AWS_LAMBDA_MAX_CONCURRENCY" ) . ok ( ) ;
663-
664- env:: set_var ( "AWS_LAMBDA_FUNCTION_NAME" , "test_fn" ) ;
665- env:: set_var ( "AWS_LAMBDA_FUNCTION_MEMORY_SIZE" , "128" ) ;
666- env:: set_var ( "AWS_LAMBDA_FUNCTION_VERSION" , "1" ) ;
667- env:: set_var ( "AWS_LAMBDA_LOG_STREAM_NAME" , "test_stream" ) ;
668- env:: set_var ( "AWS_LAMBDA_LOG_GROUP_NAME" , "test_log" ) ;
669- env:: set_var ( "AWS_LAMBDA_MAX_CONCURRENCY" , "4" ) ;
670-
671- let cfg = Config :: from_env ( ) ;
672- assert_eq ! ( cfg. max_concurrency, Some ( 4 ) ) ;
673- assert ! ( cfg. is_concurrent( ) ) ;
674-
675- // Restore env
676- if let Some ( v) = prev_fn {
677- env:: set_var ( "AWS_LAMBDA_FUNCTION_NAME" , v) ;
678- } else {
679- env:: remove_var ( "AWS_LAMBDA_FUNCTION_NAME" ) ;
680- }
681- if let Some ( v) = prev_mem {
682- env:: set_var ( "AWS_LAMBDA_FUNCTION_MEMORY_SIZE" , v) ;
683- } else {
684- env:: remove_var ( "AWS_LAMBDA_FUNCTION_MEMORY_SIZE" ) ;
685- }
686- if let Some ( v) = prev_ver {
687- env:: set_var ( "AWS_LAMBDA_FUNCTION_VERSION" , v) ;
688- } else {
689- env:: remove_var ( "AWS_LAMBDA_FUNCTION_VERSION" ) ;
690- }
691- if let Some ( v) = prev_log_stream {
692- env:: set_var ( "AWS_LAMBDA_LOG_STREAM_NAME" , v) ;
693- } else {
694- env:: remove_var ( "AWS_LAMBDA_LOG_STREAM_NAME" ) ;
695- }
696- if let Some ( v) = prev_log_group {
697- env:: set_var ( "AWS_LAMBDA_LOG_GROUP_NAME" , v) ;
698- } else {
699- env:: remove_var ( "AWS_LAMBDA_LOG_GROUP_NAME" ) ;
700- }
701- if let Some ( v) = prev_max {
702- env:: set_var ( "AWS_LAMBDA_MAX_CONCURRENCY" , v) ;
703- } else {
704- env:: remove_var ( "AWS_LAMBDA_MAX_CONCURRENCY" ) ;
705- }
706- }
707-
708666 #[ tokio:: test]
709667 async fn concurrent_worker_crash_does_not_stop_other_workers ( ) -> Result < ( ) , Error > {
710668 let next_calls = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
@@ -831,9 +789,9 @@ mod endpoint_tests {
831789 version : "1" . to_string ( ) ,
832790 log_stream : "test_stream" . to_string ( ) ,
833791 log_group : "test_log" . to_string ( ) ,
834- max_concurrency : Some ( 2 ) ,
835792 } ) ,
836793 service : wrap_handler ( handler, client) ,
794+ concurrency_limit : 2 ,
837795 } ;
838796
839797 let res = tokio:: time:: timeout ( Duration :: from_secs ( 2 ) , runtime. run_concurrent ( ) ) . await ;
0 commit comments