Skip to content

Commit 2653859

Browse files
fix(runtime): refine concurrent worker isolation
- remove shutdown signaling so worker failures stay isolated - track remaining workers and return first infra error only when none remain - treat unexpected clean worker exits as infra errors - add deterministic worker-isolation regression test - align concurrent HTTP doc wording with worker-loop model
1 parent dd08c2f commit 2653859

File tree

2 files changed

+201
-61
lines changed

2 files changed

+201
-61
lines changed

lambda-http/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ where
219219
/// Lambda Managed Instances (concurrent invocations).
220220
///
221221
/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
222-
/// will use a concurrent `/next` polling loop with a bounded number of
223-
/// in-flight handler tasks. When the environment variable is unset or `<= 1`,
222+
/// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` worker tasks, each running its own
223+
/// `/next` polling loop. When the environment variable is unset or `<= 1`,
224224
/// it falls back to the same sequential behavior as [`run`], so the same
225225
/// handler can run on both classic Lambda and Lambda Managed Instances.
226226
pub async fn run_concurrent<R, S, E>(handler: S) -> Result<(), Error>

lambda-runtime/src/runtime.rs

Lines changed: 199 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ use std::{
1212
env,
1313
fmt::Debug,
1414
future::Future,
15+
io,
1516
sync::{Arc, OnceLock},
1617
};
17-
use tokio::sync::watch;
1818
use tokio_stream::{Stream, StreamExt};
1919
use tower::{Layer, Service, ServiceExt};
2020
use tracing::{error, trace, warn};
@@ -181,50 +181,52 @@ where
181181
max_concurrency: u32,
182182
) -> Result<(), BoxError> {
183183
let limit = max_concurrency as usize;
184-
let (shutdown_tx, shutdown_rx) = watch::channel(false);
185184

186185
let mut workers = FuturesUnordered::new();
187186
for _ in 1..limit {
188187
workers.push(tokio::spawn(concurrent_worker_loop(
189188
service.clone(),
190189
config.clone(),
191190
client.clone(),
192-
shutdown_rx.clone(),
193191
)));
194192
}
195-
workers.push(tokio::spawn(concurrent_worker_loop(
196-
service,
197-
config,
198-
client,
199-
shutdown_rx,
200-
)));
201-
202-
// Track the first infrastructure error to return as the result.
203-
// Note: Handler errors (Err returned from user code) do NOT trigger this;
204-
// they are reported to Lambda via /invocation/{id}/error and the worker
193+
workers.push(tokio::spawn(concurrent_worker_loop(service, config, client)));
194+
195+
// Track the first infrastructure error. A single worker failing should
196+
// not terminate the whole runtime (LMI keeps running with the remaining
197+
// healthy workers). We only return an error once there are no workers
198+
// left (i.e., we cannot keep at least 1 worker alive).
199+
//
200+
// Note: Handler errors (Err returned from user code) do NOT trigger this.
201+
// They are reported to Lambda via /invocation/{id}/error and the worker
205202
// continues. This only captures unrecoverable runtime failures like
206-
// network errors, API client failures, or worker panics.
203+
// API client failures, runtime panics, etc.
207204
let mut first_error: Option<BoxError> = None;
205+
let mut remaining_workers = limit;
208206
while let Some(result) = futures::StreamExt::next(&mut workers).await {
207+
remaining_workers = remaining_workers.saturating_sub(1);
209208
match result {
210-
Ok(Ok(())) => {}
209+
Ok(Ok(())) => {
210+
// `concurrent_worker_loop` runs indefinitely, so an Ok return indicates
211+
// an unexpected worker exit; we still decrement because the task is gone.
212+
warn!(remaining_workers, "Concurrent worker exited unexpectedly without error");
213+
if first_error.is_none() {
214+
first_error = Some(Box::new(io::Error::other(
215+
"all concurrent workers exited unexpectedly without error",
216+
)));
217+
}
218+
}
211219
Ok(Err(err)) => {
220+
error!(error = %err, remaining_workers, "Concurrent worker exited with error");
212221
if first_error.is_none() {
213-
error!(error = %err, "Concurrent worker exited with error; shutting down");
214-
let _ = shutdown_tx.send(true);
215222
first_error = Some(err);
216-
} else {
217-
error!(error = %err, "Concurrent worker exited with error");
218223
}
219224
}
220225
Err(join_err) => {
221226
let err: BoxError = Box::new(join_err);
227+
error!(error = %err, remaining_workers, "Concurrent worker panicked");
222228
if first_error.is_none() {
223-
error!(error = %err, "Concurrent worker panicked; shutting down");
224-
let _ = shutdown_tx.send(true);
225229
first_error = Some(err);
226-
} else {
227-
error!(error = %err, "Concurrent worker panicked");
228230
}
229231
}
230232
}
@@ -316,47 +318,22 @@ async fn next_event_future(client: Arc<ApiClient>) -> Result<http::Response<hype
316318
client.call(req).await
317319
}
318320

319-
async fn concurrent_worker_loop<S>(
320-
mut service: S,
321-
config: Arc<Config>,
322-
client: Arc<ApiClient>,
323-
mut shutdown_rx: watch::Receiver<bool>,
324-
) -> Result<(), BoxError>
321+
async fn concurrent_worker_loop<S>(mut service: S, config: Arc<Config>, client: Arc<ApiClient>) -> Result<(), BoxError>
325322
where
326323
S: Service<LambdaInvocation, Response = (), Error = BoxError>,
327324
S::Future: Send,
328325
{
329326
loop {
330-
if *shutdown_rx.borrow() {
331-
break;
332-
}
333-
334-
tokio::select! {
335-
changed = shutdown_rx.changed() => {
336-
match changed {
337-
Ok(()) => {
338-
if *shutdown_rx.borrow() {
339-
break;
340-
}
341-
}
342-
Err(_) => break,
343-
}
327+
let event = match next_event_future(client.clone()).await {
328+
Ok(event) => event,
329+
Err(e) => {
330+
warn!(error = %e, "Error polling /next, retrying");
331+
continue;
344332
}
345-
result = next_event_future(client.clone()) => {
346-
let event = match result {
347-
Ok(event) => event,
348-
Err(e) => {
349-
warn!(error = %e, "Error polling /next, retrying");
350-
continue;
351-
}
352-
};
333+
};
353334

354-
process_invocation(&mut service, &config, event, false).await?;
355-
}
356-
}
335+
process_invocation(&mut service, &config, event, false).await?;
357336
}
358-
359-
Ok(())
360337
}
361338

362339
async fn process_invocation<S>(
@@ -420,13 +397,28 @@ mod endpoint_tests {
420397
requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest},
421398
Config, Diagnostic, Error, Runtime,
422399
};
400+
use bytes::Bytes;
423401
use futures::future::BoxFuture;
424-
use http::{HeaderValue, StatusCode};
425-
use http_body_util::BodyExt;
402+
use http::{HeaderValue, Method, Request, Response, StatusCode};
403+
use http_body_util::{BodyExt, Full};
426404
use httpmock::prelude::*;
427405

406+
use hyper::{body::Incoming, service::service_fn};
407+
use hyper_util::{
408+
rt::{tokio::TokioIo, TokioExecutor},
409+
server::conn::auto::Builder as ServerBuilder,
410+
};
428411
use lambda_runtime_api_client::Client;
429-
use std::{env, sync::Arc};
412+
use std::{
413+
convert::Infallible,
414+
env,
415+
sync::{
416+
atomic::{AtomicUsize, Ordering},
417+
Arc,
418+
},
419+
time::Duration,
420+
};
421+
use tokio::{net::TcpListener, sync::Notify};
430422
use tokio_stream::StreamExt;
431423

432424
#[tokio::test]
@@ -709,4 +701,152 @@ mod endpoint_tests {
709701
env::remove_var("AWS_LAMBDA_MAX_CONCURRENCY");
710702
}
711703
}
704+
705+
#[tokio::test]
706+
async fn concurrent_worker_crash_does_not_stop_other_workers() -> Result<(), Error> {
707+
let next_calls = Arc::new(AtomicUsize::new(0));
708+
let response_calls = Arc::new(AtomicUsize::new(0));
709+
let first_error_served = Arc::new(Notify::new());
710+
711+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
712+
let addr = listener.local_addr().unwrap();
713+
let base: http::Uri = format!("http://{addr}").parse().unwrap();
714+
715+
let server_handle = {
716+
let next_calls = next_calls.clone();
717+
let response_calls = response_calls.clone();
718+
let first_error_served = first_error_served.clone();
719+
tokio::spawn(async move {
720+
loop {
721+
let (tcp, _) = match listener.accept().await {
722+
Ok(v) => v,
723+
Err(_) => return,
724+
};
725+
726+
let next_calls = next_calls.clone();
727+
let response_calls = response_calls.clone();
728+
let first_error_served = first_error_served.clone();
729+
let service = service_fn(move |req: Request<Incoming>| {
730+
let next_calls = next_calls.clone();
731+
let response_calls = response_calls.clone();
732+
let first_error_served = first_error_served.clone();
733+
async move {
734+
let (parts, body) = req.into_parts();
735+
let method = parts.method;
736+
let path = parts.uri.path().to_string();
737+
738+
if method == Method::POST {
739+
// Drain request body to support keep-alive.
740+
let _ = body.collect().await;
741+
}
742+
743+
if method == Method::GET && path == "/2018-06-01/runtime/invocation/next" {
744+
let call_index = next_calls.fetch_add(1, Ordering::SeqCst);
745+
match call_index {
746+
// First worker errors (missing request id header).
747+
0 => {
748+
first_error_served.notify_one();
749+
let res = Response::builder()
750+
.status(StatusCode::OK)
751+
.header("lambda-runtime-deadline-ms", "1542409706888")
752+
.body(Full::new(Bytes::from_static(b"{}")))
753+
.unwrap();
754+
return Ok::<_, Infallible>(res);
755+
}
756+
// Second worker should keep running and process an invocation, even if another worker errors.
757+
1 => {
758+
first_error_served.notified().await;
759+
let res = Response::builder()
760+
.status(StatusCode::OK)
761+
.header("content-type", "application/json")
762+
.header("lambda-runtime-aws-request-id", "good-request")
763+
.header("lambda-runtime-deadline-ms", "1542409706888")
764+
.body(Full::new(Bytes::from_static(b"{}")))
765+
.unwrap();
766+
return Ok::<_, Infallible>(res);
767+
}
768+
// Finally, error the remaining worker so the runtime can terminate and the test can assert behavior.
769+
2 => {
770+
let res = Response::builder()
771+
.status(StatusCode::OK)
772+
.header("lambda-runtime-deadline-ms", "1542409706888")
773+
.body(Full::new(Bytes::from_static(b"{}")))
774+
.unwrap();
775+
return Ok::<_, Infallible>(res);
776+
}
777+
_ => {
778+
let res = Response::builder()
779+
.status(StatusCode::NO_CONTENT)
780+
.body(Full::new(Bytes::new()))
781+
.unwrap();
782+
return Ok::<_, Infallible>(res);
783+
}
784+
}
785+
}
786+
787+
if method == Method::POST && path.ends_with("/response") {
788+
response_calls.fetch_add(1, Ordering::SeqCst);
789+
let res = Response::builder()
790+
.status(StatusCode::OK)
791+
.body(Full::new(Bytes::new()))
792+
.unwrap();
793+
return Ok::<_, Infallible>(res);
794+
}
795+
796+
let res = Response::builder()
797+
.status(StatusCode::NOT_FOUND)
798+
.body(Full::new(Bytes::new()))
799+
.unwrap();
800+
Ok::<_, Infallible>(res)
801+
}
802+
});
803+
804+
let io = TokioIo::new(tcp);
805+
tokio::spawn(async move {
806+
if let Err(err) = ServerBuilder::new(TokioExecutor::new())
807+
.serve_connection(io, service)
808+
.await
809+
{
810+
eprintln!("Error serving connection: {err:?}");
811+
}
812+
});
813+
}
814+
})
815+
};
816+
817+
async fn func(event: crate::LambdaEvent<serde_json::Value>) -> Result<serde_json::Value, Error> {
818+
Ok(event.payload)
819+
}
820+
821+
let handler = crate::service_fn(func);
822+
let client = Arc::new(Client::builder().with_endpoint(base).build()?);
823+
let runtime = Runtime {
824+
client: client.clone(),
825+
config: Arc::new(Config {
826+
function_name: "test_fn".to_string(),
827+
memory: 128,
828+
version: "1".to_string(),
829+
log_stream: "test_stream".to_string(),
830+
log_group: "test_log".to_string(),
831+
max_concurrency: Some(2),
832+
}),
833+
service: wrap_handler(handler, client),
834+
};
835+
836+
let res = tokio::time::timeout(Duration::from_secs(2), runtime.run_concurrent()).await;
837+
assert!(res.is_ok(), "run_concurrent timed out");
838+
assert!(
839+
res.unwrap().is_err(),
840+
"expected runtime to terminate once all workers crashed"
841+
);
842+
843+
assert_eq!(
844+
response_calls.load(Ordering::SeqCst),
845+
1,
846+
"expected remaining worker to keep running after a worker crash"
847+
);
848+
849+
server_handle.abort();
850+
Ok(())
851+
}
712852
}

0 commit comments

Comments
 (0)