Skip to content

Commit ffc49db

Browse files
committed
update log delivery with async calls
1 parent dedc050 commit ffc49db

File tree

2 files changed

+79
-60
lines changed

2 files changed

+79
-60
lines changed

src/log-delivery.ts

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
1-
import { boundMethod } from 'autobind-decorator'
1+
import { boundMethod } from 'autobind-decorator';
22
import { EventEmitter } from 'events';
33
import CloudWatchLogs, {
44
InputLogEvent,
55
PutLogEventsRequest,
66
PutLogEventsResponse,
77
} from 'aws-sdk/clients/cloudwatchlogs';
88

9-
import {
10-
SessionProxy,
11-
} from './proxy';
9+
import { SessionProxy } from './proxy';
1210
import { HandlerRequest } from './utils';
1311

1412

@@ -37,14 +35,15 @@ export class ProviderLogHandler {
3735
* construction calls with the `new` operator.
3836
*/
3937
private constructor(options: ILogOptions) {
38+
this.groupName = options.groupName;
4039
this.stream = options.stream.replace(':', '__');
4140
this.client = options.session.client('CloudWatchLogs') as CloudWatchLogs;
4241
this.sequenceToken = '';
4342
this.logger = options.logger || global.console;
4443
// Attach the logger methods to localized event emitter.
4544
const emitter = new LogEmitter();
4645
this.emitter = emitter;
47-
emitter.on('log', this.logListener);
46+
this.emitter.on('log', this.logListener);
4847
// Create maps of each logger Function and then alias that.
4948
Object.entries(this.logger).forEach(([key, val]) => {
5049
if (typeof val === 'function') {
@@ -105,7 +104,8 @@ export class ProviderLogHandler {
105104
logGroupName: this.groupName,
106105
}).promise();
107106
} catch(err) {
108-
if (err.code !== 'ResourceAlreadyExistsException') {
107+
const errorCode = err.code || err.name;
108+
if (errorCode !== 'ResourceAlreadyExistsException') {
109109
throw err;
110110
}
111111
}
@@ -118,13 +118,14 @@ export class ProviderLogHandler {
118118
logStreamName: this.stream,
119119
}).promise();
120120
} catch(err) {
121-
if (err.code !== 'ResourceAlreadyExistsException') {
121+
const errorCode = err.code || err.name;
122+
if (errorCode !== 'ResourceAlreadyExistsException') {
122123
throw err;
123124
}
124125
}
125126
}
126127

127-
private async putLogEvent(record: InputLogEvent): Promise<void> {
128+
private async putLogEvents(record: InputLogEvent): Promise<void> {
128129
if (!record.timestamp) {
129130
const currentTime = new Date(Date.now());
130131
record.timestamp = Math.round(currentTime.getTime());
@@ -141,28 +142,34 @@ export class ProviderLogHandler {
141142
const response: PutLogEventsResponse = await this.client.putLogEvents(logEventsParams).promise();
142143
this.sequenceToken = response.nextSequenceToken;
143144
} catch(err) {
144-
if (err.code === 'DataAlreadyAcceptedException' || err.code === 'InvalidSequenceTokenException') {
145-
this.sequenceToken = (err.message || '').split(' ')[0];
146-
this.putLogEvent(record);
145+
const errorCode = err.code || err.name;
146+
if (errorCode === 'DataAlreadyAcceptedException' || errorCode === 'InvalidSequenceTokenException') {
147+
this.sequenceToken = (err.message || '').split(' ').pop();
148+
this.putLogEvents(record);
149+
} else {
150+
throw err;
147151
}
148152
}
149153
}
150154

151155
@boundMethod
152-
logListener(...args: any[]): void {
156+
async logListener(...args: any[]): Promise<void> {
153157
const currentTime = new Date(Date.now());
154158
const record: InputLogEvent = {
155159
message: JSON.stringify(args[0]),
156160
timestamp: Math.round(currentTime.getTime()),
157161
}
158162
try {
159-
this.putLogEvent(record);
163+
await this.putLogEvents(record);
160164
} catch(err) {
161-
if (err.message.includes('log group does not exist')) {
162-
this.createLogGroup();
165+
const errorCode = err.code || err.name;
166+
if (errorCode === 'ResourceNotFoundException') {
167+
if (err.message.includes('log group does not exist')) {
168+
await this.createLogGroup();
169+
}
170+
await this.createLogStream();
171+
await this.putLogEvents(record);
163172
}
164-
this.createLogStream();
165-
this.putLogEvent(record);
166173
}
167174
}
168175
}

tests/lib/log-delivery.test.ts

Lines changed: 55 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ describe('when delivering log', () => {
3535

3636
beforeAll(() => {
3737
session = new SessionProxy({});
38+
});
39+
40+
beforeEach(() => {
3841
createLogGroup = mockResult({ ResponseMetadata: { RequestId: 'mock-request' }});
3942
createLogStream = mockResult({ ResponseMetadata: { RequestId: 'mock-request' }});
4043
putLogEvents = mockResult({ ResponseMetadata: { RequestId: 'mock-request' }});
@@ -65,16 +68,14 @@ describe('when delivering log', () => {
6568
stackId: 'an-arn',
6669
})));
6770
ProviderLogHandler.setup(request, session);
68-
// Get a copy of the instance to avoid changing the singleton
71+
// Get a copy of the instance and remove it from class
72+
// to avoid changing the singleton.
6973
const instance = ProviderLogHandler.getInstance();
7074
ProviderLogHandler['instance'] = null;
7175
cwLogs.mockClear();
7276
return instance;
7377
});
7478
providerLogHandler = new Mock();
75-
});
76-
77-
beforeEach(() => {
7879
payload = new HandlerRequest(new Map(Object.entries({
7980
action: Action.Create,
8081
awsAccountId: '123412341234',
@@ -158,90 +159,101 @@ describe('when delivering log', () => {
158159
expect(logHandler).toBeNull();
159160
});
160161

161-
test('log group create success', () => {
162-
providerLogHandler.client.createLogGroup();
162+
test('log group create success', async () => {
163+
await providerLogHandler['createLogGroup']();
163164
expect(createLogGroup).toHaveBeenCalledTimes(1);
164165
});
165166

166-
test('log stream create success', () => {
167-
providerLogHandler.client.createLogStream();
167+
test('log stream create success', async () => {
168+
await providerLogHandler['createLogStream']();
168169
expect(createLogStream).toHaveBeenCalledTimes(1);
169170
});
170171

171172
test('create already exists', () => {
172-
['createLogGroup', 'createLogStream'].forEach((methodName: string) => {
173-
const mockLogsMethod: jest.Mock = jest.fn().mockImplementationOnce(() => {
174-
throw awsUtil.error(new Error(), { code: 'ResourceAlreadyExistsException' });
173+
['createLogGroup', 'createLogStream'].forEach(async (methodName: string) => {
174+
const mockLogsMethod: jest.Mock = jest.fn().mockReturnValue({
175+
promise: jest.fn().mockRejectedValueOnce(
176+
awsUtil.error(new Error(), { code: 'ResourceAlreadyExistsException' })
177+
)
175178
});
176179
providerLogHandler.client[methodName] = mockLogsMethod;
177180
// Should not raise an exception if the log group already exists.
178-
providerLogHandler[methodName]();
181+
await providerLogHandler[methodName]();
179182
expect(mockLogsMethod).toHaveBeenCalledTimes(1);
180183
});
181184
});
182185

183186
test('put log event success', () => {
184-
[null, 'some-seq'].forEach((sequenceToken: string) => {
187+
[null, 'some-seq'].forEach(async (sequenceToken: string) => {
185188
providerLogHandler.sequenceToken = sequenceToken;
186-
const mockPut: jest.Mock = jest.fn().mockImplementationOnce(() => {
187-
return { nextSequenceToken: 'some-other-seq' };
189+
const mockPut: jest.Mock = jest.fn().mockReturnValue({
190+
promise: jest.fn().mockResolvedValueOnce(
191+
{ nextSequenceToken: 'some-other-seq' }
192+
)
188193
});
189194
providerLogHandler.client.putLogEvents = mockPut;
190-
providerLogHandler['putLogEvent']({
195+
await providerLogHandler['putLogEvents']({
191196
message: 'log-msg',
192197
timestamp: 123,
193198
});
194199
expect(mockPut).toHaveBeenCalledTimes(1);
195200
});
196201
});
197202

198-
test('put log event invalid token', () => {
199-
const mockPut: jest.Mock = jest.fn().mockImplementationOnce(() => {
200-
throw awsUtil.error(new Error(), { code: 'InvalidSequenceTokenException' });
201-
})
202-
.mockImplementationOnce(() => {
203-
throw awsUtil.error(new Error(), { code: 'DataAlreadyAcceptedException' });
204-
})
205-
.mockImplementation(() => {
206-
return { nextSequenceToken: 'some-other-seq' };
203+
test('put log event invalid token', async () => {
204+
putLogEvents.mockReturnValue({
205+
promise: jest.fn().mockRejectedValueOnce(
206+
awsUtil.error(new Error(), { code: 'InvalidSequenceTokenException' })
207+
).mockRejectedValueOnce(
208+
awsUtil.error(new Error(), { code: 'DataAlreadyAcceptedException' })
209+
).mockResolvedValue(
210+
{ nextSequenceToken: 'some-other-seq' }
211+
)
207212
});
208-
providerLogHandler.client.putLogEvents = mockPut;
209213
for(let i = 1; i < 4; i++) {
210-
providerLogHandler['putLogEvent']({
214+
await providerLogHandler['putLogEvents']({
211215
message: 'log-msg',
212216
timestamp: i,
213217
});
214218
}
215-
expect(mockPut).toHaveBeenCalledTimes(5);
219+
expect(putLogEvents).toHaveBeenCalledTimes(5);
216220
});
217221

218-
test('emit existing cwl group stream', () => {
219-
const mock: jest.Mock = jest.fn();
220-
providerLogHandler['putLogEvent'] = mock;
222+
test('emit existing cwl group stream', async () => {
223+
const mock: jest.Mock = jest.fn().mockResolvedValue({});
224+
providerLogHandler['putLogEvents'] = mock;
221225
providerLogHandler['emitter'].emit('log', 'log-msg');
226+
await new Promise(resolve => setTimeout(resolve, 300));
222227
expect(mock).toHaveBeenCalledTimes(1);
223228
});
224229

225-
test('emit no group stream', () => {
226-
const putLogEvent: jest.Mock = jest.fn().mockImplementationOnce(() => {
227-
throw awsUtil.error(new Error(), { message: 'log group does not exist' });
228-
});
230+
test('emit no group stream', async () => {
231+
const putLogEvents: jest.Mock = jest.fn().mockResolvedValue({}).mockRejectedValueOnce(
232+
awsUtil.error(new Error(), {
233+
code: 'ResourceNotFoundException',
234+
message: 'log group does not exist',
235+
})
236+
);
229237
const createLogGroup: jest.Mock = jest.fn();
230238
const createLogStream: jest.Mock = jest.fn();
231-
providerLogHandler['putLogEvent'] = putLogEvent;
239+
providerLogHandler['putLogEvents'] = putLogEvents;
232240
providerLogHandler['createLogGroup'] = createLogGroup;
233241
providerLogHandler['createLogStream'] = createLogStream;
234-
providerLogHandler['emitter'].emit('log', 'log-msg');
235-
expect(putLogEvent).toHaveBeenCalledTimes(2);
242+
await providerLogHandler['logListener']('log-msg');
243+
expect(putLogEvents).toHaveBeenCalledTimes(2);
236244
expect(createLogGroup).toHaveBeenCalledTimes(1);
237245
expect(createLogStream).toHaveBeenCalledTimes(1);
238246

239247
// Function createGroup should not be called again if the group already exists.
240-
putLogEvent.mockImplementationOnce(() => {
241-
throw awsUtil.error(new Error(), { message: 'log stream does not exist' });
242-
});
243-
providerLogHandler['emitter'].emit('log', 'log-msg');
244-
expect(putLogEvent).toHaveBeenCalledTimes(4);
248+
putLogEvents.mockRejectedValueOnce(
249+
awsUtil.error(new Error(), {
250+
code: 'ResourceNotFoundException',
251+
message: 'log stream does not exist',
252+
})
253+
);
254+
console.log('log-msg');
255+
await new Promise(resolve => setTimeout(resolve, 300));
256+
expect(putLogEvents).toHaveBeenCalledTimes(4);
245257
expect(createLogGroup).toHaveBeenCalledTimes(1);
246258
expect(createLogStream).toHaveBeenCalledTimes(2);
247259
});

0 commit comments

Comments
 (0)