Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ Pass these options inside `config.storageConfig`:
| `indexDirectory` | `string` | `storageDirectory/streams` | Where index files are stored. |
| `partitioner` | `function` | stream-per-event | `(document, sequenceNumber) => partitionName`. |
| `serializer` | `object` | JSON | `{ serialize(doc), deserialize(string) }` — custom serialization. |
| `serializeToBuffer` | `function` | — | Buffer-native serializer `(buffer, doc, helpers) => bytesWritten` overriding `serializer.serialize`. |
| `deserializeFromBuffer` | `function` | — | Buffer-native deserializer `(buffer) => doc` overriding `serializer.deserialize`. |
| `writeBufferSize` | `number` | `16384` | Write-buffer size in bytes. |
| `maxWriteBufferDocuments` | `number` | unlimited | Maximum events per write buffer. |
| `syncOnFlush` | `boolean` | `false` | Call `fsync` on every flush. |
Expand Down Expand Up @@ -242,6 +244,27 @@ const eventstore = new EventStore('my-event-store', {

`@msgpack/msgpack` is often faster than `JSON.parse` for deserialization, while producing smaller files, but makes the storage files non-human-readable.

If your codec works directly with `Buffer`, prefer `serializeToBuffer` / `deserializeFromBuffer` to avoid string conversions:

```javascript
const eventstore = new EventStore('my-event-store', {
storageConfig: {
writeBufferSize: 16 * 1024,
serializeToBuffer: (buffer, doc, { ensureCapacity }) => {
const encoded = myCodec.encode(doc); // returns Buffer
if (encoded.byteLength > buffer.byteLength) {
buffer = ensureCapacity(encoded.byteLength);
}
encoded.copy(buffer, 0);
return encoded.byteLength;
},
deserializeFromBuffer: (buffer) => myCodec.decode(buffer)
}
});
```

`ensureCapacity(minSize)` returns a buffer that is at least `minSize` bytes and can be used when the initially provided buffer is too small.

## Compression

Use the `serializer` option to wrap events in a compression codec.
Expand Down
2 changes: 2 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,8 @@ Inherits all options from `Storage.ReadOnly` plus:
| `config.indexDirectory` | `string` | `config.dataDirectory` | Directory for index files. |
| `config.indexFile` | `string` | `'{storageName}.index'` | File name of the primary index. |
| `config.serializer` | `object` | JSON | Object with `serialize(doc)` and `deserialize(data)` methods. |
| `config.serializeToBuffer` | `function(buffer, doc, helpers)` | — | Buffer-native serializer that returns written byte count and overrides `config.serializer.serialize`. |
| `config.deserializeFromBuffer` | `function(buffer)` | — | Buffer-native deserializer that overrides `config.serializer.deserialize`. |
| `config.readBufferSize` | `number` | `4096` | Read buffer size in bytes. |
| `config.writeBufferSize` | `number` | `16384` | Write buffer size in bytes. |
| `config.maxWriteBufferDocuments` | `number` | `0` (unlimited) | Maximum number of buffered documents before an automatic flush. |
Expand Down
19 changes: 13 additions & 6 deletions src/Partition/WritablePartition.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class WritablePartition extends ReadablePartition {
/**
* Write the given data to the partition without buffering.
* @private
* @param {string} data The (padded) data to write to storage.
* @param {string|Buffer} data The data to write to storage.
* @param {number} dataSize The size of the raw document without padding.
* @param {number} [sequenceNumber] The external sequence number to store with the document.
* @param {function} [callback] A function that will be called when the document is written to disk.
Expand All @@ -208,7 +208,9 @@ class WritablePartition extends ReadablePartition {

let bytesWritten = 0;
bytesWritten += fs.writeSync(this.fd, this.writeMetaBuffer, 0, DOCUMENT_HEADER_SIZE);
bytesWritten += fs.writeSync(this.fd, data);
bytesWritten += Buffer.isBuffer(data)
? fs.writeSync(this.fd, data, 0, dataSize)
: fs.writeSync(this.fd, data);
const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT);
bytesWritten += fs.writeSync(this.fd, DOCUMENT_PAD.substr(0, padSize));
this.writeMetaBuffer.writeUInt32BE(dataSize, 0);
Expand All @@ -223,7 +225,7 @@ class WritablePartition extends ReadablePartition {
/**
* Write the given data to the partition with buffering. Will flush the write buffer if it is necessary.
* @private
* @param {string} data The (padded) data to write to storage.
* @param {string|Buffer} data The data to write to storage.
* @param {number} dataSize The size of the raw document without padding.
* @param {number} [sequenceNumber] The external sequence number to store with the document.
* @param {function} [callback] A function that will be called when the document is written to disk.
Expand All @@ -235,7 +237,12 @@ class WritablePartition extends ReadablePartition {

let bytesWritten = 0;
bytesWritten += this.writeDocumentHeader(this.writeBuffer, this.writeBufferCursor, dataSize, sequenceNumber);
bytesWritten += this.writeBuffer.write(data, this.writeBufferCursor + bytesWritten, 'utf8');
if (Buffer.isBuffer(data)) {
data.copy(this.writeBuffer, this.writeBufferCursor + bytesWritten, 0, dataSize);
bytesWritten += dataSize;
} else {
bytesWritten += this.writeBuffer.write(data, this.writeBufferCursor + bytesWritten, 'utf8');
}
const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT);
bytesWritten += this.writeBuffer.write(DOCUMENT_PAD.substr(0, padSize), this.writeBufferCursor + bytesWritten, 'utf8');
this.writeBuffer.writeUInt32BE(dataSize, this.writeBufferCursor + bytesWritten);
Expand All @@ -252,7 +259,7 @@ class WritablePartition extends ReadablePartition {

/**
* @api
* @param {string} data The data to write to storage.
* @param {string|Buffer} data The data to write to storage.
* @param {number} [sequenceNumber] The external sequence number to store with the document.
* @param {function} [callback] A function that will be called when the document is written to disk.
* @returns {number|boolean} The file position at which the data was written or false on error.
Expand All @@ -263,7 +270,7 @@ class WritablePartition extends ReadablePartition {
callback = sequenceNumber;
sequenceNumber = null;
}
const dataSize = Buffer.byteLength(data, 'utf8');
const dataSize = Buffer.isBuffer(data) ? data.byteLength : Buffer.byteLength(data, 'utf8');
assert(dataSize <= 64 * 1024 * 1024, 'Document is too large! Maximum is 64 MB');

const dataPosition = this.size;
Expand Down
16 changes: 14 additions & 2 deletions src/Storage/ReadableStorage.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class ReadableStorage extends events.EventEmitter {
* @param {object} [config.serializer] A serializer object with methods serialize(document) and deserialize(data).
* @param {function(object): string} config.serializer.serialize Default is JSON.stringify.
* @param {function(string): object} config.serializer.deserialize Default is JSON.parse.
* @param {function(Buffer): object} [config.deserializeFromBuffer] Buffer-native deserializer.
* When provided it overrides `config.serializer.deserialize`.
* @param {string} [config.dataDirectory] The path where the storage data should reside. Default '.'.
* @param {string} [config.indexDirectory] The path where the indexes should be stored. Defaults to dataDirectory.
* @param {string} [config.indexFile] The name of the primary index. Default '{storageName}.index'.
Expand Down Expand Up @@ -78,6 +80,9 @@ class ReadableStorage extends events.EventEmitter {
};
config = Object.assign(defaults, config);
this.serializer = config.serializer;
this.deserializeFromBuffer = typeof config.deserializeFromBuffer === 'function'
? config.deserializeFromBuffer
: null;

this.hmac = createHmac(config.hmacSecret);

Expand Down Expand Up @@ -285,7 +290,14 @@ class ReadableStorage extends events.EventEmitter {
}
const headerOut = {};
const buffer = partition.readFrom(position, size, headerOut, backwardsHint);
return raw ? { buffer, time64: headerOut.time64, sequenceNumber: headerOut.sequenceNumber } : this.serializer.deserialize(buffer.toString('utf8'));
return raw ? { buffer, time64: headerOut.time64, sequenceNumber: headerOut.sequenceNumber } : this.deserializeDocument(buffer);
}

deserializeDocument(buffer) {
if (this.deserializeFromBuffer) {
return this.deserializeFromBuffer(buffer);
}
return this.serializer.deserialize(buffer.toString('utf8'));
}

/**
Expand Down Expand Up @@ -438,7 +450,7 @@ class ReadableStorage extends events.EventEmitter {
*/
buildDocumentEntry(readItem) {
return {
document: this.serializer.deserialize(readItem.data.toString('utf8')),
document: this.deserializeDocument(readItem.data),
// Replicate the index entry structure here, so iteration can be used easily to reindex
entry: readItem.entry
};
Expand Down
47 changes: 45 additions & 2 deletions src/Storage/WritableStorage.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class WritableStorage extends ReadableStorage {
* @param {object} [config.serializer] A serializer object with methods serialize(document) and deserialize(data).
* @param {function(object): string} config.serializer.serialize Default is JSON.stringify.
* @param {function(string): object} config.serializer.deserialize Default is JSON.parse.
* @param {function(Buffer, object, { ensureCapacity: function(number): Buffer }): number} [config.serializeToBuffer]
* Buffer-native serializer. Receives `(buffer, document, helpers)` and must return written bytes.
* When provided it overrides `config.serializer.serialize`.
* @param {function(Buffer): object} [config.deserializeFromBuffer] Buffer-native deserializer.
* When provided it overrides `config.serializer.deserialize`.
* @param {string} [config.dataDirectory] The path where the storage data should reside. Default '.'.
* @param {string} [config.indexDirectory] The path where the indexes should be stored. Defaults to dataDirectory.
* @param {string} [config.indexFile] The name of the primary index. Default '{storageName}.index'.
Expand Down Expand Up @@ -65,6 +70,44 @@ class WritableStorage extends ReadableStorage {
this._lockMode = config.lock;
this.partitioner = config.partitioner;
this.partitionIds = {};
this.serializeToBuffer = typeof config.serializeToBuffer === 'function'
? config.serializeToBuffer
: null;
this.serializeBuffer = this.serializeToBuffer
? Buffer.alloc(config.writeBufferSize)
: null;
this.serializeBufferHelpers = this.serializeToBuffer
? { ensureCapacity: (requiredSize) => this.ensureSerializeBufferCapacity(requiredSize) }
: null;
}

// Keep existing bytes when growing so serializers can grow in multiple steps.
ensureSerializeBufferCapacity(requiredSize) {
// Coerce to an unsigned integer to reject negative/fractional sizes via the bounds checks below.
requiredSize = requiredSize >>> 0; // jshint ignore:line
const current = this.serializeBuffer;
if (requiredSize <= current.byteLength) {
return current;
}
let newSize = current.byteLength;
while (newSize < requiredSize) {
newSize *= 2;
}
const next = Buffer.alloc(newSize);
current.copy(next, 0, 0, current.byteLength);
this.serializeBuffer = next;
return next;
}

serializeDocument(document) {
if (!this.serializeToBuffer) {
return this.serializer.serialize(document).toString();
}
const written = this.serializeToBuffer(this.serializeBuffer, document, this.serializeBufferHelpers);
assert(Number.isInteger(written), 'serializeToBuffer must return an integer number of written bytes.');
assert(written > 0, 'serializeToBuffer must write at least one byte.');
assert(written <= this.serializeBuffer.byteLength, 'serializeToBuffer wrote beyond the provided buffer.');
return this.serializeBuffer.subarray(0, written);
}

/**
Expand Down Expand Up @@ -351,8 +394,8 @@ class WritableStorage extends ReadableStorage {
* @returns {number} The 1-based document sequence number in the storage.
*/
write(document, callback) {
const data = this.serializer.serialize(document).toString();
const dataSize = Buffer.byteLength(data, 'utf8');
const data = this.serializeDocument(document);
const dataSize = Buffer.isBuffer(data) ? data.byteLength : Buffer.byteLength(data, 'utf8');

const partitionName = this.partitioner(document, this.index.length + 1);
const partition = this.getPartition(partitionName);
Expand Down
51 changes: 51 additions & 0 deletions test/Storage.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,57 @@ describe('Storage', function() {
expect(storage.read(2)).to.be.eql(doc);
});

it('prefers deserializeFromBuffer over serializer.deserialize', function() {
const deserializerShouldNotBeCalled = () => {
throw new Error('serializer.deserialize should not be used');
};
storage = createStorage({
serializer: {
serialize: JSON.stringify,
deserialize: deserializerShouldNotBeCalled
},
deserializeFromBuffer: (buffer) => JSON.parse(buffer.toString('utf8'))
});
storage.open();
storage.write({ foo: 'bar' });

expect(storage.read(1)).to.be.eql({ foo: 'bar' });
});

it('supports serializeToBuffer with dynamic buffer growth', function() {
let grew = false;
const serializerShouldNotBeCalled = () => {
throw new Error('serializer.serialize should not be used');
};
storage = createStorage({
writeBufferSize: 64,
serializer: {
serialize: serializerShouldNotBeCalled,
deserialize: JSON.parse
},
serializeToBuffer: (buffer, document, helpers) => {
const encoded = Buffer.from(JSON.stringify(document), 'utf8');
if (encoded.byteLength > buffer.byteLength) {
buffer = helpers.ensureCapacity(encoded.byteLength);
grew = true;
}
encoded.copy(buffer, 0, 0, encoded.byteLength);
return encoded.byteLength;
},
deserializeFromBuffer: (buffer) => JSON.parse(buffer.toString('utf8'))
});
storage.open();

const largeDocument = { foo: 'x'.repeat(200), nested: { bar: 42 } };
const smallDocument = { foo: 'bar' };
storage.write(largeDocument);
storage.write(smallDocument);

expect(grew).to.be(true);
expect(storage.read(1)).to.be.eql(largeDocument);
expect(storage.read(2)).to.be.eql(smallDocument);
});

describe('concurrency', function() {

it('allows multiple writers to different partitions', function () {
Expand Down