Skip to content
Merged
2 changes: 1 addition & 1 deletion .github/workflows/npm-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
"package/src/Storage.js"
"package/src/Partition.js"
"package/src/Index.js"
"package/src/metadataUtil.js"
"package/src/utils/metadataUtil.js"
)
for f in "${REQUIRED[@]}"; do
echo "$FILES" | grep -qF "$f" || { echo "ERROR: required file '$f' is missing from the package"; exit 1; }
Expand Down
6 changes: 2 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@
"src/Partition/*.js",
"src/Storage/*.js",
"src/WatchesFile.js",
"src/util.js",
"src/fsUtil.js",
"src/metadataUtil.js",
"index.js"
"index.js",
"src/utils/*.js"
],
"license": "MIT",
"maintainers": [
Expand Down
4 changes: 2 additions & 2 deletions src/Consumer.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import stream from 'stream';
import fs from 'fs';
import path from 'path';
import { assert } from './util.js';
import { ensureDirectory } from './fsUtil.js';
import { assert } from './utils/util.js';
import { ensureDirectory } from './utils/fsUtil.js';
import Storage from './Storage/ReadableStorage.js';
const MAX_CATCHUP_BATCH = 10;

Expand Down
90 changes: 46 additions & 44 deletions src/EventStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import events from 'events';
import Storage, { ReadOnly as ReadOnlyStorage, LOCK_THROW, LOCK_RECLAIM } from './Storage.js';
import Index from './Index.js';
import Consumer from './Consumer.js';
import { assert, getPropertyAtPath } from './util.js';
import { ensureDirectory, scanForFiles } from './fsUtil.js';
import { buildTypeMatcherFn } from './metadataUtil.js';
import { assert, getPropertyAtPath } from './utils/util.js';
import { ensureDirectory, scanForFiles } from './utils/fsUtil.js';
import { buildTypeMatcherFn } from './utils/metadataUtil.js';

const ExpectedVersion = {
Any: -1,
Expand All @@ -20,6 +20,7 @@ const ExpectedVersion = {
*/
const DEFAULT_MATCHER_PROPERTIES = ['stream', 'payload.type'];
const STREAM_NAME_PATTERN = /^[A-Za-z0-9][A-Za-z0-9_]*(?:[\/:@~+=\-#.][A-Za-z0-9_]+)*$/;
const STORAGE_HOOK_EVENTS = new Set(['preCommit', 'preRead']);

class OptimisticConcurrencyError extends Error {}

Expand Down Expand Up @@ -240,11 +241,8 @@ class EventStore extends events.EventEmitter {
* @returns {this}
*/
on(event, listener) {
if (event === 'preCommit' || event === 'preRead') {
if (event === 'preCommit') {
assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
}
this.storage.on(event, listener);
if (this.isStorageHookEvent(event)) {
this.delegateStorageHookEvent('on', event, listener);
return this;
}
return super.on(event, listener);
Expand All @@ -265,11 +263,8 @@ class EventStore extends events.EventEmitter {
* @returns {this}
*/
once(event, listener) {
if (event === 'preCommit' || event === 'preRead') {
if (event === 'preCommit') {
assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
}
this.storage.once(event, listener);
if (this.isStorageHookEvent(event)) {
this.delegateStorageHookEvent('once', event, listener);
return this;
}
return super.once(event, listener);
Expand All @@ -284,13 +279,24 @@ class EventStore extends events.EventEmitter {
* @returns {this}
*/
off(event, listener) {
if (event === 'preCommit' || event === 'preRead') {
if (this.isStorageHookEvent(event)) {
this.storage.off(event, listener);
return this;
}
return super.off(event, listener);
}

isStorageHookEvent(event) {
return STORAGE_HOOK_EVENTS.has(event);
}

delegateStorageHookEvent(method, event, listener) {
if (event === 'preCommit') {
assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
}
this.storage[method](event, listener);
}

/**
* @inheritDoc
*/
Expand Down Expand Up @@ -429,6 +435,20 @@ class EventStore extends events.EventEmitter {
return type;
}

getExistingQueryTypes(types) {
const queryTypes = [];
for (const type of types) {
if (type in this.streams) {
queryTypes.push(type);
continue;
}
if (!this.typeAccessor) {
throw new Error(`Type stream "${type}" does not exist. Create it with createEventStream() first, or configure typeAccessor to have type streams created automatically on commit.`);
}
}
return queryTypes;
}

/**
* Commit a list of events for the given stream name, which is expected to be at the given version.
* Note that the events committed may still appear in other streams too - the given stream name is only
Expand Down Expand Up @@ -543,20 +563,7 @@ class EventStore extends events.EventEmitter {
*/
query(types, matcher = null, minRevision = 1, raw = false) {
assert(Array.isArray(types) && types.length > 0, 'Must specify a non-empty array of event types for query.');

const queryTypes = [];
for (const type of types) {
if (!(type in this.streams)) {
// No typeAccessor: the stream was never created; we cannot know whether events of
// this type exist in the store, so throw to avoid an unintentional full-store scan.
assert(!!this.typeAccessor, `Type stream "${type}" does not exist. Create it with createEventStream() first, or configure typeAccessor to have type streams created automatically on commit.`);
// typeAccessor is configured: type streams are created on commit, so a missing
// stream simply means no event of this type has been committed yet — treat as empty.
continue;
}
queryTypes.push(type);
}

const queryTypes = this.getExistingQueryTypes(types);
const condition = new CommitCondition(types, matcher, this.storage.length, raw);
const stream = this.fromStreams('_query_' + types.join('_'), queryTypes, minRevision, -1, matcher, raw);
return { stream, condition };
Expand All @@ -574,10 +581,7 @@ class EventStore extends events.EventEmitter {
* @returns {EventStream|boolean} The event stream or false if a stream with the name doesn't exist.
*/
getEventStream(streamName, minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
if (typeof predicate === 'boolean' && raw === false) {
raw = predicate;
predicate = null;
}
({ predicate, raw } = normalizePredicateRaw(predicate, raw));
if (!(streamName in this.streams)) {
return false;
}
Expand All @@ -596,10 +600,7 @@ class EventStore extends events.EventEmitter {
* @returns {EventStream} The event stream.
*/
getAllEvents(minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
if (typeof predicate === 'boolean' && raw === false) {
raw = predicate;
predicate = null;
}
({ predicate, raw } = normalizePredicateRaw(predicate, raw));
return this.getEventStream('_all', minRevision, maxRevision, predicate, raw);
}

Expand All @@ -616,10 +617,7 @@ class EventStore extends events.EventEmitter {
* @throws {Error} if any of the streams doesn't exist.
*/
fromStreams(streamName, streamNames, minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
if (typeof predicate === 'boolean' && raw === false) {
raw = predicate;
predicate = null;
}
({ predicate, raw } = normalizePredicateRaw(predicate, raw));
assert(streamNames instanceof Array, 'Must specify an array of stream names.');

if (streamNames.length === 0) {
Expand Down Expand Up @@ -658,10 +656,7 @@ class EventStore extends events.EventEmitter {
* @throws {Error} If no stream for this category exists.
*/
getEventStreamForCategory(categoryName, minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
if (typeof predicate === 'boolean' && raw === false) {
raw = predicate;
predicate = null;
}
({ predicate, raw } = normalizePredicateRaw(predicate, raw));
if (categoryName in this.streams) {
return this.getEventStream(categoryName, minRevision, maxRevision, predicate, raw);
}
Expand Down Expand Up @@ -844,6 +839,13 @@ function parseStreamFromIndexName(indexName) {
return indexName;
}

function normalizePredicateRaw(predicate, raw) {
if (typeof predicate === 'boolean' && raw === false) {
return { predicate: null, raw: predicate };
}
return { predicate, raw };
}

EventStore.Storage = Storage;
EventStore.Index = Index;

Expand Down
4 changes: 2 additions & 2 deletions src/EventStream.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import stream from 'stream';
import { assert } from './util.js';
import { buildRawBufferMatcher, matches } from './metadataUtil.js';
import { assert } from './utils/util.js';
import { buildRawBufferMatcher, matches } from './utils/metadataUtil.js';

const NDJSON_NEWLINE = Buffer.from('\n');

Expand Down
2 changes: 1 addition & 1 deletion src/Index/ReadableIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import fs from 'fs';
import path from 'path';
import events from 'events';
import Entry, { assertValidEntryClass } from '../IndexEntry.js';
import { assert, wrapAndCheck, binarySearch } from '../util.js';
import { assert, wrapAndCheck, binarySearch } from '../utils/util.js';

// node-event-store-index V01
const HEADER_MAGIC = "nesidx01";
Expand Down
6 changes: 3 additions & 3 deletions src/Index/WritableIndex.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import fs from 'fs';
import ReadableIndex, { Entry, CorruptedIndexError, HEADER_MAGIC } from './ReadableIndex.js';
import { assert, assertEqual } from '../util.js';
import { buildMetadataHeader } from '../metadataUtil.js';
import { ensureDirectory } from '../fsUtil.js';
import { assert, assertEqual } from '../utils/util.js';
import { buildMetadataHeader } from '../utils/metadataUtil.js';
import { ensureDirectory } from '../utils/fsUtil.js';

/**
* An index is a simple append-only file that stores an ordered list of entry elements pointing to the actual file position
Expand Down
4 changes: 2 additions & 2 deletions src/IndexMatcher.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { getPropertyAtPath } from './util.js';
import { matches } from './metadataUtil.js';
import { getPropertyAtPath } from './utils/util.js';
import { matches } from './utils/metadataUtil.js';

/**
* @typedef {object|function(object):boolean} Matcher
Expand Down
2 changes: 1 addition & 1 deletion src/JoinEventStream.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import EventStream from './EventStream.js';
import {assert, kWayMerge} from './util.js';
import { assert, kWayMerge } from './utils/util.js';

/** Reusable sentinel used for missing or empty per-stream iterators. */
const emptyIterator = Object.freeze({ next() { return { done: true }; } });
Expand Down
43 changes: 29 additions & 14 deletions src/Partition/ReadablePartition.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import fs from 'fs';
import path from 'path';
import events from 'events';
import { assert, alignTo, hash, binarySearch } from '../util.js';
import { assert, alignTo, hash, binarySearch } from '../utils/util.js';



Expand Down Expand Up @@ -209,6 +209,30 @@ class ReadablePartition extends events.EventEmitter {
return ({ dataSize, sequenceNumber, time64 });
}

resolveIterationPosition(position) {
return position < 0 ? this.size + position + 1 : position;
}

selectReader(position, size, backwardsHint) {
if (size > 0 && backwardsHint) {
const bufferOffset = DOCUMENT_HEADER_SIZE + size;
const reader = this.prepareReadBufferBackwards(position + bufferOffset, bufferOffset);
return { reader, bufferOffset };
}
return { reader: this.prepareReadBuffer(position), bufferOffset: 0 };
}

assignHeaderOutput(headerOut, header) {
if (headerOut === null) {
return;
}
headerOut.dataSize = header.dataSize;
headerOut.sequenceNumber = header.sequenceNumber;
// Denormalize time64 relative to this partition's epoch so callers can compare
// timestamps across partitions without needing to know the epoch value.
headerOut.time64 = this.metadata.epoch + header.time64;
}

/**
* Read the data from the given position.
*
Expand All @@ -227,10 +251,7 @@ class ReadablePartition extends events.EventEmitter {
assert(this.fd, 'Partition is not opened.');
assert((position % DOCUMENT_ALIGNMENT) === 0, `Invalid read position ${position}. Needs to be a multiple of ${DOCUMENT_ALIGNMENT}.`);

const bufferOffset = size > 0 && backwardsHint ? DOCUMENT_HEADER_SIZE + size : 0;
const reader = size > 0 && backwardsHint
? this.prepareReadBufferBackwards(position + bufferOffset, bufferOffset)
: this.prepareReadBuffer(position);
const { reader, bufferOffset } = this.selectReader(position, size, backwardsHint);
if (reader.length < DOCUMENT_HEADER_SIZE) {
return false;
}
Expand All @@ -242,13 +263,7 @@ class ReadablePartition extends events.EventEmitter {
let dataPosition = reader.cursor + DOCUMENT_HEADER_SIZE;
const header = this.readDocumentHeader(reader.buffer, reader.cursor, position, size);
const dataSize = header.dataSize;
if (headerOut !== null) {
headerOut.dataSize = header.dataSize;
headerOut.sequenceNumber = header.sequenceNumber;
// Denormalize time64 relative to this partition's epoch so callers can compare
// timestamps across partitions without needing to know the epoch value.
headerOut.time64 = this.metadata.epoch + header.time64;
}
this.assignHeaderOutput(headerOut, header);

const writeSize = this.documentWriteSize(dataSize);
assert(position + writeSize <= this.size, `Invalid document at position ${position}. This may be caused by an unfinished write.`, CorruptFileError);
Expand Down Expand Up @@ -437,7 +452,7 @@ class ReadablePartition extends events.EventEmitter {
* @returns {Generator<Buffer>} A generator that returns all documents in this partition.
*/
*readAll(after = 0, headerOut = null) {
let position = after < 0 ? this.size + after + 1 : after;
let position = this.resolveIterationPosition(after);
const internalHeader = headerOut !== null ? headerOut : {};
let data;
while ((data = this.readFrom(position, 0, internalHeader)) !== false) {
Expand All @@ -457,7 +472,7 @@ class ReadablePartition extends events.EventEmitter {
* @returns {Generator<Buffer>} A generator that returns all documents in this partition in reverse order.
*/
*readAllBackwards(before = -1, headerOut = null) {
let position = before < 0 ? this.size + before + 1 : before;
let position = this.resolveIterationPosition(before);
const internalHeader = headerOut !== null ? headerOut : {};
while ((position = this.findDocumentPositionBefore(position)) !== false) {
const data = this.readFrom(position, 0, internalHeader, true);
Expand Down
Loading
Loading