From 1250adc456336c20b7bdcec4b32ebb25d358298f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Drigh=C3=A8s?= Date: Mon, 13 Oct 2025 23:46:45 +0200 Subject: [PATCH] initiate redis-stream implementation --- docker-compose.yaml | 10 + packages/redis-stream/LICENSE | 21 ++ packages/redis-stream/README.md | 64 +++++ packages/redis-stream/docgen.json | 7 + .../examples/establish-connection.ts | 34 +++ packages/redis-stream/examples/publisher.ts | 31 +++ packages/redis-stream/examples/subscriber.ts | 32 +++ packages/redis-stream/package.json | 54 ++++ packages/redis-stream/src/RedisConnection.ts | 108 ++++++++ packages/redis-stream/src/RedisError.ts | 46 ++++ .../redis-stream/src/RedisStreamMessage.ts | 30 +++ .../redis-stream/src/RedisStreamPublisher.ts | 111 ++++++++ .../redis-stream/src/RedisStreamSubscriber.ts | 240 ++++++++++++++++++ packages/redis-stream/src/index.ts | 24 ++ .../src/internal/RedisConnection.ts | 160 ++++++++++++ .../redis-stream/test/RedisConnection.test.ts | 42 +++ .../test/RedisStreamPublisher.test.ts | 71 ++++++ .../test/RedisStreamSubscriber.test.ts | 66 +++++ packages/redis-stream/test/dependencies.ts | 7 + packages/redis-stream/tsconfig.build.json | 13 + packages/redis-stream/tsconfig.examples.json | 9 + packages/redis-stream/tsconfig.json | 14 + packages/redis-stream/tsconfig.src.json | 9 + packages/redis-stream/tsconfig.test.json | 9 + packages/redis-stream/vitest.config.ts | 7 + pnpm-lock.yaml | 139 +++++++++- tsconfig.base.json | 9 +- 27 files changed, 1365 insertions(+), 2 deletions(-) create mode 100644 packages/redis-stream/LICENSE create mode 100644 packages/redis-stream/README.md create mode 100644 packages/redis-stream/docgen.json create mode 100644 packages/redis-stream/examples/establish-connection.ts create mode 100644 packages/redis-stream/examples/publisher.ts create mode 100644 packages/redis-stream/examples/subscriber.ts create mode 100644 packages/redis-stream/package.json create mode 100644 packages/redis-stream/src/RedisConnection.ts create mode 100644 packages/redis-stream/src/RedisError.ts create mode 100644 packages/redis-stream/src/RedisStreamMessage.ts create mode 100644 packages/redis-stream/src/RedisStreamPublisher.ts create mode 100644 packages/redis-stream/src/RedisStreamSubscriber.ts create mode 100644 packages/redis-stream/src/index.ts create mode 100644 packages/redis-stream/src/internal/RedisConnection.ts create mode 100644 packages/redis-stream/test/RedisConnection.test.ts create mode 100644 packages/redis-stream/test/RedisStreamPublisher.test.ts create mode 100644 packages/redis-stream/test/RedisStreamSubscriber.test.ts create mode 100644 packages/redis-stream/test/dependencies.ts create mode 100644 packages/redis-stream/tsconfig.build.json create mode 100644 packages/redis-stream/tsconfig.examples.json create mode 100644 packages/redis-stream/tsconfig.json create mode 100644 packages/redis-stream/tsconfig.src.json create mode 100644 packages/redis-stream/tsconfig.test.json create mode 100644 packages/redis-stream/vitest.config.ts diff --git a/docker-compose.yaml b/docker-compose.yaml index fb53a84..ca0fc58 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -18,5 +18,15 @@ services: timeout: 5s retries: 5 + redis: + image: redis:7-alpine + ports: + - 6379:6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + volumes: rabbitmq_data: diff --git a/packages/redis-stream/LICENSE b/packages/redis-stream/LICENSE new file mode 100644 index 0000000..eb9cdd6 --- /dev/null +++ b/packages/redis-stream/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 effect-messaging + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/packages/redis-stream/README.md b/packages/redis-stream/README.md new file mode 100644 index 0000000..b5f8d49 --- /dev/null +++ b/packages/redis-stream/README.md @@ -0,0 +1,64 @@ +# @effect-messaging/redis-stream + +A Redis Stream toolkit for Effect. + +## Installation + +```bash +pnpm add @effect-messaging/redis-stream +``` + +## Usage + +### Publisher + +```typescript +import { + RedisConnection, + RedisStreamPublisher +} from "@effect-messaging/redis-stream" +import { Effect } from "effect" + +const program = Effect.gen(function* () { + const publisher = yield* RedisStreamPublisher.make() + + yield* publisher.publish({ + stream: "my-stream", + data: { message: "Hello World" } + }) +}) + +const runnable = program.pipe( + Effect.provide(RedisStreamPublisher.layer()), + Effect.provide(RedisConnection.layer({ host: "localhost", port: 6379 })) +) + +Effect.runPromise(runnable) +``` + +### Subscriber + +```typescript +import { + RedisConnection, + RedisStreamSubscriber +} from "@effect-messaging/redis-stream" +import { Effect } from "effect" + +const messageHandler = Effect.gen(function* () { + const message = yield* RedisStreamMessage.RedisStreamMessage + yield* Effect.logInfo(`Received: ${JSON.stringify(message.data)}`) +}) + +const program = Effect.gen(function* () { + const subscriber = yield* RedisStreamSubscriber.make("my-stream") + yield* subscriber.subscribe(messageHandler) +}) + +const runnable = program.pipe( + Effect.provide(RedisStreamSubscriber.layer("my-stream")), + Effect.provide(RedisConnection.layer({ host: "localhost", port: 6379 })) +) + +Effect.runPromise(runnable) +``` diff --git a/packages/redis-stream/docgen.json b/packages/redis-stream/docgen.json new file mode 100644 index 0000000..91ada79 --- /dev/null +++ b/packages/redis-stream/docgen.json @@ -0,0 +1,7 @@ +{ + "src": "src", + "out": "docs", + "theme": "minimal", + "includeVersion": true, + "categorizeByGroup": true +} diff --git a/packages/redis-stream/examples/establish-connection.ts b/packages/redis-stream/examples/establish-connection.ts new file mode 100644 index 0000000..5a1844b --- /dev/null +++ b/packages/redis-stream/examples/establish-connection.ts @@ -0,0 +1,34 @@ +import { RedisConnection } from "@effect-messaging/redis-stream" +import { Effect } from "effect" + +const program = Effect.gen(function*(_) { + const connection = yield* RedisConnection.RedisConnection + + // Test the connection with a ping + const pong = yield* connection.ping + yield* Effect.logInfo(`Redis connection established: ${pong}`) + + // Test basic Redis operations + const client = yield* connection.client + yield* Effect.tryPromise({ + try: () => client.set("test-key", "test-value"), + catch: (error) => new Error(`Failed to set key: ${error}`) + }) + + const value = yield* Effect.tryPromise({ + try: () => client.get("test-key"), + catch: (error) => new Error(`Failed to get key: ${error}`) + }) + + yield* Effect.logInfo(`Retrieved value: ${value}`) +}) + +const runnable = program.pipe( + Effect.provide(RedisConnection.layer({ + host: "localhost", + port: 6379 + })) +) + +// Run the program +Effect.runPromise(runnable) diff --git a/packages/redis-stream/examples/publisher.ts b/packages/redis-stream/examples/publisher.ts new file mode 100644 index 0000000..4764888 --- /dev/null +++ b/packages/redis-stream/examples/publisher.ts @@ -0,0 +1,31 @@ +import { RedisConnection, RedisStreamPublisher } from "@effect-messaging/redis-stream" +import { Context, Effect } from "effect" + +class MyPublisher extends Context.Tag("MyPublisher")() {} + +const program = Effect.gen(function*(_) { + const publisher = yield* MyPublisher + + yield* publisher.publish({ + stream: "my-stream", + data: { + message: "Hello World", + timestamp: Date.now().toString(), + type: "greeting" + } + }) + + yield* Effect.logInfo("Message published successfully") +}) + +const runnable = program.pipe( + Effect.provideServiceEffect(MyPublisher, RedisStreamPublisher.make()), + // provide the Redis Connection dependency + Effect.provide(RedisConnection.layer({ + host: "localhost", + port: 6379 + })) +) + +// Run the program +Effect.runPromise(runnable) diff --git a/packages/redis-stream/examples/subscriber.ts b/packages/redis-stream/examples/subscriber.ts new file mode 100644 index 0000000..db1c3fd --- /dev/null +++ b/packages/redis-stream/examples/subscriber.ts @@ -0,0 +1,32 @@ +import { RedisConnection, RedisStreamMessage, RedisStreamSubscriber } from "@effect-messaging/redis-stream" +import { Effect } from "effect" + +const messageHandler = Effect.gen(function*(_) { + const message = yield* RedisStreamMessage.RedisStreamMessage + + // You can add your message processing logic here + yield* Effect.logInfo(`Received message: ${JSON.stringify(message.data)}`) + yield* Effect.logInfo(`Message ID: ${message.id}, Timestamp: ${message.timestamp}`) +}) + +const program = Effect.gen(function*(_) { + const subscriber = yield* RedisStreamSubscriber.make("my-stream", { + blockTimeout: 1000, + count: 10 + }) + + // The subscriber will automatically handle message acknowledgment + // based on the success or failure of the message handler + yield* subscriber.subscribe(messageHandler) +}) + +const runnable = program.pipe( + // provide the Redis Connection dependency + Effect.provide(RedisConnection.layer({ + host: "localhost", + port: 6379 + })) +) + +// Run the program +Effect.runPromise(runnable) diff --git a/packages/redis-stream/package.json b/packages/redis-stream/package.json new file mode 100644 index 0000000..fc69360 --- /dev/null +++ b/packages/redis-stream/package.json @@ -0,0 +1,54 @@ +{ + "name": "@effect-messaging/redis-stream", + "version": "0.1.0", + "type": "module", + "license": "MIT", + "description": "A Redis Stream toolkit for Effect", + "homepage": "https://github.com/spiko-tech/effect-messaging", + "repository": { + "type": "git", + "url": "https://github.com/spiko-tech/effect-messaging.git", + "directory": "packages/redis-stream" + }, + "bugs": { + "url": "https://github.com/spiko-tech/effect-messaging/issues" + }, + "tags": [ + "typescript", + "redis", + "streams" + ], + "keywords": [ + "typescript", + "redis", + "streams" + ], + "publishConfig": { + "access": "public", + "directory": "dist", + "provenance": true + }, + "scripts": { + "codegen": "build-utils prepare-v2", + "build": "pnpm build-esm && pnpm build-annotate && pnpm build-cjs && build-utils pack-v2", + "build-esm": "tsc -b tsconfig.build.json", + "build-cjs": "babel build/esm --plugins @babel/transform-export-namespace-from --plugins @babel/transform-modules-commonjs --out-dir build/cjs --source-maps", + "build-annotate": "babel build/esm --plugins annotate-pure-calls --out-dir build/esm --source-maps", + "check": "tsc -b tsconfig.json", + "test": "vitest", + "coverage": "vitest --coverage" + }, + "devDependencies": { + "@effect-messaging/core": "workspace:^", + "@effect/platform": "^0.92.1", + "@types/node": "^22.14.1", + "effect": "^3.18.3", + "redis": "^4.6.12" + }, + "peerDependencies": { + "@effect-messaging/core": "workspace:^", + "@effect/platform": "^0.92.1", + "effect": "^3.18.3", + "redis": "^4.6.12" + } +} diff --git a/packages/redis-stream/src/RedisConnection.ts b/packages/redis-stream/src/RedisConnection.ts new file mode 100644 index 0000000..220fde8 --- /dev/null +++ b/packages/redis-stream/src/RedisConnection.ts @@ -0,0 +1,108 @@ +/** + * @since 0.1.0 + */ +import * as Context from "effect/Context" +import type * as Duration from "effect/Duration" +import * as Effect from "effect/Effect" +import * as Layer from "effect/Layer" +import type * as Schedule from "effect/Schedule" +import type * as Scope from "effect/Scope" +import type { RedisClientType } from "redis" +import * as internal from "./internal/RedisConnection.js" +import * as RedisError from "./RedisError.js" + +/** + * @category type ids + * @since 0.1.0 + */ +export const TypeId: unique symbol = Symbol.for("@effect-messaging/redis-stream/RedisConnection") + +/** + * @category type ids + * @since 0.1.0 + */ +export type TypeId = typeof TypeId + +/** + * @category models + * @since 0.1.0 + */ +export interface RedisConnection { + readonly [TypeId]: TypeId + readonly client: Effect.Effect + readonly ping: Effect.Effect + readonly quit: Effect.Effect + + /** @internal */ + readonly close: (options: internal.CloseConnectionOptions) => Effect.Effect +} + +/** + * @category tags + * @since 0.1.0 + */ +export const RedisConnection = Context.GenericTag("@effect-messaging/redis-stream/RedisConnection") + +/** + * @category models + * @since 0.1.0 + */ +export type RedisConnectionOptions = { + host?: string + port?: number + password?: string + db?: number + retryConnectionSchedule?: Schedule.Schedule + waitConnectionTimeout?: Duration.DurationInput +} + +/** + * @category constructors + * @since 0.1.0 + */ +export const make = ( + options: RedisConnectionOptions = {} +): Effect.Effect => + Effect.gen(function*() { + const internalConnection = yield* internal.InternalRedisConnection + const provideInternal = Effect.provideService(internal.InternalRedisConnection, internalConnection) + + const connection = yield* Effect.acquireRelease( + Effect.gen(function*() { + yield* internal.initiateConnection + return { + [TypeId]: TypeId as TypeId, + client: internal.getOrWaitClient.pipe(provideInternal), + ping: Effect.gen(function*() { + const client = yield* internal.getOrWaitClient.pipe(provideInternal) + return yield* Effect.tryPromise({ + try: () => client.ping(), + catch: (error) => new RedisError.RedisConnectionError({ reason: "Ping failed", cause: error }) + }) + }) as Effect.Effect, + quit: Effect.gen(function*() { + const client = yield* internal.getOrWaitClient.pipe(provideInternal) + return yield* Effect.tryPromise({ + try: () => client.quit(), + catch: (error) => new RedisError.RedisConnectionError({ reason: "Quit failed", cause: error }) + }) + }) as Effect.Effect, + close: (opts: internal.CloseConnectionOptions = {}) => internal.closeConnection(opts).pipe(provideInternal) + } + }), + (connection) => connection.close() + ) + yield* Effect.forkScoped(internal.keepConnectionAlive) + yield* Effect.forkScoped(internal.monitorConnectionErrors) + return connection + }).pipe( + Effect.provideServiceEffect(internal.InternalRedisConnection, internal.InternalRedisConnection.new(options)) + ) + +/** + * @since 0.1.0 + * @category Layers + */ +export const layer = ( + options: RedisConnectionOptions = {} +): Layer.Layer => Layer.scoped(RedisConnection, make(options)) diff --git a/packages/redis-stream/src/RedisError.ts b/packages/redis-stream/src/RedisError.ts new file mode 100644 index 0000000..1d1c35c --- /dev/null +++ b/packages/redis-stream/src/RedisError.ts @@ -0,0 +1,46 @@ +/** + * @since 0.1.0 + */ +import * as Schema from "effect/Schema" + +/** + * @since 0.1.0 + */ +export const TypeId: unique symbol = Symbol.for("@effect-messaging/redis-stream/RedisError") + +/** + * @since 0.1.0 + */ +export type TypeId = typeof TypeId + +/** + * Represents a Redis Connection Error + * + * @since 0.1.0 + * @category errors + */ +export class RedisConnectionError extends Schema.TaggedError()( + "RedisConnectionError", + { reason: Schema.String, cause: Schema.optional(Schema.Defect) } +) { + /** + * @since 0.1.0 + */ + readonly [TypeId] = TypeId +} + +/** + * Represents a Redis Stream Error + * + * @since 0.1.0 + * @category errors + */ +export class RedisStreamError extends Schema.TaggedError()( + "RedisStreamError", + { reason: Schema.String, cause: Schema.optional(Schema.Defect) } +) { + /** + * @since 0.1.0 + */ + readonly [TypeId] = TypeId +} diff --git a/packages/redis-stream/src/RedisStreamMessage.ts b/packages/redis-stream/src/RedisStreamMessage.ts new file mode 100644 index 0000000..27bff2d --- /dev/null +++ b/packages/redis-stream/src/RedisStreamMessage.ts @@ -0,0 +1,30 @@ +/** + * @since 0.1.0 + */ +import * as Context from "effect/Context" +import * as Layer from "effect/Layer" + +/** + * @category models + * @since 0.1.0 + */ +export interface RedisStreamMessage { + readonly id: string + readonly timestamp: number + readonly data: Record +} + +/** + * @category tags + * @since 0.1.0 + */ +export const RedisStreamMessage = Context.GenericTag( + "@effect-messaging/redis-stream/RedisStreamMessage" +) + +/** + * @since 0.1.0 + * @category Layers + */ +export const layer = (message: RedisStreamMessage): Layer.Layer => + Layer.succeed(RedisStreamMessage, message) diff --git a/packages/redis-stream/src/RedisStreamPublisher.ts b/packages/redis-stream/src/RedisStreamPublisher.ts new file mode 100644 index 0000000..54a3fdc --- /dev/null +++ b/packages/redis-stream/src/RedisStreamPublisher.ts @@ -0,0 +1,111 @@ +/** + * @since 0.1.0 + */ +import * as Publisher from "@effect-messaging/core/Publisher" +import * as PublisherError from "@effect-messaging/core/PublisherError" +import * as Context from "effect/Context" +import * as Effect from "effect/Effect" +import * as Layer from "effect/Layer" +import * as Schedule from "effect/Schedule" +import * as RedisConnection from "./RedisConnection.js" +import * as RedisError from "./RedisError.js" + +/** + * @category type ids + * @since 0.1.0 + */ +export const TypeId: unique symbol = Symbol.for("@effect-messaging/redis-stream/RedisStreamPublisher") + +/** + * @category tags + * @since 0.1.0 + */ +export const RedisStreamPublisher = Context.GenericTag( + "@effect-messaging/redis-stream/RedisStreamPublisher" +) + +/** + * @category type ids + * @since 0.1.0 + */ +export type TypeId = typeof TypeId + +/** + * @category models + * @since 0.1.0 + */ +export interface RedisPublishMessage { + stream: string + data: Record +} + +/** + * @category models + * @since 0.1.0 + */ +export interface RedisStreamPublisher extends Publisher.Publisher { + readonly [TypeId]: TypeId +} + +/** @internal */ +const publish = ( + connection: RedisConnection.RedisConnection, + retrySchedule: Schedule.Schedule +) => +(message: RedisPublishMessage): Effect.Effect => + Effect.gen(function*() { + const client = yield* connection.client.pipe( + Effect.catchTag( + "RedisConnectionError", + (error) => Effect.fail(new PublisherError.PublisherError({ reason: "Connection failed", cause: error })) + ) + ) + yield* Effect.tryPromise({ + try: () => client.xAdd(message.stream, "*", message.data), + catch: (error) => new RedisError.RedisStreamError({ reason: "Failed to publish message", cause: error }) + }).pipe( + Effect.retry(retrySchedule), + Effect.catchTag( + "RedisStreamError", + (error) => Effect.fail(new PublisherError.PublisherError({ reason: "Failed to publish message", cause: error })) + ), + Effect.map(() => undefined) + ) + }) + +/** + * @category constructors + * @since 0.1.0 + */ +export interface RedisStreamPublisherConfig { + readonly retrySchedule?: Schedule.Schedule +} + +/** + * @category constructors + * @since 0.1.0 + */ +export const make = ( + config?: RedisStreamPublisherConfig +): Effect.Effect => + Effect.gen(function*() { + const connection = yield* RedisConnection.RedisConnection + + const publisher: RedisStreamPublisher = { + [TypeId]: TypeId, + [Publisher.TypeId]: Publisher.TypeId, + publish: publish(connection, config?.retrySchedule ?? Schedule.stop) + } + + return publisher + }) + +/** + * @since 0.1.0 + * @category Layers + */ +export const layer = (config?: RedisStreamPublisherConfig): Layer.Layer< + RedisStreamPublisher, + never, + RedisConnection.RedisConnection +> => Layer.effect(RedisStreamPublisher, make(config)) diff --git a/packages/redis-stream/src/RedisStreamSubscriber.ts b/packages/redis-stream/src/RedisStreamSubscriber.ts new file mode 100644 index 0000000..665bdf6 --- /dev/null +++ b/packages/redis-stream/src/RedisStreamSubscriber.ts @@ -0,0 +1,240 @@ +/** + * @since 0.1.0 + */ +import * as Subscriber from "@effect-messaging/core/Subscriber" +import * as SubscriberError from "@effect-messaging/core/SubscriberError" +import * as Cause from "effect/Cause" +import * as Context from "effect/Context" +import type * as Duration from "effect/Duration" +import * as Effect from "effect/Effect" +import * as Function from "effect/Function" +import * as Layer from "effect/Layer" +import * as Predicate from "effect/Predicate" +import * as Stream from "effect/Stream" +import * as RedisConnection from "./RedisConnection.js" +import * as RedisError from "./RedisError.js" +import * as RedisStreamMessage from "./RedisStreamMessage.js" + +/** + * @category type ids + * @since 0.1.0 + */ +export const TypeId: unique symbol = Symbol.for("@effect-messaging/redis-stream/RedisStreamSubscriber") + +/** + * @category tags + * @since 0.1.0 + */ +export const RedisStreamSubscriber = Context.GenericTag( + "@effect-messaging/redis-stream/RedisStreamSubscriber" +) + +/** + * @category type ids + * @since 0.1.0 + */ +export type TypeId = typeof TypeId + +/** + * @category models + * @since 0.1.0 + */ +export interface RedisStreamSubscriber extends Subscriber.Subscriber { + readonly [TypeId]: TypeId +} + +const ATTR_SERVER_ADDRESS = "server.address" as const +const ATTR_SERVER_PORT = "server.port" as const +const ATTR_MESSAGING_DESTINATION_NAME = "messaging.destination.name" as const +const ATTR_MESSAGING_OPERATION_NAME = "messaging.operation.name" as const +const ATTR_MESSAGING_OPERATION_TYPE = "messaging.operation.type" as const +const ATTR_MESSAGING_SYSTEM = "messaging.system" as const +const ATTR_MESSAGING_MESSAGE_ID = "messaging.message.id" as const +const ATTR_MESSAGING_DESTINATION_SUBSCRIPTION_NAME = "messaging.destination.subscription.name" as const + +/** @internal */ +const subscribe = ( + connection: RedisConnection.RedisConnection, + streamName: string, + options: RedisStreamSubscriberOptions +) => +( + handler: Effect.Effect +) => + Effect.gen(function*() { + const client = yield* connection.client.pipe( + Effect.catchTag( + "RedisConnectionError", + (error) => Effect.fail(new SubscriberError.SubscriberError({ reason: "Connection failed", cause: error })) + ) + ) + let lastId = "0" // Start from the beginning + + const consumeStream = Stream.repeatEffect( + Effect.gen(function*() { + const result = yield* Effect.tryPromise({ + try: () => + client.xRead({ + key: streamName, + id: lastId + }, { + BLOCK: options.blockTimeout ?? 1000, + COUNT: options.count ?? 10 + }), + catch: (error) => new RedisError.RedisStreamError({ reason: "Failed to read from stream", cause: error }) + }) + + if (result) { + const streamData = result[0] + if (streamData && streamData.messages.length > 0) { + lastId = streamData.messages[streamData.messages.length - 1].id + return streamData.messages.map((msg: any) => ({ + id: msg.id, + timestamp: parseInt(msg.id.split("-")[0], 10), + data: msg.message + })) + } + } + return [] + }) + ) + + return yield* consumeStream.pipe( + Stream.flatMap((messages) => Stream.fromIterable(messages)), + Stream.runForEach((message: any) => + Effect.fork( + Effect.useSpan( + `redis.xread ${streamName}`, + { + kind: "consumer", + captureStackTrace: false, + attributes: { + [ATTR_SERVER_ADDRESS]: "localhost", // Could be enhanced to get from connection + [ATTR_SERVER_PORT]: 6379, + [ATTR_MESSAGING_MESSAGE_ID]: message.id, + [ATTR_MESSAGING_SYSTEM]: "redis", + [ATTR_MESSAGING_DESTINATION_SUBSCRIPTION_NAME]: streamName, + [ATTR_MESSAGING_DESTINATION_NAME]: streamName, + [ATTR_MESSAGING_OPERATION_TYPE]: "receive" + } + }, + (span) => + Effect.gen(function*() { + yield* Effect.logDebug(`redis.xread ${streamName} - ${message.id}`) + yield* handler.pipe( + options.handlerTimeout + ? Effect.timeoutFail({ + duration: options.handlerTimeout, + onTimeout: () => + new SubscriberError.SubscriberError({ reason: `RedisStreamSubscriber: handler timed out` }) + }) + : Function.identity + ) + // Automatic ACK - no explicit acknowledgment needed with XREAD + span.attribute(ATTR_MESSAGING_OPERATION_NAME, "ack") + }).pipe( + Effect.provide(RedisStreamMessage.layer(message as any)), + Effect.tapErrorCause((cause) => + Effect.gen(function*() { + yield* Effect.logError(Cause.pretty(cause)) + span.attribute(ATTR_MESSAGING_OPERATION_NAME, "nack") + span.attribute( + "error.type", + Cause.squashWith( + cause, + (_) => Predicate.hasProperty(_, "tag") ? _.tag : _ instanceof Error ? _.name : `${_}` + ) + ) + span.attribute("error.stack", Cause.pretty(cause)) + span.attribute( + "error.message", + Cause.squashWith( + cause, + (_) => Predicate.hasProperty(_, "reason") ? _.reason : _ instanceof Error ? _.message : `${_}` + ) + ) + // With XREAD, we don't requeue failed messages + }) + ), + options.uninterruptible ? Effect.uninterruptible : Effect.interruptible, + Effect.withParentSpan(span) + ) + ) + ) + ), + Effect.mapError((error) => + new SubscriberError.SubscriberError({ reason: `RedisStreamSubscriber failed to subscribe`, cause: error }) + ) + ) + }) + +/** @internal */ +const healthCheck = ( + connection: RedisConnection.RedisConnection, + streamName: string +): Effect.Effect => + Effect.gen(function*() { + const client = yield* connection.client.pipe( + Effect.catchTag( + "RedisConnectionError", + (error) => Effect.fail(new SubscriberError.SubscriberError({ reason: "Connection failed", cause: error })) + ) + ) + yield* Effect.tryPromise({ + try: () => client.exists(streamName), + catch: (error) => new RedisError.RedisStreamError({ reason: `Healthcheck failed`, cause: error }) + }).pipe( + Effect.catchTag("RedisStreamError", (error) => + new SubscriberError.SubscriberError({ reason: `Healthcheck failed`, cause: error })), + Effect.asVoid + ) + }) + +/** + * @category models + * @since 0.1.0 + */ +export interface RedisStreamSubscriberOptions { + uninterruptible?: boolean + handlerTimeout?: Duration.DurationInput + blockTimeout?: number + count?: number +} + +/** + * @category constructors + * @since 0.1.0 + */ +export const make = ( + streamName: string, + options: RedisStreamSubscriberOptions = {} +): Effect.Effect< + RedisStreamSubscriber, + RedisError.RedisConnectionError, + RedisConnection.RedisConnection +> => + Effect.gen(function*() { + const connection = yield* RedisConnection.RedisConnection + + const subscriber: RedisStreamSubscriber = { + [TypeId]: TypeId, + [Subscriber.TypeId]: Subscriber.TypeId, + subscribe: subscribe(connection, streamName, options) as any, + healthCheck: healthCheck(connection, streamName) + } + + return subscriber + }) + +/** + * @since 0.1.0 + * @category Layers + */ +export const layer = ( + streamName: string, + options: RedisStreamSubscriberOptions = {} +): Layer.Layer< + RedisStreamSubscriber, + RedisError.RedisConnectionError, + RedisConnection.RedisConnection +> => Layer.effect(RedisStreamSubscriber, make(streamName, options)) diff --git a/packages/redis-stream/src/index.ts b/packages/redis-stream/src/index.ts new file mode 100644 index 0000000..09a99ff --- /dev/null +++ b/packages/redis-stream/src/index.ts @@ -0,0 +1,24 @@ +/** + * @since 0.1.0 + */ +export * as RedisConnection from "./RedisConnection.js" + +/** + * @since 0.1.0 + */ +export * as RedisError from "./RedisError.js" + +/** + * @since 0.1.0 + */ +export * as RedisStreamMessage from "./RedisStreamMessage.js" + +/** + * @since 0.1.0 + */ +export * as RedisStreamPublisher from "./RedisStreamPublisher.js" + +/** + * @since 0.1.0 + */ +export * as RedisStreamSubscriber from "./RedisStreamSubscriber.js" diff --git a/packages/redis-stream/src/internal/RedisConnection.ts b/packages/redis-stream/src/internal/RedisConnection.ts new file mode 100644 index 0000000..1d30bd1 --- /dev/null +++ b/packages/redis-stream/src/internal/RedisConnection.ts @@ -0,0 +1,160 @@ +import * as Context from "effect/Context" +import * as Duration from "effect/Duration" +import * as Effect from "effect/Effect" +import * as Option from "effect/Option" +import * as Schedule from "effect/Schedule" +import * as Sink from "effect/Sink" +import * as Stream from "effect/Stream" +import * as SubscriptionRef from "effect/SubscriptionRef" +import type { RedisClientType } from "redis" +import { createClient } from "redis" +import { RedisConnectionError } from "../RedisError.js" + +/** @internal */ +export type ConnectionOptions = { + host?: string + port?: number + password?: string + db?: number + retryConnectionSchedule?: Schedule.Schedule + waitConnectionTimeout?: Duration.DurationInput +} + +export class InternalRedisConnection + extends Context.Tag("@effect-messaging/redis-stream/InternalRedisConnection")> + options: ConnectionOptions + retryConnectionSchedule: Schedule.Schedule + waitConnectionTimeout: Duration.DurationInput + }>() +{ + private static defaultRetryConnectionSchedule = Schedule.forever.pipe(Schedule.addDelay(() => 1000)) + private static defaultWaitConnectionTimeout = Duration.seconds(5) + + static new = ( + options: ConnectionOptions + ): Effect.Effect> => + Effect.gen(function*() { + const clientRef = yield* SubscriptionRef.make(Option.none()) + return { + clientRef, + options, + retryConnectionSchedule: options.retryConnectionSchedule ?? + InternalRedisConnection.defaultRetryConnectionSchedule, + waitConnectionTimeout: options.waitConnectionTimeout ?? InternalRedisConnection.defaultWaitConnectionTimeout + } + }) +} + +/** @internal */ +export const getOrWaitClient = Effect.gen(function*() { + const { clientRef, waitConnectionTimeout } = yield* InternalRedisConnection + return yield* clientRef.changes.pipe( + Stream.takeUntil(Option.isSome), + Stream.run(Sink.last()), + Effect.flatten, + Effect.flatten, + Effect.catchTag( + "NoSuchElementException", + () => Effect.dieMessage(`Should never happen: Client should be available here`) + ), + Effect.timeout(waitConnectionTimeout), + Effect.catchTag("TimeoutException", () => new RedisConnectionError({ reason: "Client is not available" })) + ) +}) + +/** @internal */ +export const initiateConnection = Effect.gen(function*() { + const { clientRef, options } = yield* InternalRedisConnection + yield* Effect.annotateCurrentSpan({ host: options.host, port: options.port }) + yield* SubscriptionRef.updateEffect(clientRef, () => + Effect.gen(function*() { + const redisClient = createClient({ + socket: { + host: options.host ?? "localhost", + port: options.port ?? 6379 + }, + ...(options.password && { password: options.password }), + database: options.db ?? 0 + }) + + const client = yield* Effect.tryPromise({ + try: () => redisClient.connect(), + catch: (error) => new RedisConnectionError({ reason: "Failed to establish connection", cause: error }) + }).pipe( + Effect.map(() => redisClient) + ) + return Option.some(client as any) + })) + yield* Effect.logDebug(`RedisConnection: connection established`) +}).pipe( + Effect.withSpan("RedisConnection.initiateConnection") +) + +/** @internal */ +export interface CloseConnectionOptions { + removeAllListeners?: boolean +} + +/** @internal */ +export const closeConnection = ({ removeAllListeners = true }: CloseConnectionOptions = {}) => + Effect.gen(function*() { + const { clientRef } = yield* InternalRedisConnection + const client = yield* getOrWaitClient + yield* Effect.tryPromise({ + try: () => client.quit(), + catch: (error) => new RedisConnectionError({ reason: "Failed to close connection", cause: error }) + }) + yield* SubscriptionRef.set(clientRef, Option.none()) + yield* Effect.logDebug(`RedisConnection: connection closed`) + }).pipe( + Effect.withSpan("RedisConnection.closeConnection"), + Effect.catchAll((error) => Effect.logError(`RedisConnection: error closing connection: ${error}`)) + ) + +/** @internal */ +export const keepConnectionAlive = Effect.gen(function*() { + const { clientRef, retryConnectionSchedule } = yield* InternalRedisConnection + yield* clientRef.changes.pipe( + Stream.runForEach((clientOption) => + Effect.gen(function*() { + if (Option.isSome(clientOption)) { + const client = clientOption.value + yield* Effect.tryPromise({ + try: () => client.ping(), + catch: (error) => new RedisConnectionError({ reason: "Connection health check failed", cause: error }) + }).pipe( + Effect.retry(retryConnectionSchedule), + Effect.forever, + Effect.fork + ) + } + }) + ), + Effect.fork + ) +}).pipe( + Effect.withSpan("RedisConnection.keepConnectionAlive"), + Effect.forkScoped +) + +/** @internal */ +export const monitorConnectionErrors = Effect.gen(function*() { + const { clientRef } = yield* InternalRedisConnection + yield* clientRef.changes.pipe( + Stream.runForEach((clientOption) => + Effect.gen(function*() { + if (Option.isSome(clientOption)) { + const client = clientOption.value + client.on("error", (error) => { + Effect.runSync(Effect.logError(`RedisConnection: client error: ${error}`)) + }) + } + }) + ), + Effect.fork + ) +}).pipe( + Effect.withSpan("RedisConnection.monitorConnectionErrors"), + Effect.forkScoped +) diff --git a/packages/redis-stream/test/RedisConnection.test.ts b/packages/redis-stream/test/RedisConnection.test.ts new file mode 100644 index 0000000..2ab3fc4 --- /dev/null +++ b/packages/redis-stream/test/RedisConnection.test.ts @@ -0,0 +1,42 @@ +import { Effect } from "effect" +import { describe, expect, it } from "vitest" +import { RedisConnection } from "../src" + +describe("RedisConnection", () => { + it("should establish connection", () => { + const program = Effect.gen(function*() { + const connection = yield* RedisConnection.RedisConnection + const pong = yield* connection.ping + expect(pong).toBe("PONG") + }) + + const runnable = program.pipe( + Effect.provide(RedisConnection.layer({ + host: "localhost", + port: 6379 + })) + ) + + return Effect.runPromise(runnable) + }) + + it("should handle connection errors", () => { + const program = Effect.gen(function*() { + const connection = yield* RedisConnection.RedisConnection + yield* connection.ping + }) + + const runnable = program.pipe( + Effect.provide(RedisConnection.layer({ + host: "invalid-host", + port: 9999 + })), + Effect.catchTag("RedisConnectionError", (error) => { + expect(error.reason).toContain("Failed to establish connection") + return Effect.void + }) + ) + + return Effect.runPromise(runnable) + }) +}) diff --git a/packages/redis-stream/test/RedisStreamPublisher.test.ts b/packages/redis-stream/test/RedisStreamPublisher.test.ts new file mode 100644 index 0000000..cfb71ed --- /dev/null +++ b/packages/redis-stream/test/RedisStreamPublisher.test.ts @@ -0,0 +1,71 @@ +import { Effect } from "effect" +import { describe, expect, it } from "vitest" +import { RedisConnection, RedisStreamPublisher } from "../src" + +describe("RedisStreamPublisher", () => { + it("should publish message to stream", () => { + const program = Effect.gen(function*() { + const publisher = yield* RedisStreamPublisher.make() + + yield* publisher.publish({ + stream: "test-stream", + data: { message: "test message", type: "test" } + }) + + // Verify message was published by reading it back + const connection = yield* RedisConnection.RedisConnection + const client = yield* connection.client + + const result = yield* Effect.tryPromise({ + try: () => + client.xRead({ + key: "test-stream", + id: "0" + }, { + COUNT: 1 + }), + catch: (error) => new Error(`Failed to read stream: ${error}`) + }) + + expect(result).toBeDefined() + expect(result![0].messages).toHaveLength(1) + expect(result![0].messages[0].message.message).toBe("test message") + }) + + const runnable = program.pipe( + Effect.provide(RedisStreamPublisher.layer()), + Effect.provide(RedisConnection.layer({ + host: "localhost", + port: 6379 + })) + ) + + return Effect.runPromise(runnable) + }) + + it("should handle publish errors", () => { + const program = Effect.gen(function*() { + const publisher = yield* RedisStreamPublisher.make() + + // Try to publish to an invalid stream (empty string) + yield* publisher.publish({ + stream: "", + data: { message: "test" } + }) + }) + + const runnable = program.pipe( + Effect.provide(RedisStreamPublisher.layer()), + Effect.provide(RedisConnection.layer({ + host: "localhost", + port: 6379 + })), + Effect.catchTag("PublisherError", (error) => { + expect(error.reason).toContain("Failed to publish message") + return Effect.void + }) + ) + + return Effect.runPromise(runnable) + }) +}) diff --git a/packages/redis-stream/test/RedisStreamSubscriber.test.ts b/packages/redis-stream/test/RedisStreamSubscriber.test.ts new file mode 100644 index 0000000..71808bb --- /dev/null +++ b/packages/redis-stream/test/RedisStreamSubscriber.test.ts @@ -0,0 +1,66 @@ +import { Effect } from "effect" +import { describe, expect, it } from "vitest" +import { RedisConnection, RedisStreamMessage, RedisStreamSubscriber } from "../src" + +describe("RedisStreamSubscriber", () => { + it("should subscribe to stream and process messages", () => { + const program = Effect.gen(function*() { + // First, publish a message + const connection = yield* RedisConnection.RedisConnection + const client = yield* connection.client + + yield* Effect.tryPromise({ + try: () => client.xAdd("test-subscriber-stream", "*", { message: "test subscriber message" }), + catch: (error) => new Error(`Failed to publish: ${error}`) + }) + + // Then subscribe and process + const subscriber = yield* RedisStreamSubscriber.make("test-subscriber-stream", { + blockTimeout: 100, + count: 1 + }) + + let messageProcessed = false + const handler = Effect.gen(function*() { + const message = yield* RedisStreamMessage.RedisStreamMessage + expect(message.data.message).toBe("test subscriber message") + messageProcessed = true + }) + + // Run subscription for a short time + yield* subscriber.subscribe(handler).pipe( + Effect.timeout("1 second"), + Effect.catchTag("TimeoutException", () => Effect.void) + ) + + expect(messageProcessed).toBe(true) + }) + + const runnable = program.pipe( + Effect.provide(RedisConnection.layer({ + host: "localhost", + port: 6379 + })) + ) + + return Effect.runPromise(runnable) + }) + + it("should handle health check", () => { + const program = Effect.gen(function*() { + const subscriber = yield* RedisStreamSubscriber.make("test-health-stream") + + // Health check should pass for existing stream + yield* subscriber.healthCheck + }) + + const runnable = program.pipe( + Effect.provide(RedisConnection.layer({ + host: "localhost", + port: 6379 + })) + ) + + return Effect.runPromise(runnable) + }) +}) diff --git a/packages/redis-stream/test/dependencies.ts b/packages/redis-stream/test/dependencies.ts new file mode 100644 index 0000000..29ac267 --- /dev/null +++ b/packages/redis-stream/test/dependencies.ts @@ -0,0 +1,7 @@ +import { RedisConnection } from "@effect-messaging/redis-stream" +import { Effect } from "effect" + +export const TestRedisConnection = RedisConnection.layer({ + host: "localhost", + port: 6379 +}) diff --git a/packages/redis-stream/tsconfig.build.json b/packages/redis-stream/tsconfig.build.json new file mode 100644 index 0000000..b67db04 --- /dev/null +++ b/packages/redis-stream/tsconfig.build.json @@ -0,0 +1,13 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "build/esm", + "rootDir": "src", + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "removeComments": false + }, + "include": ["src/**/*"], + "exclude": ["test/**/*", "examples/**/*", "node_modules", "build", "dist"] +} diff --git a/packages/redis-stream/tsconfig.examples.json b/packages/redis-stream/tsconfig.examples.json new file mode 100644 index 0000000..66c5f08 --- /dev/null +++ b/packages/redis-stream/tsconfig.examples.json @@ -0,0 +1,9 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "build/examples", + "rootDir": "examples" + }, + "include": ["examples/**/*"], + "exclude": ["src/**/*", "test/**/*", "node_modules", "build", "dist"] +} diff --git a/packages/redis-stream/tsconfig.json b/packages/redis-stream/tsconfig.json new file mode 100644 index 0000000..02be706 --- /dev/null +++ b/packages/redis-stream/tsconfig.json @@ -0,0 +1,14 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "build", + "composite": true, + "paths": { + "@effect-messaging/core": ["../core/src/index.js"], + "@effect-messaging/core/*": ["../core/src/*.js"], + "@effect-messaging/core/test/*": ["../core/test/*.js"] + } + }, + "include": ["src/**/*"], + "exclude": ["test/**/*", "examples/**/*", "node_modules", "build", "dist"] +} diff --git a/packages/redis-stream/tsconfig.src.json b/packages/redis-stream/tsconfig.src.json new file mode 100644 index 0000000..67c0554 --- /dev/null +++ b/packages/redis-stream/tsconfig.src.json @@ -0,0 +1,9 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "build/esm", + "rootDir": "src" + }, + "include": ["src/**/*"], + "exclude": ["test/**/*", "examples/**/*", "node_modules", "build", "dist"] +} diff --git a/packages/redis-stream/tsconfig.test.json b/packages/redis-stream/tsconfig.test.json new file mode 100644 index 0000000..5af3c13 --- /dev/null +++ b/packages/redis-stream/tsconfig.test.json @@ -0,0 +1,9 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "build/test", + "rootDir": "test" + }, + "include": ["test/**/*"], + "exclude": ["src/**/*", "examples/**/*", "node_modules", "build", "dist"] +} diff --git a/packages/redis-stream/vitest.config.ts b/packages/redis-stream/vitest.config.ts new file mode 100644 index 0000000..8c5d3ca --- /dev/null +++ b/packages/redis-stream/vitest.config.ts @@ -0,0 +1,7 @@ +import { defineConfig } from "vitest/config" + +export default defineConfig({ + test: { + environment: "node" + } +}) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9b889c4..a1653a3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -3,6 +3,7 @@ lockfileVersion: '9.0' settings: autoInstallPeers: true excludeLinksFromLockfile: false + injectWorkspacePackages: true importers: @@ -106,7 +107,7 @@ importers: devDependencies: '@effect-messaging/core': specifier: workspace:^ - version: link:../core/dist + version: file:packages/core/dist(effect@3.19.4) '@effect/platform': specifier: ^0.93.2 version: 0.93.2(effect@3.19.4) @@ -128,6 +129,25 @@ importers: version: 3.19.4 publishDirectory: dist + packages/redis-stream: + devDependencies: + '@effect-messaging/core': + specifier: workspace:^ + version: file:packages/core/dist(effect@3.19.4) + '@effect/platform': + specifier: ^0.92.1 + version: 0.92.1(effect@3.19.4) + '@types/node': + specifier: ^22.14.1 + version: 22.19.1 + effect: + specifier: ^3.18.3 + version: 3.19.4 + redis: + specifier: ^4.6.12 + version: 4.7.1 + publishDirectory: dist + packages: '@ampproject/remapping@2.3.0': @@ -299,6 +319,11 @@ packages: '@dprint/typescript@0.91.8': resolution: {integrity: sha512-tuKn4leCPItox1O4uunHcQF0QllDCvPWklnNQIh2PiWWVtRAGltJJnM4Cwj5AciplosD1Hiz7vAY3ew3crLb3A==} + '@effect-messaging/core@file:packages/core/dist': + resolution: {directory: packages/core/dist, type: directory} + peerDependencies: + effect: ^3.19.4 + '@effect/build-utils@0.8.9': resolution: {integrity: sha512-wzunjr8pyNjghLueo5ePQD7vaM3qeO2H/jF57hNYbQM/rxfipBWYKukoqWyp04qGZ+V86FyOHb8zD5+PMZvH2A==} engines: {node: '>=16.17.1'} @@ -324,6 +349,11 @@ packages: engines: {node: '>=0.10.0'} hasBin: true + '@effect/platform@0.92.1': + resolution: {integrity: sha512-XXWCBVwyhaKZISN7aM1fv/3fWDGyxr84ObywnUrL8aHvJLoIeskWFAP/fqw3c5MFCrJ3ZV97RWLbv6JiBQugdg==} + peerDependencies: + effect: ^3.18.1 + '@effect/platform@0.93.2': resolution: {integrity: sha512-IFWF2xuz37tZbyEsf3hwBlcYYqbqJho+ZM871CG92lWJSjcTgvmjCy77qnV0QhTWVdh9BMs12QKzQCMlqz4cJQ==} peerDependencies: @@ -806,6 +836,35 @@ packages: resolution: {integrity: sha512-25L86MyPvnlQoX2MTIV2OiUcb6vJ6aRbFa9pbwByn95INKD5mFH2smgjDhq+fwJoqAgvgbdJLj6Tz7V9X5CFAQ==} engines: {node: ^12.20.0 || ^14.18.0 || >=16.0.0} + '@redis/bloom@1.2.0': + resolution: {integrity: sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==} + peerDependencies: + '@redis/client': ^1.0.0 + + '@redis/client@1.6.1': + resolution: {integrity: sha512-/KCsg3xSlR+nCK8/8ZYSknYxvXHwubJrU82F3Lm1Fp6789VQ0/3RJKfsmRXjqfaTA++23CvC3hqmqe/2GEt6Kw==} + engines: {node: '>=14'} + + '@redis/graph@1.1.1': + resolution: {integrity: sha512-FEMTcTHZozZciLRl6GiiIB4zGm5z5F3F6a6FZCyrfxdKOhFlGkiAqlexWMBzCi4DcRoyiOsuLfW+cjlGWyExOw==} + peerDependencies: + '@redis/client': ^1.0.0 + + '@redis/json@1.0.7': + resolution: {integrity: sha512-6UyXfjVaTBTJtKNG4/9Z8PSpKE6XgSyEb8iwaqDcy+uKrd/DGYHTWkUdnQDyzm727V7p21WUMhsqz5oy65kPcQ==} + peerDependencies: + '@redis/client': ^1.0.0 + + '@redis/search@1.2.0': + resolution: {integrity: sha512-tYoDBbtqOVigEDMAcTGsRlMycIIjwMCgD8eR2t0NANeQmgK/lvxNAvYyb6bZDD4frHRhIHkJu2TBRvB0ERkOmw==} + peerDependencies: + '@redis/client': ^1.0.0 + + '@redis/time-series@1.1.0': + resolution: {integrity: sha512-c1Q99M5ljsIuc4YdaCwfUEXsofakb9c8+Zse2qxTadu8TalLXuAESzLvFAvNVbkmSlvlzIQOLpBCmWI9wTOt+g==} + peerDependencies: + '@redis/client': ^1.0.0 + '@rollup/rollup-android-arm-eabi@4.39.0': resolution: {integrity: sha512-lGVys55Qb00Wvh8DMAocp5kIcaNzEFTmGhfFd88LfaogYTRKrdxgtlO5H6S49v2Nd8R2C6wLOal0qv6/kCkOwA==} cpu: [arm] @@ -955,6 +1014,9 @@ packages: '@types/node@12.20.55': resolution: {integrity: sha512-J8xLz7q2OFulZ2cyGTLE1TbbZcjpno7FaN6zdJNrgAdrJ+DZzh/uFR6YrTb4C+nXakvud8Q4+rbhoIWlYQbUFQ==} + '@types/node@22.19.1': + resolution: {integrity: sha512-LCCV0HdSZZZb34qifBsyWlUmok6W7ouER+oQIGBScS8EsZsQbrtFTUrDX4hOl+CS6p7cnNC4td+qrSVGSCTUfQ==} + '@types/node@24.9.1': resolution: {integrity: sha512-QoiaXANRkSXK6p0Duvt56W208du4P9Uye9hWLWgGMDTEoKPhuenzNcC4vGUmrNkiOKTlIrBoyNQYNpSwfEZXSg==} @@ -1458,6 +1520,10 @@ packages: resolution: {integrity: sha512-JQHZ2QMW6l3aH/j6xCqQThY/9OH4D/9ls34cgkUBiEeocRTU04tHfKPBsUK1PqZCUQM7GiA0IIXJSuXHI64Kbg==} engines: {node: '>=0.8'} + cluster-key-slot@1.1.2: + resolution: {integrity: sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==} + engines: {node: '>=0.10.0'} + color-convert@2.0.1: resolution: {integrity: sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==} engines: {node: '>=7.0.0'} @@ -1983,6 +2049,10 @@ packages: resolution: {integrity: sha512-SFdFmIJi+ybC0vjlHN0ZGVGHc3lgE0DxPAT0djjVg+kjOnSqclqmj0KQ7ykTOLP6YxoqOvuAODGdcHJn+43q3g==} engines: {node: '>= 0.4'} + generic-pool@3.9.0: + resolution: {integrity: sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==} + engines: {node: '>= 4'} + gensync@1.0.0-beta.2: resolution: {integrity: sha512-3hN7NaskYvMDLQY55gnW3NQ+mesEAepTqlg+VEbj7zzqEMBVNhzcGYYeqFo/TlYz6eQiFcp1HcsCZO+nGgS8zg==} engines: {node: '>=6.9.0'} @@ -2832,6 +2902,9 @@ packages: resolution: {integrity: sha512-hOS089on8RduqdbhvQ5Z37A0ESjsqz6qnRcffsMU3495FuTdqSm+7bhJ29JvIOsBDEEnan5DPu9t3To9VRlMzA==} engines: {node: '>=8.10.0'} + redis@4.7.1: + resolution: {integrity: sha512-S1bJDnqLftzHXHP8JsT5II/CtHWQrASX5K96REjWjlmWKrviSOLWmM7QnRLstAWsu1VBBV1ffV6DzCvxNP0UJQ==} + reflect.getprototypeof@1.0.10: resolution: {integrity: sha512-00o4I+DVrefhv+nX0ulyi3biSHCPDe+yLv5o/p6d/UVlirijB8E16FtfwSAi4g3tcqrQ4lRAqQSoFEZJehYEcw==} engines: {node: '>= 0.4'} @@ -3229,6 +3302,9 @@ packages: resolution: {integrity: sha512-nWJ91DjeOkej/TA8pXQ3myruKpKEYgqvpw9lz4OPHj/NWFNluYrjbz9j01CJ8yKQd2g4jFoOkINCTW2I5LEEyw==} engines: {node: '>= 0.4'} + undici-types@6.21.0: + resolution: {integrity: sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==} + undici-types@7.16.0: resolution: {integrity: sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw==} @@ -3391,6 +3467,9 @@ packages: yallist@3.1.1: resolution: {integrity: sha512-a4UGQaWPH59mOXUYnAG2ewncQS4i4F43Tv3JoAM+s2VDAmS9NsK8GpDMLrCHPksFT7h3K6TOoUNn2pb7RoXx4g==} + yallist@4.0.0: + resolution: {integrity: sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==} + yocto-queue@0.1.0: resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==} engines: {node: '>=10'} @@ -3701,6 +3780,10 @@ snapshots: '@dprint/typescript@0.91.8': {} + '@effect-messaging/core@file:packages/core/dist(effect@3.19.4)': + dependencies: + effect: 3.19.4 + '@effect/build-utils@0.8.9': dependencies: micromatch: 4.0.8 @@ -3738,6 +3821,13 @@ snapshots: repeat-string: 1.6.1 strip-color: 0.1.0 + '@effect/platform@0.92.1(effect@3.19.4)': + dependencies: + effect: 3.19.4 + find-my-way-ts: 0.1.6 + msgpackr: 1.11.5 + multipasta: 0.2.7 + '@effect/platform@0.93.2(effect@3.19.4)': dependencies: effect: 3.19.4 @@ -4074,6 +4164,32 @@ snapshots: '@pkgr/core@0.2.2': {} + '@redis/bloom@1.2.0(@redis/client@1.6.1)': + dependencies: + '@redis/client': 1.6.1 + + '@redis/client@1.6.1': + dependencies: + cluster-key-slot: 1.1.2 + generic-pool: 3.9.0 + yallist: 4.0.0 + + '@redis/graph@1.1.1(@redis/client@1.6.1)': + dependencies: + '@redis/client': 1.6.1 + + '@redis/json@1.0.7(@redis/client@1.6.1)': + dependencies: + '@redis/client': 1.6.1 + + '@redis/search@1.2.0(@redis/client@1.6.1)': + dependencies: + '@redis/client': 1.6.1 + + '@redis/time-series@1.1.0(@redis/client@1.6.1)': + dependencies: + '@redis/client': 1.6.1 + '@rollup/rollup-android-arm-eabi@4.39.0': optional: true @@ -4180,6 +4296,10 @@ snapshots: '@types/node@12.20.55': {} + '@types/node@22.19.1': + dependencies: + undici-types: 6.21.0 + '@types/node@24.9.1': dependencies: undici-types: 7.16.0 @@ -4746,6 +4866,8 @@ snapshots: clone@1.0.4: {} + cluster-key-slot@1.1.2: {} + color-convert@2.0.1: dependencies: color-name: 1.1.4 @@ -5440,6 +5562,8 @@ snapshots: generator-function@2.0.1: optional: true + generic-pool@3.9.0: {} + gensync@1.0.0-beta.2: {} get-amd-module-type@6.0.1: @@ -6344,6 +6468,15 @@ snapshots: picomatch: 2.3.1 optional: true + redis@4.7.1: + dependencies: + '@redis/bloom': 1.2.0(@redis/client@1.6.1) + '@redis/client': 1.6.1 + '@redis/graph': 1.1.1(@redis/client@1.6.1) + '@redis/json': 1.0.7(@redis/client@1.6.1) + '@redis/search': 1.2.0(@redis/client@1.6.1) + '@redis/time-series': 1.1.0(@redis/client@1.6.1) + reflect.getprototypeof@1.0.10: dependencies: call-bind: 1.0.8 @@ -6809,6 +6942,8 @@ snapshots: which-boxed-primitive: 1.1.1 optional: true + undici-types@6.21.0: {} + undici-types@7.16.0: {} universalify@0.1.2: {} @@ -7026,4 +7161,6 @@ snapshots: yallist@3.1.1: {} + yallist@4.0.0: {} + yocto-queue@0.1.0: {} diff --git a/tsconfig.base.json b/tsconfig.base.json index 2b50811..62b1b4e 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -42,7 +42,14 @@ "@effect-messaging/amqp/test/*": ["./packages/amqp/test/*.js"], "@effect-messaging/core": ["./packages/core/src/index.js"], "@effect-messaging/core/*": ["./packages/core/src/*.js"], - "@effect-messaging/core/test/*": ["./packages/core/test/*.js"] + "@effect-messaging/core/test/*": ["./packages/core/test/*.js"], + "@effect-messaging/redis-stream": [ + "./packages/redis-stream/src/index.js" + ], + "@effect-messaging/redis-stream/*": ["./packages/redis-stream/src/*.js"], + "@effect-messaging/redis-stream/test/*": [ + "./packages/redis-stream/test/*.js" + ] } } }