Skip to content

Commit 5681923

Browse files
committed
add resource for event handlers
1 parent 5ec6fdc commit 5681923

File tree

2 files changed

+917
-0
lines changed

2 files changed

+917
-0
lines changed

src/resource.ts

Lines changed: 389 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,389 @@
1+
import 'reflect-metadata';
2+
import { boundMethod } from 'autobind-decorator';
3+
4+
import { ProgressEvent, SessionProxy } from './proxy';
5+
import { reportProgress } from './callback';
6+
import { BaseHandlerException, InternalFailure, InvalidRequest } from './exceptions';
7+
import {
8+
Action,
9+
BaseResourceModel,
10+
BaseResourceHandlerRequest,
11+
Callable,
12+
Credentials,
13+
HandlerErrorCode,
14+
OperationStatus,
15+
Optional,
16+
Response,
17+
} from './interface';
18+
import { ProviderLogHandler } from './log-delivery';
19+
import { MetricsPublisherProxy } from './metrics';
20+
import { cleanupCloudwatchEvents, rescheduleAfterMinutes } from './scheduler';
21+
import {
22+
Constructor,
23+
HandlerRequest,
24+
LambdaContext,
25+
TestEvent,
26+
UnmodeledRequest,
27+
} from './utils';
28+
29+
const LOGGER = console;
30+
const MUTATING_ACTIONS: [Action, Action, Action] = [Action.Create, Action.Update, Action.Delete];
31+
const INVOCATION_TIMEOUT_MS = 60000;
32+
33+
export type HandlerSignature = Callable<[Optional<SessionProxy>, any, Map<string, any>], Promise<ProgressEvent>>;
34+
export class HandlerSignatures extends Map<Action, HandlerSignature> {};
35+
class HandlerEvents extends Map<Action, string | symbol> {};
36+
37+
/**
38+
* Decorates a method to ensure that the JSON input and output are serialized properly.
39+
*
40+
* @returns {PropertyDescriptor}
41+
*/
42+
function ensureSerialize(target: any, propertyKey: string, descriptor: PropertyDescriptor): PropertyDescriptor {
43+
44+
// Save a reference to the original method this way we keep the values currently in the
45+
// descriptor and don't overwrite what another decorator might have done to the descriptor.
46+
if(descriptor === undefined) {
47+
descriptor = Object.getOwnPropertyDescriptor(target, propertyKey);
48+
}
49+
const originalMethod = descriptor.value;
50+
// Wrapping the original method with new signature.
51+
descriptor.value = async function(event: Object | Map<string, any>, context: any): Promise<any> {
52+
let mappedEvent: Map<string, any>;
53+
if (event instanceof Map) {
54+
mappedEvent = new Map<string, any>(event);
55+
} else {
56+
mappedEvent = new Map<string, any>(Object.entries(event));
57+
}
58+
const progress = await originalMethod.apply(this, [mappedEvent, context]);
59+
// Use the raw event data as a last-ditch attempt to call back if the
60+
// request is invalid.
61+
const serialized = progress.serialize(true, mappedEvent.get('bearerToken'));
62+
return serialized.toObject();
63+
}
64+
return descriptor;
65+
}
66+
67+
/**
68+
* Decorates a method to point to the proper action
69+
*
70+
* @returns {MethodDecorator}
71+
*/
72+
export function handlerEvent(action: Action): MethodDecorator {
73+
return function(target: any, propertyKey: string | symbol, descriptor: PropertyDescriptor): PropertyDescriptor {
74+
if (target instanceof BaseResource) {
75+
const actions: HandlerEvents = Reflect.getMetadata('handlerEvents', target) || new HandlerEvents();
76+
if (!actions.has(action)) {
77+
actions.set(action, propertyKey);
78+
}
79+
Reflect.defineMetadata('handlerEvents', actions, target);
80+
}
81+
if (descriptor) {
82+
return descriptor;
83+
}
84+
}
85+
}
86+
87+
export abstract class BaseResource<T extends BaseResourceModel = BaseResourceModel> {
88+
constructor(
89+
public typeName: string,
90+
private modelCls: Constructor<T>,
91+
private handlers?: HandlerSignatures,
92+
) {
93+
this.typeName = typeName || '';
94+
this.handlers = handlers || new HandlerSignatures();
95+
const actions: HandlerEvents = Reflect.getMetadata('handlerEvents', this) || new HandlerEvents();
96+
actions.forEach((value: string | symbol, key: Action) => {
97+
this.addHandler(key, (this as any)[value]);
98+
});
99+
}
100+
101+
public addHandler = (action: Action, f: HandlerSignature): HandlerSignature => {
102+
this.handlers.set(action, f);
103+
return f;
104+
}
105+
106+
public static scheduleReinvocation = async (
107+
handlerRequest: HandlerRequest,
108+
handlerResponse: ProgressEvent,
109+
context: LambdaContext,
110+
session: SessionProxy,
111+
): Promise<boolean> => {
112+
if (handlerResponse.status !== OperationStatus.InProgress) {
113+
return false;
114+
}
115+
// Modify requestContext dict in-place, so that invoke count is bumped on local
116+
// reinvoke too
117+
const reinvokeContext = handlerRequest.requestContext;
118+
reinvokeContext.invocation = (reinvokeContext.invocation || 0) + 1;
119+
const callbackDelaySeconds = handlerResponse.callbackDelaySeconds;
120+
const remainingMs = context.getRemainingTimeInMillis();
121+
122+
// When a handler requests a sub-minute callback delay, and if the lambda
123+
// invocation has enough runtime (with 20% buffer), we can re-run the handler
124+
// locally otherwise we re-invoke through CloudWatchEvents
125+
const neededMsRemaining = callbackDelaySeconds * 1200 + INVOCATION_TIMEOUT_MS;
126+
if (callbackDelaySeconds < 60 && remainingMs > neededMsRemaining) {
127+
const delay = async (ms: number) => {
128+
await new Promise(r => setTimeout(() => r(), ms));
129+
};
130+
delay(callbackDelaySeconds * 1000);
131+
return true;
132+
}
133+
const callbackDelayMin = Number(callbackDelaySeconds / 60);
134+
rescheduleAfterMinutes(
135+
session,
136+
context.invokedFunctionArn,
137+
callbackDelayMin,
138+
handlerRequest,
139+
);
140+
return false;
141+
}
142+
143+
private invokeHandler = async (
144+
session: Optional<SessionProxy>,
145+
request: BaseResourceHandlerRequest<T>,
146+
action: Action,
147+
callbackContext: Map<string, any>,
148+
): Promise<ProgressEvent> => {
149+
const handle: HandlerSignature = this.handlers.get(action);
150+
if (!handle) {
151+
return ProgressEvent.failed(
152+
HandlerErrorCode.InternalFailure, `No handler for ${action}`
153+
);
154+
}
155+
const progress = await handle(session, request, callbackContext);
156+
const isInProgress = progress.status === OperationStatus.InProgress;
157+
const isMutable = MUTATING_ACTIONS.some(x => x === action);
158+
if (isInProgress && !isMutable) {
159+
throw new InternalFailure('READ and LIST handlers must return synchronously.');
160+
}
161+
return progress;
162+
}
163+
164+
private parseTestRequest = (
165+
eventData: Map<string, any>
166+
): [
167+
Optional<SessionProxy>,
168+
BaseResourceHandlerRequest<T>,
169+
Action,
170+
Map<string, any>,
171+
] => {
172+
let session: SessionProxy;
173+
let request: BaseResourceHandlerRequest<T>;
174+
let action: Action;
175+
let event: TestEvent;
176+
try {
177+
event = new TestEvent(eventData);
178+
const creds = event.credentials as Credentials;
179+
if (!creds) {
180+
throw new Error('Event data is missing required property "credentials".')
181+
}
182+
if (!this.modelCls) {
183+
throw new Error('Missing Model class to be used to deserialize JSON data.')
184+
}
185+
if (event.request instanceof Map) {
186+
event.request = new Map<string, any>(event.request);
187+
} else {
188+
event.request = new Map<string, any>(Object.entries(event.request));
189+
}
190+
request = new UnmodeledRequest(event.request).toModeled<T>(this.modelCls);
191+
192+
session = SessionProxy.getSession(creds, event.region);
193+
action = event.action;
194+
} catch(err) {
195+
LOGGER.error('Invalid request');
196+
throw new InternalFailure(`${err} (${err.name})`);
197+
}
198+
199+
return [session, request, action, event.callbackContext || new Map()];
200+
}
201+
202+
// @ts-ignore
203+
public async testEntrypoint (
204+
eventData: Object | Map<string, any>, context: any
205+
): Promise<Response<BaseResource>>;
206+
@boundMethod
207+
@ensureSerialize
208+
public async testEntrypoint(
209+
eventData: Map<string, any>, context: any
210+
): Promise<ProgressEvent> {
211+
let msg = 'Uninitialized';
212+
let progress: ProgressEvent;
213+
try {
214+
const [ session, request, action, callbackContext ] = this.parseTestRequest(eventData);
215+
progress = await this.invokeHandler(session, request, action, callbackContext);
216+
} catch(err) {
217+
if (err instanceof BaseHandlerException) {
218+
LOGGER.error('Handler error')
219+
progress = err.toProgressEvent();
220+
}
221+
LOGGER.error('Exception caught');
222+
msg = err.message || msg;
223+
progress = ProgressEvent.failed(HandlerErrorCode.InternalFailure, msg);
224+
}
225+
return Promise.resolve(progress);
226+
}
227+
228+
private static parseRequest = (
229+
eventData: Map<string, any>
230+
): [
231+
[Optional<SessionProxy>, Optional<SessionProxy>, SessionProxy],
232+
Action,
233+
Map<string, any>,
234+
HandlerRequest,
235+
] => {
236+
let callerSession: Optional<SessionProxy>;
237+
let platformSession: Optional<SessionProxy>;
238+
let providerSession: SessionProxy;
239+
let action: Action;
240+
let callbackContext: Map<string, any>;
241+
let event: HandlerRequest;
242+
try {
243+
event = HandlerRequest.deserialize(eventData);
244+
if (!event.awsAccountId) {
245+
throw new Error('Event data is missing required property "awsAccountId".')
246+
}
247+
const platformCredentials = event.requestData.platformCredentials;
248+
platformSession = SessionProxy.getSession(platformCredentials);
249+
callerSession = SessionProxy.getSession(event.requestData.callerCredentials);
250+
providerSession = SessionProxy.getSession(event.requestData.providerCredentials);
251+
// Credentials are used when rescheduling, so can't zero them out (for now).
252+
if (!platformSession || !platformCredentials || Object.keys(platformCredentials).length === 0) {
253+
throw new Error('No platform credentials');
254+
}
255+
action = event.action;
256+
callbackContext = event.requestContext.callbackContext || {} as Map<string, any>;
257+
} catch(err) {
258+
LOGGER.error('Invalid request');
259+
throw new InvalidRequest(`${err} (${err.name})`);
260+
}
261+
return [
262+
[callerSession, platformSession, providerSession],
263+
action,
264+
callbackContext,
265+
event,
266+
]
267+
}
268+
269+
private castResourceRequest = (
270+
request: HandlerRequest
271+
): BaseResourceHandlerRequest<T> => {
272+
try {
273+
const unmodeled: UnmodeledRequest = UnmodeledRequest.fromUnmodeled({
274+
clientRequestToken: request.bearerToken,
275+
desiredResourceState: request.requestData.resourceProperties,
276+
previousResourceState: request.requestData.previousResourceProperties,
277+
logicalResourceIdentifier: request.requestData.logicalResourceId,
278+
});
279+
return unmodeled.toModeled<T>(this.modelCls);
280+
} catch(err) {
281+
LOGGER.error('Invalid request');
282+
throw new InvalidRequest(`${err} (${err.name})`);
283+
}
284+
}
285+
286+
// @ts-ignore
287+
public async entrypoint (
288+
eventData: Object | Map<string, any>, context: LambdaContext
289+
): Promise<Response<BaseResource>>;
290+
@boundMethod
291+
@ensureSerialize
292+
public async entrypoint (
293+
eventData: Map<string, any>, context: LambdaContext
294+
): Promise<ProgressEvent> {
295+
296+
let isLogSetup: boolean = false;
297+
let progress: ProgressEvent;
298+
299+
const printOrLog = (message: string): void => {
300+
if (isLogSetup) {
301+
LOGGER.error(message);
302+
} else {
303+
console.log(message);
304+
console.trace();
305+
}
306+
}
307+
308+
try {
309+
const [sessions, action, callback, event] = BaseResource.parseRequest(eventData);
310+
const [callerSession, platformSession, providerSession] = sessions;
311+
ProviderLogHandler.setup(event, providerSession);
312+
isLogSetup = true;
313+
314+
const request = this.castResourceRequest(event);
315+
316+
const metrics = new MetricsPublisherProxy(event.awsAccountId, event.resourceType);
317+
metrics.addMetricsPublisher(platformSession);
318+
metrics.addMetricsPublisher(callerSession);
319+
// Acknowledge the task for first time invocation.
320+
if (!event.requestContext || Object.keys(event.requestContext).length === 0) {
321+
await reportProgress({
322+
session: platformSession,
323+
bearerToken: event.bearerToken,
324+
errorCode: null,
325+
operationStatus: OperationStatus.InProgress,
326+
currentOperationStatus: OperationStatus.Pending,
327+
resourceModel: null,
328+
message: '',
329+
});
330+
} else {
331+
// If this invocation was triggered by a 're-invoke' CloudWatch Event,
332+
// clean it up.
333+
cleanupCloudwatchEvents(
334+
platformSession,
335+
event.requestContext.cloudWatchEventsRuleName || '',
336+
event.requestContext.cloudWatchEventsTargetId || '',
337+
);
338+
}
339+
let invoke: boolean = true;
340+
while (invoke) {
341+
const startTime = new Date(Date.now());
342+
metrics.publishInvocationMetric(startTime, action);
343+
let error: Error;
344+
try {
345+
progress = await this.invokeHandler(
346+
callerSession, request, action, callback
347+
);
348+
} catch(err) {
349+
error = err;
350+
}
351+
const endTime = new Date(Date.now());
352+
const milliseconds: number = endTime.getTime() - startTime.getTime();
353+
metrics.publishDurationMetric(endTime, action, milliseconds);
354+
if (error) {
355+
metrics.publishExceptionMetric(new Date(Date.now()), action, error);
356+
throw error;
357+
}
358+
if (progress.callbackContext) {
359+
const callback = progress.callbackContext;
360+
event.requestContext.callbackContext = callback;
361+
}
362+
if (MUTATING_ACTIONS.includes(event.action)) {
363+
await reportProgress({
364+
session: platformSession,
365+
bearerToken: event.bearerToken,
366+
errorCode: progress.errorCode,
367+
operationStatus: progress.status,
368+
currentOperationStatus: OperationStatus.InProgress,
369+
resourceModel: progress.resourceModel,
370+
message: progress.message,
371+
});
372+
}
373+
invoke = await BaseResource.scheduleReinvocation(
374+
event, progress, context, platformSession
375+
);
376+
}
377+
} catch(err) {
378+
if (err instanceof BaseHandlerException) {
379+
printOrLog('Handler error');
380+
progress = err.toProgressEvent();
381+
} else {
382+
printOrLog('Exception caught');
383+
progress = ProgressEvent.failed(HandlerErrorCode.InternalFailure, err.message);
384+
}
385+
}
386+
387+
return progress;
388+
}
389+
}

0 commit comments

Comments
 (0)