Skip to content

Commit 868ac38

Browse files
feat(runtime,http): add concurrent entrypoints without changing run
1 parent 7b68388 commit 868ac38

File tree

4 files changed

+132
-38
lines changed

4 files changed

+132
-38
lines changed

lambda-http/src/lib.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,9 @@ where
165165

166166
impl<'a, R, S, E> From<S> for Adapter<'a, R, S>
167167
where
168-
S: Service<Request, Response = R, Error = E> + Clone + Send + 'static,
169-
S::Future: Send + 'static,
170-
R: IntoResponse + Send + Sync + 'static,
168+
S: Service<Request, Response = R, Error = E>,
169+
S::Future: Send + 'a,
170+
R: IntoResponse,
171171
{
172172
fn from(service: S) -> Self {
173173
Adapter {
@@ -205,14 +205,32 @@ where
205205
///
206206
/// This takes care of transforming the LambdaEvent into a [`Request`] and then
207207
/// converting the result into a `LambdaResponse`.
208-
pub async fn run<R, S, E>(handler: S) -> Result<(), Error>
208+
pub async fn run<'a, R, S, E>(handler: S) -> Result<(), Error>
209+
where
210+
S: Service<Request, Response = R, Error = E>,
211+
S::Future: Send + 'a,
212+
R: IntoResponse,
213+
E: std::fmt::Debug + Into<Diagnostic>,
214+
{
215+
lambda_runtime::run(Adapter::from(handler)).await
216+
}
217+
218+
/// Starts the Lambda Rust runtime in a mode that is compatible with
219+
/// Lambda Managed Instances (concurrent invocations).
220+
///
221+
/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
222+
/// will use a concurrent `/next` polling loop with a bounded number of
223+
/// in-flight handler tasks. When the environment variable is unset or `<= 1`,
224+
/// it falls back to the same sequential behavior as [`run`], so the same
225+
/// handler can run on both classic Lambda and Lambda Managed Instances.
226+
pub async fn run_concurrent<R, S, E>(handler: S) -> Result<(), Error>
209227
where
210228
S: Service<Request, Response = R, Error = E> + Clone + Send + 'static,
211229
S::Future: Send + 'static,
212230
R: IntoResponse + Send + Sync + 'static,
213231
E: std::fmt::Debug + Into<Diagnostic> + Send + 'static,
214232
{
215-
lambda_runtime::run(Adapter::from(handler)).await
233+
lambda_runtime::run_concurrent(Adapter::from(handler)).await
216234
}
217235

218236
#[cfg(test)]

lambda-http/src/streaming.rs

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use futures_util::{Stream, TryFutureExt};
99
pub use http::{self, Response};
1010
use http_body::Body;
1111
use lambda_runtime::{
12-
tower::{util::BoxCloneService, ServiceBuilder, ServiceExt},
12+
tower::{
13+
util::{BoxCloneService, MapRequest, MapResponse},
14+
ServiceBuilder, ServiceExt,
15+
},
1316
Diagnostic,
1417
};
1518
pub use lambda_runtime::{Error, LambdaEvent, MetadataPrelude, Service, StreamResponse};
@@ -24,22 +27,10 @@ pub struct StreamAdapter<'a, S, B> {
2427
_phantom_data: PhantomData<&'a B>,
2528
}
2629

27-
impl<'a, S, B> Clone for StreamAdapter<'a, S, B>
28-
where
29-
S: Clone,
30-
{
31-
fn clone(&self) -> Self {
32-
Self {
33-
service: self.service.clone(),
34-
_phantom_data: PhantomData,
35-
}
36-
}
37-
}
38-
3930
impl<'a, S, B, E> From<S> for StreamAdapter<'a, S, B>
4031
where
41-
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,
42-
S::Future: Send + 'static,
32+
S: Service<Request, Response = Response<B>, Error = E>,
33+
S::Future: Send + 'a,
4334
B: Body + Unpin + Send + 'static,
4435
B::Data: Into<Bytes> + Send,
4536
B::Error: Into<Error> + Send + Debug,
@@ -54,15 +45,15 @@ where
5445

5546
impl<'a, S, B, E> Service<LambdaEvent<LambdaRequest>> for StreamAdapter<'a, S, B>
5647
where
57-
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,
58-
S::Future: Send + 'static,
48+
S: Service<Request, Response = Response<B>, Error = E>,
49+
S::Future: Send + 'a,
5950
B: Body + Unpin + Send + 'static,
6051
B::Data: Into<Bytes> + Send,
6152
B::Error: Into<Error> + Send + Debug,
6253
{
6354
type Response = StreamResponse<BodyStream<B>>;
6455
type Error = E;
65-
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
56+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'a>>;
6657

6758
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
6859
self.service.poll_ready(cx)
@@ -87,7 +78,31 @@ where
8778
/// Used internally by [`run_with_streaming_response`]; not part of the public
8879
/// API.
8980
#[allow(clippy::type_complexity)]
90-
fn into_stream_service<S, B, E>(
81+
fn into_stream_service<'a, S, B, E>(
82+
handler: S,
83+
) -> MapResponse<
84+
MapRequest<S, impl FnMut(LambdaEvent<LambdaRequest>) -> Request>,
85+
impl FnOnce(Response<B>) -> StreamResponse<BodyStream<B>> + Clone,
86+
>
87+
where
88+
S: Service<Request, Response = Response<B>, Error = E>,
89+
S::Future: Send + 'a,
90+
E: Debug + Into<Diagnostic>,
91+
B: Body + Unpin + Send + 'static,
92+
B::Data: Into<Bytes> + Send,
93+
B::Error: Into<Error> + Send + Debug,
94+
{
95+
ServiceBuilder::new()
96+
.map_request(event_to_request as fn(LambdaEvent<LambdaRequest>) -> Request)
97+
.service(handler)
98+
.map_response(into_stream_response)
99+
}
100+
101+
/// Builds a streaming-aware Tower service from a `Service<Request>` that can be
102+
/// cloned and sent across tasks. This is used by the concurrent HTTP entrypoint.
103+
#[allow(dead_code)]
104+
#[allow(clippy::type_complexity)]
105+
fn into_stream_service_boxed<S, B, E>(
91106
handler: S,
92107
) -> BoxCloneService<LambdaEvent<LambdaRequest>, StreamResponse<BodyStream<B>>, E>
93108
where
@@ -145,7 +160,26 @@ fn event_to_request(req: LambdaEvent<LambdaRequest>) -> Request {
145160
///
146161
/// [AWS docs for response streaming]:
147162
/// https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html
148-
pub async fn run_with_streaming_response<S, B, E>(handler: S) -> Result<(), Error>
163+
pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
164+
where
165+
S: Service<Request, Response = Response<B>, Error = E>,
166+
S::Future: Send + 'a,
167+
E: Debug + Into<Diagnostic>,
168+
B: Body + Unpin + Send + 'static,
169+
B::Data: Into<Bytes> + Send,
170+
B::Error: Into<Error> + Send + Debug,
171+
{
172+
lambda_runtime::run(into_stream_service(handler)).await
173+
}
174+
175+
/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
176+
/// responses, in a mode that is compatible with Lambda Managed Instances.
177+
///
178+
/// This uses a cloneable, boxed service internally so it can be driven by the
179+
/// concurrent runtime. When `AWS_LAMBDA_MAX_CONCURRENCY` is not set or `<= 1`,
180+
/// it falls back to the same sequential behavior as [`run_with_streaming_response`].
181+
#[allow(dead_code)]
182+
pub async fn run_with_streaming_response_concurrent<S, B, E>(handler: S) -> Result<(), Error>
149183
where
150184
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,
151185
S::Future: Send + 'static,
@@ -154,7 +188,7 @@ where
154188
B::Data: Into<Bytes> + Send,
155189
B::Error: Into<Error> + Send + Debug,
156190
{
157-
lambda_runtime::run(into_stream_service(handler)).await
191+
lambda_runtime::run_concurrent(into_stream_service_boxed(handler)).await
158192
}
159193

160194
pin_project_lite::pin_project! {

lambda-runtime/src/lib.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,30 @@ where
126126
/// }
127127
/// ```
128128
pub async fn run<A, F, R, B, S, D, E>(handler: F) -> Result<(), Error>
129+
where
130+
F: Service<LambdaEvent<A>, Response = R>,
131+
F::Future: Future<Output = Result<R, F::Error>>,
132+
F::Error: Into<Diagnostic> + fmt::Debug,
133+
A: for<'de> Deserialize<'de>,
134+
R: IntoFunctionResponse<B, S>,
135+
B: Serialize,
136+
S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
137+
D: Into<bytes::Bytes> + Send,
138+
E: Into<Error> + Send + Debug,
139+
{
140+
let runtime = Runtime::new(handler).layer(layers::TracingLayer::new());
141+
runtime.run().await
142+
}
143+
144+
/// Starts the Lambda Rust runtime in a mode that is compatible with
145+
/// Lambda Managed Instances (concurrent invocations).
146+
///
147+
/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
148+
/// will use a concurrent `/next` polling loop with a bounded number of
149+
/// in-flight handler tasks. When the environment variable is unset or `<= 1`,
150+
/// it falls back to the same sequential behavior as [`run`], so the same
151+
/// handler can run on both classic Lambda and Lambda Managed Instances.
152+
pub async fn run_concurrent<A, F, R, B, S, D, E>(handler: F) -> Result<(), Error>
129153
where
130154
F: Service<LambdaEvent<A>, Response = R> + Clone + Send + 'static,
131155
F::Future: Future<Output = Result<R, F::Error>> + Send + 'static,
@@ -138,7 +162,7 @@ where
138162
E: Into<Error> + Send + Debug + 'static,
139163
{
140164
let runtime = Runtime::new(handler).layer(layers::TracingLayer::new());
141-
runtime.run().await
165+
runtime.run_concurrent().await
142166
}
143167

144168
/// Spawns a task that will be execute a provided async closure when the process

lambda-runtime/src/runtime.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,12 @@ impl<F, EventPayload, Response, BufferedResponse, StreamingResponse, StreamItem,
8282
>,
8383
>
8484
where
85-
F: Service<LambdaEvent<EventPayload>, Response = Response> + Send + Clone + 'static,
86-
F::Future: Future<Output = Result<Response, F::Error>> + Send + 'static,
85+
F: Service<LambdaEvent<EventPayload>, Response = Response>,
86+
F::Future: Future<Output = Result<Response, F::Error>>,
8787
F::Error: Into<Diagnostic> + Debug,
88-
EventPayload: for<'de> Deserialize<'de> + Send + 'static,
89-
Response: IntoFunctionResponse<BufferedResponse, StreamingResponse> + Send + 'static,
90-
BufferedResponse: Serialize + Send + 'static,
88+
EventPayload: for<'de> Deserialize<'de>,
89+
Response: IntoFunctionResponse<BufferedResponse, StreamingResponse>,
90+
BufferedResponse: Serialize,
9191
StreamingResponse: Stream<Item = Result<StreamItem, StreamError>> + Unpin + Send + 'static,
9292
StreamItem: Into<bytes::Bytes> + Send,
9393
StreamError: Into<BoxError> + Send + Debug,
@@ -160,19 +160,23 @@ where
160160
S: Service<LambdaInvocation, Response = (), Error = BoxError> + Clone + Send + 'static,
161161
S::Future: Send,
162162
{
163-
/// Start the runtime and begin polling for events on the Lambda Runtime API.
164-
pub async fn run(self) -> Result<(), BoxError> {
163+
/// Start the runtime in concurrent mode when configured for Lambda managed-concurrency.
164+
///
165+
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is not set or is `<= 1`, this falls back to the
166+
/// sequential `run_with_incoming` loop so that the same handler can run on both
167+
/// classic Lambda and Lambda Managed Instances.
168+
pub async fn run_concurrent(self) -> Result<(), BoxError> {
165169
if self.config.is_concurrent() {
166170
let max_concurrency = self.config.max_concurrency.unwrap_or(1);
167-
Self::run_concurrent(self.service, self.config, self.client, max_concurrency).await
171+
Self::run_concurrent_inner(self.service, self.config, self.client, max_concurrency).await
168172
} else {
169173
let incoming = incoming(&self.client);
170174
Self::run_with_incoming(self.service, self.config, incoming).await
171175
}
172176
}
173177

174178
/// Concurrent processing using windowed long-polls (for Lambda managed-concurrency).
175-
async fn run_concurrent(
179+
async fn run_concurrent_inner(
176180
service: S,
177181
config: Arc<Config>,
178182
client: Arc<ApiClient>,
@@ -301,32 +305,46 @@ impl<S> Runtime<S>
301305
where
302306
S: Service<LambdaInvocation, Response = (), Error = BoxError>,
303307
{
308+
/// Start the runtime and begin polling for events on the Lambda Runtime API.
309+
pub async fn run(self) -> Result<(), BoxError> {
310+
let incoming = incoming(&self.client);
311+
Self::run_with_incoming(self.service, self.config, incoming).await
312+
}
313+
304314
/// Internal utility function to start the runtime with a customized incoming stream.
305-
/// This implements the sequential mode of the runtime.
315+
/// This implements the core of the [Runtime::run] method.
306316
pub(crate) async fn run_with_incoming(
307317
mut service: S,
308318
config: Arc<Config>,
309319
incoming: impl Stream<Item = Result<http::Response<hyper::body::Incoming>, BoxError>> + Send,
310320
) -> Result<(), BoxError> {
311321
tokio::pin!(incoming);
312-
while let Some(next_event_response) = tokio_stream::StreamExt::next(&mut incoming).await {
322+
while let Some(next_event_response) = incoming.next().await {
313323
trace!("New event arrived (run loop)");
314324
let event = next_event_response?;
315325
let (parts, incoming) = event.into_parts();
316326

317327
#[cfg(debug_assertions)]
318328
if parts.status == http::StatusCode::NO_CONTENT {
329+
// Ignore the event if the status code is 204.
330+
// This is a way to keep the runtime alive when
331+
// there are no events pending to be processed.
319332
continue;
320333
}
321334

335+
// Build the invocation such that it can be sent to the service right away
336+
// when it is ready
322337
let body = incoming.collect().await?.to_bytes();
323338
let context = Context::new(invoke_request_id(&parts.headers)?, config.clone(), &parts.headers)?;
324339
let invocation = LambdaInvocation { parts, body, context };
325340

326341
// Setup Amazon's default tracing data
327342
amzn_trace_env(&invocation.context);
328343

344+
// Wait for service to be ready
329345
let ready = service.ready().await?;
346+
347+
// Once ready, call the service which will respond to the Lambda runtime API
330348
ready.call(invocation).await?;
331349
}
332350
Ok(())

0 commit comments

Comments
 (0)