Skip to content

Commit 726cd87

Browse files
committed
add log delivery mechanism
1 parent 823ec6e commit 726cd87

File tree

2 files changed

+429
-0
lines changed

2 files changed

+429
-0
lines changed

src/log-delivery.ts

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
import { boundMethod } from 'autobind-decorator'
2+
import { EventEmitter } from 'events';
3+
import CloudWatchLogs, {
4+
InputLogEvent,
5+
PutLogEventsRequest,
6+
PutLogEventsResponse,
7+
} from 'aws-sdk/clients/cloudwatchlogs';
8+
9+
import {
10+
SessionProxy,
11+
} from './proxy';
12+
import { HandlerRequest } from './utils';
13+
14+
15+
type Console = globalThis.Console;
16+
17+
interface ILogOptions {
18+
groupName: string,
19+
stream: string,
20+
session: SessionProxy,
21+
logger?: Console,
22+
}
23+
24+
class LogEmitter extends EventEmitter {}
25+
26+
export class ProviderLogHandler {
27+
private static instance: ProviderLogHandler;
28+
private emitter: LogEmitter;
29+
public client: CloudWatchLogs;
30+
public sequenceToken: string;
31+
public groupName: string;
32+
public stream: string;
33+
private logger: Console;
34+
35+
/**
36+
* The ProviderLogHandler's constructor should always be private to prevent direct
37+
* construction calls with the `new` operator.
38+
*/
39+
private constructor(options: ILogOptions) {
40+
this.stream = options.stream.replace(':', '__');
41+
this.client = options.session.client('CloudWatchLogs') as CloudWatchLogs;
42+
this.sequenceToken = '';
43+
this.logger = options.logger || global.console;
44+
// Attach the logger methods to localized event emitter.
45+
const emitter = new LogEmitter();
46+
this.emitter = emitter;
47+
emitter.on('log', this.logListener);
48+
// Create maps of each logger Function and then alias that.
49+
Object.entries(this.logger).forEach(([key, val]) => {
50+
if (typeof val === 'function') {
51+
if (['log', 'error', 'warn', 'info'].includes(key)) {
52+
this.logger[key as 'log' | 'error' | 'warn' | 'info'] = function() {
53+
// Calls the logger method.
54+
val.apply(this, arguments);
55+
// For adding other event watchers later.
56+
emitter.emit('log', arguments);
57+
};
58+
}
59+
}
60+
});
61+
}
62+
63+
/**
64+
* The static method that controls the access to the singleton instance.
65+
*
66+
* This implementation let you subclass the ProviderLogHandler class while keeping
67+
* just one instance of each subclass around.
68+
*/
69+
public static getInstance(): ProviderLogHandler {
70+
if (!ProviderLogHandler.instance) {
71+
return null;
72+
}
73+
return ProviderLogHandler.instance;
74+
}
75+
76+
public static setup(
77+
request: HandlerRequest, providerSession?: SessionProxy
78+
): void {
79+
const logGroup: string = request.requestData?.providerLogGroupName;
80+
let streamName: string = `${request.awsAccountId}-${request.region}`;
81+
if (request.stackId && request.requestData?.logicalResourceId) {
82+
streamName = `${request.stackId}/${request.requestData.logicalResourceId}`;
83+
}
84+
let logHandler = ProviderLogHandler.getInstance();
85+
if (providerSession && logGroup) {
86+
if (logHandler) {
87+
// This is a re-used lambda container, log handler is already setup, so
88+
// we just refresh the client with new creds.
89+
logHandler.client = providerSession.client('CloudWatchLogs') as CloudWatchLogs;
90+
} else {
91+
// Filter provider messages from platform.
92+
const provider: string = request.resourceType.replace('::', '_').toLowerCase();
93+
logHandler = ProviderLogHandler.instance = new ProviderLogHandler({
94+
groupName: logGroup,
95+
stream: streamName,
96+
session: providerSession,
97+
});
98+
}
99+
}
100+
}
101+
102+
private async createLogGroup(): Promise<void> {
103+
try {
104+
await this.client.createLogGroup({
105+
logGroupName: this.groupName,
106+
}).promise();
107+
} catch(err) {
108+
if (err.code !== 'ResourceAlreadyExistsException') {
109+
throw err;
110+
}
111+
}
112+
}
113+
114+
private async createLogStream(): Promise<void> {
115+
try {
116+
await this.client.createLogStream({
117+
logGroupName: this.groupName,
118+
logStreamName: this.stream,
119+
}).promise();
120+
} catch(err) {
121+
if (err.code !== 'ResourceAlreadyExistsException') {
122+
throw err;
123+
}
124+
}
125+
}
126+
127+
private async putLogEvent(record: InputLogEvent): Promise<void> {
128+
if (!record.timestamp) {
129+
const currentTime = new Date(Date.now());
130+
record.timestamp = Math.round(currentTime.getTime());
131+
}
132+
const logEventsParams: PutLogEventsRequest = {
133+
logGroupName: this.groupName,
134+
logStreamName: this.stream,
135+
logEvents: [ record ],
136+
};
137+
if (this.sequenceToken) {
138+
logEventsParams.sequenceToken = this.sequenceToken;
139+
}
140+
try {
141+
const response: PutLogEventsResponse = await this.client.putLogEvents(logEventsParams).promise();
142+
this.sequenceToken = response.nextSequenceToken;
143+
} catch(err) {
144+
if (err.code === 'DataAlreadyAcceptedException' || err.code === 'InvalidSequenceTokenException') {
145+
this.sequenceToken = (err.message || '').split(' ')[0];
146+
this.putLogEvent(record);
147+
}
148+
}
149+
}
150+
151+
@boundMethod
152+
logListener(...args: any[]): void {
153+
const currentTime = new Date(Date.now());
154+
const record: InputLogEvent = {
155+
message: JSON.stringify(args[0]),
156+
timestamp: Math.round(currentTime.getTime()),
157+
}
158+
try {
159+
this.putLogEvent(record);
160+
} catch(err) {
161+
if (err.message.includes('log group does not exist')) {
162+
this.createLogGroup();
163+
}
164+
this.createLogStream();
165+
this.putLogEvent(record);
166+
}
167+
}
168+
}

0 commit comments

Comments
 (0)