diff --git a/.changeset/warm-pandas-double.md b/.changeset/warm-pandas-double.md new file mode 100644 index 00000000..ba6fdc9f --- /dev/null +++ b/.changeset/warm-pandas-double.md @@ -0,0 +1,5 @@ +--- +"@chat-adapter/state-redis": patch +--- + +Fix `createRedisState` typings so `url` appears in IntelliSense and the factory accepts an existing Redis client. diff --git a/packages/state-memory/src/index.test.ts b/packages/state-memory/src/index.test.ts index f98bbd4b..2e3b31b2 100644 --- a/packages/state-memory/src/index.test.ts +++ b/packages/state-memory/src/index.test.ts @@ -1,4 +1,4 @@ -import { beforeEach, describe, expect, it } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; import { createMemoryState, type MemoryStateAdapter } from "./index"; describe("MemoryStateAdapter", () => { @@ -186,13 +186,20 @@ describe("MemoryStateAdapter", () => { }); it("should refresh TTL on subsequent appends", async () => { - await adapter.appendToList("list1", { id: 1 }, { ttlMs: 50 }); - await new Promise((resolve) => setTimeout(resolve, 30)); - // Append again — refreshes TTL - await adapter.appendToList("list1", { id: 2 }, { ttlMs: 50 }); + vi.useFakeTimers(); - const result = await adapter.getList("list1"); - expect(result).toEqual([{ id: 1 }, { id: 2 }]); + try { + await adapter.appendToList("list1", { id: 1 }, { ttlMs: 50 }); + await vi.advanceTimersByTimeAsync(30); + + // Append again — refreshes TTL + await adapter.appendToList("list1", { id: 2 }, { ttlMs: 50 }); + + const result = await adapter.getList("list1"); + expect(result).toEqual([{ id: 1 }, { id: 2 }]); + } finally { + vi.useRealTimers(); + } }); it("should keep lists isolated by key", async () => { diff --git a/packages/state-redis/src/index.test.ts b/packages/state-redis/src/index.test.ts index 14df0551..ec419aeb 100644 --- a/packages/state-redis/src/index.test.ts +++ b/packages/state-redis/src/index.test.ts @@ -1,4 +1,6 @@ +import { EventEmitter } from "node:events"; import type { Logger } from "chat"; +import type { RedisClientType } from "redis"; import { describe, expect, it, vi } from "vitest"; import { createRedisState, RedisStateAdapter } from "./index"; @@ -22,6 +24,167 @@ describe("RedisStateAdapter", () => { expect(adapter).toBeInstanceOf(RedisStateAdapter); }); + it("should create an adapter with url and default logger", () => { + const adapter = createRedisState({ + url: "redis://localhost:6379", + }); + + expect(adapter).toBeInstanceOf(RedisStateAdapter); + }); + + it("should accept an existing redis client", () => { + const client = { + close: vi.fn(), + connect: vi.fn(), + isOpen: true, + isReady: true, + on: vi.fn(), + } as unknown as RedisClientType; + + const adapter = createRedisState({ + client, + logger: mockLogger, + }); + + expect(adapter).toBeInstanceOf(RedisStateAdapter); + expect(adapter.getClient()).toBe(client); + }); + + it("should wait for an injected open client to become ready", async () => { + const emitter = new EventEmitter(); + const client = Object.assign(emitter, { + close: vi.fn(), + connect: vi.fn(), + isOpen: true, + isReady: false, + }) as unknown as RedisClientType & { + isReady: boolean; + }; + + const adapter = createRedisState({ + client, + logger: mockLogger, + }); + + let resolved = false; + const connectPromise = adapter.connect().then(() => { + resolved = true; + }); + + await Promise.resolve(); + expect(resolved).toBe(false); + expect(client.connect).not.toHaveBeenCalled(); + + client.isReady = true; + emitter.emit("ready"); + + await connectPromise; + expect(resolved).toBe(true); + }); + + it("should ignore transient errors while waiting for an injected client to recover", async () => { + const emitter = new EventEmitter(); + const client = Object.assign(emitter, { + close: vi.fn(), + connect: vi.fn(), + isOpen: true, + isReady: false, + }) as unknown as RedisClientType & { + isOpen: boolean; + isReady: boolean; + }; + + const adapter = createRedisState({ + client, + logger: mockLogger, + }); + + let resolved = false; + const connectPromise = adapter.connect().then(() => { + resolved = true; + }); + + await Promise.resolve(); + emitter.emit("error", new Error("Socket closed unexpectedly")); + client.isOpen = false; + emitter.emit("reconnecting"); + + await Promise.resolve(); + expect(resolved).toBe(false); + + client.isOpen = true; + client.isReady = true; + emitter.emit("ready"); + + await connectPromise; + expect(resolved).toBe(true); + expect(client.connect).not.toHaveBeenCalled(); + }); + + it("should wait for an injected client to become ready again after reconnecting", async () => { + const emitter = new EventEmitter(); + const client = Object.assign(emitter, { + close: vi.fn(), + connect: vi.fn(), + isOpen: true, + isReady: true, + }) as unknown as RedisClientType & { + isOpen: boolean; + isReady: boolean; + }; + + const adapter = createRedisState({ + client, + logger: mockLogger, + }); + + await adapter.connect(); + expect(client.connect).not.toHaveBeenCalled(); + + client.isReady = false; + client.isOpen = false; + emitter.emit("reconnecting"); + + let resolved = false; + const reconnectPromise = adapter.connect().then(() => { + resolved = true; + }); + + await Promise.resolve(); + expect(resolved).toBe(false); + expect(client.connect).not.toHaveBeenCalled(); + + client.isOpen = true; + client.isReady = true; + emitter.emit("ready"); + + await reconnectPromise; + expect(resolved).toBe(true); + }); + + it("should reject when an injected client ends before becoming ready", async () => { + const emitter = new EventEmitter(); + const client = Object.assign(emitter, { + close: vi.fn(), + connect: vi.fn(), + isOpen: true, + isReady: false, + }) as unknown as RedisClientType; + + const adapter = createRedisState({ + client, + logger: mockLogger, + }); + + const connectPromise = adapter.connect(); + const error = new Error("Socket closed unexpectedly"); + + emitter.emit("error", error); + emitter.emit("end"); + + await expect(connectPromise).rejects.toBe(error); + }); + it("should have appendToList method", () => { const adapter = createRedisState({ url: "redis://localhost:6379", diff --git a/packages/state-redis/src/index.ts b/packages/state-redis/src/index.ts index 92e7dfdf..31ba6e45 100644 --- a/packages/state-redis/src/index.ts +++ b/packages/state-redis/src/index.ts @@ -6,11 +6,31 @@ export interface RedisStateAdapterOptions { /** Key prefix for all Redis keys (default: "chat-sdk") */ keyPrefix?: string; /** Logger instance for error reporting */ - logger: Logger; + logger?: Logger; /** Redis connection URL (e.g., redis://localhost:6379) */ url: string; } +export interface RedisStateClientOptions { + /** Existing redis client instance */ + client: RedisClientType; + /** Key prefix for all Redis keys (default: "chat-sdk") */ + keyPrefix?: string; + /** Logger instance for error reporting */ + logger?: Logger; +} + +export interface CreateRedisStateOptions { + /** Existing redis client instance */ + client?: RedisClientType; + /** Key prefix for all Redis keys (default: "chat-sdk") */ + keyPrefix?: string; + /** Logger instance for error reporting */ + logger?: Logger; + /** Redis connection URL (e.g., redis://localhost:6379) */ + url?: string; +} + /** * Redis state adapter for production use. * @@ -21,18 +41,34 @@ export class RedisStateAdapter implements StateAdapter { private readonly client: RedisClientType; private readonly keyPrefix: string; private readonly logger: Logger; + private readonly ownsClient: boolean; private connected = false; private connectPromise: Promise | null = null; - constructor(options: RedisStateAdapterOptions) { - this.client = createClient({ url: options.url }); + constructor(options: RedisStateAdapterOptions | RedisStateClientOptions) { + if ("client" in options) { + this.client = options.client; + this.ownsClient = false; + } else { + this.client = createClient({ url: options.url }); + this.ownsClient = true; + } this.keyPrefix = options.keyPrefix || "chat-sdk"; - this.logger = options.logger; + this.logger = options.logger ?? new ConsoleLogger("info").child("redis"); // Handle connection errors this.client.on("error", (err) => { this.logger.error("Redis client error", { error: err }); }); + this.client.on("ready", () => { + this.connected = true; + }); + this.client.on("reconnecting", () => { + this.connected = false; + }); + this.client.on("end", () => { + this.connected = false; + }); } private key(type: "sub" | "lock" | "cache" | "queue", id: string): string { @@ -43,16 +79,72 @@ export class RedisStateAdapter implements StateAdapter { return `${this.keyPrefix}:subscriptions`; } + private async waitForReady(): Promise { + if (this.client.isReady) { + return; + } + + await new Promise((resolve, reject) => { + let lastError: unknown; + + const handleReady = () => { + cleanup(); + resolve(); + }; + + const handleError = (error: unknown) => { + lastError = error; + }; + + const handleEnd = () => { + cleanup(); + reject( + lastError ?? + new Error("Redis client connection ended before becoming ready.") + ); + }; + + const cleanup = () => { + this.client.off("ready", handleReady); + this.client.off("error", handleError); + this.client.off("end", handleEnd); + }; + + this.client.on("ready", handleReady); + this.client.on("error", handleError); + this.client.on("end", handleEnd); + + if (this.client.isReady) { + cleanup(); + resolve(); + } + }); + } + async connect(): Promise { - if (this.connected) { + if (this.connected && this.client.isReady) { return; } // Reuse existing connection attempt to avoid race conditions if (!this.connectPromise) { - this.connectPromise = this.client.connect().then(() => { + const connectPromise = (async () => { + if (this.ownsClient && !(this.client.isReady || this.client.isOpen)) { + await this.client.connect(); + } + + await this.waitForReady(); this.connected = true; - }); + })() + .catch((error) => { + throw error; + }) + .finally(() => { + if (this.connectPromise === connectPromise) { + this.connectPromise = null; + } + }); + this.connectPromise = connectPromise; } await this.connectPromise; @@ -60,7 +152,9 @@ export class RedisStateAdapter implements StateAdapter { async disconnect(): Promise { if (this.connected) { - await this.client.close(); + if (this.ownsClient) { + await this.client.close(); + } this.connected = false; this.connectPromise = null; } @@ -321,18 +415,26 @@ function generateToken(): string { } export function createRedisState( - options?: Partial + options: CreateRedisStateOptions = {} ): RedisStateAdapter { - const url = options?.url ?? process.env.REDIS_URL; + if (options.client) { + return new RedisStateAdapter({ + client: options.client, + keyPrefix: options.keyPrefix, + logger: options.logger, + }); + } + + const url = options.url ?? process.env.REDIS_URL; if (!url) { throw new Error( - "Redis url is required. Set REDIS_URL or provide it in options." + "Redis url is required. Set REDIS_URL or provide url in options." ); } const resolved: RedisStateAdapterOptions = { url, - keyPrefix: options?.keyPrefix, - logger: options?.logger ?? new ConsoleLogger("info").child("redis"), + keyPrefix: options.keyPrefix, + logger: options.logger, }; return new RedisStateAdapter(resolved); }