Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions packages/test/src/streams/subscribeToAll.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import {
jsonEvent,
ResolvedEvent,
END,
streamNameFilter,
START,
FORWARDS,
AllStreamResolvedEvent,
} from "@kurrent/kurrentdb-client";

const asyncPipeline = promisify(pipeline);
Expand Down Expand Up @@ -465,4 +469,137 @@ describe("subscribeToAll", () => {
expect(writeStream.ids).toHaveLength(9);
});
});

describe("should handle stream truncation properly", () => {
test.only("caught up received when no events pass the filter", async () => {
const STREAM_NAME = "json_stream_name";
const defer = new Defer();
let caughtUpReceived = false;
let eventCount = 0;

await client.appendToStream(STREAM_NAME, jsonTestEvents(10));

const subscribe = client.subscribeToAll({
fromPosition: START,
filter: streamNameFilter({ prefixes: ["passthrough-filter"] }),
});

subscribe
.on("data", (resolvedEvent) => {
eventCount++;
})
.on("caughtUp", () => {
caughtUpReceived = true;
subscribe.unsubscribe();
defer.resolve();
})
.on("error", (error) => {
defer.reject(error);
});

try {
await defer.promise;
expect(caughtUpReceived).toBe(true);
expect(eventCount).toBe(0);
} catch (error) {
throw error;
}
})

test.only("subscribeToAll with filter on truncated stream", async () => {
const TRUNCATED_STREAM = "truncated_stream_test";
const TRUNCATE_BEFORE = 20;
const CREATED_EVENTS = 25;
const deferAll = new Defer();
const deferStream = new Defer();
let caughtUpReceivedAll = false;
let caughtUpReceivedStream = false;

var collected_subscribe_to_all: AllStreamResolvedEvent[] = []
var collected_subscribe_to_stream: ResolvedEvent[] = []
var collected_read_all: AllStreamResolvedEvent[] = []
var collected_read_stream: ResolvedEvent[] = []

await client.appendToStream(TRUNCATED_STREAM, jsonTestEvents(CREATED_EVENTS));

await client.setStreamMetadata(TRUNCATED_STREAM, { truncateBefore: TRUNCATE_BEFORE });

const subscription_all = client.subscribeToAll({
fromPosition: START,
filter: streamNameFilter({ prefixes: [TRUNCATED_STREAM] }),
});

subscription_all
.on("data", (resolvedEvent) => {
collected_subscribe_to_all.push(resolvedEvent);
})
.on("caughtUp", () => {
caughtUpReceivedAll = true;
subscription_all.unsubscribe();
deferAll.resolve();
})
.on("error", (error) => {
deferAll.reject(error);
});

const subscription_stream = client.subscribeToStream(TRUNCATED_STREAM, {
fromRevision: START,
});

subscription_stream
.on("data", (resolvedEvent) => {
collected_subscribe_to_stream.push(resolvedEvent);
})
.on("caughtUp", () => {
caughtUpReceivedStream = true;
subscription_stream.unsubscribe();
deferStream.resolve();
})
.on("error", (error) => {
deferStream.reject(error);
});

const read_all_events = client.readAll({
direction: FORWARDS,
fromPosition: START,
filter: streamNameFilter({ prefixes: [TRUNCATED_STREAM] }),
});

for await (const resolvedEvent of read_all_events) {
collected_read_all.push(resolvedEvent);
}

const read_stream_events = client.readStream(TRUNCATED_STREAM, {
direction: FORWARDS,
fromRevision: START,
});

for await (const resolvedEvent of read_stream_events) {
collected_read_stream.push(resolvedEvent);
}

try {
await Promise.all([deferAll.promise, deferStream.promise]);

expect(caughtUpReceivedAll).toBe(true);
expect(caughtUpReceivedStream).toBe(true);

expect(collected_subscribe_to_all.length).toBe(25);
expect(collected_read_all.length).toBe(25);

// After truncation, the counts will be different - the truncation is stored in the stream metadata,
// and the metadata is not applied when reading/subscribing to $all
// so your $all reads and subscriptions will still include all the events until they are scavenged
expect(collected_subscribe_to_stream.length).toBe(CREATED_EVENTS - TRUNCATE_BEFORE);
expect(collected_read_stream.length).toBe(CREATED_EVENTS - TRUNCATE_BEFORE);

expect(collected_subscribe_to_all.at(-1)?.event?.revision).toBe(24n);
expect(collected_read_all.at(-1)?.event?.revision).toBe(24);
expect(collected_subscribe_to_stream.at(-1)?.event?.revision).toBe(24n);
expect(collected_read_stream.at(-1)?.event?.revision).toBe(24);
} catch (error) {
throw error;
}
});
});
});
Loading