From 73a2d8bf0a38c59f01ca3603d939c08e11f50959 Mon Sep 17 00:00:00 2001 From: William Chong Date: Thu, 5 Jun 2025 10:01:25 +0400 Subject: [PATCH 1/2] Test subscribe to all with truncated stream --- .../test/src/streams/subscribeToAll.test.ts | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/packages/test/src/streams/subscribeToAll.test.ts b/packages/test/src/streams/subscribeToAll.test.ts index fdb1d776..cb160a1d 100644 --- a/packages/test/src/streams/subscribeToAll.test.ts +++ b/packages/test/src/streams/subscribeToAll.test.ts @@ -16,6 +16,10 @@ import { jsonEvent, ResolvedEvent, END, + streamNameFilter, + START, + FORWARDS, + AllStreamResolvedEvent, } from "@kurrent/kurrentdb-client"; const asyncPipeline = promisify(pipeline); @@ -465,4 +469,121 @@ describe("subscribeToAll", () => { expect(writeStream.ids).toHaveLength(9); }); }); + + describe("should handle stream truncation properly", () => { + test.only("caught up received when no events pass the filter", async () => { + const STREAM_NAME = "json_stream_name"; + const defer = new Defer(); + let caughtUpReceived = false; + let eventCount = 0; + + await client.appendToStream(STREAM_NAME, jsonTestEvents(10)); + + const subscribe = client.subscribeToAll({ + fromPosition: START, + filter: streamNameFilter({ prefixes: ["passthrough-filter"] }), + }); + + subscribe + .on("data", (resolvedEvent) => { + eventCount++; + }) + .on("caughtUp", () => { + caughtUpReceived = true; + subscribe.unsubscribe(); + defer.resolve(); + }) + .on("error", (error) => { + defer.reject(error); + }); + + try { + await defer.promise; + expect(caughtUpReceived).toBe(true); + expect(eventCount).toBe(0); + } catch (error) { + throw error; + } + }) + + test.only("subscribeToAll with filter on truncated stream", async () => { + const TRUNCATED_STREAM = "truncated_stream_test"; + const TRUNCATE_BEFORE = 20; + const CREATED_EVENTS = 25; + const deferAll = new Defer(); + const deferStream = new Defer(); + let caughtUpReceivedAll = false; + let caughtUpReceivedStream = false; + + var collected_subscribe_to_all: AllStreamResolvedEvent[] = [] + var collected_subscribe_to_stream: ResolvedEvent[] = [] + var collected_read_all: AllStreamResolvedEvent[] = [] + + await client.appendToStream(TRUNCATED_STREAM, jsonTestEvents(CREATED_EVENTS)); + + await client.setStreamMetadata(TRUNCATED_STREAM, { truncateBefore: TRUNCATE_BEFORE }); + + const subscription_all = client.subscribeToAll({ + fromPosition: START, + filter: streamNameFilter({ prefixes: [TRUNCATED_STREAM] }), + }); + + subscription_all + .on("data", (resolvedEvent) => { + collected_subscribe_to_all.push(resolvedEvent); + }) + .on("caughtUp", () => { + caughtUpReceivedAll = true; + subscription_all.unsubscribe(); + deferAll.resolve(); + }) + .on("error", (error) => { + deferAll.reject(error); + }); + + const subscription_stream = client.subscribeToStream(TRUNCATED_STREAM, { + fromRevision: START, + }); + + subscription_stream + .on("data", (resolvedEvent) => { + collected_subscribe_to_stream.push(resolvedEvent); + }) + .on("caughtUp", () => { + caughtUpReceivedStream = true; + subscription_stream.unsubscribe(); + deferStream.resolve(); + }) + .on("error", (error) => { + deferStream.reject(error); + }); + + const read_all_events = client.readAll({ + direction: FORWARDS, + fromPosition: START, + filter: streamNameFilter({ prefixes: [TRUNCATED_STREAM] }), + }); + + for await (const resolvedEvent of read_all_events) { + collected_read_all.push(resolvedEvent); + } + + try { + await Promise.all([deferAll.promise, deferStream.promise]); + + expect(caughtUpReceivedAll).toBe(true); + expect(caughtUpReceivedStream).toBe(true); + + expect(collected_subscribe_to_all.length).toBe(25); + expect(collected_read_all.length).toBe(25); + expect(collected_subscribe_to_stream.length).toBe(CREATED_EVENTS - TRUNCATE_BEFORE); + + expect(collected_subscribe_to_all.at(-1)?.event?.revision).toBe(24n); + expect(collected_read_all.at(-1)?.event?.revision).toBe(24); + expect(collected_subscribe_to_stream.at(-1)?.event?.revision).toBe(24n); + } catch (error) { + throw error; + } + }); + }); }); From 11f2d53d78838ee4247a27342af0146c0664fd44 Mon Sep 17 00:00:00 2001 From: William Chong Date: Thu, 5 Jun 2025 10:27:45 +0400 Subject: [PATCH 2/2] Add comment --- packages/test/src/streams/subscribeToAll.test.ts | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/packages/test/src/streams/subscribeToAll.test.ts b/packages/test/src/streams/subscribeToAll.test.ts index cb160a1d..cadb3b56 100644 --- a/packages/test/src/streams/subscribeToAll.test.ts +++ b/packages/test/src/streams/subscribeToAll.test.ts @@ -518,6 +518,7 @@ describe("subscribeToAll", () => { var collected_subscribe_to_all: AllStreamResolvedEvent[] = [] var collected_subscribe_to_stream: ResolvedEvent[] = [] var collected_read_all: AllStreamResolvedEvent[] = [] + var collected_read_stream: ResolvedEvent[] = [] await client.appendToStream(TRUNCATED_STREAM, jsonTestEvents(CREATED_EVENTS)); @@ -568,6 +569,15 @@ describe("subscribeToAll", () => { collected_read_all.push(resolvedEvent); } + const read_stream_events = client.readStream(TRUNCATED_STREAM, { + direction: FORWARDS, + fromRevision: START, + }); + + for await (const resolvedEvent of read_stream_events) { + collected_read_stream.push(resolvedEvent); + } + try { await Promise.all([deferAll.promise, deferStream.promise]); @@ -576,11 +586,17 @@ describe("subscribeToAll", () => { expect(collected_subscribe_to_all.length).toBe(25); expect(collected_read_all.length).toBe(25); + + // After truncation, the counts will be different - the truncation is stored in the stream metadata, + // and the metadata is not applied when reading/subscribing to $all + // so your $all reads and subscriptions will still include all the events until they are scavenged expect(collected_subscribe_to_stream.length).toBe(CREATED_EVENTS - TRUNCATE_BEFORE); + expect(collected_read_stream.length).toBe(CREATED_EVENTS - TRUNCATE_BEFORE); expect(collected_subscribe_to_all.at(-1)?.event?.revision).toBe(24n); expect(collected_read_all.at(-1)?.event?.revision).toBe(24); expect(collected_subscribe_to_stream.at(-1)?.event?.revision).toBe(24n); + expect(collected_read_stream.at(-1)?.event?.revision).toBe(24); } catch (error) { throw error; }