From c4813226039c8a4a7b6924cf135e527deb55f9ca Mon Sep 17 00:00:00 2001 From: Dean Chapman Date: Mon, 28 Apr 2025 21:27:26 +0100 Subject: [PATCH 1/2] Add simpler constructors and update readme --- README.md | 29 ++++++++++++++++--- .../com/p14n/postevent/ConsumerServer.java | 12 ++++++++ .../postevent/LocalPersistentConsumer.java | 13 +++++++++ 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 329da31..f2ebb68 100644 --- a/README.md +++ b/README.md @@ -56,16 +56,37 @@ com.p14n/postevent {:mvn/version "1.0.0"} Postevent can be used in both in-process and client/server modes, depending on your architecture. Use LocalPersistentConsumer for in-process, and ConsumerServer/RemotePersistentConsumer for client/server. +#### In-process (LocalPersistentConsumer) + +```java +var config = new ConfigData("myapp", Set.of("orders"), "dbhost", 5432, + "dbuser", "dbpassword", "dbname", 100); + +// Create a local consumer +try (LocalPersistentConsumer consumer = new LocalPersistentConsumer(config, OpenTelemetry.noop(), 10)) { + consumer.start(); + consumer.subscribe("orders", message -> { + System.out.println("Received: " + message); + }); +} + +// Publish an event +Publisher.publish(Event.create(...), dataSource, "orders"); +``` + +#### Across servers (ConsumerServer/RemotePersistentConsumer) + ```java // Start a consumer server -var config = new ConfigData("myapp", Set.of("orders"), "localhost", 5432, - "postgres", "postgres", "postgres", 10); -var server = new ConsumerServer(dataSource, config, OpenTelemetry.noop()); +var config = new ConfigData("myapp", Set.of("orders"), "dbhost", 5432, + "dbuser", "dbpassword", "dbname", 100); + +var server = new ConsumerServer(config, OpenTelemetry.noop()); server.start(8080); // Create a remote consumer try (RemotePersistentConsumer client = new RemotePersistentConsumer(OpenTelemetry.noop(), 10)) { - client.start(Set.of("orders"), dataSource, "localhost", 8080); + client.start(Set.of("orders"), dataSource, "consumerhost", 8080); client.subscribe("orders", message -> { System.out.println("Received: " + message); }); diff --git a/library/src/main/java/com/p14n/postevent/ConsumerServer.java b/library/src/main/java/com/p14n/postevent/ConsumerServer.java index 3c11677..672a099 100644 --- a/library/src/main/java/com/p14n/postevent/ConsumerServer.java +++ b/library/src/main/java/com/p14n/postevent/ConsumerServer.java @@ -13,6 +13,7 @@ import com.p14n.postevent.catchup.CatchupServer; import com.p14n.postevent.catchup.remote.CatchupGrpcServer; import com.p14n.postevent.data.ConfigData; +import com.p14n.postevent.db.DatabaseSetup; import io.grpc.Server; import io.grpc.ServerBuilder; @@ -63,6 +64,17 @@ public class ConsumerServer implements AutoCloseable { private AsyncExecutor asyncExecutor; OpenTelemetry ot; + /** + * Creates a new ConsumerServer instance with default executor configuration and + * database connection pool. + * + * @param cfg The configuration for the consumer server + * @param ot The OpenTelemetry instance for monitoring and tracing + */ + public ConsumerServer(ConfigData cfg, OpenTelemetry ot) { + this(DatabaseSetup.createPool(cfg), cfg, new DefaultExecutor(2), ot); + } + /** * Creates a new ConsumerServer instance with default executor configuration. * diff --git a/library/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java b/library/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java index d917b1c..c394d2c 100644 --- a/library/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java +++ b/library/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java @@ -7,6 +7,7 @@ import com.p14n.postevent.catchup.UnprocessedSubmitter; import com.p14n.postevent.data.PostEventConfig; import com.p14n.postevent.data.UnprocessedEventFinder; +import com.p14n.postevent.db.DatabaseSetup; import io.opentelemetry.api.OpenTelemetry; @@ -94,6 +95,18 @@ public LocalPersistentConsumer(DataSource ds, PostEventConfig cfg, OpenTelemetry this(ds, cfg, new DefaultExecutor(2, batchSize), ot, batchSize); } + /** + * Creates a new LocalPersistentConsumer with default executor configuration and + * default connection pool. + * + * @param cfg The configuration for the consumer + * @param ot OpenTelemetry instance for monitoring + * @param batchSize Maximum number of events to process in a batch + */ + public LocalPersistentConsumer(PostEventConfig cfg, OpenTelemetry ot) { + this(DatabaseSetup.createPool(cfg), cfg, new DefaultExecutor(2, 10), ot, 10); + } + /** * Starts the consumer, initializing all necessary components and scheduling * periodic tasks. From 0614844aa7131395f24451708ccb38d6492123c3 Mon Sep 17 00:00:00 2001 From: Dean Chapman Date: Mon, 28 Apr 2025 23:30:03 +0100 Subject: [PATCH 2/2] Update README with latency --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f2ebb68..bee236c 100644 --- a/README.md +++ b/README.md @@ -138,7 +138,7 @@ A ConsumerServer can be configured to consume multiple topics. Each topic will ## Performance Tested on fargate 0.5vCPU/1GB with RDS db.t4g.micro - Throughput: 200 events/second -- Latency: Y ms average +- Latency: < 50ms ms from topic publish to remote client receive in transaction ## Contributing