-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathlambda_handler.py
More file actions
120 lines (85 loc) · 3.46 KB
/
lambda_handler.py
File metadata and controls
120 lines (85 loc) · 3.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import json
from typing import Any
from urllib.parse import parse_qs
from apim_mock.auth_check import check_authenticated
from apim_mock.handler import handle_request as handle_apim_request
from apim_mock.logging import get_logger
from aws_lambda_powertools.event_handler import (
APIGatewayHttpResolver,
Response,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from jwt.exceptions import InvalidTokenError
from pdm_mock.handler import pdm_routes
_logger = get_logger(__name__)
app = APIGatewayHttpResolver()
app.include_router(pdm_routes)
def _with_default_headers(status_code: int, body: str) -> Response[str]:
return Response(
status_code=status_code,
headers={"Content-Type": "application/json"},
body=body,
)
###### Health Checks ######
@app.get("/_status")
def status() -> Response[str]:
_logger.debug("Status check endpoint called")
return Response(status_code=200, body="OK", headers={"Content-Type": "text/plain"})
@app.get("/")
def root() -> Response[str]:
"""Handler for the preview environment. This simply returns a 200 response with
the request headers, which can be used to verify and debug how the environment
is working and to inspect the incoming request.
Returns:
dict: Diagnostic 200 response with request headers.
"""
event = app.current_event
context = app.append_context
_logger.info("Lambda context: %s", context)
headers = event.get("headers", {}) or {}
# Log headers to CloudWatch
_logger.info("Incoming request headers:")
for k, v in headers.items():
_logger.info("%s: %s", k, v)
response_body = {
"message": "ok",
"headers": headers,
"requestContext": event.get("requestContext", {}),
}
return _with_default_headers(200, body=json.dumps(response_body, indent=2))
##### APIM Mock #####
@app.post("/apim/oauth2/token")
def post_auth() -> Response[str]:
_logger.debug("Authentication Mock called")
payload = app.current_event.decoded_body
if not payload:
_logger.error("No payload provided.")
return Response(status_code=400, body="Bad Request")
parsed_payload: dict[str, Any] = parse_qs(payload)
_logger.debug("Payload received %s", parsed_payload)
try:
response = handle_apim_request(parsed_payload)
except InvalidTokenError as err:
_logger.error("expected exception %s", err)
_logger.error("Type %s", type(err))
error_body = {"error": "invalid_request", "error_description": str(err)}
return _with_default_headers(status_code=400, body=json.dumps(error_body))
except ValueError as err:
_logger.error("expected exception %s", err)
error_body = {"error": "invalid_request", "error_description": str(err)}
return _with_default_headers(status_code=400, body=json.dumps(error_body))
except Exception as err:
_logger.error("unexpected exception %s", err)
_logger.error("Type %s", type(err))
return _with_default_headers(status_code=500, body="Internal Server Error")
return _with_default_headers(
status_code=200,
body=json.dumps(response),
)
def auth_check() -> bool:
headers = app.current_event.headers
token = headers.get("Authorization", "").replace("Bearer ", "")
return check_authenticated(token)
##########
def handler(data: dict[str, Any], context: LambdaContext) -> dict[str, Any]:
return app.resolve(data, context)