Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/resumable-streams-unit-tests.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@voltagent/resumable-streams": patch
---

Add unit tests for resumable-streams package
5 changes: 4 additions & 1 deletion packages/resumable-streams/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
"redis": "^4.7.0",
"resumable-stream": "^2.2.10"
},
"devDependencies": {},
"devDependencies": {
"@vitest/coverage-v8": "^3.2.4",
"vitest": "^3.2.4"
},
"exports": {
".": {
"import": {
Expand Down
377 changes: 377 additions & 0 deletions packages/resumable-streams/src/chat-session.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,377 @@
import type { ResumableStreamAdapter } from "@voltagent/core";
import { describe, expect, it, vi } from "vitest";
import { createResumableChatSession } from "./chat-session";

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

const makeAdapter = (overrides: Partial<ResumableStreamAdapter> = {}): ResumableStreamAdapter => ({
createStream: vi.fn(async () => "generated-stream-id"),
resumeStream: vi.fn(async () => null),
getActiveStreamId: vi.fn(async () => null),
clearActiveStream: vi.fn(async () => {}),
...overrides,
});

const makeReadableStream = (text = "hello"): ReadableStream<string> => {
return new ReadableStream<string>({
start(controller) {
controller.enqueue(text);
controller.close();
},
});
};

// ---------------------------------------------------------------------------
// createResumableChatSession — validation
// ---------------------------------------------------------------------------

describe("createResumableChatSession", () => {
describe("validation", () => {
it("throws when conversationId is missing", () => {
expect(() =>
createResumableChatSession({
adapter: makeAdapter(),
conversationId: "",
userId: "u1",
}),
).toThrow("conversationId is required");
});

it("throws when userId is missing", () => {
expect(() =>
createResumableChatSession({
adapter: makeAdapter(),
conversationId: "conv1",
userId: "",
}),
).toThrow("userId is required");
});

it("creates a session without throwing for valid inputs", () => {
expect(() =>
createResumableChatSession({
adapter: makeAdapter(),
conversationId: "conv1",
userId: "u1",
}),
).not.toThrow();
});
});

// ---------------------------------------------------------------------------
// createStream
// ---------------------------------------------------------------------------

describe("createStream", () => {
it("delegates to adapter.createStream and returns the stream id", async () => {
const adapter = makeAdapter({
createStream: vi.fn(async () => "sid-123"),
});
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
});

const streamId = await session.createStream(makeReadableStream());
expect(streamId).toBe("sid-123");
expect(adapter.createStream).toHaveBeenCalledOnce();
});

it("passes conversationId and userId to adapter.createStream", async () => {
const adapter = makeAdapter({
createStream: vi.fn(async () => "sid-xyz"),
});
const session = createResumableChatSession({
adapter,
conversationId: "conv-abc",
userId: "user-42",
});

await session.createStream(makeReadableStream());
expect(adapter.createStream).toHaveBeenCalledWith(
expect.objectContaining({ conversationId: "conv-abc", userId: "user-42" }),
);
});
});

// ---------------------------------------------------------------------------
// resumeStream
// ---------------------------------------------------------------------------

describe("resumeStream", () => {
it("returns null when no stream exists for the given id", async () => {
const adapter = makeAdapter({
resumeStream: vi.fn(async () => null),
});
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
});

const stream = await session.resumeStream("unknown-id");
expect(stream).toBeNull();
});

it("returns the stream when it exists", async () => {
const readable = makeReadableStream("data");
const adapter = makeAdapter({
resumeStream: vi.fn(async () => readable),
});
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
});

const stream = await session.resumeStream("known-id");
expect(stream).toBe(readable);
});
});

// ---------------------------------------------------------------------------
// getActiveStreamId
// ---------------------------------------------------------------------------

describe("getActiveStreamId", () => {
it("returns null when no active stream", async () => {
const adapter = makeAdapter({
getActiveStreamId: vi.fn(async () => null),
});
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
});

const sid = await session.getActiveStreamId();
expect(sid).toBeNull();
});

it("returns the active stream id when one is set", async () => {
const adapter = makeAdapter({
getActiveStreamId: vi.fn(async () => "active-sid"),
});
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
});

const sid = await session.getActiveStreamId();
expect(sid).toBe("active-sid");
});
});

// ---------------------------------------------------------------------------
// clearActiveStream
// ---------------------------------------------------------------------------

describe("clearActiveStream", () => {
it("calls adapter.clearActiveStream with context", async () => {
const adapter = makeAdapter();
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
});

await session.clearActiveStream();
expect(adapter.clearActiveStream).toHaveBeenCalledWith(
expect.objectContaining({ conversationId: "conv1", userId: "u1" }),
);
});

it("passes streamId when provided", async () => {
const adapter = makeAdapter();
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
});

await session.clearActiveStream("sid-to-clear");
expect(adapter.clearActiveStream).toHaveBeenCalledWith(
expect.objectContaining({ streamId: "sid-to-clear" }),
);
});
});

// ---------------------------------------------------------------------------
// consumeSseStream
// ---------------------------------------------------------------------------

describe("consumeSseStream", () => {
it("creates a stream from the provided SSE stream", async () => {
const adapter = makeAdapter({
createStream: vi.fn(async () => "sse-sid"),
});
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
});

await session.consumeSseStream({ stream: makeReadableStream() });
expect(adapter.createStream).toHaveBeenCalledOnce();
});

it("does not throw when adapter.createStream rejects", async () => {
const adapter = makeAdapter({
createStream: vi.fn(async () => {
throw new Error("persist failed");
}),
});
const logger = { error: vi.fn(), warn: vi.fn(), info: vi.fn(), debug: vi.fn() };
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
logger: logger as never,
});

// Should not throw even when adapter fails
await expect(
session.consumeSseStream({ stream: makeReadableStream() }),
).resolves.toBeUndefined();
expect(logger.error).toHaveBeenCalled();
});
});

// ---------------------------------------------------------------------------
// onFinish
// ---------------------------------------------------------------------------

describe("onFinish", () => {
it("clears the active stream when one has been created", async () => {
const adapter = makeAdapter({
createStream: vi.fn(async () => "fin-sid"),
clearActiveStream: vi.fn(async () => {}),
});
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
});

await session.createStream(makeReadableStream());
await session.onFinish();
expect(adapter.clearActiveStream).toHaveBeenCalledWith(
expect.objectContaining({ streamId: "fin-sid" }),
);
});

it("does nothing when there is no active stream", async () => {
const adapter = makeAdapter({
getActiveStreamId: vi.fn(async () => null),
clearActiveStream: vi.fn(async () => {}),
});
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
});

await session.onFinish();
expect(adapter.clearActiveStream).not.toHaveBeenCalled();
});

it("does not throw when clearActiveStream rejects", async () => {
const adapter = makeAdapter({
createStream: vi.fn(async () => "fin-sid"),
clearActiveStream: vi.fn(async () => {
throw new Error("clear failed");
}),
});
const logger = { error: vi.fn(), warn: vi.fn(), info: vi.fn(), debug: vi.fn() };
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
logger: logger as never,
});

await session.createStream(makeReadableStream());
await expect(session.onFinish()).resolves.toBeUndefined();
expect(logger.error).toHaveBeenCalled();
});
});

// ---------------------------------------------------------------------------
// resumeResponse
// ---------------------------------------------------------------------------

describe("resumeResponse", () => {
it("returns 204 when no active stream exists", async () => {
const adapter = makeAdapter({
getActiveStreamId: vi.fn(async () => null),
});
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
});

const response = await session.resumeResponse();
expect(response.status).toBe(204);
});

it("returns 204 and clears stream when the stream cannot be resumed", async () => {
const adapter = makeAdapter({
getActiveStreamId: vi.fn(async () => "stale-sid"),
resumeStream: vi.fn(async () => null),
clearActiveStream: vi.fn(async () => {}),
});
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
});

const response = await session.resumeResponse();
expect(response.status).toBe(204);
expect(adapter.clearActiveStream).toHaveBeenCalledWith(
expect.objectContaining({ streamId: "stale-sid" }),
);
});

it("returns 200 with body when the stream can be resumed", async () => {
const readable = makeReadableStream("chunk");
const adapter = makeAdapter({
getActiveStreamId: vi.fn(async () => "live-sid"),
resumeStream: vi.fn(async () => readable),
});
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
});

const response = await session.resumeResponse();
expect(response.status).toBe(200);
expect(response.body).not.toBeNull();
});

it("returns 204 when getActiveStreamId throws", async () => {
const adapter = makeAdapter({
getActiveStreamId: vi.fn(async () => {
throw new Error("network error");
}),
});
const logger = { error: vi.fn(), warn: vi.fn(), info: vi.fn(), debug: vi.fn() };
const session = createResumableChatSession({
adapter,
conversationId: "conv1",
userId: "u1",
logger: logger as never,
});

const response = await session.resumeResponse();
expect(response.status).toBe(204);
expect(logger.error).toHaveBeenCalled();
});
});
});
Loading