From 3e0617e6cdf4762438200981296a3a7afb8ec7fd Mon Sep 17 00:00:00 2001 From: Justin Schlechte Date: Wed, 16 Mar 2022 11:19:34 -0400 Subject: [PATCH 1/2] add maxChanges parameter --- index.js | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/index.js b/index.js index 43c2fdc..a22388e 100644 --- a/index.js +++ b/index.js @@ -1,13 +1,15 @@ const EventEmitter = require('events'); -const _getReadQuery = function(walOptions, slotName){ +const _getReadQuery = function(walOptions, slotName, maxChanges){ let changesSql = ''; Object.keys(walOptions).forEach(function(option){ const value = walOptions[option]; changesSql += `, '${option}', '${value}'`; }); - const sql = `SELECT * FROM pg_catalog.pg_logical_slot_get_changes('${slotName}', NULL, NULL${changesSql});`; + if (maxChanges == undefined) maxChanges = "NULL"; + + const sql = `SELECT * FROM pg_catalog.pg_logical_slot_get_changes('${slotName}', NULL, ${maxChanges}${changesSql});`; return { text: sql, @@ -27,8 +29,20 @@ const _init = async function(client, slotName, temporary){ } }; +/* + * Wal2JSONListener will emit 'changes' event containing the rows as returned + * from pg_catalog.pg_logical_slot_get_changes. This event may contain any number of rows. + * + * To limit the number of rows, use the maxChanges parameter. It is important to note + * that the maxChanges value is directly passed to pg_catalog.pg_logical_slot_get_changes as the + * upto_nchanges parameter, as documented at https://www.postgresql.org/docs/12/functions-admin.html. + * Specifically, + * > If upto_nchanges is non-NULL, decoding will stop when the number of rows produced by decoding + * > exceeds the specified value. Note, however, that the actual number of rows returned may be larger, + * > since this limit is only checked after adding the rows produced when decoding each new transaction commit. + */ class Wal2JSONListener extends EventEmitter { - constructor(client, {slotName, timeout, temporary}, walOptions={}) { + constructor(client, {slotName, timeout, temporary, maxChanges}, walOptions={}) { super(); this.slotName = slotName || 'test_slot'; this.walOptions = walOptions; @@ -37,7 +51,8 @@ class Wal2JSONListener extends EventEmitter { this.client = client; this.timeout = timeout; this.running = false; - this.readQuery = _getReadQuery(this.walOptions, this.slotName); + this.maxChanges = maxChanges + this.readQuery = _getReadQuery(this.walOptions, this.slotName, this.maxChanges); this.client.connect(); } From aadbcce5de38b346a08cb9b5aec3a62763797d83 Mon Sep 17 00:00:00 2001 From: Justin Schlechte Date: Wed, 16 Mar 2022 11:24:19 -0400 Subject: [PATCH 2/2] fix jsdoc --- index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.js b/index.js index a22388e..3b2b9cf 100644 --- a/index.js +++ b/index.js @@ -29,7 +29,7 @@ const _init = async function(client, slotName, temporary){ } }; -/* +/** * Wal2JSONListener will emit 'changes' event containing the rows as returned * from pg_catalog.pg_logical_slot_get_changes. This event may contain any number of rows. *