Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
const EventEmitter = require('events');

const _getReadQuery = function(walOptions, slotName){
const _getReadQuery = function(walOptions, slotName, maxChanges){
let changesSql = '';
Object.keys(walOptions).forEach(function(option){
const value = walOptions[option];
changesSql += `, '${option}', '${value}'`;
});

const sql = `SELECT * FROM pg_catalog.pg_logical_slot_get_changes('${slotName}', NULL, NULL${changesSql});`;
if (maxChanges == undefined) maxChanges = "NULL";

const sql = `SELECT * FROM pg_catalog.pg_logical_slot_get_changes('${slotName}', NULL, ${maxChanges}${changesSql});`;

return {
text: sql,
Expand All @@ -27,8 +29,20 @@ const _init = async function(client, slotName, temporary){
}
};

/**
* Wal2JSONListener will emit 'changes' event containing the rows as returned
* from pg_catalog.pg_logical_slot_get_changes. This event may contain any number of rows.
*
* To limit the number of rows, use the maxChanges parameter. It is important to note
* that the maxChanges value is directly passed to pg_catalog.pg_logical_slot_get_changes as the
* upto_nchanges parameter, as documented at https://www.postgresql.org/docs/12/functions-admin.html.
* Specifically,
* > If upto_nchanges is non-NULL, decoding will stop when the number of rows produced by decoding
* > exceeds the specified value. Note, however, that the actual number of rows returned may be larger,
* > since this limit is only checked after adding the rows produced when decoding each new transaction commit.
*/
class Wal2JSONListener extends EventEmitter {
constructor(client, {slotName, timeout, temporary}, walOptions={}) {
constructor(client, {slotName, timeout, temporary, maxChanges}, walOptions={}) {
super();
this.slotName = slotName || 'test_slot';
this.walOptions = walOptions;
Expand All @@ -37,7 +51,8 @@ class Wal2JSONListener extends EventEmitter {
this.client = client;
this.timeout = timeout;
this.running = false;
this.readQuery = _getReadQuery(this.walOptions, this.slotName);
this.maxChanges = maxChanges
this.readQuery = _getReadQuery(this.walOptions, this.slotName, this.maxChanges);
this.client.connect();
}

Expand Down