-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.ts
More file actions
368 lines (345 loc) · 11.5 KB
/
client.ts
File metadata and controls
368 lines (345 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
import {
Client,
escapeIdentifier,
escapeLiteral,
type ClientConfig,
} from "pg";
import type {
TxOBEventSchemaMap,
TxOBSchemaOutput,
TxOBEvent,
TxOBProcessorClient,
TxOBProcessorClientOpts,
TxOBTransactionProcessorClient,
WakeupEmitter,
} from "../processor.js";
import { EventEmitter } from "node:events";
interface Querier {
query: Client["query"];
}
// TODO: leverage the signal option that comes in on options for `getEventsToProcess` and `getEventByIdForUpdateSkipLocked`
// to cancel queries if/when supported by `pg` https://github.com/brianc/node-postgres/issues/2774
export type CreateProcessorClientOpts<
TEventSchemas extends TxOBEventSchemaMap<string>,
> = {
querier: Querier;
table?: string;
limit?: number;
eventSchemas: TEventSchemas;
};
export const createProcessorClient = <
const TEventSchemas extends TxOBEventSchemaMap<string>,
>(
opts: CreateProcessorClientOpts<TEventSchemas>,
): TxOBProcessorClient<
keyof TEventSchemas & string,
{
[TType in keyof TEventSchemas & string]: TxOBSchemaOutput<TEventSchemas[TType]>;
}
> => {
const { querier, table = "events", limit = 100, eventSchemas: _eventSchemas } =
opts;
const _table = table;
const _limit = limit;
const getEventsToProcess = async (
opts: TxOBProcessorClientOpts,
): Promise<
Pick<
TxOBEvent<
keyof TEventSchemas & string,
TxOBSchemaOutput<TEventSchemas[keyof TEventSchemas & string]>
>,
"id" | "errors"
>[]
> => {
const events = await querier.query<
Pick<
TxOBEvent<
keyof TEventSchemas & string,
TxOBSchemaOutput<TEventSchemas[keyof TEventSchemas & string]>
>,
"id" | "errors"
>
>(
`SELECT id, errors FROM ${escapeIdentifier(_table)} WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1 ORDER BY timestamp ASC LIMIT ${_limit}`,
[opts.maxErrors],
);
return events.rows;
};
const transaction: TxOBProcessorClient<
keyof TEventSchemas & string,
{
[TType in keyof TEventSchemas & string]: TxOBSchemaOutput<TEventSchemas[TType]>;
}
>["transaction"] = async (
fn: (
txProcessorClient: TxOBTransactionProcessorClient<
keyof TEventSchemas & string,
{
[TType in keyof TEventSchemas & string]: TxOBSchemaOutput<
TEventSchemas[TType]
>;
}
>,
) => Promise<void>,
): Promise<void> => {
try {
await querier.query("BEGIN");
await fn({
getEventByIdForUpdateSkipLocked: async (
eventId: TxOBEvent<
keyof TEventSchemas & string,
TxOBSchemaOutput<TEventSchemas[keyof TEventSchemas & string]>
>["id"],
opts: TxOBProcessorClientOpts,
): Promise<
| TxOBEvent<
keyof TEventSchemas & string,
TxOBSchemaOutput<TEventSchemas[keyof TEventSchemas & string]>
>
| null
> => {
const event = await querier.query<
TxOBEvent<
keyof TEventSchemas & string,
TxOBSchemaOutput<TEventSchemas[keyof TEventSchemas & string]>
>
>(
`SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${escapeIdentifier(_table)} WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED`,
[eventId, opts.maxErrors],
);
if (event.rowCount === 0) {
return null;
}
return event.rows[0];
},
updateEvent: async (
event: TxOBEvent<
keyof TEventSchemas & string,
TxOBSchemaOutput<TEventSchemas[keyof TEventSchemas & string]>
>,
): Promise<void> => {
await querier.query(
`UPDATE ${escapeIdentifier(_table)} SET handler_results = $1, errors = $2, processed_at = $3, backoff_until = $4 WHERE id = $5`,
[
event.handler_results,
event.errors,
event.processed_at,
event.backoff_until,
event.id,
],
);
},
createEvent: async (
event: Omit<
TxOBEvent<
keyof TEventSchemas & string,
TxOBSchemaOutput<TEventSchemas[keyof TEventSchemas & string]>
>,
"processed_at" | "backoff_until"
>,
): Promise<void> => {
await querier.query(
`INSERT INTO ${escapeIdentifier(_table)} (id, timestamp, type, data, correlation_id, handler_results, errors) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
event.id,
event.timestamp,
event.type,
event.data,
event.correlation_id,
event.handler_results,
event.errors,
],
);
},
});
await querier.query("COMMIT");
} catch (error) {
try {
await querier.query("ROLLBACK");
} catch (rollbackError) {
const message = error instanceof Error ? error.message : String(error);
const rollbackMessage =
rollbackError instanceof Error
? rollbackError.message
: String(rollbackError);
throw new Error(
`Transaction failed: ${message} (rollback also failed: ${rollbackMessage})`,
{ cause: error },
);
}
throw error;
}
};
return {
getEventsToProcess,
transaction,
};
};
type CreateWakeupEmitterOpts =
| {
listenClientConfig: ClientConfig;
channel?: string;
createTrigger: true;
table?: string;
querier: Querier;
}
| {
listenClientConfig: ClientConfig;
channel?: string;
createTrigger?: false;
table?: string;
querier?: Querier;
};
/**
* Creates a Postgres NOTIFY-based wakeup emitter for reducing polling frequency.
* This uses a separate connection for LISTEN, as you cannot LISTEN on a connection
* that's used for queries.
*
* @param opts - Options for the wakeup emitter. If `createTrigger` is `true`, `querier` is required.
* @returns A WakeupEmitter that emits 'wakeup' events when Postgres NOTIFY is received
*/
export const createWakeupEmitter = async (
opts: CreateWakeupEmitterOpts,
): Promise<WakeupEmitter> => {
const {
listenClientConfig,
channel = "txob_events",
table = "events",
} = opts;
const emitter = new EventEmitter();
// Create a separate client for LISTEN
const listenClient = new Client(listenClientConfig);
await listenClient.connect();
// Set up LISTEN - channel names are identifiers
// Note: Postgres channel names are case-insensitive and converted to lowercase
const listenChannel = channel.toLowerCase();
await listenClient.query(`LISTEN ${escapeIdentifier(listenChannel)}`);
// Handle notifications
listenClient.on("notification", (msg) => {
// msg.channel is already lowercase from Postgres
if (msg.channel === listenChannel) {
emitter.emit("wakeup");
}
});
// Handle connection errors
listenClient.on("error", (err) => {
emitter.emit("error", err);
});
// Handle disconnection
listenClient.on("end", () => {
emitter.emit("error", new Error("Postgres LISTEN connection ended"));
});
// Create trigger if requested
if (opts.createTrigger && opts.querier) {
await createWakeupTrigger({
querier: opts.querier,
table,
channel: listenChannel,
});
}
// Return a WakeupEmitter that wraps the EventEmitter
return {
on: (event: "wakeup", listener: () => void) => {
emitter.on(event, listener);
},
off: (event: "wakeup", listener: () => void) => {
emitter.off(event, listener);
// If no more listeners, we could optionally close the connection
// But we'll leave it open for potential re-use
},
// Expose cleanup method (not part of interface but useful)
close: async () => {
await listenClient.query(`UNLISTEN ${escapeIdentifier(listenChannel)}`);
await listenClient.end();
},
} as WakeupEmitter & { close: () => Promise<void> };
};
type CreateWakeupTriggerOpts = {
querier: Querier;
table?: string;
channel?: string;
};
/**
* Creates a Postgres trigger that sends NOTIFY when events are inserted.
* Wakeup signals are primarily for new events - retries after backoff are handled
* by fallback polling.
*
* This function is safe for concurrent execution - multiple processes can call it
* simultaneously without errors. It gracefully handles cases where the trigger
* already exists by catching and ignoring duplicate object errors.
*
* @param opts - Options for the wakeup trigger
* @returns Promise that resolves when the trigger is created
*/
export const createWakeupTrigger = async (
opts: CreateWakeupTriggerOpts,
): Promise<void> => {
const { querier, table = "events", channel = "txob_events" } = opts;
const triggerName = `txob_wakeup_trigger_${table}`;
const functionName = `txob_wakeup_notify_${table}`;
try {
// Create the function that sends NOTIFY
// CREATE OR REPLACE is safe for concurrent execution
// Note: channel name is embedded in the function body using escapeLiteral for safety
// pg_notify expects a text parameter, so we use escapeLiteral to safely embed the channel name
const channelLiteral = escapeLiteral(channel);
await querier.query(`
CREATE OR REPLACE FUNCTION ${escapeIdentifier(functionName)}()
RETURNS TRIGGER AS $$
BEGIN
-- Send NOTIFY when a new event is inserted
-- Only trigger on INSERT - retries after backoff are handled by fallback polling
IF TG_OP = 'INSERT' THEN
PERFORM pg_notify(${channelLiteral}, '');
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
`);
// Try to create the trigger
// If it already exists, PostgreSQL will throw a duplicate_object error (42710)
// which we'll catch and ignore below
await querier.query(`
CREATE TRIGGER ${escapeIdentifier(triggerName)}
AFTER INSERT ON ${escapeIdentifier(table)}
FOR EACH ROW
EXECUTE FUNCTION ${escapeIdentifier(functionName)}();
`);
} catch (error: unknown) {
// Handle errors that can occur during concurrent trigger creation
// Note: pg errors may not always be DatabaseError instances, but they have a 'code' field
const errorCode =
typeof error === "object" &&
error !== null &&
"code" in error &&
typeof error.code === "string"
? error.code
: undefined;
const errorMessage = error instanceof Error ? error.message : String(error);
// 42710: duplicate_object - trigger already exists
// This is safe to ignore - another process already created it
if (errorCode === "42710") {
return;
}
// XX000: internal_error - can include "tuple concurrently updated"
// This happens when multiple processes try to modify system catalogs simultaneously
// It's safe to ignore - one process will succeed, others will get this error
if (
errorCode === "XX000" &&
errorMessage.includes("tuple concurrently updated")
) {
return;
}
// Also check error message as fallback for duplicate trigger errors
// Sometimes the error code might not be set correctly
if (
errorMessage.includes("already exists") &&
(errorMessage.includes("trigger") || errorMessage.includes("relation"))
) {
return;
}
// Re-throw other errors
throw error;
}
};