From daf7669d1ac47470bd17f87a51bc134d8a5cfc16 Mon Sep 17 00:00:00 2001 From: Dean Chapman Date: Sun, 27 Apr 2025 10:31:54 +0100 Subject: [PATCH 1/5] [human]Start publish work --- .gitignore | 1 + library/build.gradle | 51 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 68d8449..5be3e12 100644 --- a/.gitignore +++ b/.gitignore @@ -270,3 +270,4 @@ bin **/terraform.tfstate **/terraform.tfstate.backup app/infra/tf/terraform.tfvars +gradle.properties diff --git a/library/build.gradle b/library/build.gradle index 06cd6b9..59173e7 100644 --- a/library/build.gradle +++ b/library/build.gradle @@ -1,7 +1,12 @@ +import com.vanniktech.maven.publish.SonatypeHost +import com.vanniktech.maven.publish.JavaLibrary +import com.vanniktech.maven.publish.JavadocJar + plugins { id 'java' id 'com.adarshr.test-logger' version '4.0.0' id 'com.google.protobuf' version '0.9.2' + id "com.vanniktech.maven.publish" version "0.31.0" } compileJava { @@ -10,7 +15,7 @@ compileJava { } group = 'com.p14n' -version = '1.0-SNAPSHOT' +version = '1.0.0-SNAPSHOT' repositories { mavenCentral() @@ -133,3 +138,47 @@ sourceSets { } } } + +tasks.withType(Jar) { + duplicatesStrategy = DuplicatesStrategy.EXCLUDE +} + +mavenPublishing { + + configure(new JavaLibrary(new JavadocJar.Javadoc(), true)) + + //publishToMavenCentral(SonatypeHost.DEFAULT) + // or when publishing to https://s01.oss.sonatype.org + //publishToMavenCentral(SonatypeHost.S01) + // or when publishing to https://central.sonatype.com/ + publishToMavenCentral(SonatypeHost.CENTRAL_PORTAL) + + signAllPublications() + + coordinates("com.p14n", "postevent", version) + + pom { + name = "Postevent" + description = 'A reliable event publishing and consumption system using PostgreSQL and gRPC' + inceptionYear = "2025" + url = "https://github.com/p14n/postevent/" + licenses { + license { + name = 'MIT License' + url = 'https://opensource.org/licenses/MIT' + } + } + developers { + developer { + id = 'p14n' + name = 'Dean Chapman' + email = 'dean@p14n.com' + } + } + scm { + connection = 'scm:git:git://github.com/p14n/postevent.git' + developerConnection = 'scm:git:ssh://github.com:p14n/postevent.git' + url = 'https://github.com/p14n/postevent' + } + } +} \ No newline at end of file From f022ac7d5d431b05b7f2e06d9f05325f10c18b45 Mon Sep 17 00:00:00 2001 From: Dean Chapman Date: Sun, 27 Apr 2025 21:14:47 +0100 Subject: [PATCH 2/5] [augment] Add javadoc --- library/build.gradle | 5 + .../com/p14n/postevent/ConsumerServer.java | 79 ++++++++++- .../com/p14n/postevent/LocalConsumer.java | 68 ++++++++- .../postevent/LocalPersistentConsumer.java | 96 ++++++++++++- .../java/com/p14n/postevent/Publisher.java | 55 ++++++- .../postevent/RemotePersistentConsumer.java | 102 ++++++++++++- .../p14n/postevent/broker/AsyncExecutor.java | 25 ++++ .../postevent/broker/DefaultExecutor.java | 45 ++++++ .../broker/DefaultMessageBroker.java | 132 +++++++++++++++-- .../postevent/broker/EventMessageBroker.java | 51 ++++++- .../p14n/postevent/broker/SystemEvent.java | 54 ++++++- .../postevent/broker/SystemEventBroker.java | 51 ++++++- .../postevent/broker/TransactionalBroker.java | 72 +++++++++- .../postevent/broker/TransactionalEvent.java | 52 +++++++ .../MessageBrokerGrpcClient.java | 73 +++++++++- .../MessageBrokerGrpcServer.java | 76 +++++++++- .../p14n/postevent/catchup/CatchupServer.java | 72 ++++++++++ .../catchup/CatchupServerInterface.java | 36 ++++- .../postevent/catchup/CatchupService.java | 134 ++++++++++++++++-- .../catchup/OneAtATimeBehaviour.java | 54 ++++++- .../postevent/catchup/PersistentBroker.java | 87 +++++++++++- .../catchup/UnprocessedSubmitter.java | 42 ++++++ .../{grpc => remote}/CatchupGrpcClient.java | 86 ++++++++++- .../{grpc => remote}/CatchupGrpcServer.java | 83 ++++++++++- .../com/p14n/postevent/data/ConfigData.java | 75 ++++++++-- .../java/com/p14n/postevent/data/Event.java | 92 ++++++++++-- .../p14n/postevent/data/PostEventConfig.java | 85 +++++++++-- .../com/p14n/postevent/data/Traceable.java | 51 ++++++- .../data/UnprocessedEventFinder.java | 52 +++++-- .../com/p14n/postevent/db/DatabaseSetup.java | 114 ++++++++++++++- .../main/java/com/p14n/postevent/db/SQL.java | 80 ++++++++++- .../postevent/debezium/DebeziumServer.java | 88 +++++++++++- .../p14n/postevent/debezium/Functions.java | 57 +++++++- .../postevent/processor/OrderedProcessor.java | 1 + .../postevent/telemetry/BrokerMetrics.java | 48 ++++++- .../postevent/telemetry/MapTextMapGetter.java | 40 +++++- .../telemetry/OpenTelemetryFunctions.java | 107 +++++++++++--- .../MessageBrokerGrpcIntegrationTest.java | 2 +- .../CatchupGrpcIntegrationTest.java | 4 +- 39 files changed, 2378 insertions(+), 148 deletions(-) rename library/src/main/java/com/p14n/postevent/broker/{grpc => remote}/MessageBrokerGrpcClient.java (64%) rename library/src/main/java/com/p14n/postevent/broker/{grpc => remote}/MessageBrokerGrpcServer.java (65%) rename library/src/main/java/com/p14n/postevent/catchup/{grpc => remote}/CatchupGrpcClient.java (58%) rename library/src/main/java/com/p14n/postevent/catchup/{grpc => remote}/CatchupGrpcServer.java (61%) rename library/src/test/java/com/p14n/postevent/broker/{grpc => remote}/MessageBrokerGrpcIntegrationTest.java (99%) rename library/src/test/java/com/p14n/postevent/catchup/{grpc => remote}/CatchupGrpcIntegrationTest.java (96%) diff --git a/library/build.gradle b/library/build.gradle index 59173e7..8081510 100644 --- a/library/build.gradle +++ b/library/build.gradle @@ -143,6 +143,11 @@ tasks.withType(Jar) { duplicatesStrategy = DuplicatesStrategy.EXCLUDE } +javadoc { + exclude "**/grpc/**" + source = sourceSets.main.allJava +} + mavenPublishing { configure(new JavaLibrary(new JavadocJar.Javadoc(), true)) diff --git a/library/src/main/java/com/p14n/postevent/ConsumerServer.java b/library/src/main/java/com/p14n/postevent/ConsumerServer.java index 3200ff6..3c11677 100644 --- a/library/src/main/java/com/p14n/postevent/ConsumerServer.java +++ b/library/src/main/java/com/p14n/postevent/ConsumerServer.java @@ -9,9 +9,9 @@ import com.p14n.postevent.broker.AsyncExecutor; import com.p14n.postevent.broker.DefaultExecutor; import com.p14n.postevent.broker.EventMessageBroker; -import com.p14n.postevent.broker.grpc.MessageBrokerGrpcServer; +import com.p14n.postevent.broker.remote.MessageBrokerGrpcServer; import com.p14n.postevent.catchup.CatchupServer; -import com.p14n.postevent.catchup.grpc.CatchupGrpcServer; +import com.p14n.postevent.catchup.remote.CatchupGrpcServer; import com.p14n.postevent.data.ConfigData; import io.grpc.Server; @@ -21,6 +21,38 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * A server that manages event consumption and distribution. + * ConsumerServer handles incoming events, manages their persistence, + * and coordinates event distribution to subscribers. + * + *

+ * This server can be configured to handle multiple topics and supports + * both local and remote consumers through gRPC connections. + *

+ * + *

+ * Key features: + *

+ * + * + *

+ * Example usage: + *

+ * + *
{@code
+ * var config = new ConfigData("myapp", Set.of("orders"), "localhost", 5432,
+ *         "postgres", "postgres", "postgres", 10);
+ * var server = new ConsumerServer(dataSource, config, OpenTelemetry.noop());
+ * server.start(8080);
+ * }
+ */ public class ConsumerServer implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(ConsumerServer.class); @@ -31,10 +63,25 @@ public class ConsumerServer implements AutoCloseable { private AsyncExecutor asyncExecutor; OpenTelemetry ot; + /** + * Creates a new ConsumerServer instance with default executor configuration. + * + * @param ds The datasource for event persistence + * @param cfg The configuration for the consumer server + * @param ot The OpenTelemetry instance for monitoring and tracing + */ public ConsumerServer(DataSource ds, ConfigData cfg, OpenTelemetry ot) { this(ds, cfg, new DefaultExecutor(2), ot); } + /** + * Creates a new ConsumerServer instance with custom executor configuration. + * + * @param ds The datasource for event persistence + * @param cfg The configuration for the consumer server + * @param asyncExecutor The executor for handling asynchronous operations + * @param ot The OpenTelemetry instance for monitoring and tracing + */ public ConsumerServer(DataSource ds, ConfigData cfg, AsyncExecutor asyncExecutor, OpenTelemetry ot) { this.ds = ds; this.cfg = cfg; @@ -42,14 +89,30 @@ public ConsumerServer(DataSource ds, ConfigData cfg, AsyncExecutor asyncExecutor this.ot = ot; } + /** + * Starts the consumer server on the specified port. + * Initializes the event broker, local consumer, and gRPC services. + * + * @param port The port number to listen on + * @throws IOException If the server fails to start + * @throws InterruptedException If the server startup is interrupted + */ public void start(int port) throws IOException, InterruptedException { start(ServerBuilder.forPort(port)); } + /** + * Starts the consumer server with a custom server builder configuration. + * Initializes the event broker, local consumer, and gRPC services. + * + * @param sb The server builder to use for configuration + * @throws IOException If the server fails to start + * @throws InterruptedException If the server startup is interrupted + */ public void start(ServerBuilder sb) throws IOException, InterruptedException { logger.atInfo().log("Starting consumer server"); - var mb = new EventMessageBroker(asyncExecutor, ot,"consumer_server"); + var mb = new EventMessageBroker(asyncExecutor, ot, "consumer_server"); var lc = new LocalConsumer<>(cfg, mb); var grpcServer = new MessageBrokerGrpcServer(mb); var catchupServer = new CatchupServer(ds); @@ -75,6 +138,10 @@ public void start(ServerBuilder sb) throws IOException, InterruptedException closeables = List.of(lc, mb, asyncExecutor); } + /** + * Stops the consumer server and releases all resources. + * Shuts down the gRPC server and closes all managed resources. + */ public void stop() { logger.atInfo().log("Stopping consumer server"); @@ -93,6 +160,12 @@ public void stop() { logger.atInfo().log("Consumer server stopped"); } + /** + * Stops the consumer server and releases all resources. + * Implementation of AutoCloseable interface. + * + * @throws Exception If an error occurs during shutdown + */ @Override public void close() throws Exception { stop(); diff --git a/library/src/main/java/com/p14n/postevent/LocalConsumer.java b/library/src/main/java/com/p14n/postevent/LocalConsumer.java index c824d40..9656c21 100644 --- a/library/src/main/java/com/p14n/postevent/LocalConsumer.java +++ b/library/src/main/java/com/p14n/postevent/LocalConsumer.java @@ -13,6 +13,36 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * A local consumer implementation that captures database changes using Debezium + * and publishes them to a message broker. + * + *

+ * This consumer: + *

+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * PostEventConfig config = new PostEventConfig(...);
+ * MessageBroker broker = new EventMessageBroker(...);
+ * 
+ * LocalConsumer consumer = new LocalConsumer<>(config, broker);
+ * consumer.start();
+ * 
+ * // When done
+ * consumer.close();
+ * }
+ * + * @param The type of messages that the broker will publish + */ public class LocalConsumer implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(LocalConsumer.class); @@ -21,6 +51,12 @@ public class LocalConsumer implements AutoCloseable { private final PostEventConfig config; private final DatabaseSetup db; + /** + * Creates a new LocalConsumer instance. + * + * @param config The configuration for the consumer and database connection + * @param broker The message broker to publish events to + */ public LocalConsumer(PostEventConfig config, MessageBroker broker) { this.config = config; this.broker = broker; @@ -28,13 +64,30 @@ public LocalConsumer(PostEventConfig config, MessageBroker broker) this.debezium = new DebeziumServer(); } + /** + * Starts the consumer by setting up the database and initializing the Debezium + * server. + * + *

+ * This method: + *

    + *
  • Sets up database tables and configurations for the specified topics
  • + *
  • Configures and starts the Debezium server
  • + *
  • Sets up event processing pipeline from Debezium to the message + * broker
  • + *
+ * + * @throws IOException If there's an error starting the consumer or + * connecting to the database + * @throws InterruptedException If the startup process is interrupted + */ public void start() throws IOException, InterruptedException { logger.atInfo() - .addArgument(config.topics()) // renamed from name() + .addArgument(config.topics()) .log("Starting local consumer for {}"); try { - db.setupAll(config.topics()); // renamed from name() + db.setupAll(config.topics()); Consumer> consumer = record -> { try { Event event = changeEventToEvent(record); @@ -52,10 +105,21 @@ public void start() throws IOException, InterruptedException { } } + /** + * Stops the Debezium server and releases associated resources. + * + * @throws IOException If there's an error stopping the server + */ public void stop() throws IOException { debezium.stop(); } + /** + * Implements AutoCloseable to ensure proper resource cleanup. + * Delegates to {@link #stop()}. + * + * @throws IOException If there's an error during cleanup + */ @Override public void close() throws IOException { stop(); diff --git a/library/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java b/library/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java index c7d9794..d917b1c 100644 --- a/library/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java +++ b/library/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java @@ -20,6 +20,40 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * A local consumer implementation that provides persistent event processing + * with catchup capabilities. + * This consumer ensures reliable event processing by persisting events and + * providing mechanisms + * to handle missed or unprocessed events. + * + *

+ * Key features: + *

    + *
  • Persistent event storage using a configured DataSource
  • + *
  • Automatic catchup for missed events
  • + *
  • Periodic health checks and unprocessed event processing
  • + *
  • OpenTelemetry integration for observability
  • + *
  • Support for batch processing of events
  • + *
+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * var consumer = new LocalPersistentConsumer(dataSource, config, openTelemetry, 100);
+ * consumer.start();
+ * 
+ * // Subscribe to events
+ * consumer.subscribe("orders", event -> {
+ *     // Process the event
+ * });
+ * 
+ * // When done
+ * consumer.close();
+ * }
+ */ public class LocalPersistentConsumer implements AutoCloseable, MessageBroker { private static final Logger logger = LoggerFactory.getLogger(LocalPersistentConsumer.class); private PostEventConfig cfg; @@ -30,6 +64,15 @@ public class LocalPersistentConsumer implements AutoCloseable, MessageBroker subscriber) { return tb.subscribe(topic, subscriber); } + /** + * Unsubscribes from events on the specified topic. + * + * @param topic The topic to unsubscribe from + * @param subscriber The subscriber to remove + * @return true if unsubscription was successful, false otherwise + */ @Override public boolean unsubscribe(String topic, MessageSubscriber subscriber) { return tb.unsubscribe(topic, subscriber); } + /** + * Converts a transactional event. In this implementation, returns the event + * unchanged. + * + * @param m The transactional event to convert + * @return The same transactional event + */ @Override public TransactionalEvent convert(TransactionalEvent m) { return m; } - } diff --git a/library/src/main/java/com/p14n/postevent/Publisher.java b/library/src/main/java/com/p14n/postevent/Publisher.java index 9389e82..9d11f3b 100644 --- a/library/src/main/java/com/p14n/postevent/Publisher.java +++ b/library/src/main/java/com/p14n/postevent/Publisher.java @@ -12,12 +12,50 @@ /** * Publisher class responsible for writing events to a PostgreSQL database. + * This utility class provides static methods for publishing events to specific + * topic tables + * in the database. It ensures data integrity by validating topic names and + * handling + * database connections appropriately. + * + *

+ * The class supports two publishing modes: + *

    + *
  • Publishing with an existing connection
  • + *
  • Publishing with a DataSource (manages connection automatically)
  • + *
+ * + *

+ * Topic names must follow these rules: + *

    + *
  • Cannot be null or empty
  • + *
  • Must contain only lowercase letters and underscores
  • + *
  • Pattern: ^[a-z_]+$
  • + *
+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * // Using an existing connection
+ * Connection conn = ...;
+ * Event event = ...;
+ * Publisher.publish(event, conn, "user_events");
+ *
+ * // Using a DataSource
+ * DataSource ds = ...;
+ * Publisher.publish(event, ds, "order_events");
+ * }
*/ public class Publisher { - private Publisher(){ - + /** + * Private constructor to prevent instantiation of the utility class. + */ + private Publisher() { } + /** * Publishes an event to the specified topic table. * @@ -45,9 +83,20 @@ public static void publish(Event event, Connection connection, String topic) thr } } + /** + * Publishes an event to the specified topic table using a DataSource. + * This method manages the database connection automatically. + * + * @param event The event to publish + * @param ds The DataSource to obtain a connection from + * @param topic The topic/table name to publish to + * @throws SQLException if a database access error occurs + * @throws IllegalArgumentException if the topic is null, empty, or contains + * invalid characters + */ public static void publish(Event event, DataSource ds, String topic) throws SQLException { try (Connection c = ds.getConnection()) { publish(event, c, topic); } } -} \ No newline at end of file +} diff --git a/library/src/main/java/com/p14n/postevent/RemotePersistentConsumer.java b/library/src/main/java/com/p14n/postevent/RemotePersistentConsumer.java index aa9f32f..d6f04be 100644 --- a/library/src/main/java/com/p14n/postevent/RemotePersistentConsumer.java +++ b/library/src/main/java/com/p14n/postevent/RemotePersistentConsumer.java @@ -8,11 +8,11 @@ import com.p14n.postevent.broker.SystemEventBroker; import com.p14n.postevent.broker.TransactionalBroker; import com.p14n.postevent.broker.TransactionalEvent; -import com.p14n.postevent.broker.grpc.MessageBrokerGrpcClient; +import com.p14n.postevent.broker.remote.MessageBrokerGrpcClient; import com.p14n.postevent.catchup.CatchupService; import com.p14n.postevent.catchup.PersistentBroker; import com.p14n.postevent.catchup.UnprocessedSubmitter; -import com.p14n.postevent.catchup.grpc.CatchupGrpcClient; +import com.p14n.postevent.catchup.remote.CatchupGrpcClient; import com.p14n.postevent.data.UnprocessedEventFinder; import io.grpc.ManagedChannel; @@ -29,6 +29,41 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +/** + * A remote consumer implementation that provides persistent event processing + * with catchup capabilities. + * This consumer connects to a remote event source via gRPC and ensures reliable + * event processing + * with persistence and automatic catchup for missed events. + * + *

+ * Key features: + *

+ *
    + *
  • Remote event consumption via gRPC
  • + *
  • Persistent event storage
  • + *
  • Automatic catchup for missed events
  • + *
  • Periodic health checks and catchup triggers
  • + *
  • OpenTelemetry integration for observability
  • + *
+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * var consumer = new RemotePersistentConsumer(openTelemetry, 100);
+ * consumer.start(Set.of("orders", "inventory"), dataSource, "localhost", 8080);
+ *
+ * // Subscribe to events
+ * consumer.subscribe("orders", event -> {
+ *     // Process the event
+ * });
+ *
+ * // When done
+ * consumer.close();
+ * }
+ */ public class RemotePersistentConsumer implements AutoCloseable, MessageBroker { private static final Logger logger = LoggerFactory.getLogger(RemotePersistentConsumer.class); @@ -39,16 +74,37 @@ public class RemotePersistentConsumer implements AutoCloseable, MessageBroker topics, DataSource ds, String host, int port) { start(topics, ds, ManagedChannelBuilder.forAddress(host, port) .keepAliveTime(1, TimeUnit.HOURS) @@ -57,6 +113,14 @@ public void start(Set topics, DataSource ds, String host, int port) { .build()); } + /** + * Starts the consumer with a pre-configured gRPC channel. + * + * @param topics Set of topics to subscribe to + * @param ds DataSource for event persistence + * @param channel Configured gRPC channel + * @throws IllegalStateException if the consumer is already started + */ public void start(Set topics, DataSource ds, ManagedChannel channel) { logger.atInfo().log("Starting consumer client"); @@ -99,6 +163,11 @@ public void start(Set topics, DataSource ds, ManagedChannel channel) { } } + /** + * Closes all resources associated with this consumer. + * This includes the message brokers, gRPC channel, and other closeable + * resources. + */ @Override public void close() { logger.atInfo().log("Closing consumer client"); @@ -117,6 +186,13 @@ public void close() { logger.atInfo().log("Consumer client closed"); } + /** + * Publishes a transactional event to the specified topic. + * + * @param topic The topic to publish to + * @param message The transactional event to publish + * @throws RuntimeException if publishing fails + */ @Override public void publish(String topic, TransactionalEvent message) { try { @@ -126,6 +202,14 @@ public void publish(String topic, TransactionalEvent message) { } } + /** + * Subscribes to events on the specified topic. + * Triggers a catchup event to ensure the subscriber receives any missed events. + * + * @param topic The topic to subscribe to + * @param subscriber The subscriber that will receive events + * @return true if subscription was successful, false otherwise + */ @Override public boolean subscribe(String topic, MessageSubscriber subscriber) { var subscribed = tb.subscribe(topic, subscriber); @@ -133,11 +217,25 @@ public boolean subscribe(String topic, MessageSubscriber sub return subscribed; } + /** + * Unsubscribes from events on the specified topic. + * + * @param topic The topic to unsubscribe from + * @param subscriber The subscriber to remove + * @return true if unsubscription was successful, false otherwise + */ @Override public boolean unsubscribe(String topic, MessageSubscriber subscriber) { return tb.unsubscribe(topic, subscriber); } + /** + * Converts a transactional event. In this implementation, returns the event + * unchanged. + * + * @param m The transactional event to convert + * @return The same transactional event + */ @Override public TransactionalEvent convert(TransactionalEvent m) { return m; diff --git a/library/src/main/java/com/p14n/postevent/broker/AsyncExecutor.java b/library/src/main/java/com/p14n/postevent/broker/AsyncExecutor.java index a259071..33e1a70 100644 --- a/library/src/main/java/com/p14n/postevent/broker/AsyncExecutor.java +++ b/library/src/main/java/com/p14n/postevent/broker/AsyncExecutor.java @@ -3,15 +3,40 @@ import java.util.List; import java.util.concurrent.*; +/** + * Interface for asynchronous task execution. + */ public interface AsyncExecutor extends AutoCloseable { + /** + * Schedules a task for repeated fixed-rate execution. + * + * @param command The task to execute + * @param initialDelay The time to delay first execution + * @param period The period between successive executions + * @param unit The time unit of the initialDelay and period parameters + * @return A ScheduledFuture representing pending completion of the task + */ public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); + /** + * Shuts down the executor and returns a list of runnables that were not + * executed. + * + * @return A list of runnables that were not executed + */ List shutdownNow(); + /** + * Submits a task for execution and returns a Future representing the pending + * + * @param task The task to submit + * @param The type of the task + * @return A Future representing pending completion of the task + */ Future submit(Callable task); } diff --git a/library/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java b/library/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java index ae233d4..3d954c3 100644 --- a/library/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java +++ b/library/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java @@ -6,32 +6,77 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; +/** + * Default implementation of {@link AsyncExecutor} that provides configurable + * thread pool execution. + * Supports both virtual threads and fixed-size thread pools for task execution, + * along with + * scheduled task capabilities. + * + *

+ * Key features: + *

+ *
    + *
  • Virtual thread pool support for efficient concurrent task execution
  • + *
  • Configurable fixed-size thread pool alternative
  • + *
  • Scheduled task execution with customizable intervals
  • + *
  • Named thread factories for better debugging and monitoring
  • + *
+ */ public class DefaultExecutor implements AsyncExecutor { private final ScheduledExecutorService se; private final ExecutorService es; + /** + * Creates a new executor with a scheduled thread pool and virtual thread pool. + * + * @param scheduledSize the size of the scheduled thread pool + */ public DefaultExecutor(int scheduledSize) { this.se = createScheduledExecutorService(scheduledSize); this.es = createVirtualExecutorService(); } + /** + * Creates a new executor with both scheduled and fixed-size thread pools. + * + * @param scheduledSize the size of the scheduled thread pool + * @param fixedSize the size of the fixed thread pool + */ public DefaultExecutor(int scheduledSize, int fixedSize) { this.se = createScheduledExecutorService(scheduledSize); this.es = createFixedExecutorService(fixedSize); } + /** + * Creates a fixed-size thread pool with named threads. + * + * @param size the number of threads in the pool + * @return a fixed thread pool executor service + */ protected ExecutorService createFixedExecutorService(int size) { return Executors.newFixedThreadPool(size, new ThreadFactoryBuilder().setNameFormat("post-event-fixed-%d").build()); } + /** + * Creates a virtual thread pool for efficient task execution. + * + * @return a virtual thread executor service + */ protected ExecutorService createVirtualExecutorService() { return Executors.newThreadPerTaskExecutor( new ThreadFactoryBuilder().setThreadFactory(Thread.ofVirtual().factory()) .setNameFormat("post-event-virtual-%d").build()); } + /** + * Creates a scheduled thread pool with named threads. + * + * @param size the number of threads in the pool + * @return a scheduled thread pool executor service + */ protected ScheduledExecutorService createScheduledExecutorService(int size) { return Executors.newScheduledThreadPool(size, new ThreadFactoryBuilder().setNameFormat("post-event-scheduled-%d").build()); diff --git a/library/src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java b/library/src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java index 957b8cc..122ae54 100644 --- a/library/src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java +++ b/library/src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java @@ -12,28 +12,101 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Tracer; +/** + * Abstract base implementation of a message broker that supports + * publish-subscribe messaging patterns. + * Provides thread-safe handling of message publishing and subscription + * management with telemetry support. + * + *

+ * Key features: + *

+ *
    + *
  • Thread-safe message publishing and subscription management
  • + *
  • Asynchronous message delivery to subscribers
  • + *
  • OpenTelemetry integration for metrics and tracing
  • + *
  • Automatic resource cleanup via AutoCloseable
  • + *
+ * + *

+ * The broker maintains a concurrent map of topic subscribers and ensures + * thread-safe operations + * for publishing messages and managing subscriptions. Messages are delivered + * asynchronously to + * subscribers using the provided {@link AsyncExecutor}. + *

+ * + * @param The input message type, must implement {@link Traceable} + * @param The output message type delivered to subscribers + */ public abstract class DefaultMessageBroker implements MessageBroker, AutoCloseable { + /** + * Thread-safe map storing topic subscribers. + * Key is the topic name, value is a thread-safe set of subscribers. + */ protected final ConcurrentHashMap>> topicSubscribers = new ConcurrentHashMap<>(); + + /** + * Flag indicating if the broker has been closed. + */ protected final AtomicBoolean closed = new AtomicBoolean(false); + + /** + * Executor for handling asynchronous message delivery. + */ private final AsyncExecutor asyncExecutor; + + /** + * Metrics collector for broker operations. + */ protected final BrokerMetrics metrics; + + /** + * OpenTelemetry tracer for distributed tracing. + */ protected final Tracer tracer; + /** + * OpenTelemetry instance for metrics and tracing. + */ protected final OpenTelemetry openTelemetry; - public DefaultMessageBroker(OpenTelemetry ot,String scopeName) { - this(new DefaultExecutor(2), ot,scopeName); + /** + * Creates a new broker instance with default executor configuration. + * + * @param ot The OpenTelemetry instance for metrics and tracing + * @param scopeName The scope name for OpenTelemetry instrumentation + */ + public DefaultMessageBroker(OpenTelemetry ot, String scopeName) { + this(new DefaultExecutor(2), ot, scopeName); } - public DefaultMessageBroker(AsyncExecutor asyncExecutor, OpenTelemetry ot,String scopeName) { + /** + * Creates a new broker instance with custom executor configuration. + * + * @param asyncExecutor The executor for handling asynchronous message delivery + * @param ot The OpenTelemetry instance for metrics and tracing + * @param scopeName The scope name for OpenTelemetry instrumentation + */ + public DefaultMessageBroker(AsyncExecutor asyncExecutor, OpenTelemetry ot, String scopeName) { this.asyncExecutor = asyncExecutor; this.metrics = new BrokerMetrics(ot.getMeter(scopeName)); this.tracer = ot.getTracer(scopeName); this.openTelemetry = ot; } + /** + * Checks if a message can be processed for a given topic. + * Validates broker state and message parameters. + * + * @param topic The topic to check + * @param message The message to validate + * @return true if the message can be processed, false otherwise + * @throws IllegalStateException if the broker is closed + * @throws IllegalArgumentException if topic or message is null + */ protected boolean canProcess(String topic, InT message) { if (closed.get()) { throw new IllegalStateException("Broker is closed"); @@ -47,10 +120,27 @@ protected boolean canProcess(String topic, InT message) { throw new IllegalArgumentException("Topic cannot be null"); } - // If no subscribers for this topic, message is silently dropped return topicSubscribers.containsKey(topic) && !topicSubscribers.get(topic).isEmpty(); } + /** + * Converts the input message type to the output message type. + * Must be implemented by concrete classes. + * + * @param message The input message to convert + * @return The converted output message + */ + public abstract OutT convert(InT message); + + /** + * Publishes a message to all subscribers of the specified topic. + * Messages are delivered asynchronously to all subscribers. + * If a subscriber throws an exception during message processing, + * its error handler is invoked and the exception is propagated. + * + * @param topic The topic to publish to + * @param message The message to publish + */ @Override public void publish(String topic, InT message) { if (!canProcess(topic, message)) { @@ -59,13 +149,11 @@ public void publish(String topic, InT message) { metrics.recordPublished(topic); - // Deliver to all subscribers for this topic Set> subscribers = topicSubscribers.get(topic); if (subscribers != null) { - - processWithTelemetry(openTelemetry,tracer, message, "publish_message", () -> { + processWithTelemetry(openTelemetry, tracer, message, "publish_message", () -> { for (MessageSubscriber subscriber : subscribers) { - asyncExecutor.submit(() -> processWithTelemetry(openTelemetry,tracer, message, "process_message", + asyncExecutor.submit(() -> processWithTelemetry(openTelemetry, tracer, message, "process_message", () -> { try { subscriber.onMessage(convert(message)); @@ -82,10 +170,20 @@ public void publish(String topic, InT message) { } return null; }); - } } + /** + * Subscribes a message handler to a specific topic. + * Thread-safe subscription management using ConcurrentHashMap and + * CopyOnWriteArraySet. + * + * @param topic The topic to subscribe to + * @param subscriber The subscriber to add + * @return true if the subscription was added, false if it already existed + * @throws IllegalStateException if the broker is closed + * @throws IllegalArgumentException if topic or subscriber is null + */ @Override public boolean subscribe(String topic, MessageSubscriber subscriber) { if (closed.get()) { @@ -111,6 +209,16 @@ public boolean subscribe(String topic, MessageSubscriber subscriber) { return added; } + /** + * Unsubscribes a message handler from a specific topic. + * Thread-safe unsubscription with automatic topic cleanup when the last + * subscriber is removed. + * + * @param topic The topic to unsubscribe from + * @param subscriber The subscriber to remove + * @return true if the subscription was removed, false if it didn't exist + * @throws IllegalArgumentException if topic or subscriber is null + */ @Override public boolean unsubscribe(String topic, MessageSubscriber subscriber) { if (subscriber == null) { @@ -135,10 +243,14 @@ public boolean unsubscribe(String topic, MessageSubscriber subscriber) { return false; } + /** + * Closes the broker and removes all subscribers. + * After closing, no new subscriptions can be added and no messages can be + * published. + */ @Override public void close() { closed.set(true); topicSubscribers.clear(); } - } diff --git a/library/src/main/java/com/p14n/postevent/broker/EventMessageBroker.java b/library/src/main/java/com/p14n/postevent/broker/EventMessageBroker.java index c450eb5..1123055 100644 --- a/library/src/main/java/com/p14n/postevent/broker/EventMessageBroker.java +++ b/library/src/main/java/com/p14n/postevent/broker/EventMessageBroker.java @@ -1,22 +1,67 @@ package com.p14n.postevent.broker; import com.p14n.postevent.data.Event; - import io.opentelemetry.api.OpenTelemetry; +/** + * A specialized message broker implementation for handling {@link Event} + * messages. + * Extends {@link DefaultMessageBroker} to provide direct pass-through of events + * without any transformation. + * + *

+ * This broker is designed for simple event distribution scenarios where the + * input + * and output event types are identical. It maintains all the thread-safety and + * telemetry capabilities of the parent class while simplifying the event + * handling + * process. + *

+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * EventMessageBroker broker = new EventMessageBroker(openTelemetry, "events");
+ * broker.subscribe("orders", event -> {
+ *     // Handle the event directly
+ * });
+ * broker.publish("orders", new Event(...));
+ * }
+ */ public class EventMessageBroker extends DefaultMessageBroker { + /** + * Creates a new EventMessageBroker with custom executor configuration. + * + * @param asyncExecutor The executor for handling asynchronous message delivery + * @param ot The OpenTelemetry instance for metrics and tracing + * @param scopeName The scope name for OpenTelemetry instrumentation + */ public EventMessageBroker(AsyncExecutor asyncExecutor, OpenTelemetry ot, String scopeName) { super(asyncExecutor, ot, scopeName); } + /** + * Creates a new EventMessageBroker with default executor configuration. + * + * @param ot The OpenTelemetry instance for metrics and tracing + * @param scopeName The scope name for OpenTelemetry instrumentation + */ public EventMessageBroker(OpenTelemetry ot, String scopeName) { - super(ot,scopeName); + super(ot, scopeName); } + /** + * Implements the conversion method from the parent class. + * Simply returns the input event as-is since no transformation is needed. + * + * @param m The event to convert + * @return The same event instance + */ @Override public Event convert(Event m) { return m; } - } diff --git a/library/src/main/java/com/p14n/postevent/broker/SystemEvent.java b/library/src/main/java/com/p14n/postevent/broker/SystemEvent.java index a627123..ca8d312 100644 --- a/library/src/main/java/com/p14n/postevent/broker/SystemEvent.java +++ b/library/src/main/java/com/p14n/postevent/broker/SystemEvent.java @@ -2,14 +2,66 @@ import com.p14n.postevent.data.Traceable; +/** + * Enumeration of system-level events used for internal coordination and + * control. + * Implements {@link Traceable} to support distributed tracing and event + * correlation. + * + *

+ * Available events: + *

+ *
    + *
  • {@code CatchupRequired}: Signals that a topic needs to catch up on missed + * events
  • + *
  • {@code UnprocessedCheckRequired}: Signals that unprocessed events need to + * be checked
  • + *
  • {@code FetchLatest}: Signals that the latest events should be fetched for + * a topic
  • + *
+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * SystemEvent event = SystemEvent.CatchupRequired.withTopic("orders");
+ * broker.publish(event.topic(), event);
+ * }
+ */ public enum SystemEvent implements Traceable { + /** + * Signals that a topic needs to catch up on missed events. + * Used by the {@link com.p14n.postevent.catchup.CatchupService} to initiate + * catchup processing. + */ CatchupRequired, + + /** + * Signals that unprocessed events need to be checked. + * Used to trigger verification of events that may have been missed or failed + * processing. + */ UnprocessedCheckRequired, - FetchLatest; // New event type + /** + * Signals that the latest events should be fetched for a topic. + * Used to request immediate retrieval of recent events without full catchup. + */ + FetchLatest; + + /** + * The topic this event is associated with. + */ public String topic; + /** + * Associates a topic with this system event. + * + * @param topic the topic to associate with this event + * @return this event instance with the topic set + */ public SystemEvent withTopic(String topic) { this.topic = topic; return this; diff --git a/library/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java b/library/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java index 1c2d672..c538a49 100644 --- a/library/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java +++ b/library/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java @@ -2,15 +2,49 @@ import io.opentelemetry.api.OpenTelemetry; -public class SystemEventBroker extends - DefaultMessageBroker { +/** + * Specialized message broker for handling system-level events. + * Extends {@link DefaultMessageBroker} to provide a dedicated channel for + * internal system coordination events. + * + *

+ * This broker handles {@link SystemEvent} messages for internal coordination + * such as: + *

    + *
  • Catchup processing signals
  • + *
  • Unprocessed event checks
  • + *
  • Latest event fetch requests
  • + *
+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * SystemEventBroker broker = new SystemEventBroker(openTelemetry);
+ * broker.subscribe(subscriber);
+ * broker.publish(SystemEvent.CatchupRequired.withTopic("orders"));
+ * }
+ */ +public class SystemEventBroker extends DefaultMessageBroker { + /** + * Creates a new system event broker with custom executor configuration. + * + * @param asyncExecutor the executor for handling asynchronous message delivery + * @param ot the OpenTelemetry instance for metrics and tracing + */ public SystemEventBroker(AsyncExecutor asyncExecutor, OpenTelemetry ot) { super(asyncExecutor, ot, "system_events"); } + /** + * Creates a new system event broker with default executor configuration. + * + * @param ot the OpenTelemetry instance for metrics and tracing + */ public SystemEventBroker(OpenTelemetry ot) { - super(ot,"system_events"); + super(ot, "system_events"); } @Override @@ -18,12 +52,21 @@ public SystemEvent convert(SystemEvent m) { return m; } + /** + * Publishes a system event to the default system topic. + * + * @param event the system event to publish + */ public void publish(SystemEvent event) { publish("system", event); } + /** + * Subscribes to system events on the default system topic. + * + * @param subscriber the subscriber to receive system events + */ public void subscribe(MessageSubscriber subscriber) { subscribe("system", subscriber); } - } diff --git a/library/src/main/java/com/p14n/postevent/broker/TransactionalBroker.java b/library/src/main/java/com/p14n/postevent/broker/TransactionalBroker.java index 87cd68d..0ecadce 100644 --- a/library/src/main/java/com/p14n/postevent/broker/TransactionalBroker.java +++ b/library/src/main/java/com/p14n/postevent/broker/TransactionalBroker.java @@ -11,22 +11,85 @@ import java.sql.Connection; +/** + * A specialized message broker that provides transactional delivery of events + * to subscribers. + * Extends {@link DefaultMessageBroker} to ensure events are processed within + * database transactions. + * + *

+ * Key features: + *

+ *
    + *
  • Database transaction support for each message delivery
  • + *
  • Ordered processing of events using {@link OrderedProcessor}
  • + *
  • Integration with OpenTelemetry for transaction monitoring
  • + *
  • Error handling with subscriber notification
  • + *
+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * TransactionalBroker broker = new TransactionalBroker(dataSource, openTelemetry, systemEventBroker);
+ * broker.subscribe("orders", new TransactionalEventHandler());
+ * broker.publish("orders", event);
+ * }
+ */ public class TransactionalBroker extends DefaultMessageBroker { private final DataSource ds; private final SystemEventBroker systemEventBroker; - public TransactionalBroker(DataSource ds, AsyncExecutor asyncExecutor, OpenTelemetry ot, SystemEventBroker systemEventBroker) { + /** + * Creates a new transactional broker with custom executor configuration. + * + * @param ds the data source for database connections + * @param asyncExecutor the executor for handling asynchronous message + * delivery + * @param ot the OpenTelemetry instance for metrics and tracing + * @param systemEventBroker the broker for system events + */ + public TransactionalBroker(DataSource ds, AsyncExecutor asyncExecutor, OpenTelemetry ot, + SystemEventBroker systemEventBroker) { super(asyncExecutor, ot, "transactional_broker"); this.ds = ds; this.systemEventBroker = systemEventBroker; } + /** + * Creates a new transactional broker with default executor configuration. + * + * @param ds the data source for database connections + * @param ot the OpenTelemetry instance for metrics and tracing + * @param systemEventBroker the broker for system events + */ public TransactionalBroker(DataSource ds, OpenTelemetry ot, SystemEventBroker systemEventBroker) { super(ot, "transactional_broker"); this.ds = ds; this.systemEventBroker = systemEventBroker; } + /** + * Publishes a message to subscribers within a database transaction. + * Each subscriber receives the message in its own transaction context. + * + *

+ * The publishing process: + *

+ *
    + *
  1. Validates the message and topic
  2. + *
  3. Creates a new database connection for each subscriber
  4. + *
  5. Processes the message using {@link OrderedProcessor}
  6. + *
  7. Handles delivery within a transaction context
  8. + *
  9. Records metrics for successful delivery
  10. + *
+ * + * @param topic the topic to publish to + * @param message the event message to publish + * @throws IllegalStateException if the broker is closed + * @throws IllegalArgumentException if topic or message is null + */ @Override public void publish(String topic, Event message) { if (!canProcess(topic, message)) { @@ -38,11 +101,8 @@ public void publish(String topic, Event message) { // Deliver to all subscribers for (MessageSubscriber subscriber : topicSubscribers.get(topic)) { try (Connection c = ds.getConnection()) { - processWithTelemetry(openTelemetry, tracer, message, "ordered_process", () -> { - - var op = new OrderedProcessor(systemEventBroker,(connection, event) -> { - + var op = new OrderedProcessor(systemEventBroker, (connection, event) -> { return processWithTelemetry(openTelemetry, tracer, message, "message_transaction", () -> { try { subscriber.onMessage(new TransactionalEvent(connection, event)); @@ -56,12 +116,10 @@ public void publish(String topic, Event message) { return false; } }); - }); op.process(c, message); return null; }); - } catch (Exception e) { try { subscriber.onError(e); diff --git a/library/src/main/java/com/p14n/postevent/broker/TransactionalEvent.java b/library/src/main/java/com/p14n/postevent/broker/TransactionalEvent.java index 616f53c..2912283 100644 --- a/library/src/main/java/com/p14n/postevent/broker/TransactionalEvent.java +++ b/library/src/main/java/com/p14n/postevent/broker/TransactionalEvent.java @@ -5,23 +5,75 @@ import java.sql.Connection; +/** + * A record that wraps an {@link Event} with its associated database + * {@link Connection}, + * enabling transactional processing of events. This class implements + * {@link Traceable} + * to maintain tracing context through the event processing pipeline. + * + *

+ * The TransactionalEvent serves as a container that: + *

    + *
  • Maintains the database connection context for transactional + * operations
  • + *
  • Preserves the original event data and metadata
  • + *
  • Delegates tracing information to the underlying event
  • + *
+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * Connection conn = dataSource.getConnection();
+ * Event event = Event.create(...);
+ * TransactionalEvent txEvent = new TransactionalEvent(conn, event);
+ *
+ * // Use the transactional event
+ * processEventInTransaction(txEvent);
+ * }
+ * + * @param connection The database connection for transactional operations + * @param event The underlying event being processed + */ public record TransactionalEvent(Connection connection, Event event) implements Traceable { + /** + * Returns the unique identifier of the underlying event. + * + * @return the event's unique identifier + */ @Override public String id() { return event.id(); } + /** + * Returns the topic of the underlying event. + * + * @return the event's topic + */ @Override public String topic() { return event.topic(); } + /** + * Returns the subject of the underlying event. + * + * @return the event's subject + */ @Override public String subject() { return event.subject(); } + /** + * Returns the OpenTelemetry trace parent identifier of the underlying event. + * + * @return the event's trace parent identifier + */ @Override public String traceparent() { return event.traceparent(); diff --git a/library/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcClient.java b/library/src/main/java/com/p14n/postevent/broker/remote/MessageBrokerGrpcClient.java similarity index 64% rename from library/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcClient.java rename to library/src/main/java/com/p14n/postevent/broker/remote/MessageBrokerGrpcClient.java index d939312..6098304 100644 --- a/library/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcClient.java +++ b/library/src/main/java/com/p14n/postevent/broker/remote/MessageBrokerGrpcClient.java @@ -1,8 +1,11 @@ -package com.p14n.postevent.broker.grpc; +package com.p14n.postevent.broker.remote; import com.p14n.postevent.broker.AsyncExecutor; import com.p14n.postevent.broker.EventMessageBroker; import com.p14n.postevent.broker.MessageSubscriber; +import com.p14n.postevent.broker.grpc.EventResponse; +import com.p14n.postevent.broker.grpc.MessageBrokerServiceGrpc; +import com.p14n.postevent.broker.grpc.SubscriptionRequest; import com.p14n.postevent.data.Event; import io.grpc.ManagedChannel; @@ -18,14 +21,37 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * A gRPC client implementation of the EventMessageBroker that connects to a + * remote message broker service. + * Provides streaming event subscription and publication capabilities over gRPC. + * + *

+ * Key features: + *

+ *
    + *
  • Streaming event subscription via gRPC
  • + *
  • Automatic reconnection handling
  • + *
  • Thread-safe subscription management
  • + *
  • OpenTelemetry integration for observability
  • + *
+ */ public class MessageBrokerGrpcClient extends EventMessageBroker { private static final Logger logger = LoggerFactory.getLogger(MessageBrokerGrpcClient.class); private final MessageBrokerServiceGrpc.MessageBrokerServiceStub asyncStub; - private final Set subscribed = ConcurrentHashMap.newKeySet();; + private final Set subscribed = ConcurrentHashMap.newKeySet(); ManagedChannel channel; + /** + * Creates a new MessageBrokerGrpcClient with the specified host and port. + * + * @param asyncExecutor Executor for handling asynchronous operations + * @param ot OpenTelemetry instance for monitoring + * @param host Remote broker host address + * @param port Remote broker port + */ public MessageBrokerGrpcClient(AsyncExecutor asyncExecutor, OpenTelemetry ot, String host, int port) { this(asyncExecutor, ot, ManagedChannelBuilder.forAddress(host, port) .keepAliveTime(1, TimeUnit.HOURS) @@ -34,12 +60,25 @@ public MessageBrokerGrpcClient(AsyncExecutor asyncExecutor, OpenTelemetry ot, St .build()); } + /** + * Creates a new MessageBrokerGrpcClient with a pre-configured gRPC channel. + * + * @param asyncExecutor Executor for handling asynchronous operations + * @param ot OpenTelemetry instance for monitoring + * @param channel Pre-configured gRPC ManagedChannel + */ public MessageBrokerGrpcClient(AsyncExecutor asyncExecutor, OpenTelemetry ot, ManagedChannel channel) { super(asyncExecutor, ot, "grpc_client_broker"); this.channel = channel; this.asyncStub = MessageBrokerServiceGrpc.newStub(channel); } + /** + * Establishes a streaming subscription to events on the specified topic. + * Creates a gRPC stream observer to handle incoming events and errors. + * + * @param topic Topic to subscribe to + */ public void subscribeToEvents(String topic) { SubscriptionRequest request = SubscriptionRequest.newBuilder() .setTopic(topic) @@ -60,7 +99,6 @@ public void onNext(EventResponse response) { @Override public void onError(Throwable t) { logger.atError().setCause(t).log("Error in event stream"); - // subscribed.remove(topic); } @Override @@ -72,10 +110,14 @@ public void onCompleted() { asyncStub.subscribeToEvents(request, responseObserver); subscribed.add(topic); - // Send the subscription request - } + /** + * Converts a gRPC event response into an internal Event object. + * + * @param grpcEvent The gRPC event response to convert + * @return Converted Event object + */ private Event convertFromGrpcEvent(EventResponse grpcEvent) { OffsetDateTime time = null; if (!grpcEvent.getTime().isEmpty()) { @@ -95,11 +137,24 @@ private Event convertFromGrpcEvent(EventResponse grpcEvent) { grpcEvent.getTraceparent()); } + /** + * {@inheritDoc} + * Publishes an event to the specified topic. + */ @Override public void publish(String topic, Event message) { super.publish(topic, message); } + /** + * {@inheritDoc} + * Subscribes to events on the specified topic, establishing a gRPC stream if + * needed. + * + * @param topic Topic to subscribe to + * @param subscriber Subscriber to receive events + * @return true if subscription was successful, false otherwise + */ @Override public boolean subscribe(String topic, MessageSubscriber subscriber) { if (super.subscribe(topic, subscriber)) { @@ -111,6 +166,14 @@ public boolean subscribe(String topic, MessageSubscriber subscriber) { return false; } + /** + * {@inheritDoc} + * Unsubscribes from events and manages gRPC channel cleanup. + * + * @param topic Topic to unsubscribe from + * @param subscriber Subscriber to remove + * @return true if unsubscription was successful, false otherwise + */ @Override public boolean unsubscribe(String topic, MessageSubscriber subscriber) { boolean unsubscribed = super.unsubscribe(topic, subscriber); diff --git a/library/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcServer.java b/library/src/main/java/com/p14n/postevent/broker/remote/MessageBrokerGrpcServer.java similarity index 65% rename from library/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcServer.java rename to library/src/main/java/com/p14n/postevent/broker/remote/MessageBrokerGrpcServer.java index 83fe063..5e0beda 100644 --- a/library/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcServer.java +++ b/library/src/main/java/com/p14n/postevent/broker/remote/MessageBrokerGrpcServer.java @@ -1,5 +1,8 @@ -package com.p14n.postevent.broker.grpc; +package com.p14n.postevent.broker.remote; +import com.p14n.postevent.broker.grpc.EventResponse; +import com.p14n.postevent.broker.grpc.MessageBrokerServiceGrpc; +import com.p14n.postevent.broker.grpc.SubscriptionRequest; import com.p14n.postevent.data.Event; import com.p14n.postevent.broker.MessageBroker; import com.p14n.postevent.broker.MessageSubscriber; @@ -12,14 +15,58 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * A gRPC server implementation that exposes a MessageBroker service for remote + * event subscription. + * This class extends the auto-generated + * MessageBrokerServiceGrpc.MessageBrokerServiceImplBase + * to provide streaming event delivery to remote clients. + * + *

+ * Key features: + *

    + *
  • Streaming event subscription via gRPC
  • + *
  • Thread-safe event delivery
  • + *
  • Automatic cleanup of cancelled subscriptions
  • + *
  • Error handling and propagation to clients
  • + *
  • Structured logging for monitoring and debugging
  • + *
+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * MessageBroker broker = // initialize broker
+ * MessageBrokerGrpcServer server = new MessageBrokerGrpcServer(broker);
+ * 
+ * // Add to gRPC server
+ * Server grpcServer = ServerBuilder.forPort(8080)
+ *     .addService(server)
+ *     .build();
+ * grpcServer.start();
+ * }
+ */ public class MessageBrokerGrpcServer extends MessageBrokerServiceGrpc.MessageBrokerServiceImplBase { private static final Logger logger = LoggerFactory.getLogger(MessageBrokerGrpcServer.class); private final MessageBroker messageBroker; + /** + * Creates a new MessageBrokerGrpcServer with the specified message broker. + * + * @param messageBroker The message broker that will handle event distribution + */ public MessageBrokerGrpcServer(MessageBroker messageBroker) { this.messageBroker = messageBroker; } + /** + * Sends an error response to the client through the gRPC stream. + * + * @param responseObserver The stream observer to send the error to + * @param msg The error message + * @param error The underlying error cause + */ private void errorResponse(StreamObserver responseObserver, String msg, Throwable error) { responseObserver.onError(Status.INTERNAL @@ -28,6 +75,26 @@ private void errorResponse(StreamObserver responseObserver, Strin .asRuntimeException()); } + /** + * Handles subscription requests from clients and establishes a streaming + * connection + * for event delivery. This method implements the gRPC service endpoint for + * event + * subscription. + * + *

+ * The method: + *

    + *
  • Validates the subscription request
  • + *
  • Sets up a message subscriber for the requested topic
  • + *
  • Handles client disconnection and cleanup
  • + *
  • Manages error conditions and client notification
  • + *
+ * + * @param request The subscription request containing the topic to + * subscribe to + * @param responseObserver The stream observer for sending events to the client + */ @Override public void subscribeToEvents(SubscriptionRequest request, StreamObserver responseObserver) { String topic = request.getTopic(); @@ -92,6 +159,13 @@ public void onError(Throwable error) { } } + /** + * Converts an internal Event object to a gRPC EventResponse message. + * Handles all optional fields and ensures proper conversion of data types. + * + * @param event The internal event to convert + * @return The gRPC event response message + */ private EventResponse convertToGrpcEvent(Event event) { EventResponse.Builder builder = EventResponse.newBuilder() .setId(event.id()) diff --git a/library/src/main/java/com/p14n/postevent/catchup/CatchupServer.java b/library/src/main/java/com/p14n/postevent/catchup/CatchupServer.java index e68a39c..61e1544 100644 --- a/library/src/main/java/com/p14n/postevent/catchup/CatchupServer.java +++ b/library/src/main/java/com/p14n/postevent/catchup/CatchupServer.java @@ -15,15 +15,71 @@ import java.util.ArrayList; import java.util.List; +/** + * Database implementation of the CatchupServerInterface. + * Provides functionality to fetch missed events and latest message IDs from a + * PostgreSQL database. + * Uses a DataSource for connection management and prepared statements for + * secure SQL execution. + * + *

+ * Events are stored in topic-specific tables within the 'postevent' schema. + * Each table + * contains events with sequential IDs (idn) that can be used for range-based + * queries. + *

+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * DataSource dataSource = // initialize your datasource
+ * CatchupServer server = new CatchupServer(dataSource);
+ * 
+ * // Fetch up to 100 events from sequence 1000 to 2000
+ * List events = server.fetchEvents(1000L, 2000L, 100, "my-topic");
+ * 
+ * // Get the latest event ID
+ * long latestId = server.getLatestMessageId("my-topic");
+ * }
+ */ public class CatchupServer implements CatchupServerInterface { private static final Logger logger = LoggerFactory.getLogger(CatchupServer.class); private final DataSource dataSource; + /** + * Creates a new CatchupServer instance. + * + * @param dataSource The DataSource to use for database connections. + * Should be properly configured with connection pooling and + * credentials. + */ public CatchupServer(DataSource dataSource) { this.dataSource = dataSource; } + /** + * {@inheritDoc} + * + *

+ * This implementation fetches events from a PostgreSQL database using a + * prepared statement + * to prevent SQL injection. Events are retrieved in order by their sequence + * number (idn). + *

+ * + *

+ * The SQL query used is: + * {@code SELECT * FROM postevent. WHERE idn BETWEEN (?+1) AND ? ORDER BY idn LIMIT ?} + *

+ * + * @throws IllegalArgumentException if startAfter is greater than end, + * maxResults is less than or equal to 0, + * or if topic is null or empty + * @throws RuntimeException if there is a database access error + */ @Override public List fetchEvents(long startAfter, long end, int maxResults, String topic) { if (startAfter > end) { @@ -66,6 +122,22 @@ public List fetchEvents(long startAfter, long end, int maxResults, String } } + /** + * {@inheritDoc} + * + *

+ * This implementation queries the PostgreSQL database for the maximum sequence + * number (idn) + * in the specified topic's table. Returns 0 if no events exist. + *

+ * + *

+ * The SQL query used is: {@code SELECT MAX(idn) FROM postevent.} + *

+ * + * @throws IllegalArgumentException if topic is null or empty + * @throws RuntimeException if there is a database access error + */ @Override public long getLatestMessageId(String topic) { if (topic == null || topic.trim().isEmpty()) { diff --git a/library/src/main/java/com/p14n/postevent/catchup/CatchupServerInterface.java b/library/src/main/java/com/p14n/postevent/catchup/CatchupServerInterface.java index 9ba5ff5..16b09c9 100644 --- a/library/src/main/java/com/p14n/postevent/catchup/CatchupServerInterface.java +++ b/library/src/main/java/com/p14n/postevent/catchup/CatchupServerInterface.java @@ -5,24 +5,54 @@ /** * Interface for fetching events from a data store. + * Provides methods to retrieve events within a specified range and get the + * latest message ID + * for a given topic. This interface is used to implement catch-up functionality + * for clients + * that have missed events or need to synchronize their state. + * + *

+ * Implementations of this interface should: + *

+ *
    + *
  • Handle event retrieval from persistent storage
  • + *
  • Ensure events are returned in order by sequence number (idn)
  • + *
  • Apply appropriate validation for input parameters
  • + *
  • Handle topic-specific event retrieval
  • + *
*/ public interface CatchupServerInterface { /** * Fetches events from a data store within a specified range. + * Returns events where the sequence number (idn) is greater than startAfter + * and less than or equal to end, ordered by sequence number. * * @param startAfter The ID to start fetching events after (exclusive) * @param end The maximum ID to fetch events up to (inclusive) * @param maxResults The maximum number of events to fetch - * @return A list of events within the specified range + * @param topic The topic to fetch events from + * @return A list of events within the specified range, ordered by sequence + * number + * @throws IllegalArgumentException if startAfter is greater than end, + * maxResults is less than or equal to 0, + * or if topic is null or empty + * @throws RuntimeException if there is an error accessing the data + * store */ List fetchEvents(long startAfter, long end, int maxResults, String topic); /** - * Retrieves the latest message ID for a given topic. + * Retrieves the latest message ID (sequence number) for a given topic. + * This can be used to determine the current position in the event stream + * and calculate gaps in sequence numbers. * * @param topic The topic to get the latest message ID for - * @return The latest message ID for the specified topic + * @return The latest message ID for the specified topic, or 0 if no messages + * exist + * @throws IllegalArgumentException if topic is null or empty + * @throws RuntimeException if there is an error accessing the data + * store */ long getLatestMessageId(String topic); } diff --git a/library/src/main/java/com/p14n/postevent/catchup/CatchupService.java b/library/src/main/java/com/p14n/postevent/catchup/CatchupService.java index 254748e..0511d4f 100644 --- a/library/src/main/java/com/p14n/postevent/catchup/CatchupService.java +++ b/library/src/main/java/com/p14n/postevent/catchup/CatchupService.java @@ -21,7 +21,29 @@ import javax.sql.DataSource; /** - * Service to handle catching up on missed events for topics. + * Service responsible for managing event catchup operations and maintaining + * high water marks (HWM). + * Implements both MessageSubscriber for SystemEvents and OneAtATimeBehaviour + * for sequential processing. + * + *

+ * Key features: + *

+ *
    + *
  • Event catchup processing for missed events
  • + *
  • High water mark (HWM) management
  • + *
  • Gap detection in event sequences
  • + *
  • Sequential processing of catchup operations
  • + *
+ * + *

+ * The service responds to two types of system events: + *

+ *
    + *
  • {@code SystemEvent.CatchupRequired}: Triggers full catchup + * processing
  • + *
  • {@code SystemEvent.FetchLatest}: Fetches only the latest events
  • + *
*/ public class CatchupService implements MessageSubscriber, OneAtATimeBehaviour { private static final Logger LOGGER = LoggerFactory.getLogger(CatchupService.class); @@ -34,6 +56,13 @@ public class CatchupService implements MessageSubscriber, OneAtATim final AtomicInteger signals = new AtomicInteger(0); final AtomicBoolean running = new AtomicBoolean(false); + /** + * Creates a new CatchupService instance. + * + * @param ds The DataSource for database operations + * @param catchupServer The server interface for fetching catchup events + * @param systemEventBroker The broker for publishing system events + */ public CatchupService(DataSource ds, CatchupServerInterface catchupServer, SystemEventBroker systemEventBroker) { this.datasource = ds; this.catchupServer = catchupServer; @@ -41,12 +70,12 @@ public CatchupService(DataSource ds, CatchupServerInterface catchupServer, Syste } /** - * Catches up a topic by processing events since their last high water - * mark. + * Performs catchup processing for a specific topic. + * Fetches missed events between the current high water mark and the next gap, + * writes them to the messages table, and updates the HWM accordingly. * - * @param topicName The name of the topic - * @return The number of events processed - * @throws SQLException If a database error occurs + * @param topicName The topic to process catchup for + * @return The number of events processed during catchup */ public int catchup(String topicName) { @@ -100,6 +129,16 @@ public int catchup(String topicName) { } } + /** + * Gets the current high water mark (HWM) for a topic from the contiguous_hwm + * table. + * If no HWM exists for the topic, initializes it with 0. + * + * @param connection Database connection + * @param topicName Topic to get HWM for + * @return Current HWM value + * @throws SQLException If database operation fails + */ private long getCurrentHwm(Connection connection, String topicName) throws SQLException { String sql = "SELECT hwm FROM postevent.contiguous_hwm WHERE topic_name = ?"; @@ -118,6 +157,13 @@ private long getCurrentHwm(Connection connection, String topicName) throws SQLEx } } + /** + * Initializes the HWM for a topic with value 0. + * + * @param connection Database connection + * @param topicName Topic to initialize + * @throws SQLException If database operation fails + */ private void initializeHwm(Connection connection, String topicName) throws SQLException { String sql = "INSERT INTO postevent.contiguous_hwm (topic_name, hwm) VALUES (?, 0) ON CONFLICT DO NOTHING"; @@ -128,6 +174,16 @@ private void initializeHwm(Connection connection, String topicName) throws SQLEx getCurrentHwm(connection, topicName); } + /** + * Finds the next gap end (lowest idn greater than current HWM) for a topic. + * + * @param connection Database connection + * @param currentHwm Current HWM value + * @param topicName Topic to check + * @return The ID of the next message after the gap, or currentHwm if no gap + * exists + * @throws SQLException If database operation fails + */ private long findGapEnd(Connection connection, long currentHwm, String topicName) throws SQLException { String sql = "SELECT MIN(idn) as next_idn FROM postevent.messages WHERE topic=? AND idn > ?"; @@ -146,6 +202,14 @@ private long findGapEnd(Connection connection, long currentHwm, String topicName } } + /** + * Writes a batch of events to the messages table. + * + * @param connection Database connection + * @param events List of events to write + * @return Number of events successfully written + * @throws SQLException If database operation fails + */ private int writeEventsToMessagesTable(Connection connection, List events) throws SQLException { int count = 0; String sql = "INSERT INTO postevent.messages (" + @@ -164,6 +228,16 @@ private int writeEventsToMessagesTable(Connection connection, List events return count; } + /** + * Updates the high water mark for a topic using optimistic locking. + * Only updates if the current HWM matches the expected value. + * + * @param connection Database connection + * @param topicName Topic to update + * @param currentHwm Expected current HWM value + * @param newHwm New HWM value to set + * @throws SQLException If database operation fails + */ private void updateHwm(Connection connection, String topicName, long currentHwm, long newHwm) throws SQLException { String sql = "UPDATE postevent.contiguous_hwm SET hwm = ? WHERE topic_name = ? AND hwm = ?"; @@ -182,6 +256,12 @@ private void updateHwm(Connection connection, String topicName, long currentHwm, } } + /** + * Handles incoming system events, processing them one at a time. + * Supports CatchupRequired and FetchLatest events. + * + * @param message The system event to process + */ @Override public void onMessage(SystemEvent message) { if (Objects.requireNonNull(message) == SystemEvent.CatchupRequired) { @@ -191,6 +271,13 @@ public void onMessage(SystemEvent message) { } } + /** + * Fetches and processes the latest event for a topic. + * If successful, triggers a catchup to process any intermediate events. + * + * @param topicName Topic to fetch latest event for + * @return Number of events processed (0 or 1) + */ private int fetchLatest(String topicName) { if (topicName == null) { LOGGER.warn("Topic name is null for fetch latest"); @@ -259,6 +346,15 @@ private static class GapCheckResult { } } + /** + * Processes a result set to check for gaps in message sequence. + * Updates the last contiguous IDN as it processes messages. + * + * @param rs ResultSet containing message IDNs + * @param currentHwm Current HWM value to start checking from + * @return GapCheckResult containing gap status and last contiguous IDN + * @throws SQLException If database operation fails + */ private GapCheckResult processMessages(ResultSet rs, long currentHwm) throws SQLException { long expectedNext = currentHwm + 1; long lastContiguousIdn = currentHwm; @@ -278,11 +374,11 @@ private GapCheckResult processMessages(ResultSet rs, long currentHwm) throws SQL /** * Checks for gaps in the message sequence and updates the HWM to the last * contiguous message. - * - * @param topicName The name of the topic - * @param currentHwm The current high water mark to start checking from + * + * @param topicName Topic to check + * @param currentHwm Current HWM value * @return true if a gap was found, false if no gaps were found - * @throws SQLException If a database error occurs + * @throws SQLException If database operation fails */ public boolean hasSequenceGap(String topicName, long currentHwm) throws SQLException { @@ -295,6 +391,15 @@ public boolean hasSequenceGap(String topicName, long currentHwm) throws SQLExcep } } + /** + * Internal version of hasSequenceGap that accepts an existing connection. + * + * @param topicName Topic to check + * @param currentHwm Current HWM value + * @param connection Existing database connection to use + * @return true if a gap was found, false if no gaps were found + * @throws SQLException If database operation fails + */ public boolean hasSequenceGap(String topicName, long currentHwm, Connection connection) throws SQLException { LOGGER.info("Checking for sequence gaps after HWM {} for topic {}", new Object[] { currentHwm, topicName }); @@ -320,6 +425,15 @@ public boolean hasSequenceGap(String topicName, long currentHwm, Connection conn } } + /** + * Updates the HWM to the last contiguous message ID for a topic. + * + * @param topicName Topic to update + * @param currentHwm Current HWM value + * @param connection Database connection + * @return true if HWM was updated, false if no update was needed + * @throws SQLException If database operation fails + */ public boolean updateHwmToLastContiguous(String topicName, long currentHwm, Connection connection) throws SQLException { String sql = "SELECT idn FROM postevent.messages WHERE topic = ? AND idn > ? ORDER BY idn"; diff --git a/library/src/main/java/com/p14n/postevent/catchup/OneAtATimeBehaviour.java b/library/src/main/java/com/p14n/postevent/catchup/OneAtATimeBehaviour.java index 9313f7f..fedbd8e 100644 --- a/library/src/main/java/com/p14n/postevent/catchup/OneAtATimeBehaviour.java +++ b/library/src/main/java/com/p14n/postevent/catchup/OneAtATimeBehaviour.java @@ -3,12 +3,65 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +/** + * Interface defining behavior for executing tasks one at a time with signal + * tracking. + * Provides thread-safe execution of tasks while maintaining a count of pending + * signals + * and ensuring only one task runs at any given time. + * + *

+ * This interface is useful for implementing throttling or sequential processing + * patterns where multiple requests need to be processed one at a time, with the + * ability + * to trigger follow-up actions when new signals arrive during processing. + *

+ * + */ public interface OneAtATimeBehaviour { + /** + * Returns the atomic counter tracking the number of signals received. + * This counter is used to track pending tasks or signals that arrive while + * processing is in progress. + * + * @return AtomicInteger representing the number of pending signals + */ public AtomicInteger getSignals(); + /** + * Returns the atomic boolean indicating whether a task is currently running. + * This flag is used to ensure mutual exclusion between tasks. + * + * @return AtomicBoolean representing the running state + */ public AtomicBoolean getRunning(); + /** + * Executes tasks one at a time with signal tracking. + * This method ensures that only one task runs at a time while maintaining + * a count of signals received during execution. + * + *

+ * The method operates as follows: + *

+ *
    + *
  1. Increments the signal counter to track the incoming request
  2. + *
  3. If a task is already running, returns immediately
  4. + *
  5. If no task is running, acquires the lock and: + *
      + *
    • Resets the signal counter
    • + *
    • Executes the main task
    • + *
    • If new signals arrived during execution, triggers the next task
    • + *
    + *
  6. + *
+ * + * @param todo The main task to execute when acquiring the lock + * @param nextTodo The follow-up task to execute if new signals arrive during + * processing + * @throws RuntimeException if either task throws an exception during execution + */ public default void oneAtATime(Runnable todo, Runnable nextTodo) { getSignals().incrementAndGet(); if (getRunning().get()) { @@ -26,5 +79,4 @@ public default void oneAtATime(Runnable todo, Runnable nextTodo) { } } } - } diff --git a/library/src/main/java/com/p14n/postevent/catchup/PersistentBroker.java b/library/src/main/java/com/p14n/postevent/catchup/PersistentBroker.java index 58359cc..00f3a94 100644 --- a/library/src/main/java/com/p14n/postevent/catchup/PersistentBroker.java +++ b/library/src/main/java/com/p14n/postevent/catchup/PersistentBroker.java @@ -13,6 +13,44 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * A message broker implementation that provides persistence for events before + * forwarding them to a target broker. + * This broker ensures reliable event delivery by first persisting events to a + * database and then forwarding them + * to subscribers only after successful persistence. + * + *

+ * Key features: + *

    + *
  • Event persistence before delivery
  • + *
  • High-water mark (HWM) tracking for each topic
  • + *
  • Automatic catchup triggering for missed events
  • + *
  • Transaction management for reliable persistence
  • + *
  • Integration with system event notifications
  • + *
+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * DataSource dataSource = // initialize your datasource
+ * MessageBroker targetBroker = // initialize target broker
+ * SystemEventBroker systemEventBroker = // initialize system event broker
+ * 
+ * PersistentBroker broker = new PersistentBroker<>(
+ *     targetBroker, 
+ *     dataSource, 
+ *     systemEventBroker
+ * );
+ * 
+ * // Use the broker
+ * broker.publish("orders", event);
+ * }
+ * + * @param The type of messages that the target broker handles + */ public class PersistentBroker implements MessageBroker, AutoCloseable, MessageSubscriber { private static final Logger logger = LoggerFactory.getLogger(PersistentBroker.class); private static final String INSERT_SQL = "INSERT INTO postevent.messages (" + SQL.EXT_COLS + @@ -21,9 +59,15 @@ public class PersistentBroker implements MessageBroker, AutoC private final MessageBroker targetBroker; private final DataSource dataSource; - // private final String topicName; private final SystemEventBroker systemEventBroker; + /** + * Creates a new PersistentBroker instance. + * + * @param targetBroker The broker to forward events to after persistence + * @param dataSource The data source for event persistence + * @param systemEventBroker The broker for system event notifications + */ public PersistentBroker(MessageBroker targetBroker, DataSource dataSource, SystemEventBroker systemEventBroker) { @@ -32,6 +76,17 @@ public PersistentBroker(MessageBroker targetBroker, this.systemEventBroker = systemEventBroker; } + /** + * Publishes an event to the specified topic with persistence. + * The event is first persisted to the database, and only after successful + * persistence + * is it forwarded to the target broker. If the high-water mark update fails, + * a catchup event is triggered. + * + * @param topic The topic to publish to + * @param event The event to publish + * @throws RuntimeException if persistence or forwarding fails + */ @Override public void publish(String topic, Event event) { Connection conn = null; @@ -74,26 +129,56 @@ public void publish(String topic, Event event) { } } + /** + * Subscribes to events on the specified topic. + * + * @param topic The topic to subscribe to + * @param subscriber The subscriber that will receive events + * @return true if subscription was successful, false otherwise + */ @Override public boolean subscribe(String topic, MessageSubscriber subscriber) { return targetBroker.subscribe(topic, subscriber); } + /** + * Unsubscribes from events on the specified topic. + * + * @param topic The topic to unsubscribe from + * @param subscriber The subscriber to remove + * @return true if unsubscription was successful, false otherwise + */ @Override public boolean unsubscribe(String topic, MessageSubscriber subscriber) { return targetBroker.unsubscribe(topic, subscriber); } + /** + * Closes the broker and its resources. + */ @Override public void close() { targetBroker.close(); } + /** + * Converts an event message. + * This implementation returns null as conversion is handled by the target + * broker. + * + * @param m The event to convert + * @return null + */ @Override public OutT convert(Event m) { return null; } + /** + * Handles incoming messages by publishing them to the appropriate topic. + * + * @param message The event message to handle + */ @Override public void onMessage(Event message) { logger.atDebug().log("Received event for persistence"); diff --git a/library/src/main/java/com/p14n/postevent/catchup/UnprocessedSubmitter.java b/library/src/main/java/com/p14n/postevent/catchup/UnprocessedSubmitter.java index c5231f0..c70052c 100644 --- a/library/src/main/java/com/p14n/postevent/catchup/UnprocessedSubmitter.java +++ b/library/src/main/java/com/p14n/postevent/catchup/UnprocessedSubmitter.java @@ -13,6 +13,31 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +/** + * Service responsible for finding and submitting unprocessed events for + * processing. + * Integrates with {@link UnprocessedEventFinder} to locate events and triggers + * catchup processing through the {@link SystemEventBroker}. + * + *

+ * The submitter periodically checks for unprocessed events and initiates + * processing for their respective topics. It ensures that no events + * are left unprocessed. + *

+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * DataSource dataSource = // initialize your datasource
+ * SystemEventBroker broker = // initialize your broker
+ * UnprocessedSubmitter submitter = new UnprocessedSubmitter(dataSource, broker);
+ *
+ * // Check for unprocessed events and submit them
+ * submitter.submitUnprocessedEvents();
+ * }
+ */ public class UnprocessedSubmitter implements MessageSubscriber, OneAtATimeBehaviour { private final MessageBroker targetBroker; @@ -23,6 +48,15 @@ public class UnprocessedSubmitter implements MessageSubscriber, One final AtomicInteger signals = new AtomicInteger(0); final AtomicBoolean running = new AtomicBoolean(false); + /** + * Creates a new UnprocessedSubmitter instance. + * + * @param systemEventBroker Broker for publishing system events + * @param ds Data source for database connections + * @param unprocessedEventFinder Finder for unprocessed events + * @param targetBroker Broker for event processing + * @param batchSize Batch size for event processing + */ public UnprocessedSubmitter(SystemEventBroker systemEventBroker, DataSource ds, UnprocessedEventFinder unprocessedEventFinder, MessageBroker targetBroker, int batchSize) { @@ -33,6 +67,14 @@ public UnprocessedSubmitter(SystemEventBroker systemEventBroker, DataSource ds, this.systemEventBroker = systemEventBroker; } + /** + * Finds and submits unprocessed events for catchup processing. + * Queries the database for unprocessed events and triggers catchup + * processing for each unique topic found. + * + * @return Number of topics submitted for processing + * @throws SQLException if a database error occurs + */ private void resubmit() { try (Connection c = ds.getConnection()) { var events = unprocessedEventFinder.findUnprocessedEventsWithLimit(c, batchSize); diff --git a/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcClient.java b/library/src/main/java/com/p14n/postevent/catchup/remote/CatchupGrpcClient.java similarity index 58% rename from library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcClient.java rename to library/src/main/java/com/p14n/postevent/catchup/remote/CatchupGrpcClient.java index adfdf10..6a8472b 100644 --- a/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcClient.java +++ b/library/src/main/java/com/p14n/postevent/catchup/remote/CatchupGrpcClient.java @@ -1,11 +1,11 @@ -package com.p14n.postevent.catchup.grpc; +package com.p14n.postevent.catchup.remote; import com.p14n.postevent.catchup.CatchupServerInterface; +import com.p14n.postevent.catchup.grpc.*; import com.p14n.postevent.data.Event; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; - import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.List; @@ -13,23 +13,79 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * gRPC client implementation of the CatchupServerInterface. + * Provides functionality to fetch missed events and latest message IDs from a + * remote gRPC server. + * Implements AutoCloseable to properly manage the gRPC channel lifecycle. + * + *

+ * Key features: + *

+ *
    + *
  • Remote event fetching via gRPC
  • + *
  • Latest message ID retrieval
  • + *
  • Automatic resource cleanup
  • + *
  • Configurable connection settings
  • + *
+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * try (CatchupGrpcClient client = new CatchupGrpcClient("localhost", 50051)) {
+ *     List events = client.fetchEvents(100L, 200L, 50, "my-topic");
+ *     long latestId = client.getLatestMessageId("my-topic");
+ * }
+ * }
+ */ public class CatchupGrpcClient implements CatchupServerInterface, AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(CatchupGrpcClient.class); private final ManagedChannel channel; private final CatchupServiceGrpc.CatchupServiceBlockingStub blockingStub; + /** + * Creates a new CatchupGrpcClient with the specified host and port. + * Initializes a plain text connection (no TLS) to the gRPC server. + * + * @param host The hostname of the gRPC server + * @param port The port number of the gRPC server + */ public CatchupGrpcClient(String host, int port) { this(ManagedChannelBuilder.forAddress(host, port) .usePlaintext() .build()); } + /** + * Creates a new CatchupGrpcClient with a pre-configured channel. + * Useful for custom channel configuration or testing. + * + * @param channel The pre-configured gRPC managed channel + */ public CatchupGrpcClient(ManagedChannel channel) { this.channel = channel; this.blockingStub = CatchupServiceGrpc.newBlockingStub(channel); } + /** + * {@inheritDoc} + * + *

+ * This implementation makes a blocking gRPC call to fetch events from the + * remote server. + * Events are converted from the gRPC protocol format to the internal Event + * format. + *

+ * + * @throws IllegalArgumentException if startAfter is greater than end, + * maxResults is less than or equal to 0, + * or if topic is null or empty + * @throws RuntimeException if the gRPC call fails or if there's an + * error converting the response + */ @Override public List fetchEvents(long startAfter, long end, int maxResults, String topic) { logger.atInfo() @@ -63,6 +119,14 @@ public List fetchEvents(long startAfter, long end, int maxResults, String return events; } + /** + * Converts a gRPC event message to the internal Event format. + * If the time field is empty in the gRPC event, the current time is used. + * + * @param grpcEvent The gRPC event message to convert + * @param topic The topic associated with the event + * @return A new Event instance with the converted data + */ private Event convertFromGrpcEvent(com.p14n.postevent.catchup.grpc.Event grpcEvent, String topic) { OffsetDateTime time = null; if (!grpcEvent.getTime().isEmpty()) { @@ -85,6 +149,17 @@ private Event convertFromGrpcEvent(com.p14n.postevent.catchup.grpc.Event grpcEve grpcEvent.getTraceparent()); } + /** + * {@inheritDoc} + * + *

+ * This implementation makes a blocking gRPC call to fetch the latest message ID + * from the remote server. + *

+ * + * @throws IllegalArgumentException if topic is null or empty + * @throws RuntimeException if the gRPC call fails + */ @Override public long getLatestMessageId(String topic) { logger.atInfo() @@ -105,11 +180,16 @@ public long getLatestMessageId(String topic) { } } + /** + * Closes the gRPC channel with a 5-second timeout. + * This method should be called when the client is no longer needed to free up + * resources. + */ @Override public void close() { try { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); - } catch (Exception e) { + } catch (InterruptedException e) { logger.error("Failed to close", e); } } diff --git a/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcServer.java b/library/src/main/java/com/p14n/postevent/catchup/remote/CatchupGrpcServer.java similarity index 61% rename from library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcServer.java rename to library/src/main/java/com/p14n/postevent/catchup/remote/CatchupGrpcServer.java index cf66fe6..cb0cacb 100644 --- a/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcServer.java +++ b/library/src/main/java/com/p14n/postevent/catchup/remote/CatchupGrpcServer.java @@ -1,6 +1,7 @@ -package com.p14n.postevent.catchup.grpc; +package com.p14n.postevent.catchup.remote; import com.p14n.postevent.catchup.CatchupServerInterface; +import com.p14n.postevent.catchup.grpc.*; import com.p14n.postevent.data.Event; import io.grpc.Server; import io.grpc.ServerBuilder; @@ -13,12 +14,46 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +/** + * gRPC server implementation for the catchup service. + * Provides a gRPC interface for clients to fetch missed events and get latest + * message IDs. + * + *

+ * Key features: + *

+ *
    + *
  • Event catchup functionality via gRPC
  • + *
  • Latest message ID retrieval
  • + *
  • Graceful shutdown handling
  • + *
  • Automatic resource cleanup
  • + *
+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * CatchupServerInterface catchupServer = new CatchupServer(dataSource);
+ * CatchupGrpcServer server = new CatchupGrpcServer(8080, catchupServer);
+ * server.start();
+ * // ... server is running ...
+ * server.stop();
+ * }
+ */ public class CatchupGrpcServer { private static final Logger logger = LoggerFactory.getLogger(CatchupGrpcServer.class); private final int port; private final Server server; + /** + * Creates a new CatchupGrpcServer instance. + * + * @param port The port number to listen on + * @param catchupServer The backend server implementation for handling catchup + * operations + */ public CatchupGrpcServer(int port, CatchupServerInterface catchupServer) { this.port = port; this.server = ServerBuilder.forPort(port) @@ -26,6 +61,12 @@ public CatchupGrpcServer(int port, CatchupServerInterface catchupServer) { .build(); } + /** + * Starts the gRPC server and registers a shutdown hook. + * The server will listen for incoming connections on the configured port. + * + * @throws IOException If the server fails to start + */ public void start() throws IOException { server.start(); logger.atInfo().log("Server started, listening on port {}", port); @@ -40,25 +81,53 @@ public void start() throws IOException { })); } + /** + * Initiates an orderly shutdown of the server. + * Waits for ongoing requests to complete up to a timeout of 30 seconds. + * + * @throws InterruptedException If the shutdown process is interrupted + */ public void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } } + /** + * Blocks until the server is shutdown. + * Useful for keeping the server running in the main thread. + * + * @throws InterruptedException If the blocking is interrupted + */ public void blockUntilShutdown() throws InterruptedException { if (server != null) { server.awaitTermination(); } } + /** + * Implementation of the gRPC catchup service. + * Handles incoming gRPC requests by delegating to the backend + * CatchupServerInterface. + */ public static class CatchupServiceImpl extends CatchupServiceGrpc.CatchupServiceImplBase { private final CatchupServerInterface catchupServer; + /** + * Creates a new CatchupServiceImpl instance. + * + * @param catchupServer The backend server interface for catchup operations + */ public CatchupServiceImpl(CatchupServerInterface catchupServer) { this.catchupServer = catchupServer; } + /** + * Handles gRPC requests for getting the latest message ID for a topic. + * + * @param request The request containing the topic name + * @param responseObserver The observer for sending the response + */ @Override public void getLatestMessageId(TopicRequest request, StreamObserver responseObserver) { try { @@ -76,6 +145,12 @@ public void getLatestMessageId(TopicRequest request, StreamObserver responseObserver) { try { @@ -101,6 +176,12 @@ public void fetchEvents(FetchEventsRequest request, StreamObserver + * This class holds all necessary configuration parameters for connecting + * to the database and processing events. + *

+ * + * @param affinity The affinity identifier for this instance + * @param topics Set of topics to handle + * @param dbHost Database host address + * @param dbPort Database port number + * @param dbUser Database username + * @param dbPassword Database password + * @param dbName Database name + * @param pollInterval Poll interval in milliseconds + * @param overrideProps Additional database properties for overriding debezium + * defaults + */ public record ConfigData(String affinity, Set topics, // renamed from name String dbHost, @@ -12,23 +32,48 @@ public record ConfigData(String affinity, String dbName, int pollInterval, Properties overrideProps) implements PostEventConfig { + + /** + * Creates a new ConfigData instance with the specified parameters. + * + * @param affinity The affinity identifier for this instance + * @param topics Set of topics to handle + * @param dbHost Database host address + * @param dbPort Database port number + * @param dbUser Database username + * @param dbPassword Database password + * @param dbName Database name + * @param pollInterval Poll interval in milliseconds + */ public ConfigData(String affinity, - Set topics, // renamed from name - String dbHost, - int dbPort, - String dbUser, - String dbPassword, - String dbName, - int pollInterval) { - this(affinity, topics, dbHost, dbPort, dbUser, dbPassword, dbName, pollInterval,null); + Set topics, + String dbHost, + int dbPort, + String dbUser, + String dbPassword, + String dbName, + int pollInterval) { + this(affinity, topics, dbHost, dbPort, dbUser, dbPassword, dbName, pollInterval, null); } + + /** + * Creates a new ConfigData instance with the specified parameters. + * + * @param affinity The affinity identifier for this instance + * @param topics Set of topics to handle + * @param dbHost Database host address + * @param dbPort Database port number + * @param dbUser Database username + * @param dbPassword Database password + * @param dbName Database name + */ public ConfigData(String affinity, - Set topics, // renamed from name - String dbHost, - int dbPort, - String dbUser, - String dbPassword, - String dbName) { - this(affinity, topics, dbHost, dbPort, dbUser, dbPassword, dbName, 500,null); + Set topics, // renamed from name + String dbHost, + int dbPort, + String dbUser, + String dbPassword, + String dbName) { + this(affinity, topics, dbHost, dbPort, dbUser, dbPassword, dbName, 500, null); } } diff --git a/library/src/main/java/com/p14n/postevent/data/Event.java b/library/src/main/java/com/p14n/postevent/data/Event.java index 415602b..89c00e8 100644 --- a/library/src/main/java/com/p14n/postevent/data/Event.java +++ b/library/src/main/java/com/p14n/postevent/data/Event.java @@ -4,22 +4,74 @@ /** * Record representing an event to be published to the database. + * This immutable class encapsulates all the information needed for event + * processing + * and persistence. + * + *

+ * An event consists of: + *

+ *
    + *
  • Mandatory fields: id, source, type
  • + *
  • Optional metadata: datacontenttype, dataschema, subject, traceparent
  • + *
  • Payload: data as byte array
  • + *
  • System fields: time, idn (sequence number), topic
  • + *
+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * Event event = Event.create(
+ *         "event-123",
+ *         "order-service",
+ *         "order.created",
+ *         "application/json",
+ *         "order-schema-v1",
+ *         "orders",
+ *         jsonData.getBytes(),
+ *         null);
+ * }
+ * + * @param id Unique identifier for the event + * @param source System that generated the event + * @param type Event type identifier + * @param datacontenttype MIME type of the data payload + * @param dataschema Schema identifier for the data payload + * @param subject Subject or category of the event + * @param data Event payload as byte array + * @param time Timestamp when the event occurred + * @param idn Sequential identifier number + * @param topic Topic or stream identifier + * @param traceparent OpenTelemetry trace parent identifier */ -public record Event(String id, - String source, - String type, - String datacontenttype, - String dataschema, - String subject, - byte[] data, - Instant time, - Long idn, - String topic, - String traceparent) implements Traceable { +public record Event( + String id, + String source, + String type, + String datacontenttype, + String dataschema, + String subject, + byte[] data, + Instant time, + Long idn, + String topic, + String traceparent) implements Traceable { /** * Creates a new Event instance with validation of required fields. + * This is a convenience method that sets time, idn, and topic to null. * + * @param id Unique identifier for the event + * @param source System that generated the event + * @param type Event type identifier + * @param datacontenttype MIME type of the data payload + * @param dataschema Schema identifier for the data payload + * @param subject Subject or category of the event + * @param data Event payload as byte array + * @param traceparent OpenTelemetry trace parent identifier + * @return A new Event instance * @throws IllegalArgumentException if any required field is null or empty */ public static Event create(String id, String source, String type, String datacontenttype, String dataschema, @@ -27,6 +79,24 @@ public static Event create(String id, String source, String type, String datacon return create(id, source, type, datacontenttype, dataschema, subject, data, null, null, null, traceparent); } + /** + * Creates a new Event instance with validation of required fields and all + * optional fields. + * + * @param id Unique identifier for the event + * @param source System that generated the event + * @param type Event type identifier + * @param datacontenttype MIME type of the data payload + * @param dataschema Schema identifier for the data payload + * @param subject Subject or category of the event + * @param data Event payload as byte array + * @param time Timestamp when the event occurred + * @param idn Sequential identifier number + * @param topic Topic or stream identifier + * @param traceparent OpenTelemetry trace parent identifier + * @return A new Event instance + * @throws IllegalArgumentException if id, source, or type is null or empty + */ public static Event create(String id, String source, String type, String datacontenttype, String dataschema, String subject, byte[] data, Instant time, Long idn, String topic, String traceparent) { if (id == null || id.trim().isEmpty()) { diff --git a/library/src/main/java/com/p14n/postevent/data/PostEventConfig.java b/library/src/main/java/com/p14n/postevent/data/PostEventConfig.java index d94b898..93857a5 100644 --- a/library/src/main/java/com/p14n/postevent/data/PostEventConfig.java +++ b/library/src/main/java/com/p14n/postevent/data/PostEventConfig.java @@ -3,30 +3,93 @@ import java.util.Properties; import java.util.Set; +/** + * Configuration interface for PostEvent system settings. + * Defines the essential configuration parameters needed for event processing + * and database connectivity. + */ public interface PostEventConfig { - public String affinity(); + /** + * Gets the affinity identifier for this instance. + * The affinity is used to identify related event processors. + * + * @return The affinity string identifier + */ + String affinity(); - public Set topics(); // Changed from single topic to set + /** + * Gets the set of topics this instance will handle. + * + * @return Set of topic names + */ + Set topics(); - public String dbHost(); + /** + * Gets the database host address. + * + * @return The database host address + */ + String dbHost(); - public int dbPort(); + /** + * Gets the database port number. + * + * @return The database port number + */ + int dbPort(); - public String dbUser(); + /** + * Gets the database username. + * + * @return The database username + */ + String dbUser(); - public String dbPassword(); + /** + * Gets the database password. + * + * @return The database password + */ + String dbPassword(); - public String dbName(); + /** + * Gets the database name. + * + * @return The database name + */ + String dbName(); - public Properties overrideProps(); + /** + * Gets additional database properties for overriding defaults. + * + * @return Properties object containing override values + */ + Properties overrideProps(); - public default int startupTimeoutSeconds() { + /** + * Gets the startup timeout in seconds. + * Default is 30 seconds. + * + * @return The startup timeout in seconds + */ + default int startupTimeoutSeconds() { return 30; } - public default String jdbcUrl() { + /** + * Constructs the JDBC URL for database connection. + * + * @return The complete JDBC URL string + */ + default String jdbcUrl() { return String.format("jdbc:postgresql://%s:%d/%s", dbHost(), dbPort(), dbName()); } - public int pollInterval(); + + /** + * Gets the poll interval for checking new events. + * + * @return The poll interval in milliseconds + */ + int pollInterval(); } diff --git a/library/src/main/java/com/p14n/postevent/data/Traceable.java b/library/src/main/java/com/p14n/postevent/data/Traceable.java index 8a0f0e6..c5d893d 100644 --- a/library/src/main/java/com/p14n/postevent/data/Traceable.java +++ b/library/src/main/java/com/p14n/postevent/data/Traceable.java @@ -1,11 +1,54 @@ package com.p14n.postevent.data; +/** + * Interface for objects that can be traced and identified in a distributed + * system. + * Provides essential methods for tracking and correlating events across + * services. + * + *

+ * Key attributes: + *

+ *
    + *
  • {@code id}: Unique identifier for the traceable object
  • + *
  • {@code topic}: Message routing or categorization identifier
  • + *
  • {@code subject}: Business domain or entity identifier
  • + *
  • {@code traceparent}: OpenTelemetry trace context identifier
  • + *
+ * + *

+ * This interface is typically implemented by event and message classes to + * support + * distributed tracing and message correlation. + *

+ */ public interface Traceable { - public String id(); - public String topic(); + /** + * Returns the unique identifier of the traceable object. + * + * @return the unique identifier string + */ + String id(); - public String subject(); + /** + * Returns the topic or stream identifier for message routing. + * + * @return the topic string + */ + String topic(); - public String traceparent(); + /** + * Returns the business domain subject or entity identifier. + * + * @return the subject string + */ + String subject(); + + /** + * Returns the OpenTelemetry trace parent identifier for distributed tracing. + * + * @return the trace parent string + */ + String traceparent(); } diff --git a/library/src/main/java/com/p14n/postevent/data/UnprocessedEventFinder.java b/library/src/main/java/com/p14n/postevent/data/UnprocessedEventFinder.java index 61304ec..697fc69 100644 --- a/library/src/main/java/com/p14n/postevent/data/UnprocessedEventFinder.java +++ b/library/src/main/java/com/p14n/postevent/data/UnprocessedEventFinder.java @@ -11,18 +11,46 @@ import org.slf4j.LoggerFactory; /** - * Responsible for finding all unprocessed events in the messages table. - * Unprocessed events have a status of 'u'. + * Responsible for finding unprocessed events in the messages table. + * Provides methods to query and retrieve events with a status of 'u' + * (unprocessed) + * using various filtering criteria. + * + *

+ * The finder uses prepared statements for secure SQL execution and includes + * logging for operational visibility. All database operations require an + * external + * {@link Connection} to be provided. + *

+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * UnprocessedEventFinder finder = new UnprocessedEventFinder();
+ * try (Connection conn = dataSource.getConnection()) {
+ *     List events = finder.findUnprocessedEvents(conn);
+ *     // Process the events...
+ * }
+ * }
*/ public class UnprocessedEventFinder { private static final Logger logger = LoggerFactory.getLogger(UnprocessedEventFinder.class); + /** + * Creates a new UnprocessedEventFinder instance. + */ + public UnprocessedEventFinder() { + } + /** * Finds all unprocessed events in the messages table. + * Events are ordered by their sequence number (idn) in ascending order. * * @param connection Database connection to use - * @return List of unprocessed events ordered by creation time (ascending) + * @return List of unprocessed events * @throws SQLException if a database error occurs */ public List findUnprocessedEvents(Connection connection) throws SQLException { @@ -44,11 +72,11 @@ public List findUnprocessedEvents(Connection connection) throws SQLExcept /** * Finds unprocessed events for a specific subject. + * Events are ordered by creation time in ascending order. * * @param connection Database connection to use * @param subject Subject to filter by - * @return List of unprocessed events for the subject ordered by creation time - * (ascending) + * @return List of unprocessed events for the subject * @throws SQLException if a database error occurs */ public List findUnprocessedEventsBySubject(Connection connection, String subject) throws SQLException { @@ -72,15 +100,16 @@ public List findUnprocessedEventsBySubject(Connection connection, String /** * Finds a limited number of unprocessed events. + * Events are ordered by creation time in ascending order. * * @param connection Database connection to use * @param limit Maximum number of events to return - * @return List of unprocessed events ordered by creation time (ascending), - * limited to the specified count - * @throws SQLException if a database error occurs + * @return List of unprocessed events, limited to the specified count + * @throws SQLException if a database error occurs + * @throws IllegalArgumentException if limit is less than or equal to 0 */ public List findUnprocessedEventsWithLimit(Connection connection, int limit) throws SQLException { - logger.atInfo().log("Finding up to " + limit + " unprocessed events"); + logger.atInfo().log("Finding up to {} unprocessed events", limit); String sql = "SELECT id, source, type, datacontenttype, dataschema, subject, data, " + "time, idn, topic, traceparent FROM postevent.messages " + @@ -99,7 +128,6 @@ public List findUnprocessedEventsWithLimit(Connection connection, int lim } } - /** * Processes a result set and converts each row to an Event object. * @@ -167,11 +195,11 @@ public int countUnprocessedEvents(Connection connection) throws SQLException { if (rs.next()) { int count = rs.getInt(1); - logger.atInfo().log("Found " + count + " unprocessed events"); + logger.atInfo().log("Found {} unprocessed events", count); return count; } return 0; } } -} \ No newline at end of file +} diff --git a/library/src/main/java/com/p14n/postevent/db/DatabaseSetup.java b/library/src/main/java/com/p14n/postevent/db/DatabaseSetup.java index cf49bd6..d74e662 100644 --- a/library/src/main/java/com/p14n/postevent/db/DatabaseSetup.java +++ b/library/src/main/java/com/p14n/postevent/db/DatabaseSetup.java @@ -15,25 +15,75 @@ import java.util.ArrayList; import java.util.Set; +/** + * Handles PostgreSQL database setup and initialization for the PostEvent + * system. + * This class manages schema creation, table initialization, and replication + * slot cleanup. + * + *

+ * Key responsibilities include: + *

    + *
  • Creating the PostEvent schema
  • + *
  • Initializing topic-specific tables
  • + *
  • Setting up message tracking tables
  • + *
  • Managing replication slots
  • + *
  • Providing connection pool setup
  • + *
+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * PostEventConfig config = // initialize configuration
+ * DatabaseSetup setup = new DatabaseSetup(config);
+ * 
+ * // Setup all required tables for given topics
+ * setup.setupAll(Set.of("orders", "inventory"));
+ * 
+ * // Create connection pool
+ * DataSource pool = DatabaseSetup.createPool(config);
+ * }
+ */ public class DatabaseSetup { - // SELECT 'select pg_drop_replication_slot(''' || slot_name ||''');' FROM - // pg_replication_slots where active=false and slot_name like 'postevent%'; private static final Logger logger = LoggerFactory.getLogger(DatabaseSetup.class); private final String jdbcUrl; private final String username; private final String password; + /** + * Creates a new DatabaseSetup instance using configuration from + * PostEventConfig. + * + * @param cfg Configuration containing database connection details + */ public DatabaseSetup(PostEventConfig cfg) { this(cfg.jdbcUrl(), cfg.dbUser(), cfg.dbPassword()); } + /** + * Creates a new DatabaseSetup instance with explicit connection parameters. + * + * @param jdbcUrl PostgreSQL JDBC URL + * @param username Database username + * @param password Database password + */ public DatabaseSetup(String jdbcUrl, String username, String password) { this.jdbcUrl = jdbcUrl; this.username = username; this.password = password; } + /** + * Performs complete database setup for multiple topics. + * Creates schema, message tables, and topic-specific tables. + * + * @param topics Set of topic names to initialize + * @return this instance for method chaining + * @throws RuntimeException if database operations fail + */ public DatabaseSetup setupAll(Set topics) { createSchemaIfNotExists(); createMessagesTableIfNotExists(); @@ -43,10 +93,23 @@ public DatabaseSetup setupAll(Set topics) { return this; } + /** + * Performs complete database setup for a single topic. + * + * @param topic Topic name to initialize + * @return this instance for method chaining + * @throws RuntimeException if database operations fail + */ public DatabaseSetup setupAll(String topic) { return setupAll(Set.of(topic)); } + /** + * Removes inactive replication slots with names starting with 'postevent'. + * This cleanup prevents accumulation of unused slots. + * + * @return this instance for method chaining + */ public DatabaseSetup clearOldSlots() { try (Connection conn = getConnection(); Statement stmt = conn.createStatement()) { @@ -73,6 +136,12 @@ public DatabaseSetup clearOldSlots() { return this; } + /** + * Creates the PostEvent schema if it doesn't exist. + * + * @return this instance for method chaining + * @throws RuntimeException if schema creation fails + */ public DatabaseSetup createSchemaIfNotExists() { try (Connection conn = getConnection(); Statement stmt = conn.createStatement()) { @@ -88,6 +157,15 @@ public DatabaseSetup createSchemaIfNotExists() { return this; } + /** + * Creates a topic-specific table if it doesn't exist. + * The table stores event data with unique constraints on ID and source. + * + * @param topic Name of the topic table to create + * @return this instance for method chaining + * @throws IllegalArgumentException if topic name is invalid + * @throws RuntimeException if table creation fails + */ public DatabaseSetup createTableIfNotExists(String topic) { if (topic == null || topic.trim().isEmpty()) { throw new IllegalArgumentException("Topic name cannot be null or empty"); @@ -124,6 +202,14 @@ traceparent VARCHAR(55), return this; } + /** + * Creates the messages table if it doesn't exist. + * This table stores all events with their processing status and supports + * efficient querying. + * + * @return this instance for method chaining + * @throws RuntimeException if table creation fails + */ public DatabaseSetup createMessagesTableIfNotExists() { try (Connection conn = getConnection(); Statement stmt = conn.createStatement()) { @@ -147,12 +233,12 @@ PRIMARY KEY (topic,idn) stmt.execute(sql); - // Existing index for hasUnprocessedPriorEvents + // Index for hasUnprocessedPriorEvents query stmt.execute(""" CREATE INDEX IF NOT EXISTS idx_messages_subject_topic_idn_status ON postevent.messages (subject, topic, idn, status)"""); - // New index for findUnprocessedEvents query + // Index for findUnprocessedEvents query stmt.execute(""" CREATE INDEX IF NOT EXISTS idx_messages_status_time ON postevent.messages (status, time)"""); @@ -166,6 +252,14 @@ PRIMARY KEY (topic,idn) return this; } + /** + * Creates the contiguous high-water mark table if it doesn't exist. + * This table tracks the latest continuously processed message ID for each + * topic. + * + * @return this instance for method chaining + * @throws RuntimeException if table creation fails + */ public DatabaseSetup createContiguousHwmTableIfNotExists() { try (Connection conn = getConnection(); Statement stmt = conn.createStatement()) { @@ -186,10 +280,22 @@ topic_name VARCHAR(255) PRIMARY KEY, return this; } + /** + * Creates a database connection using the configured credentials. + * + * @return A new database Connection + * @throws SQLException if connection fails + */ private Connection getConnection() throws SQLException { return DriverManager.getConnection(jdbcUrl, username, password); } + /** + * Creates and configures a connection pool using HikariCP. + * + * @param cfg Configuration containing database connection details + * @return Configured DataSource + */ public static DataSource createPool(PostEventConfig cfg) { HikariDataSource ds = new HikariDataSource(); ds.setJdbcUrl(cfg.jdbcUrl()); diff --git a/library/src/main/java/com/p14n/postevent/db/SQL.java b/library/src/main/java/com/p14n/postevent/db/SQL.java index c55a401..cc8d7f9 100644 --- a/library/src/main/java/com/p14n/postevent/db/SQL.java +++ b/library/src/main/java/com/p14n/postevent/db/SQL.java @@ -1,16 +1,65 @@ package com.p14n.postevent.db; import com.p14n.postevent.data.Event; - import java.sql.*; +/** + * Utility class providing SQL-related constants and helper methods for database + * operations. + * This class handles common database operations such as mapping result sets to + * events, + * setting prepared statement parameters, and managing database resources. + * + *

+ * The class defines two sets of columns: + *

+ *
    + *
  • Core columns: Basic event properties (id, source, type, etc.)
  • + *
  • Extended columns: Core columns plus additional fields (time, idn, + * topic)
  • + *
+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * // Using core columns in a SELECT statement
+ * String query = "SELECT " + SQL.CORE_COLS + " FROM events";
+ *
+ * // Setting event data on a prepared statement
+ * PreparedStatement stmt = connection.prepareStatement(
+ *         "INSERT INTO events (" + SQL.CORE_COLS + ") VALUES (" + SQL.CORE_PH + ")");
+ * SQL.setEventOnStatement(stmt, event);
+ * }
+ */ public class SQL { + /** Private constructor to prevent instantiation of utility class */ + private SQL() { + } + + /** Column names for core event properties */ public static String CORE_COLS = "id, source, type, datacontenttype, dataschema, subject, data, traceparent"; + + /** Column names including both core and extended properties */ public static String EXT_COLS = CORE_COLS + ", time, idn, topic"; + + /** Placeholder parameters for core columns in prepared statements */ public static String CORE_PH = "?,?,?,?,?,?,?,?"; + + /** Placeholder parameters for extended columns in prepared statements */ public static String EXT_PH = CORE_PH + ",?,?,?"; + /** + * Creates an Event object from a ResultSet row. + * Maps database columns to Event properties using the current row data. + * + * @param rs ResultSet positioned at the row to map + * @param topic Topic name for the event (if not present in ResultSet) + * @return New Event instance populated with ResultSet data + * @throws SQLException if any database access error occurs + */ public static Event eventFromResultSet(ResultSet rs, String topic) throws SQLException { return Event.create( rs.getString("id"), @@ -26,6 +75,14 @@ public static Event eventFromResultSet(ResultSet rs, String topic) throws SQLExc rs.getString("traceparent")); } + /** + * Sets core event properties on a PreparedStatement. + * Maps Event properties to the first 8 parameters of the statement. + * + * @param stmt PreparedStatement to set parameters on + * @param event Event containing the data to set + * @throws SQLException if any database access error occurs + */ public static void setEventOnStatement(PreparedStatement stmt, Event event) throws SQLException { stmt.setString(1, event.id()); stmt.setString(2, event.source()); @@ -37,12 +94,26 @@ public static void setEventOnStatement(PreparedStatement stmt, Event event) thro stmt.setString(8, event.traceparent()); } + /** + * Sets extended event properties (time, IDN, and topic) on a PreparedStatement. + * Maps these properties to parameters 9-11 of the statement. + * + * @param stmt PreparedStatement to set parameters on + * @param event Event containing the data to set + * @throws SQLException if any database access error occurs + */ public static void setTimeIDNAndTopic(PreparedStatement stmt, Event event) throws SQLException { stmt.setTimestamp(9, Timestamp.from(event.time())); stmt.setLong(10, event.idn()); stmt.setString(11, event.topic()); } + /** + * Safely closes a database connection. + * Handles null connections and suppresses any exceptions during closure. + * + * @param conn Connection to close (may be null) + */ public static void closeConnection(Connection conn) { if (conn != null) { try { @@ -55,6 +126,13 @@ public static void closeConnection(Connection conn) { } } + /** + * Handles SQLException by attempting to rollback the transaction. + * If rollback fails, the rollback exception is added as a suppressed exception. + * + * @param e Original SQLException that triggered the rollback + * @param conn Connection to rollback (may be null) + */ public static void handleSQLException(SQLException e, Connection conn) { if (conn != null) { try { diff --git a/library/src/main/java/com/p14n/postevent/debezium/DebeziumServer.java b/library/src/main/java/com/p14n/postevent/debezium/DebeziumServer.java index 9a839a6..b3fc625 100644 --- a/library/src/main/java/com/p14n/postevent/debezium/DebeziumServer.java +++ b/library/src/main/java/com/p14n/postevent/debezium/DebeziumServer.java @@ -19,9 +19,69 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Server implementation for Debezium Change Data Capture (CDC). + * Manages a Debezium engine instance to capture database changes and forward + * them to a consumer. + * Supports PostgreSQL CDC using the pgoutput plugin with configurable topics + * and affinity. + * + *

+ * The server uses JDBC offset storage for tracking progress and supports + * filtered publication mode. + * It automatically manages replication slots and cleanup on shutdown. + *

+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * PostEventConfig config = // initialize configuration
+ * DebeziumServer server = new DebeziumServer();
+ *
+ * // Start the server with a consumer
+ * server.start(config, event -> {
+ *     // Process the change event
+ *     System.out.println("Received change: " + event);
+ * });
+ *
+ * // Shutdown when done
+ * server.stop();
+ * }
+ */ public class DebeziumServer { private static final Logger logger = LoggerFactory.getLogger(DebeziumServer.class); + /** + * Constructs a new DebeziumServer instance. + */ + public DebeziumServer() { + } + + /** + * Creates Debezium configuration properties for PostgreSQL CDC. + * + *

+ * Configures: + *

+ *
    + *
  • PostgreSQL connector with pgoutput plugin
  • + *
  • JDBC offset storage with affinity-based partitioning
  • + *
  • Filtered publication mode for specified topics
  • + *
  • Automatic replication slot management
  • + *
+ * + * @param affinity Identifier for this instance, used for offset tracking + * @param topics Set of topics to monitor for changes + * @param dbHost Database host address + * @param dbPort Database port + * @param dbUser Database username + * @param dbPassword Database password + * @param dbName Database name + * @param pollInterval Interval in milliseconds between polls + * @return Properties configured for Debezium PostgreSQL connector + */ public static Properties props( String affinity, Set topics, @@ -47,7 +107,7 @@ public static Properties props( props.setProperty("offset.storage.jdbc.user", dbUser); props.setProperty("offset.storage.jdbc.password", dbPassword); props.setProperty("offset.flush.interval.ms", "1000"); - props.setProperty("poll.interval.ms",String.valueOf(pollInterval)); + props.setProperty("poll.interval.ms", String.valueOf(pollInterval)); props.setProperty("database.hostname", dbHost); props.setProperty("plugin.name", "pgoutput"); props.setProperty("database.port", dbPort); @@ -81,6 +141,24 @@ public static Properties props( private ExecutorService executor; private DebeziumEngine> engine; + /** + * Starts the Debezium engine with the specified configuration and consumer. + * + *

+ * Initializes a single-threaded executor and configures the Debezium engine to: + *

    + *
  • Use JSON format for change events
  • + *
  • Monitor configured topics for changes
  • + *
  • Forward events to the provided consumer
  • + *
+ * + * @param cfg Configuration for the Debezium engine + * @param consumer Consumer to process change events + * @throws IllegalStateException if the consumer is null, config is null, or + * startup timeout is exceeded + * @throws IOException if engine initialization fails + * @throws InterruptedException if startup is interrupted + */ public void start(PostEventConfig cfg, Consumer> consumer) throws IOException, InterruptedException { if (consumer == null) { @@ -121,6 +199,12 @@ public void taskStarted() { logger.atInfo().log("Debezium engine started successfully"); } + /** + * Stops the Debezium engine and executor service. + * Ensures clean shutdown of resources with a 5-second timeout. + * + * @throws IOException if engine shutdown fails + */ public void stop() throws IOException { if (executor != null) { executor.shutdown(); @@ -139,7 +223,5 @@ public void stop() throws IOException { logger.error("Shutdown interrupted", e); } } - } - } diff --git a/library/src/main/java/com/p14n/postevent/debezium/Functions.java b/library/src/main/java/com/p14n/postevent/debezium/Functions.java index 24404cb..d9b75ec 100644 --- a/library/src/main/java/com/p14n/postevent/debezium/Functions.java +++ b/library/src/main/java/com/p14n/postevent/debezium/Functions.java @@ -8,15 +8,70 @@ import java.io.IOException; import java.time.Instant; +/** + * Utility class providing functions for converting Debezium change events into + * application Events. + * Handles the parsing and transformation of CDC (Change Data Capture) events + * from + * Debezium into the application's event model. + * + *

+ * The class expects Debezium events in JSON format with a specific structure + * containing: + *

    + *
  • payload.after - The new state of the record
  • + *
  • payload.source.table - The source table name
  • + *
  • Various event metadata fields (id, source, type, etc.)
  • + *
+ */ public class Functions { + + /** Private constructor to prevent instantiation of utility class */ + private Functions() { + } + + /** JSON ObjectMapper instance for parsing Debezium event payloads */ private final static ObjectMapper mapper = new ObjectMapper(); + /** + * Safely extracts text from a JsonNode, returning null if the node is null. + * + * @param j The JsonNode to extract text from + * @return The text value of the node, or null if the node is null + */ private static String safeText(JsonNode j) { - if(j != null){ + if (j != null) { return j.asText(); } return null; } + + /** + * Converts a Debezium ChangeEvent into an application Event. + * Parses the JSON payload and extracts relevant fields to construct a new Event + * instance. + * + *

+ * The method expects the following fields in the change event payload: + *

    + *
  • id - Event identifier
  • + *
  • source - Event source
  • + *
  • type - Event type
  • + *
  • datacontenttype - Content type of the data
  • + *
  • dataschema - Schema of the data
  • + *
  • subject - Event subject
  • + *
  • data - Binary event data
  • + *
  • time - Event timestamp
  • + *
  • idn - Sequential identifier
  • + *
  • traceparent - OpenTelemetry trace parent
  • + *
+ * + * @param record The Debezium ChangeEvent to convert + * @return A new Event instance, or null if the record doesn't contain required + * fields + * @throws IOException If there's an error parsing the JSON or extracting binary + * data + */ public static Event changeEventToEvent(ChangeEvent record) throws IOException { var actualObj = mapper.readTree(record.value()); var payload = actualObj.get("payload"); diff --git a/library/src/main/java/com/p14n/postevent/processor/OrderedProcessor.java b/library/src/main/java/com/p14n/postevent/processor/OrderedProcessor.java index 5fe8fa0..362f1c4 100644 --- a/library/src/main/java/com/p14n/postevent/processor/OrderedProcessor.java +++ b/library/src/main/java/com/p14n/postevent/processor/OrderedProcessor.java @@ -26,6 +26,7 @@ public class OrderedProcessor { /** * Creates a new OrderedProcessor with the specified processing function. * + * @param systemEventBroker The broker for system events * @param processorFunction Function that processes an event with a database * connection * and returns true if processing was successful diff --git a/library/src/main/java/com/p14n/postevent/telemetry/BrokerMetrics.java b/library/src/main/java/com/p14n/postevent/telemetry/BrokerMetrics.java index 89ddfc6..54b1af4 100644 --- a/library/src/main/java/com/p14n/postevent/telemetry/BrokerMetrics.java +++ b/library/src/main/java/com/p14n/postevent/telemetry/BrokerMetrics.java @@ -6,11 +6,33 @@ import io.opentelemetry.api.metrics.LongUpDownCounter; import io.opentelemetry.api.metrics.Meter; +/** + * Manages OpenTelemetry metrics for message broker operations. + * Tracks the number of published messages, received messages, and active + * subscribers + * across different topics. + * + *

+ * This class provides three main metrics: + *

+ *
    + *
  • messages_published: Counter for total messages published per topic
  • + *
  • messages_received: Counter for total messages received by subscribers per + * topic
  • + *
  • active_subscribers: Up/down counter for current number of subscribers per + * topic
  • + *
+ */ public class BrokerMetrics { private final LongCounter publishedMessages; private final LongCounter receivedMessages; private final LongUpDownCounter activeSubscribers; + /** + * Creates a new BrokerMetrics instance with the provided OpenTelemetry meter. + * + * @param meter OpenTelemetry meter used to create the metric instruments + */ public BrokerMetrics(Meter meter) { publishedMessages = meter.counterBuilder("messages_published") .setDescription("Number of messages published") @@ -25,23 +47,47 @@ public BrokerMetrics(Meter meter) { .build(); } + /** + * Records a message publication event for the specified topic. + * Increments the published messages counter with the topic attribute. + * + * @param topic The topic the message was published to + */ public void recordPublished(String topic) { publishedMessages.add(1, Attributes.of( AttributeKey.stringKey("topic"), topic)); } + /** + * Records a message reception event for the specified topic. + * Increments the received messages counter with the topic attribute. + * + * @param topic The topic the message was received from + */ public void recordReceived(String topic) { receivedMessages.add(1, Attributes.of( AttributeKey.stringKey("topic"), topic)); } + /** + * Records the addition of a subscriber for the specified topic. + * Increments the active subscribers counter with the topic attribute. + * + * @param topic The topic the subscriber was added to + */ public void recordSubscriberAdded(String topic) { activeSubscribers.add(1, Attributes.of( AttributeKey.stringKey("topic"), topic)); } + /** + * Records the removal of a subscriber for the specified topic. + * Decrements the active subscribers counter with the topic attribute. + * + * @param topic The topic the subscriber was removed from + */ public void recordSubscriberRemoved(String topic) { activeSubscribers.add(-1, Attributes.of( AttributeKey.stringKey("topic"), topic)); } -} \ No newline at end of file +} diff --git a/library/src/main/java/com/p14n/postevent/telemetry/MapTextMapGetter.java b/library/src/main/java/com/p14n/postevent/telemetry/MapTextMapGetter.java index 6ad92fa..1084f48 100644 --- a/library/src/main/java/com/p14n/postevent/telemetry/MapTextMapGetter.java +++ b/library/src/main/java/com/p14n/postevent/telemetry/MapTextMapGetter.java @@ -4,15 +4,53 @@ import java.util.Map; +/** + * Implementation of OpenTelemetry's TextMapGetter interface for extracting + * context from a Map. + * This class enables the extraction of trace context information from a + * String-to-String Map carrier, + * which is commonly used in distributed tracing scenarios. + * + *

+ * Example usage: + *

+ * + *
{@code
+ * Map carrier = new HashMap<>();
+ * carrier.put("traceparent", "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01");
+ * 
+ * MapTextMapGetter getter = new MapTextMapGetter();
+ * String traceParent = getter.get(carrier, "traceparent");
+ * }
+ */ public class MapTextMapGetter implements TextMapGetter> { + + /** Private constructor to prevent instantiation of utility class */ + public MapTextMapGetter() { + } + + /** + * Retrieves a value from the carrier Map using the specified key. + * + * @param carrier The Map containing the context information + * @param key The key whose value should be retrieved + * @return The value associated with the key, or null if not present + * @throws AssertionError if the carrier is null + */ @Override public String get(Map carrier, String key) { assert carrier != null; return carrier.get(key); } + /** + * Returns all keys from the carrier Map. + * + * @param carrier The Map containing the context information + * @return An Iterable of all keys in the carrier + */ @Override public Iterable keys(Map carrier) { return carrier.keySet(); } -} \ No newline at end of file +} diff --git a/library/src/main/java/com/p14n/postevent/telemetry/OpenTelemetryFunctions.java b/library/src/main/java/com/p14n/postevent/telemetry/OpenTelemetryFunctions.java index f6a18ce..380debe 100644 --- a/library/src/main/java/com/p14n/postevent/telemetry/OpenTelemetryFunctions.java +++ b/library/src/main/java/com/p14n/postevent/telemetry/OpenTelemetryFunctions.java @@ -11,18 +11,51 @@ import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.context.propagation.TextMapGetter; import io.opentelemetry.context.propagation.TextMapSetter; import io.opentelemetry.api.OpenTelemetry; +/** + * Utility class providing OpenTelemetry instrumentation functions for + * distributed tracing. + * Supports trace context propagation, span creation, and execution with + * telemetry. + * + *

+ * Key features: + *

+ *
    + *
  • Trace context serialization and deserialization
  • + *
  • Automated span management with error handling
  • + *
  • Support for traceable events with metadata
  • + *
+ */ public class OpenTelemetryFunctions { + + /** Private constructor to prevent instantiation of utility class */ + private OpenTelemetryFunctions() { + } + + /** + * Serializes the current trace context into a string format. + * Useful for passing trace context across process boundaries. + * + * @param ot OpenTelemetry instance to use for propagation + * @return String representation of the trace context (traceparent) + */ public static String serializeTraceContext(OpenTelemetry ot) { Map carrier = new HashMap<>(); TextMapSetter> setter = Map::put; ot.getPropagators().getTextMapPropagator().inject(Context.current(), carrier, setter); - return carrier.get("traceparent"); // Serialized trace context + return carrier.get("traceparent"); } + /** + * Deserializes a trace context string back into an OpenTelemetry Context. + * + * @param ot OpenTelemetry instance to use for propagation + * @param traceparent Serialized trace context string + * @return Reconstructed OpenTelemetry Context + */ public static Context deserializeTraceContext(OpenTelemetry ot, String traceparent) { Map carrier = new HashMap<>(); carrier.put("traceparent", traceparent); @@ -30,18 +63,33 @@ public static Context deserializeTraceContext(OpenTelemetry ot, String tracepare new MapTextMapGetter()); } - public static T processWithTelemetry(OpenTelemetry ot,Tracer tracer, String spanName, String topic, String eventId, - String subject, - String traceparent, - Supplier action) { - + /** + * Executes an action within a new trace span with detailed attributes. + * + * @param Return type of the action + * @param ot OpenTelemetry instance + * @param tracer Tracer to create spans + * @param spanName Name of the span to create + * @param topic Topic attribute for the span + * @param eventId Event ID attribute for the span + * @param subject Subject attribute for the span + * @param traceparent Parent trace context (optional) + * @param action Action to execute within the span + * @return Result of the action execution + * @throws RuntimeException if the action throws an exception + */ + public static T processWithTelemetry(OpenTelemetry ot, Tracer tracer, String spanName, String topic, + String eventId, + String subject, + String traceparent, + Supplier action) { Context parentContext = traceparent == null ? null - : OpenTelemetryFunctions.deserializeTraceContext(ot, traceparent); + : OpenTelemetryFunctions.deserializeTraceContext(ot, traceparent); SpanBuilder sb = tracer.spanBuilder(spanName) - .setAttribute("topic", topic) - .setAttribute("event.id", eventId) - .setAttribute("subject", subject); - if(parentContext !=null) { + .setAttribute("topic", topic) + .setAttribute("event.id", eventId) + .setAttribute("subject", subject); + if (parentContext != null) { sb.setParent(parentContext); } Span span = sb.startSpan(); @@ -55,9 +103,18 @@ public static T processWithTelemetry(OpenTelemetry ot,Tracer tracer, String } } + /** + * Executes an action within a new trace span with minimal configuration. + * + * @param Return type of the action + * @param tracer Tracer to create spans + * @param spanName Name of the span to create + * @param action Action to execute within the span + * @return Result of the action execution + * @throws RuntimeException if the action throws an exception + */ public static T processWithTelemetry(Tracer tracer, String spanName, - Supplier action) { - + Supplier action) { SpanBuilder sb = tracer.spanBuilder(spanName); Span span = sb.startSpan(); try (Scope scope = span.makeCurrent()) { @@ -70,12 +127,22 @@ public static T processWithTelemetry(Tracer tracer, String spanName, } } - public static T processWithTelemetry(OpenTelemetry ot,Tracer tracer, Traceable event, String spanName, + /** + * Executes an action within a new trace span using a Traceable event for + * attributes. + * + * @param Return type of the action + * @param ot OpenTelemetry instance + * @param tracer Tracer to create spans + * @param event Traceable event containing span attributes + * @param spanName Name of the span to create + * @param action Action to execute within the span + * @return Result of the action execution + * @throws RuntimeException if the action throws an exception + */ + public static T processWithTelemetry(OpenTelemetry ot, Tracer tracer, Traceable event, String spanName, Supplier action) { - return processWithTelemetry(ot,tracer, spanName, event.topic(), event.id(), event.subject(),event.traceparent(), action); + return processWithTelemetry(ot, tracer, spanName, event.topic(), event.id(), event.subject(), + event.traceparent(), action); } - } -/* - - */ \ No newline at end of file diff --git a/library/src/test/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcIntegrationTest.java b/library/src/test/java/com/p14n/postevent/broker/remote/MessageBrokerGrpcIntegrationTest.java similarity index 99% rename from library/src/test/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcIntegrationTest.java rename to library/src/test/java/com/p14n/postevent/broker/remote/MessageBrokerGrpcIntegrationTest.java index 6889cc2..440cf0e 100644 --- a/library/src/test/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcIntegrationTest.java +++ b/library/src/test/java/com/p14n/postevent/broker/remote/MessageBrokerGrpcIntegrationTest.java @@ -1,4 +1,4 @@ -package com.p14n.postevent.broker.grpc; +package com.p14n.postevent.broker.remote; import com.p14n.postevent.broker.DefaultExecutor; import com.p14n.postevent.broker.DefaultMessageBroker; diff --git a/library/src/test/java/com/p14n/postevent/catchup/grpc/CatchupGrpcIntegrationTest.java b/library/src/test/java/com/p14n/postevent/catchup/remote/CatchupGrpcIntegrationTest.java similarity index 96% rename from library/src/test/java/com/p14n/postevent/catchup/grpc/CatchupGrpcIntegrationTest.java rename to library/src/test/java/com/p14n/postevent/catchup/remote/CatchupGrpcIntegrationTest.java index ace0e4a..d5a19ba 100644 --- a/library/src/test/java/com/p14n/postevent/catchup/grpc/CatchupGrpcIntegrationTest.java +++ b/library/src/test/java/com/p14n/postevent/catchup/remote/CatchupGrpcIntegrationTest.java @@ -1,6 +1,8 @@ -package com.p14n.postevent.catchup.grpc; +package com.p14n.postevent.catchup.remote; import com.p14n.postevent.catchup.CatchupServerInterface; +import com.p14n.postevent.catchup.remote.CatchupGrpcClient; +import com.p14n.postevent.catchup.remote.CatchupGrpcServer; import com.p14n.postevent.data.Event; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; From f0d1a89b59b493e2b55161cb02357be31f523a4d Mon Sep 17 00:00:00 2001 From: Dean Chapman Date: Sun, 27 Apr 2025 21:41:11 +0100 Subject: [PATCH 3/5] [human] Tidy --- .github/workflows/publish.yml | 28 ++++++++++++++++++++++++++++ README.md | 10 ++++------ library/build.gradle | 15 ++------------- 3 files changed, 34 insertions(+), 19 deletions(-) create mode 100644 .github/workflows/publish.yml diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..c09618c --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,28 @@ +name: Publish package to the Maven Central Repository +on: + release: + types: [created] +jobs: + publish: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up Java + uses: actions/setup-java@v3 + with: + java-version: '21' + distribution: 'temurin' + + - name: Validate Gradle wrapper + uses: gradle/wrapper-validation-action@v1 + + - name: Publish package + uses: gradle/gradle-build-action@v2 + with: + arguments: publishToMavenCentral + env: + ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.MAVEN_USERNAME }} + ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.MAVEN_PASSWORD }} + ORG_GRADLE_PROJECT_signingInMemoryKey: ${{ secrets.GPG_PRIVATE_KEY }} + ORG_GRADLE_PROJECT_signingInMemoryKeyId: ${{ secrets.GPG_KEY_ID }} + ORG_GRADLE_PROJECT_signingInMemoryKeyPassword: ${{ secrets.GPG_PASSPHRASE }} \ No newline at end of file diff --git a/README.md b/README.md index f123bd2..0d4682d 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ [![Build Status](https://github.com/p14n/postevent/workflows/Test%20PR/badge.svg)](https://github.com/p14n/postevent/actions) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE) -[![Latest Release](https://img.shields.io/github/v/release/yourusername/postevent)](https://github.com/yourusername/postevent/releases) +[![Latest Release](https://img.shields.io/github/v/release/p14n/postevent)](https://github.com/yourusername/p14n/releases) A reliable event publishing and consumption system using PostgreSQL and gRPC, following the CloudEvents specification. @@ -36,7 +36,7 @@ A reliable event publishing and consumption system using PostgreSQL and gRPC, fo #### Gradle ```groovy -implementation 'com.p14n:postevent:1.0.0' +implementation 'com.p14n:postevent:1.0.0-SNAPSHOT' ``` #### Maven @@ -44,7 +44,7 @@ implementation 'com.p14n:postevent:1.0.0' com.p14n postevent - 1.0.0 + 1.0.0-SNAPSHOT ``` @@ -155,8 +155,6 @@ This project is licensed under the MIT License - see the [LICENSE](LICENSE) file ## Support - Create an [Issue](https://github.com/yourusername/postevent/issues) -- Join our [Discord/Slack] community -- Email: support@yourdomain.com ## Acknowledgments @@ -166,4 +164,4 @@ This project is licensed under the MIT License - see the [LICENSE](LICENSE) file ## Project Status -Active development, production-ready. +Active development. diff --git a/library/build.gradle b/library/build.gradle index 8081510..63c9f3d 100644 --- a/library/build.gradle +++ b/library/build.gradle @@ -62,8 +62,6 @@ dependencies { // Instrumentation for GRPC implementation 'io.opentelemetry.instrumentation:opentelemetry-grpc-1.6:1.32.0-alpha' - // Logging integration - //implementation 'io.opentelemetry:opentelemetry-extension-logging:1.32.0' } testlogger { @@ -94,6 +92,7 @@ test { failFast = true testLogging.showStandardStreams = true } + task fastTest( type: Test ) { useJUnitPlatform { includeEngines 'junit-jupiter' @@ -107,11 +106,6 @@ jar { } } -task runHelloWorld(type: JavaExec) { - classpath = sourceSets.main.runtimeClasspath - mainClass = 'com.p14n.postevent.HelloWorld' -} - // Configure Protobuf plugin protobuf { protoc { @@ -129,7 +123,6 @@ protobuf { } } -// Make sure the generated code is included in the source sets sourceSets { main { java { @@ -152,11 +145,7 @@ mavenPublishing { configure(new JavaLibrary(new JavadocJar.Javadoc(), true)) - //publishToMavenCentral(SonatypeHost.DEFAULT) - // or when publishing to https://s01.oss.sonatype.org - //publishToMavenCentral(SonatypeHost.S01) - // or when publishing to https://central.sonatype.com/ - publishToMavenCentral(SonatypeHost.CENTRAL_PORTAL) + publishToMavenCentral(SonatypeHost.CENTRAL_PORTAL, true) signAllPublications() From b2c250eaa6ab57ea78da98dc5bc4af40419f0b9b Mon Sep 17 00:00:00 2001 From: Dean Chapman Date: Sun, 27 Apr 2025 21:44:59 +0100 Subject: [PATCH 4/5] [human] Add CODEOWNERS --- .github/CODEOWNERS | 1 + 1 file changed, 1 insertion(+) create mode 100644 .github/CODEOWNERS diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..68f805e --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +* @p14n \ No newline at end of file From 3559132cacb4c1a139275f29d376f36208b53340 Mon Sep 17 00:00:00 2001 From: Dean Chapman Date: Sun, 27 Apr 2025 21:45:58 +0100 Subject: [PATCH 5/5] Update library/src/main/java/com/p14n/postevent/Publisher.java Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> --- library/src/main/java/com/p14n/postevent/Publisher.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/library/src/main/java/com/p14n/postevent/Publisher.java b/library/src/main/java/com/p14n/postevent/Publisher.java index 9d11f3b..d00acd0 100644 --- a/library/src/main/java/com/p14n/postevent/Publisher.java +++ b/library/src/main/java/com/p14n/postevent/Publisher.java @@ -95,6 +95,10 @@ public static void publish(Event event, Connection connection, String topic) thr * invalid characters */ public static void publish(Event event, DataSource ds, String topic) throws SQLException { + if (topic == null || topic.trim().isEmpty() || !topic.matches("[a-z_]+")) { + throw new IllegalArgumentException("Invalid topic name: must be non-null, non-empty, and only contain lowercase letters and underscores."); + } + try (Connection c = ds.getConnection()) { publish(event, c, topic); }