Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions .changeset/rename-publisher-subscriber-to-producer-consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
---
"@effect-messaging/core": major
"@effect-messaging/amqp": major
"@effect-messaging/nats": major
---

Rename Publisher/Subscriber to Producer/Consumer across all packages for industry-standard naming

### Breaking Changes

**@effect-messaging/core:**
- `Publisher` renamed to `Producer`
- `Subscriber` renamed to `Consumer`
- `PublisherError` renamed to `ProducerError`
- `SubscriberError` renamed to `ConsumerError`
- `SubscriberApp` renamed to `ConsumerApp`
- `Producer.publish()` method renamed to `Producer.send()`
- `Consumer.subscribe()` method renamed to `Consumer.serve()`

**@effect-messaging/amqp:**
- `AMQPPublisher` renamed to `AMQPProducer`
- `AMQPSubscriber` renamed to `AMQPConsumer`
- `AMQPSubscriberResponse` renamed to `AMQPConsumerResponse`
- `AMQPProducer.publish()` method renamed to `AMQPProducer.send()`
- `AMQPConsumer.subscribe()` method renamed to `AMQPConsumer.serve()`

**@effect-messaging/nats:**
- `NATSPublisher` renamed to `NATSProducer`
- `NATSSubscriber` renamed to `NATSConsumer`
- `JetStreamPublisher` renamed to `JetStreamProducer`
- `JetStreamSubscriber` renamed to `JetStreamConsumer`
- `JetStreamSubscriberResponse` renamed to `JetStreamConsumerResponse`
- The previous low-level `JetStreamConsumer` wrapper (for NATS consumers) is now exported as `JetStreamConsumerMessages`
- `NATSProducer.publish()` method renamed to `NATSProducer.send()`
- `JetStreamProducer.publish()` method renamed to `JetStreamProducer.send()`
- `NATSConsumer.subscribe()` method renamed to `NATSConsumer.serve()`
- `JetStreamConsumer.subscribe()` method renamed to `JetStreamConsumer.serve()`

### Migration Guide

Update your imports and code references:

```typescript
// Before
import { Publisher, Subscriber } from "@effect-messaging/core"
import { AMQPPublisher, AMQPSubscriber } from "@effect-messaging/amqp"
import { JetStreamPublisher, JetStreamSubscriber } from "@effect-messaging/nats"

// After
import { Producer, Consumer } from "@effect-messaging/core"
import { AMQPProducer, AMQPConsumer } from "@effect-messaging/amqp"
import { JetStreamProducer, JetStreamConsumer } from "@effect-messaging/nats"
```

Update method calls on producer instances:

```typescript
// Before
yield* producer.publish({ ... })

// After
yield* producer.send({ ... })
```

Update method calls on consumer instances:

```typescript
// Before
yield* consumer.subscribe(messageHandler)

// After
yield* consumer.serve(messageHandler)
```
151 changes: 102 additions & 49 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ A message broker toolkit for Effect.
- 🔌 Effectful wrappers for AMQP Connection and Channel
- 🔄 Auto-reconnect functionality when the connection is lost
- 🧘 Seamless consumption continuation after reconnection
- 🔭 Distributed tracing support (spans propagate from publishers to subscribers)
- 🔭 Distributed tracing support (spans propagate from producers to consumers)

### NATS / JetStream features

- 🔌 Effectful wrappers for NATS Connection and JetStream Client
- 📦 Full JetStream support (streams, consumers, publishers, subscribers)
- 🔭 Distributed tracing support (spans propagate from publishers to subscribers)
- 📦 Full JetStream support (streams, consumers, producers)
- 🔭 Distributed tracing support (spans propagate from producers to consumers)

> [!WARNING]
> This project is currently **under development**. Please note that future releases might introduce breaking changes.
Expand Down Expand Up @@ -55,27 +55,27 @@ const runnable = program.pipe(
Effect.runPromise(runnable)
```

#### 2. Create a Publisher
#### 2. Create a Producer

To send messages, create a publisher:
To send messages, create a producer:

```typescript
import {
AMQPChannel,
AMQPConnection,
AMQPPublisher
AMQPProducer
} from "@effect-messaging/amqp"
import { Context, Effect } from "effect"

class MyPublisher extends Context.Tag("MyPublisher")<
MyPublisher,
AMQPPublisher.AMQPPublisher
class MyProducer extends Context.Tag("MyProducer")<
MyProducer,
AMQPProducer.AMQPProducer
>() {}

const program = Effect.gen(function* (_) {
const publisher = yield* MyPublisher
const producer = yield* MyProducer

yield* publisher.publish({
yield* producer.send({
exchange: "my-exchange",
routingKey: "my-routing-key",
content: Buffer.from('{ "hello": "world" }'),
Expand All @@ -91,7 +91,7 @@ const program = Effect.gen(function* (_) {
})

const runnable = program.pipe(
Effect.provideServiceEffect(MyPublisher, AMQPPublisher.make()),
Effect.provideServiceEffect(MyProducer, AMQPProducer.make()),
// provide the AMQP Channel dependency
Effect.provide(AMQPChannel.layer),
// provide the AMQP Connection dependency
Expand All @@ -110,19 +110,21 @@ const runnable = program.pipe(
Effect.runPromise(runnable)
```

#### 3. Create a Subscriber
#### 3. Create a Consumer

To receive messages, create a subscriber:
To receive messages, create a consumer. There are two approaches:

**Option A: Using `serve()` (Layer-based, recommended for production)**

```typescript
import {
AMQPChannel,
AMQPConnection,
AMQPConsumeMessage,
AMQPSubscriber,
AMQPSubscriberResponse
AMQPConsumer,
AMQPConsumerResponse
} from "@effect-messaging/amqp"
import { Effect } from "effect"
import { Effect, Layer } from "effect"

const messageHandler = Effect.gen(function* (_) {
const message = yield* AMQPConsumeMessage.AMQPConsumeMessage
Expand All @@ -134,29 +136,50 @@ const messageHandler = Effect.gen(function* (_) {
// - ack(): Acknowledge successful processing
// - nack({ allUpTo?, requeue? }): Negative acknowledge
// - reject({ requeue? }): Reject the message
return AMQPSubscriberResponse.ack()
return AMQPConsumerResponse.ack()
})

// Create a Layer that manages the consumer lifecycle
const ConsumerLive = Layer.unwrapEffect(
Effect.gen(function* (_) {
const consumer = yield* AMQPConsumer.make("my-queue")
return consumer.serve(messageHandler)
})
)

const ConnectionLive = AMQPConnection.layer({
hostname: "localhost",
port: 5672,
username: "guest",
password: "guest",
heartbeat: 10
})

// Run the consumer as a long-running service
Effect.runPromise(
Layer.launch(ConsumerLive).pipe(
Effect.provide(AMQPChannel.layer),
Effect.provide(ConnectionLive)
)
)
```

**Option B: Using `serveEffect()` (Effect-based, useful for scripts or tests)**

```typescript
const program = Effect.gen(function* (_) {
const subscriber = yield* AMQPSubscriber.make("my-queue")
const consumer = yield* AMQPConsumer.make("my-queue")

// Subscribe to messages - on handler error, messages are nacked automatically
yield* subscriber.subscribe(messageHandler)
// Serve messages - on handler error, messages are nacked automatically
yield* consumer.serveEffect(messageHandler)
})

const runnable = program.pipe(
Effect.scoped,
// provide the AMQP Channel dependency
Effect.provide(AMQPChannel.layer),
// provide the AMQP Connection dependency
Effect.provide(
AMQPConnection.layer({
hostname: "localhost",
port: 5672,
username: "guest",
password: "guest",
heartbeat: 10
})
)
Effect.provide(ConnectionLive)
)

// Run the program
Expand Down Expand Up @@ -186,22 +209,22 @@ const runnable = program.pipe(
Effect.runPromise(runnable)
```

#### 2. Create a JetStream Publisher
#### 2. Create a JetStream Producer

To publish messages to a JetStream stream:

```typescript
import {
JetStreamClient,
JetStreamPublisher,
JetStreamProducer,
NATSConnection
} from "@effect-messaging/nats"
import { Effect } from "effect"

const program = Effect.gen(function* (_) {
const publisher = yield* JetStreamPublisher.make()
const producer = yield* JetStreamProducer.make()

yield* publisher.publish({
yield* producer.send({
subject: "orders.created",
payload: new TextEncoder().encode('{ "orderId": "123" }')
})
Expand All @@ -215,19 +238,21 @@ const runnable = program.pipe(
Effect.runPromise(runnable)
```

#### 3. Create a JetStream Subscriber
#### 3. Create a JetStream Consumer

To consume messages from a JetStream consumer. There are two approaches:

To consume messages from a JetStream consumer:
**Option A: Using `serve()` (Layer-based, recommended for production)**

```typescript
import {
JetStreamClient,
JetStreamMessage,
JetStreamSubscriber,
JetStreamSubscriberResponse,
JetStreamConsumer,
JetStreamConsumerResponse,
NATSConnection
} from "@effect-messaging/nats"
import { Effect } from "effect"
import { Effect, Layer } from "effect"

const messageHandler = Effect.gen(function* (_) {
const message = yield* JetStreamMessage.JetStreamConsumeMessage
Expand All @@ -238,18 +263,46 @@ const messageHandler = Effect.gen(function* (_) {
// - ack(): Acknowledge successful processing
// - nak({ millis? }): Negative acknowledge, optionally delay redelivery
// - term({ reason? }): Terminate message, stop redelivery
return JetStreamSubscriberResponse.ack()
return JetStreamConsumerResponse.ack()
})

// Create a Layer that manages the consumer lifecycle
const ConsumerLive = Layer.unwrapEffect(
Effect.gen(function* (_) {
const client = yield* JetStreamClient.JetStreamClient

// Get an existing consumer (stream and consumer must already exist)
const natsConsumer = yield* client.consumers.get("my-stream", "my-consumer")
const consumer = yield* JetStreamConsumer.fromConsumer(natsConsumer)

return consumer.serve(messageHandler)
})
)

const NATSLive = NATSConnection.layerNode({ servers: ["localhost:4222"] })
const JetStreamLive = JetStreamClient.layer()

// Run the consumer as a long-running service
Effect.runPromise(
Layer.launch(ConsumerLive).pipe(
Effect.provide(JetStreamLive),
Effect.provide(NATSLive)
)
)
```

**Option B: Using `serveEffect()` (Effect-based, useful for scripts or tests)**

```typescript
const program = Effect.gen(function* (_) {
const client = yield* JetStreamClient.JetStreamClient

// Get an existing consumer (stream and consumer must already exist)
const consumer = yield* client.consumers.get("my-stream", "my-consumer")
const subscriber = yield* JetStreamSubscriber.fromConsumer(consumer)
const natsConsumer = yield* client.consumers.get("my-stream", "my-consumer")
const consumer = yield* JetStreamConsumer.fromConsumer(natsConsumer)

// Subscribe to messages - on handler error, messages are nacked automatically
yield* subscriber.subscribe(messageHandler)
// Serve messages - on handler error, messages are nacked automatically
yield* consumer.serveEffect(messageHandler)
})

const runnable = program.pipe(
Expand All @@ -267,8 +320,8 @@ Effect.runPromise(runnable)

**Basic abstractions:**

- [x] Add a `Publisher` interface
- [x] Add a `Subscriber` interface
- [x] Add a `Producer` interface
- [x] Add a `Consumer` interface

**Application-level API for consumer apps:**

Expand All @@ -278,21 +331,21 @@ Effect.runPromise(runnable)
**Higher-level declarative API:**

- [ ] Add declarative API to define messages schemas
- [ ] Generate publisher based on message definitions
- [ ] Generate producer based on message definitions
- [ ] Generate consumer app based on message definitions
- [ ] AsyncAPI specification generation

### AMQP implementation

- [x] Effect wrappers for AMQP Connection & AMQP Channel
- [x] Implement publisher and subscriber
- [x] Implement producer and consumer
- [x] Integration tests
- [x] Add examples & documentation

### NATS implementation

- [x] Effect wrappers for `@nats-io/nats-core` and `@nats-io/jetstream`
- [x] Implement publisher and subscriber
- [x] Implement producer and consumer
- [x] Integration tests
- [x] Add examples & documentation

Expand Down
Loading