|
1 | | -use crate::{http::header::SET_COOKIE, request::LambdaRequest, tower::ServiceBuilder, Request, RequestExt}; |
| 1 | +use crate::{http::header::SET_COOKIE, request::LambdaRequest, Request, RequestExt}; |
2 | 2 | use bytes::Bytes; |
3 | | -pub use http::{self, Response}; |
4 | | -use http_body::Body; |
5 | | -pub use lambda_runtime::{ |
6 | | - self, |
7 | | - tower::{ |
8 | | - util::{MapRequest, MapResponse}, |
9 | | - ServiceExt, |
10 | | - }, |
11 | | - Error, LambdaEvent, MetadataPrelude, Service, StreamResponse, |
12 | | -}; |
13 | | -use lambda_runtime::{tower::util::BoxService, Diagnostic}; |
14 | | -use std::{ |
| 3 | +use core::{ |
15 | 4 | fmt::Debug, |
| 5 | + future::Future, |
16 | 6 | pin::Pin, |
17 | 7 | task::{Context, Poll}, |
18 | 8 | }; |
| 9 | +pub use http::{self, Response}; |
| 10 | +use http_body::Body; |
| 11 | +use lambda_runtime::Diagnostic; |
| 12 | +pub use lambda_runtime::{Error, LambdaEvent, MetadataPrelude, Service, StreamResponse}; |
| 13 | +use std::marker::PhantomData; |
19 | 14 | use tokio_stream::Stream; |
20 | 15 |
|
21 | | -/// Runs the Lambda runtime with a handler that returns **streaming** HTTP |
| 16 | +/// An adapter that lifts a standard [`Service<Request>`] into a |
| 17 | +/// [`Service<LambdaEvent<LambdaRequest>>`] which produces streaming Lambda HTTP |
22 | 18 | /// responses. |
23 | | -pub fn into_streaming_response<'a, S, B, E>( |
24 | | - handler: S, |
25 | | -) -> BoxService<LambdaEvent<LambdaRequest>, StreamResponse<BodyStream<B>>, E> |
| 19 | +pub struct StreamAdapter<'a, S, B> { |
| 20 | + service: S, |
| 21 | + _phantom_data: PhantomData<&'a B>, |
| 22 | +} |
| 23 | + |
| 24 | +impl<'a, S, B, E> From<S> for StreamAdapter<'a, S, B> |
26 | 25 | where |
27 | | - S: Service<Request, Response = Response<B>, Error = E> + Send + 'static, |
| 26 | + S: Service<Request, Response = Response<B>, Error = E>, |
28 | 27 | S::Future: Send + 'a, |
29 | | - E: Debug + Into<Diagnostic> + 'static, |
| 28 | + E: Debug + Into<Diagnostic>, |
30 | 29 | B: Body + Unpin + Send + 'static, |
31 | 30 | B::Data: Into<Bytes> + Send, |
32 | 31 | B::Error: Into<Error> + Send + Debug, |
33 | 32 | { |
34 | | - into_streaming_response_inner::<S, B, E>(handler).boxed() |
| 33 | + fn from(service: S) -> Self { |
| 34 | + StreamAdapter { |
| 35 | + service, |
| 36 | + _phantom_data: PhantomData, |
| 37 | + } |
| 38 | + } |
35 | 39 | } |
36 | 40 |
|
37 | | -#[allow(clippy::type_complexity)] |
38 | | -fn into_streaming_response_inner<'a, S, B, E>( |
39 | | - handler: S, |
40 | | -) -> MapResponse< |
41 | | - MapRequest<S, impl FnMut(LambdaEvent<LambdaRequest>) -> Request>, |
42 | | - impl FnOnce(Response<B>) -> StreamResponse<BodyStream<B>> + Clone, |
43 | | -> |
| 41 | +impl<'a, S, B, E> Service<LambdaEvent<LambdaRequest>> for StreamAdapter<'a, S, B> |
44 | 42 | where |
45 | 43 | S: Service<Request, Response = Response<B>, Error = E>, |
46 | 44 | S::Future: Send + 'a, |
47 | | - E: Debug + Into<Diagnostic>, |
48 | | - B: Body + Unpin + Send + 'static, |
| 45 | + B: Body + Send + 'static, |
49 | 46 | B::Data: Into<Bytes> + Send, |
50 | 47 | B::Error: Into<Error> + Send + Debug, |
| 48 | + E: Debug + Into<Diagnostic>, |
51 | 49 | { |
52 | | - ServiceBuilder::new() |
53 | | - .map_request(|req: LambdaEvent<LambdaRequest>| { |
54 | | - let event: Request = req.payload.into(); |
55 | | - event.with_lambda_context(req.context) |
56 | | - }) |
57 | | - .service(handler) |
58 | | - .map_response(|res: Response<B>| { |
59 | | - let (parts, body) = res.into_parts(); |
| 50 | + type Response = StreamResponse<BodyStream<B>>; |
| 51 | + type Error = E; |
| 52 | + type Future = Pin<Box<dyn Future<Output = Result<Self::Response, E>> + Send + 'a>>; |
60 | 53 |
|
61 | | - let mut prelude_headers = parts.headers; |
| 54 | + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| 55 | + self.service.poll_ready(cx) |
| 56 | + } |
62 | 57 |
|
63 | | - let cookies = prelude_headers |
| 58 | + fn call(&mut self, req: LambdaEvent<LambdaRequest>) -> Self::Future { |
| 59 | + let event: Request = req.payload.into(); |
| 60 | + let fut = self.service.call(event.with_lambda_context(req.context)); |
| 61 | + Box::pin(async move { |
| 62 | + let res = fut.await?; |
| 63 | + let (parts, body) = res.into_parts(); |
| 64 | + |
| 65 | + let mut headers = parts.headers; |
| 66 | + let cookies = headers |
64 | 67 | .get_all(SET_COOKIE) |
65 | 68 | .iter() |
66 | 69 | .map(|c| String::from_utf8_lossy(c.as_bytes()).to_string()) |
67 | | - .collect::<Vec<String>>(); |
68 | | - |
69 | | - prelude_headers.remove(SET_COOKIE); |
| 70 | + .collect::<Vec<_>>(); |
| 71 | + headers.remove(SET_COOKIE); |
70 | 72 |
|
71 | | - let metadata_prelude = MetadataPrelude { |
72 | | - headers: prelude_headers, |
73 | | - status_code: parts.status, |
74 | | - cookies, |
75 | | - }; |
76 | | - |
77 | | - StreamResponse { |
78 | | - metadata_prelude, |
| 73 | + Ok(StreamResponse { |
| 74 | + metadata_prelude: MetadataPrelude { |
| 75 | + headers, |
| 76 | + status_code: parts.status, |
| 77 | + cookies, |
| 78 | + }, |
79 | 79 | stream: BodyStream { body }, |
80 | | - } |
| 80 | + }) |
81 | 81 | }) |
| 82 | + } |
82 | 83 | } |
83 | 84 |
|
84 | 85 | /// Runs the Lambda runtime with a handler that returns **streaming** HTTP |
|
97 | 98 | B::Data: Into<Bytes> + Send, |
98 | 99 | B::Error: Into<Error> + Send + Debug, |
99 | 100 | { |
100 | | - let svc = into_streaming_response_inner(handler); |
101 | | - lambda_runtime::run(svc).await |
| 101 | + lambda_runtime::run(StreamAdapter::from(handler)).await |
102 | 102 | } |
103 | 103 |
|
104 | 104 | pin_project_lite::pin_project! { |
|
0 commit comments