From b6e67bf9454c3bc41077990b879d23f0fab69acd Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 11:38:23 +0000 Subject: [PATCH 1/3] Add buffer-native serializer support --- docs/advanced.md | 23 ++++++++++++ docs/api.md | 2 ++ src/Partition/WritablePartition.js | 19 ++++++---- src/Storage/ReadableStorage.js | 22 ++++++++++-- src/Storage/WritableStorage.js | 58 ++++++++++++++++++++++++++++-- test/Storage.spec.js | 45 +++++++++++++++++++++++ 6 files changed, 159 insertions(+), 10 deletions(-) diff --git a/docs/advanced.md b/docs/advanced.md index 60fedfa..a470949 100644 --- a/docs/advanced.md +++ b/docs/advanced.md @@ -70,6 +70,8 @@ Pass these options inside `config.storageConfig`: | `indexDirectory` | `string` | `storageDirectory/streams` | Where index files are stored. | | `partitioner` | `function` | stream-per-event | `(document, sequenceNumber) => partitionName`. | | `serializer` | `object` | JSON | `{ serialize(doc), deserialize(string) }` — custom serialization. | +| `serializeToBuffer` | `function` | — | Buffer-native serializer `(buffer, doc, helpers) => bytesWritten` overriding `serializer.serialize`. | +| `deserializeFromBuffer` | `function` | — | Buffer-native deserializer `(buffer) => doc` overriding `serializer.deserialize`. | | `writeBufferSize` | `number` | `16384` | Write-buffer size in bytes. | | `maxWriteBufferDocuments` | `number` | unlimited | Maximum events per write buffer. | | `syncOnFlush` | `boolean` | `false` | Call `fsync` on every flush. | @@ -242,6 +244,27 @@ const eventstore = new EventStore('my-event-store', { `@msgpack/msgpack` is often faster than `JSON.parse` for deserialization, while producing smaller files, but makes the storage files non-human-readable. +If your codec works directly with `Buffer`, prefer `serializeToBuffer` / `deserializeFromBuffer` to avoid string conversions: + +```javascript +const eventstore = new EventStore('my-event-store', { + storageConfig: { + writeBufferSize: 16 * 1024, + serializeToBuffer: (buffer, doc, { ensureCapacity }) => { + const encoded = myCodec.encode(doc); // returns Buffer + if (encoded.byteLength > buffer.byteLength) { + buffer = ensureCapacity(encoded.byteLength); + } + encoded.copy(buffer, 0); + return encoded.byteLength; + }, + deserializeFromBuffer: (buffer) => myCodec.decode(buffer) + } +}); +``` + +`ensureCapacity(minSize)` returns a buffer that is at least `minSize` bytes and can be used when the initially provided buffer is too small. + ## Compression Use the `serializer` option to wrap events in a compression codec. diff --git a/docs/api.md b/docs/api.md index 7791dc1..43f6ca8 100644 --- a/docs/api.md +++ b/docs/api.md @@ -552,6 +552,8 @@ Inherits all options from `Storage.ReadOnly` plus: | `config.indexDirectory` | `string` | `config.dataDirectory` | Directory for index files. | | `config.indexFile` | `string` | `'{storageName}.index'` | File name of the primary index. | | `config.serializer` | `object` | JSON | Object with `serialize(doc)` and `deserialize(data)` methods. | +| `config.serializeToBuffer` | `function(buffer, doc, helpers)` | — | Buffer-native serializer that returns written byte count and overrides `config.serializer.serialize`. | +| `config.deserializeFromBuffer` | `function(buffer)` | — | Buffer-native deserializer that overrides `config.serializer.deserialize`. | | `config.readBufferSize` | `number` | `4096` | Read buffer size in bytes. | | `config.writeBufferSize` | `number` | `16384` | Write buffer size in bytes. | | `config.maxWriteBufferDocuments` | `number` | `0` (unlimited) | Maximum number of buffered documents before an automatic flush. | diff --git a/src/Partition/WritablePartition.js b/src/Partition/WritablePartition.js index 30a8741..9fb3df5 100644 --- a/src/Partition/WritablePartition.js +++ b/src/Partition/WritablePartition.js @@ -196,7 +196,7 @@ class WritablePartition extends ReadablePartition { /** * Write the given data to the partition without buffering. * @private - * @param {string} data The (padded) data to write to storage. + * @param {string|Buffer} data The data to write to storage. * @param {number} dataSize The size of the raw document without padding. * @param {number} [sequenceNumber] The external sequence number to store with the document. * @param {function} [callback] A function that will be called when the document is written to disk. @@ -208,7 +208,9 @@ class WritablePartition extends ReadablePartition { let bytesWritten = 0; bytesWritten += fs.writeSync(this.fd, this.writeMetaBuffer, 0, DOCUMENT_HEADER_SIZE); - bytesWritten += fs.writeSync(this.fd, data); + bytesWritten += Buffer.isBuffer(data) + ? fs.writeSync(this.fd, data, 0, dataSize) + : fs.writeSync(this.fd, data); const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT); bytesWritten += fs.writeSync(this.fd, DOCUMENT_PAD.substr(0, padSize)); this.writeMetaBuffer.writeUInt32BE(dataSize, 0); @@ -223,7 +225,7 @@ class WritablePartition extends ReadablePartition { /** * Write the given data to the partition with buffering. Will flush the write buffer if it is necessary. * @private - * @param {string} data The (padded) data to write to storage. + * @param {string|Buffer} data The data to write to storage. * @param {number} dataSize The size of the raw document without padding. * @param {number} [sequenceNumber] The external sequence number to store with the document. * @param {function} [callback] A function that will be called when the document is written to disk. @@ -235,7 +237,12 @@ class WritablePartition extends ReadablePartition { let bytesWritten = 0; bytesWritten += this.writeDocumentHeader(this.writeBuffer, this.writeBufferCursor, dataSize, sequenceNumber); - bytesWritten += this.writeBuffer.write(data, this.writeBufferCursor + bytesWritten, 'utf8'); + if (Buffer.isBuffer(data)) { + data.copy(this.writeBuffer, this.writeBufferCursor + bytesWritten, 0, dataSize); + bytesWritten += dataSize; + } else { + bytesWritten += this.writeBuffer.write(data, this.writeBufferCursor + bytesWritten, 'utf8'); + } const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT); bytesWritten += this.writeBuffer.write(DOCUMENT_PAD.substr(0, padSize), this.writeBufferCursor + bytesWritten, 'utf8'); this.writeBuffer.writeUInt32BE(dataSize, this.writeBufferCursor + bytesWritten); @@ -252,7 +259,7 @@ class WritablePartition extends ReadablePartition { /** * @api - * @param {string} data The data to write to storage. + * @param {string|Buffer} data The data to write to storage. * @param {number} [sequenceNumber] The external sequence number to store with the document. * @param {function} [callback] A function that will be called when the document is written to disk. * @returns {number|boolean} The file position at which the data was written or false on error. @@ -263,7 +270,7 @@ class WritablePartition extends ReadablePartition { callback = sequenceNumber; sequenceNumber = null; } - const dataSize = Buffer.byteLength(data, 'utf8'); + const dataSize = Buffer.isBuffer(data) ? data.byteLength : Buffer.byteLength(data, 'utf8'); assert(dataSize <= 64 * 1024 * 1024, 'Document is too large! Maximum is 64 MB'); const dataPosition = this.size; diff --git a/src/Storage/ReadableStorage.js b/src/Storage/ReadableStorage.js index 443fe42..68540ed 100644 --- a/src/Storage/ReadableStorage.js +++ b/src/Storage/ReadableStorage.js @@ -43,6 +43,8 @@ class ReadableStorage extends events.EventEmitter { * @param {object} [config.serializer] A serializer object with methods serialize(document) and deserialize(data). * @param {function(object): string} config.serializer.serialize Default is JSON.stringify. * @param {function(string): object} config.serializer.deserialize Default is JSON.parse. + * @param {function(Buffer): object} [config.deserializeFromBuffer] Buffer-native deserializer. + * When provided it overrides `config.serializer.deserialize`. * @param {string} [config.dataDirectory] The path where the storage data should reside. Default '.'. * @param {string} [config.indexDirectory] The path where the indexes should be stored. Defaults to dataDirectory. * @param {string} [config.indexFile] The name of the primary index. Default '{storageName}.index'. @@ -78,6 +80,9 @@ class ReadableStorage extends events.EventEmitter { }; config = Object.assign(defaults, config); this.serializer = config.serializer; + this.deserializeFromBuffer = typeof config.deserializeFromBuffer === 'function' + ? config.deserializeFromBuffer + : null; this.hmac = createHmac(config.hmacSecret); @@ -285,7 +290,20 @@ class ReadableStorage extends events.EventEmitter { } const headerOut = {}; const buffer = partition.readFrom(position, size, headerOut, backwardsHint); - return raw ? { buffer, time64: headerOut.time64, sequenceNumber: headerOut.sequenceNumber } : this.serializer.deserialize(buffer.toString('utf8')); + return raw ? { buffer, time64: headerOut.time64, sequenceNumber: headerOut.sequenceNumber } : this.deserializeDocument(buffer); + } + + /** + * Deserialize a document buffer using either the buffer-native deserializer or the string serializer fallback. + * @protected + * @param {Buffer} buffer + * @returns {object} + */ + deserializeDocument(buffer) { + if (this.deserializeFromBuffer) { + return this.deserializeFromBuffer(buffer); + } + return this.serializer.deserialize(buffer.toString('utf8')); } /** @@ -438,7 +456,7 @@ class ReadableStorage extends events.EventEmitter { */ buildDocumentEntry(readItem) { return { - document: this.serializer.deserialize(readItem.data.toString('utf8')), + document: this.deserializeDocument(readItem.data), // Replicate the index entry structure here, so iteration can be used easily to reindex entry: readItem.entry }; diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index dbdd41b..236017b 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -30,6 +30,11 @@ class WritableStorage extends ReadableStorage { * @param {object} [config.serializer] A serializer object with methods serialize(document) and deserialize(data). * @param {function(object): string} config.serializer.serialize Default is JSON.stringify. * @param {function(string): object} config.serializer.deserialize Default is JSON.parse. + * @param {function(Buffer, object, { ensureCapacity: function(number): Buffer }): number} [config.serializeToBuffer] + * Buffer-native serializer. Receives `(buffer, document, helpers)` and must return written bytes. + * When provided it overrides `config.serializer.serialize`. + * @param {function(Buffer): object} [config.deserializeFromBuffer] Buffer-native deserializer. + * When provided it overrides `config.serializer.deserialize`. * @param {string} [config.dataDirectory] The path where the storage data should reside. Default '.'. * @param {string} [config.indexDirectory] The path where the indexes should be stored. Defaults to dataDirectory. * @param {string} [config.indexFile] The name of the primary index. Default '{storageName}.index'. @@ -65,6 +70,55 @@ class WritableStorage extends ReadableStorage { this._lockMode = config.lock; this.partitioner = config.partitioner; this.partitionIds = {}; + this.serializeToBuffer = typeof config.serializeToBuffer === 'function' + ? config.serializeToBuffer + : null; + this.serializeBuffer = this.serializeToBuffer + ? Buffer.allocUnsafeSlow(config.writeBufferSize) + : null; + this.serializeBufferHelpers = this.serializeToBuffer + ? { ensureCapacity: (requiredSize) => this.ensureSerializeBufferCapacity(requiredSize) } + : null; + } + + /** + * Ensure the serializer buffer can hold at least requiredSize bytes. + * Existing bytes are preserved for serializers that grow in multiple steps. + * @protected + * @param {number} requiredSize + * @returns {Buffer} + */ + ensureSerializeBufferCapacity(requiredSize) { + requiredSize = requiredSize >>> 0; // jshint ignore:line + const current = this.serializeBuffer; + if (requiredSize <= current.byteLength) { + return current; + } + let newSize = current.byteLength; + while (newSize < requiredSize) { + newSize *= 2; + } + const next = Buffer.allocUnsafeSlow(newSize); + current.copy(next, 0, 0, current.byteLength); + this.serializeBuffer = next; + return next; + } + + /** + * Serialize one document either via buffer-native serializer or string serializer fallback. + * @protected + * @param {object} document + * @returns {string|Buffer} + */ + serializeDocument(document) { + if (!this.serializeToBuffer) { + return this.serializer.serialize(document).toString(); + } + const written = this.serializeToBuffer(this.serializeBuffer, document, this.serializeBufferHelpers); + assert(Number.isInteger(written), 'serializeToBuffer must return the number of written bytes.'); + assert(written > 0, 'serializeToBuffer must write at least one byte.'); + assert(written <= this.serializeBuffer.byteLength, 'serializeToBuffer wrote beyond the provided buffer.'); + return this.serializeBuffer.subarray(0, written); } /** @@ -351,8 +405,8 @@ class WritableStorage extends ReadableStorage { * @returns {number} The 1-based document sequence number in the storage. */ write(document, callback) { - const data = this.serializer.serialize(document).toString(); - const dataSize = Buffer.byteLength(data, 'utf8'); + const data = this.serializeDocument(document); + const dataSize = Buffer.isBuffer(data) ? data.byteLength : Buffer.byteLength(data, 'utf8'); const partitionName = this.partitioner(document, this.index.length + 1); const partition = this.getPartition(partitionName); diff --git a/test/Storage.spec.js b/test/Storage.spec.js index 3746b23..1d47f29 100644 --- a/test/Storage.spec.js +++ b/test/Storage.spec.js @@ -1198,6 +1198,51 @@ describe('Storage', function() { expect(storage.read(2)).to.be.eql(doc); }); + it('prefers deserializeFromBuffer over serializer.deserialize', function() { + storage = createStorage({ + serializer: { + serialize: JSON.stringify, + deserialize: () => { throw new Error('serializer.deserialize should not be used'); } + }, + deserializeFromBuffer: (buffer) => JSON.parse(buffer.toString('utf8')) + }); + storage.open(); + storage.write({ foo: 'bar' }); + + expect(storage.read(1)).to.be.eql({ foo: 'bar' }); + }); + + it('supports serializeToBuffer with dynamic buffer growth', function() { + let grew = false; + storage = createStorage({ + writeBufferSize: 64, + serializer: { + serialize: () => { throw new Error('serializer.serialize should not be used'); }, + deserialize: JSON.parse + }, + serializeToBuffer: (buffer, document, helpers) => { + const encoded = Buffer.from(JSON.stringify(document), 'utf8'); + if (encoded.byteLength > buffer.byteLength) { + buffer = helpers.ensureCapacity(encoded.byteLength); + grew = true; + } + encoded.copy(buffer, 0, 0, encoded.byteLength); + return encoded.byteLength; + }, + deserializeFromBuffer: (buffer) => JSON.parse(buffer.toString('utf8')) + }); + storage.open(); + + const largeDocument = { foo: 'x'.repeat(200), nested: { bar: 42 } }; + const smallDocument = { foo: 'bar' }; + storage.write(largeDocument); + storage.write(smallDocument); + + expect(grew).to.be(true); + expect(storage.read(1)).to.be.eql(largeDocument); + expect(storage.read(2)).to.be.eql(smallDocument); + }); + describe('concurrency', function() { it('allows multiple writers to different partitions', function () { From 9b34c865f14f78d28b67b0cdf5ce243a84f7708d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 11:39:55 +0000 Subject: [PATCH 2/3] Refine serializer buffer allocation safety --- src/Storage/WritableStorage.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index 236017b..6b48ec9 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -74,7 +74,7 @@ class WritableStorage extends ReadableStorage { ? config.serializeToBuffer : null; this.serializeBuffer = this.serializeToBuffer - ? Buffer.allocUnsafeSlow(config.writeBufferSize) + ? Buffer.alloc(config.writeBufferSize) : null; this.serializeBufferHelpers = this.serializeToBuffer ? { ensureCapacity: (requiredSize) => this.ensureSerializeBufferCapacity(requiredSize) } @@ -89,6 +89,7 @@ class WritableStorage extends ReadableStorage { * @returns {Buffer} */ ensureSerializeBufferCapacity(requiredSize) { + // Coerce to an unsigned integer to reject negative/fractional sizes via the bounds checks below. requiredSize = requiredSize >>> 0; // jshint ignore:line const current = this.serializeBuffer; if (requiredSize <= current.byteLength) { @@ -98,7 +99,7 @@ class WritableStorage extends ReadableStorage { while (newSize < requiredSize) { newSize *= 2; } - const next = Buffer.allocUnsafeSlow(newSize); + const next = Buffer.alloc(newSize); current.copy(next, 0, 0, current.byteLength); this.serializeBuffer = next; return next; From 90b4c8c9b69c7e2673ab8d5a9ba94fe7f264b028 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 11:43:03 +0000 Subject: [PATCH 3/3] Polish buffer serializer implementation --- src/Storage/ReadableStorage.js | 6 ------ src/Storage/WritableStorage.js | 16 ++-------------- test/Storage.spec.js | 10 ++++++++-- 3 files changed, 10 insertions(+), 22 deletions(-) diff --git a/src/Storage/ReadableStorage.js b/src/Storage/ReadableStorage.js index 68540ed..8a032e4 100644 --- a/src/Storage/ReadableStorage.js +++ b/src/Storage/ReadableStorage.js @@ -293,12 +293,6 @@ class ReadableStorage extends events.EventEmitter { return raw ? { buffer, time64: headerOut.time64, sequenceNumber: headerOut.sequenceNumber } : this.deserializeDocument(buffer); } - /** - * Deserialize a document buffer using either the buffer-native deserializer or the string serializer fallback. - * @protected - * @param {Buffer} buffer - * @returns {object} - */ deserializeDocument(buffer) { if (this.deserializeFromBuffer) { return this.deserializeFromBuffer(buffer); diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index 6b48ec9..4ab55a8 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -81,13 +81,7 @@ class WritableStorage extends ReadableStorage { : null; } - /** - * Ensure the serializer buffer can hold at least requiredSize bytes. - * Existing bytes are preserved for serializers that grow in multiple steps. - * @protected - * @param {number} requiredSize - * @returns {Buffer} - */ + // Keep existing bytes when growing so serializers can grow in multiple steps. ensureSerializeBufferCapacity(requiredSize) { // Coerce to an unsigned integer to reject negative/fractional sizes via the bounds checks below. requiredSize = requiredSize >>> 0; // jshint ignore:line @@ -105,18 +99,12 @@ class WritableStorage extends ReadableStorage { return next; } - /** - * Serialize one document either via buffer-native serializer or string serializer fallback. - * @protected - * @param {object} document - * @returns {string|Buffer} - */ serializeDocument(document) { if (!this.serializeToBuffer) { return this.serializer.serialize(document).toString(); } const written = this.serializeToBuffer(this.serializeBuffer, document, this.serializeBufferHelpers); - assert(Number.isInteger(written), 'serializeToBuffer must return the number of written bytes.'); + assert(Number.isInteger(written), 'serializeToBuffer must return an integer number of written bytes.'); assert(written > 0, 'serializeToBuffer must write at least one byte.'); assert(written <= this.serializeBuffer.byteLength, 'serializeToBuffer wrote beyond the provided buffer.'); return this.serializeBuffer.subarray(0, written); diff --git a/test/Storage.spec.js b/test/Storage.spec.js index 1d47f29..939f6cc 100644 --- a/test/Storage.spec.js +++ b/test/Storage.spec.js @@ -1199,10 +1199,13 @@ describe('Storage', function() { }); it('prefers deserializeFromBuffer over serializer.deserialize', function() { + const deserializerShouldNotBeCalled = () => { + throw new Error('serializer.deserialize should not be used'); + }; storage = createStorage({ serializer: { serialize: JSON.stringify, - deserialize: () => { throw new Error('serializer.deserialize should not be used'); } + deserialize: deserializerShouldNotBeCalled }, deserializeFromBuffer: (buffer) => JSON.parse(buffer.toString('utf8')) }); @@ -1214,10 +1217,13 @@ describe('Storage', function() { it('supports serializeToBuffer with dynamic buffer growth', function() { let grew = false; + const serializerShouldNotBeCalled = () => { + throw new Error('serializer.serialize should not be used'); + }; storage = createStorage({ writeBufferSize: 64, serializer: { - serialize: () => { throw new Error('serializer.serialize should not be used'); }, + serialize: serializerShouldNotBeCalled, deserialize: JSON.parse }, serializeToBuffer: (buffer, document, helpers) => {