Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,37 @@ com.p14n/postevent {:mvn/version "1.0.0"}

Postevent can be used in both in-process and client/server modes, depending on your architecture. Use LocalPersistentConsumer for in-process, and ConsumerServer/RemotePersistentConsumer for client/server.

#### In-process (LocalPersistentConsumer)

```java
var config = new ConfigData("myapp", Set.of("orders"), "dbhost", 5432,
"dbuser", "dbpassword", "dbname", 100);

// Create a local consumer
try (LocalPersistentConsumer consumer = new LocalPersistentConsumer(config, OpenTelemetry.noop(), 10)) {
consumer.start();
consumer.subscribe("orders", message -> {
System.out.println("Received: " + message);
});
}

// Publish an event
Publisher.publish(Event.create(...), dataSource, "orders");
```

#### Across servers (ConsumerServer/RemotePersistentConsumer)

```java
// Start a consumer server
var config = new ConfigData("myapp", Set.of("orders"), "localhost", 5432,
"postgres", "postgres", "postgres", 10);
var server = new ConsumerServer(dataSource, config, OpenTelemetry.noop());
var config = new ConfigData("myapp", Set.of("orders"), "dbhost", 5432,
"dbuser", "dbpassword", "dbname", 100);

var server = new ConsumerServer(config, OpenTelemetry.noop());
server.start(8080);

// Create a remote consumer
try (RemotePersistentConsumer client = new RemotePersistentConsumer(OpenTelemetry.noop(), 10)) {
client.start(Set.of("orders"), dataSource, "localhost", 8080);
client.start(Set.of("orders"), dataSource, "consumerhost", 8080);
client.subscribe("orders", message -> {
System.out.println("Received: " + message);
});
Expand Down Expand Up @@ -117,7 +138,7 @@ A ConsumerServer can be configured to consume multiple topics. Each topic will
## Performance
Tested on fargate 0.5vCPU/1GB with RDS db.t4g.micro
- Throughput: 200 events/second
- Latency: Y ms average
- Latency: < 50ms ms from topic publish to remote client receive in transaction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (typo): Typo: Duplicate 'ms' unit.

Remove the extra 'ms' unit.

Suggested change
- Latency: < 50ms ms from topic publish to remote client receive in transaction
- Latency: < 50ms from topic publish to remote client receive in transaction


## Contributing

Expand Down
12 changes: 12 additions & 0 deletions library/src/main/java/com/p14n/postevent/ConsumerServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.p14n.postevent.catchup.CatchupServer;
import com.p14n.postevent.catchup.remote.CatchupGrpcServer;
import com.p14n.postevent.data.ConfigData;
import com.p14n.postevent.db.DatabaseSetup;

import io.grpc.Server;
import io.grpc.ServerBuilder;
Expand Down Expand Up @@ -63,6 +64,17 @@ public class ConsumerServer implements AutoCloseable {
private AsyncExecutor asyncExecutor;
OpenTelemetry ot;

/**
* Creates a new ConsumerServer instance with default executor configuration and
* database connection pool.
*
* @param cfg The configuration for the consumer server
* @param ot The OpenTelemetry instance for monitoring and tracing
*/
public ConsumerServer(ConfigData cfg, OpenTelemetry ot) {
this(DatabaseSetup.createPool(cfg), cfg, new DefaultExecutor(2), ot);
}

/**
* Creates a new ConsumerServer instance with default executor configuration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.p14n.postevent.catchup.UnprocessedSubmitter;
import com.p14n.postevent.data.PostEventConfig;
import com.p14n.postevent.data.UnprocessedEventFinder;
import com.p14n.postevent.db.DatabaseSetup;

import io.opentelemetry.api.OpenTelemetry;

Expand Down Expand Up @@ -94,6 +95,18 @@ public LocalPersistentConsumer(DataSource ds, PostEventConfig cfg, OpenTelemetry
this(ds, cfg, new DefaultExecutor(2, batchSize), ot, batchSize);
}

/**
* Creates a new LocalPersistentConsumer with default executor configuration and
* default connection pool.
*
* @param cfg The configuration for the consumer
* @param ot OpenTelemetry instance for monitoring
* @param batchSize Maximum number of events to process in a batch
*/
public LocalPersistentConsumer(PostEventConfig cfg, OpenTelemetry ot) {
this(DatabaseSetup.createPool(cfg), cfg, new DefaultExecutor(2, 10), ot, 10);
}

/**
* Starts the consumer, initializing all necessary components and scheduling
* periodic tasks.
Expand Down
Loading