diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS
new file mode 100644
index 0000000..68f805e
--- /dev/null
+++ b/.github/CODEOWNERS
@@ -0,0 +1 @@
+* @p14n
\ No newline at end of file
diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
new file mode 100644
index 0000000..c09618c
--- /dev/null
+++ b/.github/workflows/publish.yml
@@ -0,0 +1,28 @@
+name: Publish package to the Maven Central Repository
+on:
+ release:
+ types: [created]
+jobs:
+ publish:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ - name: Set up Java
+ uses: actions/setup-java@v3
+ with:
+ java-version: '21'
+ distribution: 'temurin'
+
+ - name: Validate Gradle wrapper
+ uses: gradle/wrapper-validation-action@v1
+
+ - name: Publish package
+ uses: gradle/gradle-build-action@v2
+ with:
+ arguments: publishToMavenCentral
+ env:
+ ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.MAVEN_USERNAME }}
+ ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.MAVEN_PASSWORD }}
+ ORG_GRADLE_PROJECT_signingInMemoryKey: ${{ secrets.GPG_PRIVATE_KEY }}
+ ORG_GRADLE_PROJECT_signingInMemoryKeyId: ${{ secrets.GPG_KEY_ID }}
+ ORG_GRADLE_PROJECT_signingInMemoryKeyPassword: ${{ secrets.GPG_PASSPHRASE }}
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 68d8449..5be3e12 100644
--- a/.gitignore
+++ b/.gitignore
@@ -270,3 +270,4 @@ bin
**/terraform.tfstate
**/terraform.tfstate.backup
app/infra/tf/terraform.tfvars
+gradle.properties
diff --git a/README.md b/README.md
index f123bd2..0d4682d 100644
--- a/README.md
+++ b/README.md
@@ -8,7 +8,7 @@
[](https://github.com/p14n/postevent/actions)
[](LICENSE)
-[](https://github.com/yourusername/postevent/releases)
+[](https://github.com/yourusername/p14n/releases)
A reliable event publishing and consumption system using PostgreSQL and gRPC, following the CloudEvents specification.
@@ -36,7 +36,7 @@ A reliable event publishing and consumption system using PostgreSQL and gRPC, fo
#### Gradle
```groovy
-implementation 'com.p14n:postevent:1.0.0'
+implementation 'com.p14n:postevent:1.0.0-SNAPSHOT'
```
#### Maven
@@ -44,7 +44,7 @@ implementation 'com.p14n:postevent:1.0.0'
com.p14n
postevent
- 1.0.0
+ 1.0.0-SNAPSHOT
```
@@ -155,8 +155,6 @@ This project is licensed under the MIT License - see the [LICENSE](LICENSE) file
## Support
- Create an [Issue](https://github.com/yourusername/postevent/issues)
-- Join our [Discord/Slack] community
-- Email: support@yourdomain.com
## Acknowledgments
@@ -166,4 +164,4 @@ This project is licensed under the MIT License - see the [LICENSE](LICENSE) file
## Project Status
-Active development, production-ready.
+Active development.
diff --git a/library/build.gradle b/library/build.gradle
index 06cd6b9..63c9f3d 100644
--- a/library/build.gradle
+++ b/library/build.gradle
@@ -1,7 +1,12 @@
+import com.vanniktech.maven.publish.SonatypeHost
+import com.vanniktech.maven.publish.JavaLibrary
+import com.vanniktech.maven.publish.JavadocJar
+
plugins {
id 'java'
id 'com.adarshr.test-logger' version '4.0.0'
id 'com.google.protobuf' version '0.9.2'
+ id "com.vanniktech.maven.publish" version "0.31.0"
}
compileJava {
@@ -10,7 +15,7 @@ compileJava {
}
group = 'com.p14n'
-version = '1.0-SNAPSHOT'
+version = '1.0.0-SNAPSHOT'
repositories {
mavenCentral()
@@ -57,8 +62,6 @@ dependencies {
// Instrumentation for GRPC
implementation 'io.opentelemetry.instrumentation:opentelemetry-grpc-1.6:1.32.0-alpha'
- // Logging integration
- //implementation 'io.opentelemetry:opentelemetry-extension-logging:1.32.0'
}
testlogger {
@@ -89,6 +92,7 @@ test {
failFast = true
testLogging.showStandardStreams = true
}
+
task fastTest( type: Test ) {
useJUnitPlatform {
includeEngines 'junit-jupiter'
@@ -102,11 +106,6 @@ jar {
}
}
-task runHelloWorld(type: JavaExec) {
- classpath = sourceSets.main.runtimeClasspath
- mainClass = 'com.p14n.postevent.HelloWorld'
-}
-
// Configure Protobuf plugin
protobuf {
protoc {
@@ -124,7 +123,6 @@ protobuf {
}
}
-// Make sure the generated code is included in the source sets
sourceSets {
main {
java {
@@ -133,3 +131,48 @@ sourceSets {
}
}
}
+
+tasks.withType(Jar) {
+ duplicatesStrategy = DuplicatesStrategy.EXCLUDE
+}
+
+javadoc {
+ exclude "**/grpc/**"
+ source = sourceSets.main.allJava
+}
+
+mavenPublishing {
+
+ configure(new JavaLibrary(new JavadocJar.Javadoc(), true))
+
+ publishToMavenCentral(SonatypeHost.CENTRAL_PORTAL, true)
+
+ signAllPublications()
+
+ coordinates("com.p14n", "postevent", version)
+
+ pom {
+ name = "Postevent"
+ description = 'A reliable event publishing and consumption system using PostgreSQL and gRPC'
+ inceptionYear = "2025"
+ url = "https://github.com/p14n/postevent/"
+ licenses {
+ license {
+ name = 'MIT License'
+ url = 'https://opensource.org/licenses/MIT'
+ }
+ }
+ developers {
+ developer {
+ id = 'p14n'
+ name = 'Dean Chapman'
+ email = 'dean@p14n.com'
+ }
+ }
+ scm {
+ connection = 'scm:git:git://github.com/p14n/postevent.git'
+ developerConnection = 'scm:git:ssh://github.com:p14n/postevent.git'
+ url = 'https://github.com/p14n/postevent'
+ }
+ }
+}
\ No newline at end of file
diff --git a/library/src/main/java/com/p14n/postevent/ConsumerServer.java b/library/src/main/java/com/p14n/postevent/ConsumerServer.java
index 3200ff6..3c11677 100644
--- a/library/src/main/java/com/p14n/postevent/ConsumerServer.java
+++ b/library/src/main/java/com/p14n/postevent/ConsumerServer.java
@@ -9,9 +9,9 @@
import com.p14n.postevent.broker.AsyncExecutor;
import com.p14n.postevent.broker.DefaultExecutor;
import com.p14n.postevent.broker.EventMessageBroker;
-import com.p14n.postevent.broker.grpc.MessageBrokerGrpcServer;
+import com.p14n.postevent.broker.remote.MessageBrokerGrpcServer;
import com.p14n.postevent.catchup.CatchupServer;
-import com.p14n.postevent.catchup.grpc.CatchupGrpcServer;
+import com.p14n.postevent.catchup.remote.CatchupGrpcServer;
import com.p14n.postevent.data.ConfigData;
import io.grpc.Server;
@@ -21,6 +21,38 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * A server that manages event consumption and distribution.
+ * ConsumerServer handles incoming events, manages their persistence,
+ * and coordinates event distribution to subscribers.
+ *
+ *
+ * This server can be configured to handle multiple topics and supports
+ * both local and remote consumers through gRPC connections.
+ *
+ *
+ *
+ * Key features:
+ *
+ *
+ * - Event persistence using a configured DataSource
+ * - Asynchronous event processing via AsyncExecutor
+ * - gRPC-based remote event distribution
+ * - OpenTelemetry integration for monitoring and tracing
+ * - Catchup functionality for missed events
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * var config = new ConfigData("myapp", Set.of("orders"), "localhost", 5432,
+ * "postgres", "postgres", "postgres", 10);
+ * var server = new ConsumerServer(dataSource, config, OpenTelemetry.noop());
+ * server.start(8080);
+ * }
+ */
public class ConsumerServer implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ConsumerServer.class);
@@ -31,10 +63,25 @@ public class ConsumerServer implements AutoCloseable {
private AsyncExecutor asyncExecutor;
OpenTelemetry ot;
+ /**
+ * Creates a new ConsumerServer instance with default executor configuration.
+ *
+ * @param ds The datasource for event persistence
+ * @param cfg The configuration for the consumer server
+ * @param ot The OpenTelemetry instance for monitoring and tracing
+ */
public ConsumerServer(DataSource ds, ConfigData cfg, OpenTelemetry ot) {
this(ds, cfg, new DefaultExecutor(2), ot);
}
+ /**
+ * Creates a new ConsumerServer instance with custom executor configuration.
+ *
+ * @param ds The datasource for event persistence
+ * @param cfg The configuration for the consumer server
+ * @param asyncExecutor The executor for handling asynchronous operations
+ * @param ot The OpenTelemetry instance for monitoring and tracing
+ */
public ConsumerServer(DataSource ds, ConfigData cfg, AsyncExecutor asyncExecutor, OpenTelemetry ot) {
this.ds = ds;
this.cfg = cfg;
@@ -42,14 +89,30 @@ public ConsumerServer(DataSource ds, ConfigData cfg, AsyncExecutor asyncExecutor
this.ot = ot;
}
+ /**
+ * Starts the consumer server on the specified port.
+ * Initializes the event broker, local consumer, and gRPC services.
+ *
+ * @param port The port number to listen on
+ * @throws IOException If the server fails to start
+ * @throws InterruptedException If the server startup is interrupted
+ */
public void start(int port) throws IOException, InterruptedException {
start(ServerBuilder.forPort(port));
}
+ /**
+ * Starts the consumer server with a custom server builder configuration.
+ * Initializes the event broker, local consumer, and gRPC services.
+ *
+ * @param sb The server builder to use for configuration
+ * @throws IOException If the server fails to start
+ * @throws InterruptedException If the server startup is interrupted
+ */
public void start(ServerBuilder> sb) throws IOException, InterruptedException {
logger.atInfo().log("Starting consumer server");
- var mb = new EventMessageBroker(asyncExecutor, ot,"consumer_server");
+ var mb = new EventMessageBroker(asyncExecutor, ot, "consumer_server");
var lc = new LocalConsumer<>(cfg, mb);
var grpcServer = new MessageBrokerGrpcServer(mb);
var catchupServer = new CatchupServer(ds);
@@ -75,6 +138,10 @@ public void start(ServerBuilder> sb) throws IOException, InterruptedException
closeables = List.of(lc, mb, asyncExecutor);
}
+ /**
+ * Stops the consumer server and releases all resources.
+ * Shuts down the gRPC server and closes all managed resources.
+ */
public void stop() {
logger.atInfo().log("Stopping consumer server");
@@ -93,6 +160,12 @@ public void stop() {
logger.atInfo().log("Consumer server stopped");
}
+ /**
+ * Stops the consumer server and releases all resources.
+ * Implementation of AutoCloseable interface.
+ *
+ * @throws Exception If an error occurs during shutdown
+ */
@Override
public void close() throws Exception {
stop();
diff --git a/library/src/main/java/com/p14n/postevent/LocalConsumer.java b/library/src/main/java/com/p14n/postevent/LocalConsumer.java
index c824d40..9656c21 100644
--- a/library/src/main/java/com/p14n/postevent/LocalConsumer.java
+++ b/library/src/main/java/com/p14n/postevent/LocalConsumer.java
@@ -13,6 +13,36 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * A local consumer implementation that captures database changes using Debezium
+ * and publishes them to a message broker.
+ *
+ *
+ * This consumer:
+ *
+ * - Sets up and manages database connections for change data capture
+ * - Initializes and controls a Debezium server instance
+ * - Converts Debezium change events to application events
+ * - Publishes converted events to a configured message broker
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * PostEventConfig config = new PostEventConfig(...);
+ * MessageBroker broker = new EventMessageBroker(...);
+ *
+ * LocalConsumer consumer = new LocalConsumer<>(config, broker);
+ * consumer.start();
+ *
+ * // When done
+ * consumer.close();
+ * }
+ *
+ * @param The type of messages that the broker will publish
+ */
public class LocalConsumer implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(LocalConsumer.class);
@@ -21,6 +51,12 @@ public class LocalConsumer implements AutoCloseable {
private final PostEventConfig config;
private final DatabaseSetup db;
+ /**
+ * Creates a new LocalConsumer instance.
+ *
+ * @param config The configuration for the consumer and database connection
+ * @param broker The message broker to publish events to
+ */
public LocalConsumer(PostEventConfig config, MessageBroker broker) {
this.config = config;
this.broker = broker;
@@ -28,13 +64,30 @@ public LocalConsumer(PostEventConfig config, MessageBroker broker)
this.debezium = new DebeziumServer();
}
+ /**
+ * Starts the consumer by setting up the database and initializing the Debezium
+ * server.
+ *
+ *
+ * This method:
+ *
+ * - Sets up database tables and configurations for the specified topics
+ * - Configures and starts the Debezium server
+ * - Sets up event processing pipeline from Debezium to the message
+ * broker
+ *
+ *
+ * @throws IOException If there's an error starting the consumer or
+ * connecting to the database
+ * @throws InterruptedException If the startup process is interrupted
+ */
public void start() throws IOException, InterruptedException {
logger.atInfo()
- .addArgument(config.topics()) // renamed from name()
+ .addArgument(config.topics())
.log("Starting local consumer for {}");
try {
- db.setupAll(config.topics()); // renamed from name()
+ db.setupAll(config.topics());
Consumer> consumer = record -> {
try {
Event event = changeEventToEvent(record);
@@ -52,10 +105,21 @@ public void start() throws IOException, InterruptedException {
}
}
+ /**
+ * Stops the Debezium server and releases associated resources.
+ *
+ * @throws IOException If there's an error stopping the server
+ */
public void stop() throws IOException {
debezium.stop();
}
+ /**
+ * Implements AutoCloseable to ensure proper resource cleanup.
+ * Delegates to {@link #stop()}.
+ *
+ * @throws IOException If there's an error during cleanup
+ */
@Override
public void close() throws IOException {
stop();
diff --git a/library/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java b/library/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java
index c7d9794..d917b1c 100644
--- a/library/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java
+++ b/library/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java
@@ -20,6 +20,40 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * A local consumer implementation that provides persistent event processing
+ * with catchup capabilities.
+ * This consumer ensures reliable event processing by persisting events and
+ * providing mechanisms
+ * to handle missed or unprocessed events.
+ *
+ *
+ * Key features:
+ *
+ * - Persistent event storage using a configured DataSource
+ * - Automatic catchup for missed events
+ * - Periodic health checks and unprocessed event processing
+ * - OpenTelemetry integration for observability
+ * - Support for batch processing of events
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * var consumer = new LocalPersistentConsumer(dataSource, config, openTelemetry, 100);
+ * consumer.start();
+ *
+ * // Subscribe to events
+ * consumer.subscribe("orders", event -> {
+ * // Process the event
+ * });
+ *
+ * // When done
+ * consumer.close();
+ * }
+ */
public class LocalPersistentConsumer implements AutoCloseable, MessageBroker {
private static final Logger logger = LoggerFactory.getLogger(LocalPersistentConsumer.class);
private PostEventConfig cfg;
@@ -30,6 +64,15 @@ public class LocalPersistentConsumer implements AutoCloseable, MessageBroker subscriber) {
return tb.subscribe(topic, subscriber);
}
+ /**
+ * Unsubscribes from events on the specified topic.
+ *
+ * @param topic The topic to unsubscribe from
+ * @param subscriber The subscriber to remove
+ * @return true if unsubscription was successful, false otherwise
+ */
@Override
public boolean unsubscribe(String topic, MessageSubscriber subscriber) {
return tb.unsubscribe(topic, subscriber);
}
+ /**
+ * Converts a transactional event. In this implementation, returns the event
+ * unchanged.
+ *
+ * @param m The transactional event to convert
+ * @return The same transactional event
+ */
@Override
public TransactionalEvent convert(TransactionalEvent m) {
return m;
}
-
}
diff --git a/library/src/main/java/com/p14n/postevent/Publisher.java b/library/src/main/java/com/p14n/postevent/Publisher.java
index 9389e82..d00acd0 100644
--- a/library/src/main/java/com/p14n/postevent/Publisher.java
+++ b/library/src/main/java/com/p14n/postevent/Publisher.java
@@ -12,12 +12,50 @@
/**
* Publisher class responsible for writing events to a PostgreSQL database.
+ * This utility class provides static methods for publishing events to specific
+ * topic tables
+ * in the database. It ensures data integrity by validating topic names and
+ * handling
+ * database connections appropriately.
+ *
+ *
+ * The class supports two publishing modes:
+ *
+ * - Publishing with an existing connection
+ * - Publishing with a DataSource (manages connection automatically)
+ *
+ *
+ *
+ * Topic names must follow these rules:
+ *
+ * - Cannot be null or empty
+ * - Must contain only lowercase letters and underscores
+ * - Pattern: ^[a-z_]+$
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * // Using an existing connection
+ * Connection conn = ...;
+ * Event event = ...;
+ * Publisher.publish(event, conn, "user_events");
+ *
+ * // Using a DataSource
+ * DataSource ds = ...;
+ * Publisher.publish(event, ds, "order_events");
+ * }
*/
public class Publisher {
- private Publisher(){
-
+ /**
+ * Private constructor to prevent instantiation of the utility class.
+ */
+ private Publisher() {
}
+
/**
* Publishes an event to the specified topic table.
*
@@ -45,9 +83,24 @@ public static void publish(Event event, Connection connection, String topic) thr
}
}
+ /**
+ * Publishes an event to the specified topic table using a DataSource.
+ * This method manages the database connection automatically.
+ *
+ * @param event The event to publish
+ * @param ds The DataSource to obtain a connection from
+ * @param topic The topic/table name to publish to
+ * @throws SQLException if a database access error occurs
+ * @throws IllegalArgumentException if the topic is null, empty, or contains
+ * invalid characters
+ */
public static void publish(Event event, DataSource ds, String topic) throws SQLException {
+ if (topic == null || topic.trim().isEmpty() || !topic.matches("[a-z_]+")) {
+ throw new IllegalArgumentException("Invalid topic name: must be non-null, non-empty, and only contain lowercase letters and underscores.");
+ }
+
try (Connection c = ds.getConnection()) {
publish(event, c, topic);
}
}
-}
\ No newline at end of file
+}
diff --git a/library/src/main/java/com/p14n/postevent/RemotePersistentConsumer.java b/library/src/main/java/com/p14n/postevent/RemotePersistentConsumer.java
index aa9f32f..d6f04be 100644
--- a/library/src/main/java/com/p14n/postevent/RemotePersistentConsumer.java
+++ b/library/src/main/java/com/p14n/postevent/RemotePersistentConsumer.java
@@ -8,11 +8,11 @@
import com.p14n.postevent.broker.SystemEventBroker;
import com.p14n.postevent.broker.TransactionalBroker;
import com.p14n.postevent.broker.TransactionalEvent;
-import com.p14n.postevent.broker.grpc.MessageBrokerGrpcClient;
+import com.p14n.postevent.broker.remote.MessageBrokerGrpcClient;
import com.p14n.postevent.catchup.CatchupService;
import com.p14n.postevent.catchup.PersistentBroker;
import com.p14n.postevent.catchup.UnprocessedSubmitter;
-import com.p14n.postevent.catchup.grpc.CatchupGrpcClient;
+import com.p14n.postevent.catchup.remote.CatchupGrpcClient;
import com.p14n.postevent.data.UnprocessedEventFinder;
import io.grpc.ManagedChannel;
@@ -29,6 +29,41 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
+/**
+ * A remote consumer implementation that provides persistent event processing
+ * with catchup capabilities.
+ * This consumer connects to a remote event source via gRPC and ensures reliable
+ * event processing
+ * with persistence and automatic catchup for missed events.
+ *
+ *
+ * Key features:
+ *
+ *
+ * - Remote event consumption via gRPC
+ * - Persistent event storage
+ * - Automatic catchup for missed events
+ * - Periodic health checks and catchup triggers
+ * - OpenTelemetry integration for observability
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * var consumer = new RemotePersistentConsumer(openTelemetry, 100);
+ * consumer.start(Set.of("orders", "inventory"), dataSource, "localhost", 8080);
+ *
+ * // Subscribe to events
+ * consumer.subscribe("orders", event -> {
+ * // Process the event
+ * });
+ *
+ * // When done
+ * consumer.close();
+ * }
+ */
public class RemotePersistentConsumer implements AutoCloseable, MessageBroker {
private static final Logger logger = LoggerFactory.getLogger(RemotePersistentConsumer.class);
@@ -39,16 +74,37 @@ public class RemotePersistentConsumer implements AutoCloseable, MessageBroker topics, DataSource ds, String host, int port) {
start(topics, ds, ManagedChannelBuilder.forAddress(host, port)
.keepAliveTime(1, TimeUnit.HOURS)
@@ -57,6 +113,14 @@ public void start(Set topics, DataSource ds, String host, int port) {
.build());
}
+ /**
+ * Starts the consumer with a pre-configured gRPC channel.
+ *
+ * @param topics Set of topics to subscribe to
+ * @param ds DataSource for event persistence
+ * @param channel Configured gRPC channel
+ * @throws IllegalStateException if the consumer is already started
+ */
public void start(Set topics, DataSource ds, ManagedChannel channel) {
logger.atInfo().log("Starting consumer client");
@@ -99,6 +163,11 @@ public void start(Set topics, DataSource ds, ManagedChannel channel) {
}
}
+ /**
+ * Closes all resources associated with this consumer.
+ * This includes the message brokers, gRPC channel, and other closeable
+ * resources.
+ */
@Override
public void close() {
logger.atInfo().log("Closing consumer client");
@@ -117,6 +186,13 @@ public void close() {
logger.atInfo().log("Consumer client closed");
}
+ /**
+ * Publishes a transactional event to the specified topic.
+ *
+ * @param topic The topic to publish to
+ * @param message The transactional event to publish
+ * @throws RuntimeException if publishing fails
+ */
@Override
public void publish(String topic, TransactionalEvent message) {
try {
@@ -126,6 +202,14 @@ public void publish(String topic, TransactionalEvent message) {
}
}
+ /**
+ * Subscribes to events on the specified topic.
+ * Triggers a catchup event to ensure the subscriber receives any missed events.
+ *
+ * @param topic The topic to subscribe to
+ * @param subscriber The subscriber that will receive events
+ * @return true if subscription was successful, false otherwise
+ */
@Override
public boolean subscribe(String topic, MessageSubscriber subscriber) {
var subscribed = tb.subscribe(topic, subscriber);
@@ -133,11 +217,25 @@ public boolean subscribe(String topic, MessageSubscriber sub
return subscribed;
}
+ /**
+ * Unsubscribes from events on the specified topic.
+ *
+ * @param topic The topic to unsubscribe from
+ * @param subscriber The subscriber to remove
+ * @return true if unsubscription was successful, false otherwise
+ */
@Override
public boolean unsubscribe(String topic, MessageSubscriber subscriber) {
return tb.unsubscribe(topic, subscriber);
}
+ /**
+ * Converts a transactional event. In this implementation, returns the event
+ * unchanged.
+ *
+ * @param m The transactional event to convert
+ * @return The same transactional event
+ */
@Override
public TransactionalEvent convert(TransactionalEvent m) {
return m;
diff --git a/library/src/main/java/com/p14n/postevent/broker/AsyncExecutor.java b/library/src/main/java/com/p14n/postevent/broker/AsyncExecutor.java
index a259071..33e1a70 100644
--- a/library/src/main/java/com/p14n/postevent/broker/AsyncExecutor.java
+++ b/library/src/main/java/com/p14n/postevent/broker/AsyncExecutor.java
@@ -3,15 +3,40 @@
import java.util.List;
import java.util.concurrent.*;
+/**
+ * Interface for asynchronous task execution.
+ */
public interface AsyncExecutor extends AutoCloseable {
+ /**
+ * Schedules a task for repeated fixed-rate execution.
+ *
+ * @param command The task to execute
+ * @param initialDelay The time to delay first execution
+ * @param period The period between successive executions
+ * @param unit The time unit of the initialDelay and period parameters
+ * @return A ScheduledFuture representing pending completion of the task
+ */
public ScheduledFuture> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
+ /**
+ * Shuts down the executor and returns a list of runnables that were not
+ * executed.
+ *
+ * @return A list of runnables that were not executed
+ */
List shutdownNow();
+ /**
+ * Submits a task for execution and returns a Future representing the pending
+ *
+ * @param task The task to submit
+ * @param The type of the task
+ * @return A Future representing pending completion of the task
+ */
Future submit(Callable task);
}
diff --git a/library/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java b/library/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java
index ae233d4..3d954c3 100644
--- a/library/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java
+++ b/library/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java
@@ -6,32 +6,77 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+/**
+ * Default implementation of {@link AsyncExecutor} that provides configurable
+ * thread pool execution.
+ * Supports both virtual threads and fixed-size thread pools for task execution,
+ * along with
+ * scheduled task capabilities.
+ *
+ *
+ * Key features:
+ *
+ *
+ * - Virtual thread pool support for efficient concurrent task execution
+ * - Configurable fixed-size thread pool alternative
+ * - Scheduled task execution with customizable intervals
+ * - Named thread factories for better debugging and monitoring
+ *
+ */
public class DefaultExecutor implements AsyncExecutor {
private final ScheduledExecutorService se;
private final ExecutorService es;
+ /**
+ * Creates a new executor with a scheduled thread pool and virtual thread pool.
+ *
+ * @param scheduledSize the size of the scheduled thread pool
+ */
public DefaultExecutor(int scheduledSize) {
this.se = createScheduledExecutorService(scheduledSize);
this.es = createVirtualExecutorService();
}
+ /**
+ * Creates a new executor with both scheduled and fixed-size thread pools.
+ *
+ * @param scheduledSize the size of the scheduled thread pool
+ * @param fixedSize the size of the fixed thread pool
+ */
public DefaultExecutor(int scheduledSize, int fixedSize) {
this.se = createScheduledExecutorService(scheduledSize);
this.es = createFixedExecutorService(fixedSize);
}
+ /**
+ * Creates a fixed-size thread pool with named threads.
+ *
+ * @param size the number of threads in the pool
+ * @return a fixed thread pool executor service
+ */
protected ExecutorService createFixedExecutorService(int size) {
return Executors.newFixedThreadPool(size,
new ThreadFactoryBuilder().setNameFormat("post-event-fixed-%d").build());
}
+ /**
+ * Creates a virtual thread pool for efficient task execution.
+ *
+ * @return a virtual thread executor service
+ */
protected ExecutorService createVirtualExecutorService() {
return Executors.newThreadPerTaskExecutor(
new ThreadFactoryBuilder().setThreadFactory(Thread.ofVirtual().factory())
.setNameFormat("post-event-virtual-%d").build());
}
+ /**
+ * Creates a scheduled thread pool with named threads.
+ *
+ * @param size the number of threads in the pool
+ * @return a scheduled thread pool executor service
+ */
protected ScheduledExecutorService createScheduledExecutorService(int size) {
return Executors.newScheduledThreadPool(size,
new ThreadFactoryBuilder().setNameFormat("post-event-scheduled-%d").build());
diff --git a/library/src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java b/library/src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java
index 957b8cc..122ae54 100644
--- a/library/src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java
+++ b/library/src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java
@@ -12,28 +12,101 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
+/**
+ * Abstract base implementation of a message broker that supports
+ * publish-subscribe messaging patterns.
+ * Provides thread-safe handling of message publishing and subscription
+ * management with telemetry support.
+ *
+ *
+ * Key features:
+ *
+ *
+ * - Thread-safe message publishing and subscription management
+ * - Asynchronous message delivery to subscribers
+ * - OpenTelemetry integration for metrics and tracing
+ * - Automatic resource cleanup via AutoCloseable
+ *
+ *
+ *
+ * The broker maintains a concurrent map of topic subscribers and ensures
+ * thread-safe operations
+ * for publishing messages and managing subscriptions. Messages are delivered
+ * asynchronously to
+ * subscribers using the provided {@link AsyncExecutor}.
+ *
+ *
+ * @param The input message type, must implement {@link Traceable}
+ * @param The output message type delivered to subscribers
+ */
public abstract class DefaultMessageBroker
implements MessageBroker, AutoCloseable {
+ /**
+ * Thread-safe map storing topic subscribers.
+ * Key is the topic name, value is a thread-safe set of subscribers.
+ */
protected final ConcurrentHashMap>> topicSubscribers = new ConcurrentHashMap<>();
+
+ /**
+ * Flag indicating if the broker has been closed.
+ */
protected final AtomicBoolean closed = new AtomicBoolean(false);
+
+ /**
+ * Executor for handling asynchronous message delivery.
+ */
private final AsyncExecutor asyncExecutor;
+
+ /**
+ * Metrics collector for broker operations.
+ */
protected final BrokerMetrics metrics;
+
+ /**
+ * OpenTelemetry tracer for distributed tracing.
+ */
protected final Tracer tracer;
+ /**
+ * OpenTelemetry instance for metrics and tracing.
+ */
protected final OpenTelemetry openTelemetry;
- public DefaultMessageBroker(OpenTelemetry ot,String scopeName) {
- this(new DefaultExecutor(2), ot,scopeName);
+ /**
+ * Creates a new broker instance with default executor configuration.
+ *
+ * @param ot The OpenTelemetry instance for metrics and tracing
+ * @param scopeName The scope name for OpenTelemetry instrumentation
+ */
+ public DefaultMessageBroker(OpenTelemetry ot, String scopeName) {
+ this(new DefaultExecutor(2), ot, scopeName);
}
- public DefaultMessageBroker(AsyncExecutor asyncExecutor, OpenTelemetry ot,String scopeName) {
+ /**
+ * Creates a new broker instance with custom executor configuration.
+ *
+ * @param asyncExecutor The executor for handling asynchronous message delivery
+ * @param ot The OpenTelemetry instance for metrics and tracing
+ * @param scopeName The scope name for OpenTelemetry instrumentation
+ */
+ public DefaultMessageBroker(AsyncExecutor asyncExecutor, OpenTelemetry ot, String scopeName) {
this.asyncExecutor = asyncExecutor;
this.metrics = new BrokerMetrics(ot.getMeter(scopeName));
this.tracer = ot.getTracer(scopeName);
this.openTelemetry = ot;
}
+ /**
+ * Checks if a message can be processed for a given topic.
+ * Validates broker state and message parameters.
+ *
+ * @param topic The topic to check
+ * @param message The message to validate
+ * @return true if the message can be processed, false otherwise
+ * @throws IllegalStateException if the broker is closed
+ * @throws IllegalArgumentException if topic or message is null
+ */
protected boolean canProcess(String topic, InT message) {
if (closed.get()) {
throw new IllegalStateException("Broker is closed");
@@ -47,10 +120,27 @@ protected boolean canProcess(String topic, InT message) {
throw new IllegalArgumentException("Topic cannot be null");
}
- // If no subscribers for this topic, message is silently dropped
return topicSubscribers.containsKey(topic) && !topicSubscribers.get(topic).isEmpty();
}
+ /**
+ * Converts the input message type to the output message type.
+ * Must be implemented by concrete classes.
+ *
+ * @param message The input message to convert
+ * @return The converted output message
+ */
+ public abstract OutT convert(InT message);
+
+ /**
+ * Publishes a message to all subscribers of the specified topic.
+ * Messages are delivered asynchronously to all subscribers.
+ * If a subscriber throws an exception during message processing,
+ * its error handler is invoked and the exception is propagated.
+ *
+ * @param topic The topic to publish to
+ * @param message The message to publish
+ */
@Override
public void publish(String topic, InT message) {
if (!canProcess(topic, message)) {
@@ -59,13 +149,11 @@ public void publish(String topic, InT message) {
metrics.recordPublished(topic);
- // Deliver to all subscribers for this topic
Set> subscribers = topicSubscribers.get(topic);
if (subscribers != null) {
-
- processWithTelemetry(openTelemetry,tracer, message, "publish_message", () -> {
+ processWithTelemetry(openTelemetry, tracer, message, "publish_message", () -> {
for (MessageSubscriber subscriber : subscribers) {
- asyncExecutor.submit(() -> processWithTelemetry(openTelemetry,tracer, message, "process_message",
+ asyncExecutor.submit(() -> processWithTelemetry(openTelemetry, tracer, message, "process_message",
() -> {
try {
subscriber.onMessage(convert(message));
@@ -82,10 +170,20 @@ public void publish(String topic, InT message) {
}
return null;
});
-
}
}
+ /**
+ * Subscribes a message handler to a specific topic.
+ * Thread-safe subscription management using ConcurrentHashMap and
+ * CopyOnWriteArraySet.
+ *
+ * @param topic The topic to subscribe to
+ * @param subscriber The subscriber to add
+ * @return true if the subscription was added, false if it already existed
+ * @throws IllegalStateException if the broker is closed
+ * @throws IllegalArgumentException if topic or subscriber is null
+ */
@Override
public boolean subscribe(String topic, MessageSubscriber subscriber) {
if (closed.get()) {
@@ -111,6 +209,16 @@ public boolean subscribe(String topic, MessageSubscriber subscriber) {
return added;
}
+ /**
+ * Unsubscribes a message handler from a specific topic.
+ * Thread-safe unsubscription with automatic topic cleanup when the last
+ * subscriber is removed.
+ *
+ * @param topic The topic to unsubscribe from
+ * @param subscriber The subscriber to remove
+ * @return true if the subscription was removed, false if it didn't exist
+ * @throws IllegalArgumentException if topic or subscriber is null
+ */
@Override
public boolean unsubscribe(String topic, MessageSubscriber subscriber) {
if (subscriber == null) {
@@ -135,10 +243,14 @@ public boolean unsubscribe(String topic, MessageSubscriber subscriber) {
return false;
}
+ /**
+ * Closes the broker and removes all subscribers.
+ * After closing, no new subscriptions can be added and no messages can be
+ * published.
+ */
@Override
public void close() {
closed.set(true);
topicSubscribers.clear();
}
-
}
diff --git a/library/src/main/java/com/p14n/postevent/broker/EventMessageBroker.java b/library/src/main/java/com/p14n/postevent/broker/EventMessageBroker.java
index c450eb5..1123055 100644
--- a/library/src/main/java/com/p14n/postevent/broker/EventMessageBroker.java
+++ b/library/src/main/java/com/p14n/postevent/broker/EventMessageBroker.java
@@ -1,22 +1,67 @@
package com.p14n.postevent.broker;
import com.p14n.postevent.data.Event;
-
import io.opentelemetry.api.OpenTelemetry;
+/**
+ * A specialized message broker implementation for handling {@link Event}
+ * messages.
+ * Extends {@link DefaultMessageBroker} to provide direct pass-through of events
+ * without any transformation.
+ *
+ *
+ * This broker is designed for simple event distribution scenarios where the
+ * input
+ * and output event types are identical. It maintains all the thread-safety and
+ * telemetry capabilities of the parent class while simplifying the event
+ * handling
+ * process.
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * EventMessageBroker broker = new EventMessageBroker(openTelemetry, "events");
+ * broker.subscribe("orders", event -> {
+ * // Handle the event directly
+ * });
+ * broker.publish("orders", new Event(...));
+ * }
+ */
public class EventMessageBroker extends DefaultMessageBroker {
+ /**
+ * Creates a new EventMessageBroker with custom executor configuration.
+ *
+ * @param asyncExecutor The executor for handling asynchronous message delivery
+ * @param ot The OpenTelemetry instance for metrics and tracing
+ * @param scopeName The scope name for OpenTelemetry instrumentation
+ */
public EventMessageBroker(AsyncExecutor asyncExecutor, OpenTelemetry ot, String scopeName) {
super(asyncExecutor, ot, scopeName);
}
+ /**
+ * Creates a new EventMessageBroker with default executor configuration.
+ *
+ * @param ot The OpenTelemetry instance for metrics and tracing
+ * @param scopeName The scope name for OpenTelemetry instrumentation
+ */
public EventMessageBroker(OpenTelemetry ot, String scopeName) {
- super(ot,scopeName);
+ super(ot, scopeName);
}
+ /**
+ * Implements the conversion method from the parent class.
+ * Simply returns the input event as-is since no transformation is needed.
+ *
+ * @param m The event to convert
+ * @return The same event instance
+ */
@Override
public Event convert(Event m) {
return m;
}
-
}
diff --git a/library/src/main/java/com/p14n/postevent/broker/SystemEvent.java b/library/src/main/java/com/p14n/postevent/broker/SystemEvent.java
index a627123..ca8d312 100644
--- a/library/src/main/java/com/p14n/postevent/broker/SystemEvent.java
+++ b/library/src/main/java/com/p14n/postevent/broker/SystemEvent.java
@@ -2,14 +2,66 @@
import com.p14n.postevent.data.Traceable;
+/**
+ * Enumeration of system-level events used for internal coordination and
+ * control.
+ * Implements {@link Traceable} to support distributed tracing and event
+ * correlation.
+ *
+ *
+ * Available events:
+ *
+ *
+ * - {@code CatchupRequired}: Signals that a topic needs to catch up on missed
+ * events
+ * - {@code UnprocessedCheckRequired}: Signals that unprocessed events need to
+ * be checked
+ * - {@code FetchLatest}: Signals that the latest events should be fetched for
+ * a topic
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * SystemEvent event = SystemEvent.CatchupRequired.withTopic("orders");
+ * broker.publish(event.topic(), event);
+ * }
+ */
public enum SystemEvent implements Traceable {
+ /**
+ * Signals that a topic needs to catch up on missed events.
+ * Used by the {@link com.p14n.postevent.catchup.CatchupService} to initiate
+ * catchup processing.
+ */
CatchupRequired,
+
+ /**
+ * Signals that unprocessed events need to be checked.
+ * Used to trigger verification of events that may have been missed or failed
+ * processing.
+ */
UnprocessedCheckRequired,
- FetchLatest; // New event type
+ /**
+ * Signals that the latest events should be fetched for a topic.
+ * Used to request immediate retrieval of recent events without full catchup.
+ */
+ FetchLatest;
+
+ /**
+ * The topic this event is associated with.
+ */
public String topic;
+ /**
+ * Associates a topic with this system event.
+ *
+ * @param topic the topic to associate with this event
+ * @return this event instance with the topic set
+ */
public SystemEvent withTopic(String topic) {
this.topic = topic;
return this;
diff --git a/library/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java b/library/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java
index 1c2d672..c538a49 100644
--- a/library/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java
+++ b/library/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java
@@ -2,15 +2,49 @@
import io.opentelemetry.api.OpenTelemetry;
-public class SystemEventBroker extends
- DefaultMessageBroker {
+/**
+ * Specialized message broker for handling system-level events.
+ * Extends {@link DefaultMessageBroker} to provide a dedicated channel for
+ * internal system coordination events.
+ *
+ *
+ * This broker handles {@link SystemEvent} messages for internal coordination
+ * such as:
+ *
+ * - Catchup processing signals
+ * - Unprocessed event checks
+ * - Latest event fetch requests
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * SystemEventBroker broker = new SystemEventBroker(openTelemetry);
+ * broker.subscribe(subscriber);
+ * broker.publish(SystemEvent.CatchupRequired.withTopic("orders"));
+ * }
+ */
+public class SystemEventBroker extends DefaultMessageBroker {
+ /**
+ * Creates a new system event broker with custom executor configuration.
+ *
+ * @param asyncExecutor the executor for handling asynchronous message delivery
+ * @param ot the OpenTelemetry instance for metrics and tracing
+ */
public SystemEventBroker(AsyncExecutor asyncExecutor, OpenTelemetry ot) {
super(asyncExecutor, ot, "system_events");
}
+ /**
+ * Creates a new system event broker with default executor configuration.
+ *
+ * @param ot the OpenTelemetry instance for metrics and tracing
+ */
public SystemEventBroker(OpenTelemetry ot) {
- super(ot,"system_events");
+ super(ot, "system_events");
}
@Override
@@ -18,12 +52,21 @@ public SystemEvent convert(SystemEvent m) {
return m;
}
+ /**
+ * Publishes a system event to the default system topic.
+ *
+ * @param event the system event to publish
+ */
public void publish(SystemEvent event) {
publish("system", event);
}
+ /**
+ * Subscribes to system events on the default system topic.
+ *
+ * @param subscriber the subscriber to receive system events
+ */
public void subscribe(MessageSubscriber subscriber) {
subscribe("system", subscriber);
}
-
}
diff --git a/library/src/main/java/com/p14n/postevent/broker/TransactionalBroker.java b/library/src/main/java/com/p14n/postevent/broker/TransactionalBroker.java
index 87cd68d..0ecadce 100644
--- a/library/src/main/java/com/p14n/postevent/broker/TransactionalBroker.java
+++ b/library/src/main/java/com/p14n/postevent/broker/TransactionalBroker.java
@@ -11,22 +11,85 @@
import java.sql.Connection;
+/**
+ * A specialized message broker that provides transactional delivery of events
+ * to subscribers.
+ * Extends {@link DefaultMessageBroker} to ensure events are processed within
+ * database transactions.
+ *
+ *
+ * Key features:
+ *
+ *
+ * - Database transaction support for each message delivery
+ * - Ordered processing of events using {@link OrderedProcessor}
+ * - Integration with OpenTelemetry for transaction monitoring
+ * - Error handling with subscriber notification
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * TransactionalBroker broker = new TransactionalBroker(dataSource, openTelemetry, systemEventBroker);
+ * broker.subscribe("orders", new TransactionalEventHandler());
+ * broker.publish("orders", event);
+ * }
+ */
public class TransactionalBroker extends DefaultMessageBroker {
private final DataSource ds;
private final SystemEventBroker systemEventBroker;
- public TransactionalBroker(DataSource ds, AsyncExecutor asyncExecutor, OpenTelemetry ot, SystemEventBroker systemEventBroker) {
+ /**
+ * Creates a new transactional broker with custom executor configuration.
+ *
+ * @param ds the data source for database connections
+ * @param asyncExecutor the executor for handling asynchronous message
+ * delivery
+ * @param ot the OpenTelemetry instance for metrics and tracing
+ * @param systemEventBroker the broker for system events
+ */
+ public TransactionalBroker(DataSource ds, AsyncExecutor asyncExecutor, OpenTelemetry ot,
+ SystemEventBroker systemEventBroker) {
super(asyncExecutor, ot, "transactional_broker");
this.ds = ds;
this.systemEventBroker = systemEventBroker;
}
+ /**
+ * Creates a new transactional broker with default executor configuration.
+ *
+ * @param ds the data source for database connections
+ * @param ot the OpenTelemetry instance for metrics and tracing
+ * @param systemEventBroker the broker for system events
+ */
public TransactionalBroker(DataSource ds, OpenTelemetry ot, SystemEventBroker systemEventBroker) {
super(ot, "transactional_broker");
this.ds = ds;
this.systemEventBroker = systemEventBroker;
}
+ /**
+ * Publishes a message to subscribers within a database transaction.
+ * Each subscriber receives the message in its own transaction context.
+ *
+ *
+ * The publishing process:
+ *
+ *
+ * - Validates the message and topic
+ * - Creates a new database connection for each subscriber
+ * - Processes the message using {@link OrderedProcessor}
+ * - Handles delivery within a transaction context
+ * - Records metrics for successful delivery
+ *
+ *
+ * @param topic the topic to publish to
+ * @param message the event message to publish
+ * @throws IllegalStateException if the broker is closed
+ * @throws IllegalArgumentException if topic or message is null
+ */
@Override
public void publish(String topic, Event message) {
if (!canProcess(topic, message)) {
@@ -38,11 +101,8 @@ public void publish(String topic, Event message) {
// Deliver to all subscribers
for (MessageSubscriber subscriber : topicSubscribers.get(topic)) {
try (Connection c = ds.getConnection()) {
-
processWithTelemetry(openTelemetry, tracer, message, "ordered_process", () -> {
-
- var op = new OrderedProcessor(systemEventBroker,(connection, event) -> {
-
+ var op = new OrderedProcessor(systemEventBroker, (connection, event) -> {
return processWithTelemetry(openTelemetry, tracer, message, "message_transaction", () -> {
try {
subscriber.onMessage(new TransactionalEvent(connection, event));
@@ -56,12 +116,10 @@ public void publish(String topic, Event message) {
return false;
}
});
-
});
op.process(c, message);
return null;
});
-
} catch (Exception e) {
try {
subscriber.onError(e);
diff --git a/library/src/main/java/com/p14n/postevent/broker/TransactionalEvent.java b/library/src/main/java/com/p14n/postevent/broker/TransactionalEvent.java
index 616f53c..2912283 100644
--- a/library/src/main/java/com/p14n/postevent/broker/TransactionalEvent.java
+++ b/library/src/main/java/com/p14n/postevent/broker/TransactionalEvent.java
@@ -5,23 +5,75 @@
import java.sql.Connection;
+/**
+ * A record that wraps an {@link Event} with its associated database
+ * {@link Connection},
+ * enabling transactional processing of events. This class implements
+ * {@link Traceable}
+ * to maintain tracing context through the event processing pipeline.
+ *
+ *
+ * The TransactionalEvent serves as a container that:
+ *
+ * - Maintains the database connection context for transactional
+ * operations
+ * - Preserves the original event data and metadata
+ * - Delegates tracing information to the underlying event
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * Connection conn = dataSource.getConnection();
+ * Event event = Event.create(...);
+ * TransactionalEvent txEvent = new TransactionalEvent(conn, event);
+ *
+ * // Use the transactional event
+ * processEventInTransaction(txEvent);
+ * }
+ *
+ * @param connection The database connection for transactional operations
+ * @param event The underlying event being processed
+ */
public record TransactionalEvent(Connection connection, Event event) implements Traceable {
+ /**
+ * Returns the unique identifier of the underlying event.
+ *
+ * @return the event's unique identifier
+ */
@Override
public String id() {
return event.id();
}
+ /**
+ * Returns the topic of the underlying event.
+ *
+ * @return the event's topic
+ */
@Override
public String topic() {
return event.topic();
}
+ /**
+ * Returns the subject of the underlying event.
+ *
+ * @return the event's subject
+ */
@Override
public String subject() {
return event.subject();
}
+ /**
+ * Returns the OpenTelemetry trace parent identifier of the underlying event.
+ *
+ * @return the event's trace parent identifier
+ */
@Override
public String traceparent() {
return event.traceparent();
diff --git a/library/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcClient.java b/library/src/main/java/com/p14n/postevent/broker/remote/MessageBrokerGrpcClient.java
similarity index 64%
rename from library/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcClient.java
rename to library/src/main/java/com/p14n/postevent/broker/remote/MessageBrokerGrpcClient.java
index d939312..6098304 100644
--- a/library/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcClient.java
+++ b/library/src/main/java/com/p14n/postevent/broker/remote/MessageBrokerGrpcClient.java
@@ -1,8 +1,11 @@
-package com.p14n.postevent.broker.grpc;
+package com.p14n.postevent.broker.remote;
import com.p14n.postevent.broker.AsyncExecutor;
import com.p14n.postevent.broker.EventMessageBroker;
import com.p14n.postevent.broker.MessageSubscriber;
+import com.p14n.postevent.broker.grpc.EventResponse;
+import com.p14n.postevent.broker.grpc.MessageBrokerServiceGrpc;
+import com.p14n.postevent.broker.grpc.SubscriptionRequest;
import com.p14n.postevent.data.Event;
import io.grpc.ManagedChannel;
@@ -18,14 +21,37 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * A gRPC client implementation of the EventMessageBroker that connects to a
+ * remote message broker service.
+ * Provides streaming event subscription and publication capabilities over gRPC.
+ *
+ *
+ * Key features:
+ *
+ *
+ * - Streaming event subscription via gRPC
+ * - Automatic reconnection handling
+ * - Thread-safe subscription management
+ * - OpenTelemetry integration for observability
+ *
+ */
public class MessageBrokerGrpcClient extends EventMessageBroker {
private static final Logger logger = LoggerFactory.getLogger(MessageBrokerGrpcClient.class);
private final MessageBrokerServiceGrpc.MessageBrokerServiceStub asyncStub;
- private final Set subscribed = ConcurrentHashMap.newKeySet();;
+ private final Set subscribed = ConcurrentHashMap.newKeySet();
ManagedChannel channel;
+ /**
+ * Creates a new MessageBrokerGrpcClient with the specified host and port.
+ *
+ * @param asyncExecutor Executor for handling asynchronous operations
+ * @param ot OpenTelemetry instance for monitoring
+ * @param host Remote broker host address
+ * @param port Remote broker port
+ */
public MessageBrokerGrpcClient(AsyncExecutor asyncExecutor, OpenTelemetry ot, String host, int port) {
this(asyncExecutor, ot, ManagedChannelBuilder.forAddress(host, port)
.keepAliveTime(1, TimeUnit.HOURS)
@@ -34,12 +60,25 @@ public MessageBrokerGrpcClient(AsyncExecutor asyncExecutor, OpenTelemetry ot, St
.build());
}
+ /**
+ * Creates a new MessageBrokerGrpcClient with a pre-configured gRPC channel.
+ *
+ * @param asyncExecutor Executor for handling asynchronous operations
+ * @param ot OpenTelemetry instance for monitoring
+ * @param channel Pre-configured gRPC ManagedChannel
+ */
public MessageBrokerGrpcClient(AsyncExecutor asyncExecutor, OpenTelemetry ot, ManagedChannel channel) {
super(asyncExecutor, ot, "grpc_client_broker");
this.channel = channel;
this.asyncStub = MessageBrokerServiceGrpc.newStub(channel);
}
+ /**
+ * Establishes a streaming subscription to events on the specified topic.
+ * Creates a gRPC stream observer to handle incoming events and errors.
+ *
+ * @param topic Topic to subscribe to
+ */
public void subscribeToEvents(String topic) {
SubscriptionRequest request = SubscriptionRequest.newBuilder()
.setTopic(topic)
@@ -60,7 +99,6 @@ public void onNext(EventResponse response) {
@Override
public void onError(Throwable t) {
logger.atError().setCause(t).log("Error in event stream");
- // subscribed.remove(topic);
}
@Override
@@ -72,10 +110,14 @@ public void onCompleted() {
asyncStub.subscribeToEvents(request, responseObserver);
subscribed.add(topic);
- // Send the subscription request
-
}
+ /**
+ * Converts a gRPC event response into an internal Event object.
+ *
+ * @param grpcEvent The gRPC event response to convert
+ * @return Converted Event object
+ */
private Event convertFromGrpcEvent(EventResponse grpcEvent) {
OffsetDateTime time = null;
if (!grpcEvent.getTime().isEmpty()) {
@@ -95,11 +137,24 @@ private Event convertFromGrpcEvent(EventResponse grpcEvent) {
grpcEvent.getTraceparent());
}
+ /**
+ * {@inheritDoc}
+ * Publishes an event to the specified topic.
+ */
@Override
public void publish(String topic, Event message) {
super.publish(topic, message);
}
+ /**
+ * {@inheritDoc}
+ * Subscribes to events on the specified topic, establishing a gRPC stream if
+ * needed.
+ *
+ * @param topic Topic to subscribe to
+ * @param subscriber Subscriber to receive events
+ * @return true if subscription was successful, false otherwise
+ */
@Override
public boolean subscribe(String topic, MessageSubscriber subscriber) {
if (super.subscribe(topic, subscriber)) {
@@ -111,6 +166,14 @@ public boolean subscribe(String topic, MessageSubscriber subscriber) {
return false;
}
+ /**
+ * {@inheritDoc}
+ * Unsubscribes from events and manages gRPC channel cleanup.
+ *
+ * @param topic Topic to unsubscribe from
+ * @param subscriber Subscriber to remove
+ * @return true if unsubscription was successful, false otherwise
+ */
@Override
public boolean unsubscribe(String topic, MessageSubscriber subscriber) {
boolean unsubscribed = super.unsubscribe(topic, subscriber);
diff --git a/library/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcServer.java b/library/src/main/java/com/p14n/postevent/broker/remote/MessageBrokerGrpcServer.java
similarity index 65%
rename from library/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcServer.java
rename to library/src/main/java/com/p14n/postevent/broker/remote/MessageBrokerGrpcServer.java
index 83fe063..5e0beda 100644
--- a/library/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcServer.java
+++ b/library/src/main/java/com/p14n/postevent/broker/remote/MessageBrokerGrpcServer.java
@@ -1,5 +1,8 @@
-package com.p14n.postevent.broker.grpc;
+package com.p14n.postevent.broker.remote;
+import com.p14n.postevent.broker.grpc.EventResponse;
+import com.p14n.postevent.broker.grpc.MessageBrokerServiceGrpc;
+import com.p14n.postevent.broker.grpc.SubscriptionRequest;
import com.p14n.postevent.data.Event;
import com.p14n.postevent.broker.MessageBroker;
import com.p14n.postevent.broker.MessageSubscriber;
@@ -12,14 +15,58 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * A gRPC server implementation that exposes a MessageBroker service for remote
+ * event subscription.
+ * This class extends the auto-generated
+ * MessageBrokerServiceGrpc.MessageBrokerServiceImplBase
+ * to provide streaming event delivery to remote clients.
+ *
+ *
+ * Key features:
+ *
+ * - Streaming event subscription via gRPC
+ * - Thread-safe event delivery
+ * - Automatic cleanup of cancelled subscriptions
+ * - Error handling and propagation to clients
+ * - Structured logging for monitoring and debugging
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * MessageBroker broker = // initialize broker
+ * MessageBrokerGrpcServer server = new MessageBrokerGrpcServer(broker);
+ *
+ * // Add to gRPC server
+ * Server grpcServer = ServerBuilder.forPort(8080)
+ * .addService(server)
+ * .build();
+ * grpcServer.start();
+ * }
+ */
public class MessageBrokerGrpcServer extends MessageBrokerServiceGrpc.MessageBrokerServiceImplBase {
private static final Logger logger = LoggerFactory.getLogger(MessageBrokerGrpcServer.class);
private final MessageBroker messageBroker;
+ /**
+ * Creates a new MessageBrokerGrpcServer with the specified message broker.
+ *
+ * @param messageBroker The message broker that will handle event distribution
+ */
public MessageBrokerGrpcServer(MessageBroker messageBroker) {
this.messageBroker = messageBroker;
}
+ /**
+ * Sends an error response to the client through the gRPC stream.
+ *
+ * @param responseObserver The stream observer to send the error to
+ * @param msg The error message
+ * @param error The underlying error cause
+ */
private void errorResponse(StreamObserver responseObserver, String msg,
Throwable error) {
responseObserver.onError(Status.INTERNAL
@@ -28,6 +75,26 @@ private void errorResponse(StreamObserver responseObserver, Strin
.asRuntimeException());
}
+ /**
+ * Handles subscription requests from clients and establishes a streaming
+ * connection
+ * for event delivery. This method implements the gRPC service endpoint for
+ * event
+ * subscription.
+ *
+ *
+ * The method:
+ *
+ * - Validates the subscription request
+ * - Sets up a message subscriber for the requested topic
+ * - Handles client disconnection and cleanup
+ * - Manages error conditions and client notification
+ *
+ *
+ * @param request The subscription request containing the topic to
+ * subscribe to
+ * @param responseObserver The stream observer for sending events to the client
+ */
@Override
public void subscribeToEvents(SubscriptionRequest request, StreamObserver responseObserver) {
String topic = request.getTopic();
@@ -92,6 +159,13 @@ public void onError(Throwable error) {
}
}
+ /**
+ * Converts an internal Event object to a gRPC EventResponse message.
+ * Handles all optional fields and ensures proper conversion of data types.
+ *
+ * @param event The internal event to convert
+ * @return The gRPC event response message
+ */
private EventResponse convertToGrpcEvent(Event event) {
EventResponse.Builder builder = EventResponse.newBuilder()
.setId(event.id())
diff --git a/library/src/main/java/com/p14n/postevent/catchup/CatchupServer.java b/library/src/main/java/com/p14n/postevent/catchup/CatchupServer.java
index e68a39c..61e1544 100644
--- a/library/src/main/java/com/p14n/postevent/catchup/CatchupServer.java
+++ b/library/src/main/java/com/p14n/postevent/catchup/CatchupServer.java
@@ -15,15 +15,71 @@
import java.util.ArrayList;
import java.util.List;
+/**
+ * Database implementation of the CatchupServerInterface.
+ * Provides functionality to fetch missed events and latest message IDs from a
+ * PostgreSQL database.
+ * Uses a DataSource for connection management and prepared statements for
+ * secure SQL execution.
+ *
+ *
+ * Events are stored in topic-specific tables within the 'postevent' schema.
+ * Each table
+ * contains events with sequential IDs (idn) that can be used for range-based
+ * queries.
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * DataSource dataSource = // initialize your datasource
+ * CatchupServer server = new CatchupServer(dataSource);
+ *
+ * // Fetch up to 100 events from sequence 1000 to 2000
+ * List events = server.fetchEvents(1000L, 2000L, 100, "my-topic");
+ *
+ * // Get the latest event ID
+ * long latestId = server.getLatestMessageId("my-topic");
+ * }
+ */
public class CatchupServer implements CatchupServerInterface {
private static final Logger logger = LoggerFactory.getLogger(CatchupServer.class);
private final DataSource dataSource;
+ /**
+ * Creates a new CatchupServer instance.
+ *
+ * @param dataSource The DataSource to use for database connections.
+ * Should be properly configured with connection pooling and
+ * credentials.
+ */
public CatchupServer(DataSource dataSource) {
this.dataSource = dataSource;
}
+ /**
+ * {@inheritDoc}
+ *
+ *
+ * This implementation fetches events from a PostgreSQL database using a
+ * prepared statement
+ * to prevent SQL injection. Events are retrieved in order by their sequence
+ * number (idn).
+ *
+ *
+ *
+ * The SQL query used is:
+ * {@code SELECT * FROM postevent. WHERE idn BETWEEN (?+1) AND ? ORDER BY idn LIMIT ?}
+ *
+ *
+ * @throws IllegalArgumentException if startAfter is greater than end,
+ * maxResults is less than or equal to 0,
+ * or if topic is null or empty
+ * @throws RuntimeException if there is a database access error
+ */
@Override
public List fetchEvents(long startAfter, long end, int maxResults, String topic) {
if (startAfter > end) {
@@ -66,6 +122,22 @@ public List fetchEvents(long startAfter, long end, int maxResults, String
}
}
+ /**
+ * {@inheritDoc}
+ *
+ *
+ * This implementation queries the PostgreSQL database for the maximum sequence
+ * number (idn)
+ * in the specified topic's table. Returns 0 if no events exist.
+ *
+ *
+ *
+ * The SQL query used is: {@code SELECT MAX(idn) FROM postevent.}
+ *
+ *
+ * @throws IllegalArgumentException if topic is null or empty
+ * @throws RuntimeException if there is a database access error
+ */
@Override
public long getLatestMessageId(String topic) {
if (topic == null || topic.trim().isEmpty()) {
diff --git a/library/src/main/java/com/p14n/postevent/catchup/CatchupServerInterface.java b/library/src/main/java/com/p14n/postevent/catchup/CatchupServerInterface.java
index 9ba5ff5..16b09c9 100644
--- a/library/src/main/java/com/p14n/postevent/catchup/CatchupServerInterface.java
+++ b/library/src/main/java/com/p14n/postevent/catchup/CatchupServerInterface.java
@@ -5,24 +5,54 @@
/**
* Interface for fetching events from a data store.
+ * Provides methods to retrieve events within a specified range and get the
+ * latest message ID
+ * for a given topic. This interface is used to implement catch-up functionality
+ * for clients
+ * that have missed events or need to synchronize their state.
+ *
+ *
+ * Implementations of this interface should:
+ *
+ *
+ * - Handle event retrieval from persistent storage
+ * - Ensure events are returned in order by sequence number (idn)
+ * - Apply appropriate validation for input parameters
+ * - Handle topic-specific event retrieval
+ *
*/
public interface CatchupServerInterface {
/**
* Fetches events from a data store within a specified range.
+ * Returns events where the sequence number (idn) is greater than startAfter
+ * and less than or equal to end, ordered by sequence number.
*
* @param startAfter The ID to start fetching events after (exclusive)
* @param end The maximum ID to fetch events up to (inclusive)
* @param maxResults The maximum number of events to fetch
- * @return A list of events within the specified range
+ * @param topic The topic to fetch events from
+ * @return A list of events within the specified range, ordered by sequence
+ * number
+ * @throws IllegalArgumentException if startAfter is greater than end,
+ * maxResults is less than or equal to 0,
+ * or if topic is null or empty
+ * @throws RuntimeException if there is an error accessing the data
+ * store
*/
List fetchEvents(long startAfter, long end, int maxResults, String topic);
/**
- * Retrieves the latest message ID for a given topic.
+ * Retrieves the latest message ID (sequence number) for a given topic.
+ * This can be used to determine the current position in the event stream
+ * and calculate gaps in sequence numbers.
*
* @param topic The topic to get the latest message ID for
- * @return The latest message ID for the specified topic
+ * @return The latest message ID for the specified topic, or 0 if no messages
+ * exist
+ * @throws IllegalArgumentException if topic is null or empty
+ * @throws RuntimeException if there is an error accessing the data
+ * store
*/
long getLatestMessageId(String topic);
}
diff --git a/library/src/main/java/com/p14n/postevent/catchup/CatchupService.java b/library/src/main/java/com/p14n/postevent/catchup/CatchupService.java
index 254748e..0511d4f 100644
--- a/library/src/main/java/com/p14n/postevent/catchup/CatchupService.java
+++ b/library/src/main/java/com/p14n/postevent/catchup/CatchupService.java
@@ -21,7 +21,29 @@
import javax.sql.DataSource;
/**
- * Service to handle catching up on missed events for topics.
+ * Service responsible for managing event catchup operations and maintaining
+ * high water marks (HWM).
+ * Implements both MessageSubscriber for SystemEvents and OneAtATimeBehaviour
+ * for sequential processing.
+ *
+ *
+ * Key features:
+ *
+ *
+ * - Event catchup processing for missed events
+ * - High water mark (HWM) management
+ * - Gap detection in event sequences
+ * - Sequential processing of catchup operations
+ *
+ *
+ *
+ * The service responds to two types of system events:
+ *
+ *
+ * - {@code SystemEvent.CatchupRequired}: Triggers full catchup
+ * processing
+ * - {@code SystemEvent.FetchLatest}: Fetches only the latest events
+ *
*/
public class CatchupService implements MessageSubscriber, OneAtATimeBehaviour {
private static final Logger LOGGER = LoggerFactory.getLogger(CatchupService.class);
@@ -34,6 +56,13 @@ public class CatchupService implements MessageSubscriber, OneAtATim
final AtomicInteger signals = new AtomicInteger(0);
final AtomicBoolean running = new AtomicBoolean(false);
+ /**
+ * Creates a new CatchupService instance.
+ *
+ * @param ds The DataSource for database operations
+ * @param catchupServer The server interface for fetching catchup events
+ * @param systemEventBroker The broker for publishing system events
+ */
public CatchupService(DataSource ds, CatchupServerInterface catchupServer, SystemEventBroker systemEventBroker) {
this.datasource = ds;
this.catchupServer = catchupServer;
@@ -41,12 +70,12 @@ public CatchupService(DataSource ds, CatchupServerInterface catchupServer, Syste
}
/**
- * Catches up a topic by processing events since their last high water
- * mark.
+ * Performs catchup processing for a specific topic.
+ * Fetches missed events between the current high water mark and the next gap,
+ * writes them to the messages table, and updates the HWM accordingly.
*
- * @param topicName The name of the topic
- * @return The number of events processed
- * @throws SQLException If a database error occurs
+ * @param topicName The topic to process catchup for
+ * @return The number of events processed during catchup
*/
public int catchup(String topicName) {
@@ -100,6 +129,16 @@ public int catchup(String topicName) {
}
}
+ /**
+ * Gets the current high water mark (HWM) for a topic from the contiguous_hwm
+ * table.
+ * If no HWM exists for the topic, initializes it with 0.
+ *
+ * @param connection Database connection
+ * @param topicName Topic to get HWM for
+ * @return Current HWM value
+ * @throws SQLException If database operation fails
+ */
private long getCurrentHwm(Connection connection, String topicName) throws SQLException {
String sql = "SELECT hwm FROM postevent.contiguous_hwm WHERE topic_name = ?";
@@ -118,6 +157,13 @@ private long getCurrentHwm(Connection connection, String topicName) throws SQLEx
}
}
+ /**
+ * Initializes the HWM for a topic with value 0.
+ *
+ * @param connection Database connection
+ * @param topicName Topic to initialize
+ * @throws SQLException If database operation fails
+ */
private void initializeHwm(Connection connection, String topicName) throws SQLException {
String sql = "INSERT INTO postevent.contiguous_hwm (topic_name, hwm) VALUES (?, 0) ON CONFLICT DO NOTHING";
@@ -128,6 +174,16 @@ private void initializeHwm(Connection connection, String topicName) throws SQLEx
getCurrentHwm(connection, topicName);
}
+ /**
+ * Finds the next gap end (lowest idn greater than current HWM) for a topic.
+ *
+ * @param connection Database connection
+ * @param currentHwm Current HWM value
+ * @param topicName Topic to check
+ * @return The ID of the next message after the gap, or currentHwm if no gap
+ * exists
+ * @throws SQLException If database operation fails
+ */
private long findGapEnd(Connection connection, long currentHwm, String topicName) throws SQLException {
String sql = "SELECT MIN(idn) as next_idn FROM postevent.messages WHERE topic=? AND idn > ?";
@@ -146,6 +202,14 @@ private long findGapEnd(Connection connection, long currentHwm, String topicName
}
}
+ /**
+ * Writes a batch of events to the messages table.
+ *
+ * @param connection Database connection
+ * @param events List of events to write
+ * @return Number of events successfully written
+ * @throws SQLException If database operation fails
+ */
private int writeEventsToMessagesTable(Connection connection, List events) throws SQLException {
int count = 0;
String sql = "INSERT INTO postevent.messages (" +
@@ -164,6 +228,16 @@ private int writeEventsToMessagesTable(Connection connection, List events
return count;
}
+ /**
+ * Updates the high water mark for a topic using optimistic locking.
+ * Only updates if the current HWM matches the expected value.
+ *
+ * @param connection Database connection
+ * @param topicName Topic to update
+ * @param currentHwm Expected current HWM value
+ * @param newHwm New HWM value to set
+ * @throws SQLException If database operation fails
+ */
private void updateHwm(Connection connection, String topicName, long currentHwm, long newHwm)
throws SQLException {
String sql = "UPDATE postevent.contiguous_hwm SET hwm = ? WHERE topic_name = ? AND hwm = ?";
@@ -182,6 +256,12 @@ private void updateHwm(Connection connection, String topicName, long currentHwm,
}
}
+ /**
+ * Handles incoming system events, processing them one at a time.
+ * Supports CatchupRequired and FetchLatest events.
+ *
+ * @param message The system event to process
+ */
@Override
public void onMessage(SystemEvent message) {
if (Objects.requireNonNull(message) == SystemEvent.CatchupRequired) {
@@ -191,6 +271,13 @@ public void onMessage(SystemEvent message) {
}
}
+ /**
+ * Fetches and processes the latest event for a topic.
+ * If successful, triggers a catchup to process any intermediate events.
+ *
+ * @param topicName Topic to fetch latest event for
+ * @return Number of events processed (0 or 1)
+ */
private int fetchLatest(String topicName) {
if (topicName == null) {
LOGGER.warn("Topic name is null for fetch latest");
@@ -259,6 +346,15 @@ private static class GapCheckResult {
}
}
+ /**
+ * Processes a result set to check for gaps in message sequence.
+ * Updates the last contiguous IDN as it processes messages.
+ *
+ * @param rs ResultSet containing message IDNs
+ * @param currentHwm Current HWM value to start checking from
+ * @return GapCheckResult containing gap status and last contiguous IDN
+ * @throws SQLException If database operation fails
+ */
private GapCheckResult processMessages(ResultSet rs, long currentHwm) throws SQLException {
long expectedNext = currentHwm + 1;
long lastContiguousIdn = currentHwm;
@@ -278,11 +374,11 @@ private GapCheckResult processMessages(ResultSet rs, long currentHwm) throws SQL
/**
* Checks for gaps in the message sequence and updates the HWM to the last
* contiguous message.
- *
- * @param topicName The name of the topic
- * @param currentHwm The current high water mark to start checking from
+ *
+ * @param topicName Topic to check
+ * @param currentHwm Current HWM value
* @return true if a gap was found, false if no gaps were found
- * @throws SQLException If a database error occurs
+ * @throws SQLException If database operation fails
*/
public boolean hasSequenceGap(String topicName, long currentHwm) throws SQLException {
@@ -295,6 +391,15 @@ public boolean hasSequenceGap(String topicName, long currentHwm) throws SQLExcep
}
}
+ /**
+ * Internal version of hasSequenceGap that accepts an existing connection.
+ *
+ * @param topicName Topic to check
+ * @param currentHwm Current HWM value
+ * @param connection Existing database connection to use
+ * @return true if a gap was found, false if no gaps were found
+ * @throws SQLException If database operation fails
+ */
public boolean hasSequenceGap(String topicName, long currentHwm, Connection connection) throws SQLException {
LOGGER.info("Checking for sequence gaps after HWM {} for topic {}",
new Object[] { currentHwm, topicName });
@@ -320,6 +425,15 @@ public boolean hasSequenceGap(String topicName, long currentHwm, Connection conn
}
}
+ /**
+ * Updates the HWM to the last contiguous message ID for a topic.
+ *
+ * @param topicName Topic to update
+ * @param currentHwm Current HWM value
+ * @param connection Database connection
+ * @return true if HWM was updated, false if no update was needed
+ * @throws SQLException If database operation fails
+ */
public boolean updateHwmToLastContiguous(String topicName, long currentHwm, Connection connection)
throws SQLException {
String sql = "SELECT idn FROM postevent.messages WHERE topic = ? AND idn > ? ORDER BY idn";
diff --git a/library/src/main/java/com/p14n/postevent/catchup/OneAtATimeBehaviour.java b/library/src/main/java/com/p14n/postevent/catchup/OneAtATimeBehaviour.java
index 9313f7f..fedbd8e 100644
--- a/library/src/main/java/com/p14n/postevent/catchup/OneAtATimeBehaviour.java
+++ b/library/src/main/java/com/p14n/postevent/catchup/OneAtATimeBehaviour.java
@@ -3,12 +3,65 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * Interface defining behavior for executing tasks one at a time with signal
+ * tracking.
+ * Provides thread-safe execution of tasks while maintaining a count of pending
+ * signals
+ * and ensuring only one task runs at any given time.
+ *
+ *
+ * This interface is useful for implementing throttling or sequential processing
+ * patterns where multiple requests need to be processed one at a time, with the
+ * ability
+ * to trigger follow-up actions when new signals arrive during processing.
+ *
+ *
+ */
public interface OneAtATimeBehaviour {
+ /**
+ * Returns the atomic counter tracking the number of signals received.
+ * This counter is used to track pending tasks or signals that arrive while
+ * processing is in progress.
+ *
+ * @return AtomicInteger representing the number of pending signals
+ */
public AtomicInteger getSignals();
+ /**
+ * Returns the atomic boolean indicating whether a task is currently running.
+ * This flag is used to ensure mutual exclusion between tasks.
+ *
+ * @return AtomicBoolean representing the running state
+ */
public AtomicBoolean getRunning();
+ /**
+ * Executes tasks one at a time with signal tracking.
+ * This method ensures that only one task runs at a time while maintaining
+ * a count of signals received during execution.
+ *
+ *
+ * The method operates as follows:
+ *
+ *
+ * - Increments the signal counter to track the incoming request
+ * - If a task is already running, returns immediately
+ * - If no task is running, acquires the lock and:
+ *
+ * - Resets the signal counter
+ * - Executes the main task
+ * - If new signals arrived during execution, triggers the next task
+ *
+ *
+ *
+ *
+ * @param todo The main task to execute when acquiring the lock
+ * @param nextTodo The follow-up task to execute if new signals arrive during
+ * processing
+ * @throws RuntimeException if either task throws an exception during execution
+ */
public default void oneAtATime(Runnable todo, Runnable nextTodo) {
getSignals().incrementAndGet();
if (getRunning().get()) {
@@ -26,5 +79,4 @@ public default void oneAtATime(Runnable todo, Runnable nextTodo) {
}
}
}
-
}
diff --git a/library/src/main/java/com/p14n/postevent/catchup/PersistentBroker.java b/library/src/main/java/com/p14n/postevent/catchup/PersistentBroker.java
index 58359cc..00f3a94 100644
--- a/library/src/main/java/com/p14n/postevent/catchup/PersistentBroker.java
+++ b/library/src/main/java/com/p14n/postevent/catchup/PersistentBroker.java
@@ -13,6 +13,44 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * A message broker implementation that provides persistence for events before
+ * forwarding them to a target broker.
+ * This broker ensures reliable event delivery by first persisting events to a
+ * database and then forwarding them
+ * to subscribers only after successful persistence.
+ *
+ *
+ * Key features:
+ *
+ * - Event persistence before delivery
+ * - High-water mark (HWM) tracking for each topic
+ * - Automatic catchup triggering for missed events
+ * - Transaction management for reliable persistence
+ * - Integration with system event notifications
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * DataSource dataSource = // initialize your datasource
+ * MessageBroker targetBroker = // initialize target broker
+ * SystemEventBroker systemEventBroker = // initialize system event broker
+ *
+ * PersistentBroker broker = new PersistentBroker<>(
+ * targetBroker,
+ * dataSource,
+ * systemEventBroker
+ * );
+ *
+ * // Use the broker
+ * broker.publish("orders", event);
+ * }
+ *
+ * @param The type of messages that the target broker handles
+ */
public class PersistentBroker implements MessageBroker, AutoCloseable, MessageSubscriber {
private static final Logger logger = LoggerFactory.getLogger(PersistentBroker.class);
private static final String INSERT_SQL = "INSERT INTO postevent.messages (" + SQL.EXT_COLS +
@@ -21,9 +59,15 @@ public class PersistentBroker implements MessageBroker, AutoC
private final MessageBroker targetBroker;
private final DataSource dataSource;
- // private final String topicName;
private final SystemEventBroker systemEventBroker;
+ /**
+ * Creates a new PersistentBroker instance.
+ *
+ * @param targetBroker The broker to forward events to after persistence
+ * @param dataSource The data source for event persistence
+ * @param systemEventBroker The broker for system event notifications
+ */
public PersistentBroker(MessageBroker targetBroker,
DataSource dataSource,
SystemEventBroker systemEventBroker) {
@@ -32,6 +76,17 @@ public PersistentBroker(MessageBroker targetBroker,
this.systemEventBroker = systemEventBroker;
}
+ /**
+ * Publishes an event to the specified topic with persistence.
+ * The event is first persisted to the database, and only after successful
+ * persistence
+ * is it forwarded to the target broker. If the high-water mark update fails,
+ * a catchup event is triggered.
+ *
+ * @param topic The topic to publish to
+ * @param event The event to publish
+ * @throws RuntimeException if persistence or forwarding fails
+ */
@Override
public void publish(String topic, Event event) {
Connection conn = null;
@@ -74,26 +129,56 @@ public void publish(String topic, Event event) {
}
}
+ /**
+ * Subscribes to events on the specified topic.
+ *
+ * @param topic The topic to subscribe to
+ * @param subscriber The subscriber that will receive events
+ * @return true if subscription was successful, false otherwise
+ */
@Override
public boolean subscribe(String topic, MessageSubscriber subscriber) {
return targetBroker.subscribe(topic, subscriber);
}
+ /**
+ * Unsubscribes from events on the specified topic.
+ *
+ * @param topic The topic to unsubscribe from
+ * @param subscriber The subscriber to remove
+ * @return true if unsubscription was successful, false otherwise
+ */
@Override
public boolean unsubscribe(String topic, MessageSubscriber subscriber) {
return targetBroker.unsubscribe(topic, subscriber);
}
+ /**
+ * Closes the broker and its resources.
+ */
@Override
public void close() {
targetBroker.close();
}
+ /**
+ * Converts an event message.
+ * This implementation returns null as conversion is handled by the target
+ * broker.
+ *
+ * @param m The event to convert
+ * @return null
+ */
@Override
public OutT convert(Event m) {
return null;
}
+ /**
+ * Handles incoming messages by publishing them to the appropriate topic.
+ *
+ * @param message The event message to handle
+ */
@Override
public void onMessage(Event message) {
logger.atDebug().log("Received event for persistence");
diff --git a/library/src/main/java/com/p14n/postevent/catchup/UnprocessedSubmitter.java b/library/src/main/java/com/p14n/postevent/catchup/UnprocessedSubmitter.java
index c5231f0..c70052c 100644
--- a/library/src/main/java/com/p14n/postevent/catchup/UnprocessedSubmitter.java
+++ b/library/src/main/java/com/p14n/postevent/catchup/UnprocessedSubmitter.java
@@ -13,6 +13,31 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * Service responsible for finding and submitting unprocessed events for
+ * processing.
+ * Integrates with {@link UnprocessedEventFinder} to locate events and triggers
+ * catchup processing through the {@link SystemEventBroker}.
+ *
+ *
+ * The submitter periodically checks for unprocessed events and initiates
+ * processing for their respective topics. It ensures that no events
+ * are left unprocessed.
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * DataSource dataSource = // initialize your datasource
+ * SystemEventBroker broker = // initialize your broker
+ * UnprocessedSubmitter submitter = new UnprocessedSubmitter(dataSource, broker);
+ *
+ * // Check for unprocessed events and submit them
+ * submitter.submitUnprocessedEvents();
+ * }
+ */
public class UnprocessedSubmitter implements MessageSubscriber, OneAtATimeBehaviour {
private final MessageBroker targetBroker;
@@ -23,6 +48,15 @@ public class UnprocessedSubmitter implements MessageSubscriber, One
final AtomicInteger signals = new AtomicInteger(0);
final AtomicBoolean running = new AtomicBoolean(false);
+ /**
+ * Creates a new UnprocessedSubmitter instance.
+ *
+ * @param systemEventBroker Broker for publishing system events
+ * @param ds Data source for database connections
+ * @param unprocessedEventFinder Finder for unprocessed events
+ * @param targetBroker Broker for event processing
+ * @param batchSize Batch size for event processing
+ */
public UnprocessedSubmitter(SystemEventBroker systemEventBroker, DataSource ds,
UnprocessedEventFinder unprocessedEventFinder,
MessageBroker targetBroker, int batchSize) {
@@ -33,6 +67,14 @@ public UnprocessedSubmitter(SystemEventBroker systemEventBroker, DataSource ds,
this.systemEventBroker = systemEventBroker;
}
+ /**
+ * Finds and submits unprocessed events for catchup processing.
+ * Queries the database for unprocessed events and triggers catchup
+ * processing for each unique topic found.
+ *
+ * @return Number of topics submitted for processing
+ * @throws SQLException if a database error occurs
+ */
private void resubmit() {
try (Connection c = ds.getConnection()) {
var events = unprocessedEventFinder.findUnprocessedEventsWithLimit(c, batchSize);
diff --git a/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcClient.java b/library/src/main/java/com/p14n/postevent/catchup/remote/CatchupGrpcClient.java
similarity index 58%
rename from library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcClient.java
rename to library/src/main/java/com/p14n/postevent/catchup/remote/CatchupGrpcClient.java
index adfdf10..6a8472b 100644
--- a/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcClient.java
+++ b/library/src/main/java/com/p14n/postevent/catchup/remote/CatchupGrpcClient.java
@@ -1,11 +1,11 @@
-package com.p14n.postevent.catchup.grpc;
+package com.p14n.postevent.catchup.remote;
import com.p14n.postevent.catchup.CatchupServerInterface;
+import com.p14n.postevent.catchup.grpc.*;
import com.p14n.postevent.data.Event;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
-
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
@@ -13,23 +13,79 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * gRPC client implementation of the CatchupServerInterface.
+ * Provides functionality to fetch missed events and latest message IDs from a
+ * remote gRPC server.
+ * Implements AutoCloseable to properly manage the gRPC channel lifecycle.
+ *
+ *
+ * Key features:
+ *
+ *
+ * - Remote event fetching via gRPC
+ * - Latest message ID retrieval
+ * - Automatic resource cleanup
+ * - Configurable connection settings
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * try (CatchupGrpcClient client = new CatchupGrpcClient("localhost", 50051)) {
+ * List events = client.fetchEvents(100L, 200L, 50, "my-topic");
+ * long latestId = client.getLatestMessageId("my-topic");
+ * }
+ * }
+ */
public class CatchupGrpcClient implements CatchupServerInterface, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(CatchupGrpcClient.class);
private final ManagedChannel channel;
private final CatchupServiceGrpc.CatchupServiceBlockingStub blockingStub;
+ /**
+ * Creates a new CatchupGrpcClient with the specified host and port.
+ * Initializes a plain text connection (no TLS) to the gRPC server.
+ *
+ * @param host The hostname of the gRPC server
+ * @param port The port number of the gRPC server
+ */
public CatchupGrpcClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port)
.usePlaintext()
.build());
}
+ /**
+ * Creates a new CatchupGrpcClient with a pre-configured channel.
+ * Useful for custom channel configuration or testing.
+ *
+ * @param channel The pre-configured gRPC managed channel
+ */
public CatchupGrpcClient(ManagedChannel channel) {
this.channel = channel;
this.blockingStub = CatchupServiceGrpc.newBlockingStub(channel);
}
+ /**
+ * {@inheritDoc}
+ *
+ *
+ * This implementation makes a blocking gRPC call to fetch events from the
+ * remote server.
+ * Events are converted from the gRPC protocol format to the internal Event
+ * format.
+ *
+ *
+ * @throws IllegalArgumentException if startAfter is greater than end,
+ * maxResults is less than or equal to 0,
+ * or if topic is null or empty
+ * @throws RuntimeException if the gRPC call fails or if there's an
+ * error converting the response
+ */
@Override
public List fetchEvents(long startAfter, long end, int maxResults, String topic) {
logger.atInfo()
@@ -63,6 +119,14 @@ public List fetchEvents(long startAfter, long end, int maxResults, String
return events;
}
+ /**
+ * Converts a gRPC event message to the internal Event format.
+ * If the time field is empty in the gRPC event, the current time is used.
+ *
+ * @param grpcEvent The gRPC event message to convert
+ * @param topic The topic associated with the event
+ * @return A new Event instance with the converted data
+ */
private Event convertFromGrpcEvent(com.p14n.postevent.catchup.grpc.Event grpcEvent, String topic) {
OffsetDateTime time = null;
if (!grpcEvent.getTime().isEmpty()) {
@@ -85,6 +149,17 @@ private Event convertFromGrpcEvent(com.p14n.postevent.catchup.grpc.Event grpcEve
grpcEvent.getTraceparent());
}
+ /**
+ * {@inheritDoc}
+ *
+ *
+ * This implementation makes a blocking gRPC call to fetch the latest message ID
+ * from the remote server.
+ *
+ *
+ * @throws IllegalArgumentException if topic is null or empty
+ * @throws RuntimeException if the gRPC call fails
+ */
@Override
public long getLatestMessageId(String topic) {
logger.atInfo()
@@ -105,11 +180,16 @@ public long getLatestMessageId(String topic) {
}
}
+ /**
+ * Closes the gRPC channel with a 5-second timeout.
+ * This method should be called when the client is no longer needed to free up
+ * resources.
+ */
@Override
public void close() {
try {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
- } catch (Exception e) {
+ } catch (InterruptedException e) {
logger.error("Failed to close", e);
}
}
diff --git a/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcServer.java b/library/src/main/java/com/p14n/postevent/catchup/remote/CatchupGrpcServer.java
similarity index 61%
rename from library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcServer.java
rename to library/src/main/java/com/p14n/postevent/catchup/remote/CatchupGrpcServer.java
index cf66fe6..cb0cacb 100644
--- a/library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcServer.java
+++ b/library/src/main/java/com/p14n/postevent/catchup/remote/CatchupGrpcServer.java
@@ -1,6 +1,7 @@
-package com.p14n.postevent.catchup.grpc;
+package com.p14n.postevent.catchup.remote;
import com.p14n.postevent.catchup.CatchupServerInterface;
+import com.p14n.postevent.catchup.grpc.*;
import com.p14n.postevent.data.Event;
import io.grpc.Server;
import io.grpc.ServerBuilder;
@@ -13,12 +14,46 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+/**
+ * gRPC server implementation for the catchup service.
+ * Provides a gRPC interface for clients to fetch missed events and get latest
+ * message IDs.
+ *
+ *
+ * Key features:
+ *
+ *
+ * - Event catchup functionality via gRPC
+ * - Latest message ID retrieval
+ * - Graceful shutdown handling
+ * - Automatic resource cleanup
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * CatchupServerInterface catchupServer = new CatchupServer(dataSource);
+ * CatchupGrpcServer server = new CatchupGrpcServer(8080, catchupServer);
+ * server.start();
+ * // ... server is running ...
+ * server.stop();
+ * }
+ */
public class CatchupGrpcServer {
private static final Logger logger = LoggerFactory.getLogger(CatchupGrpcServer.class);
private final int port;
private final Server server;
+ /**
+ * Creates a new CatchupGrpcServer instance.
+ *
+ * @param port The port number to listen on
+ * @param catchupServer The backend server implementation for handling catchup
+ * operations
+ */
public CatchupGrpcServer(int port, CatchupServerInterface catchupServer) {
this.port = port;
this.server = ServerBuilder.forPort(port)
@@ -26,6 +61,12 @@ public CatchupGrpcServer(int port, CatchupServerInterface catchupServer) {
.build();
}
+ /**
+ * Starts the gRPC server and registers a shutdown hook.
+ * The server will listen for incoming connections on the configured port.
+ *
+ * @throws IOException If the server fails to start
+ */
public void start() throws IOException {
server.start();
logger.atInfo().log("Server started, listening on port {}", port);
@@ -40,25 +81,53 @@ public void start() throws IOException {
}));
}
+ /**
+ * Initiates an orderly shutdown of the server.
+ * Waits for ongoing requests to complete up to a timeout of 30 seconds.
+ *
+ * @throws InterruptedException If the shutdown process is interrupted
+ */
public void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
+ /**
+ * Blocks until the server is shutdown.
+ * Useful for keeping the server running in the main thread.
+ *
+ * @throws InterruptedException If the blocking is interrupted
+ */
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
+ /**
+ * Implementation of the gRPC catchup service.
+ * Handles incoming gRPC requests by delegating to the backend
+ * CatchupServerInterface.
+ */
public static class CatchupServiceImpl extends CatchupServiceGrpc.CatchupServiceImplBase {
private final CatchupServerInterface catchupServer;
+ /**
+ * Creates a new CatchupServiceImpl instance.
+ *
+ * @param catchupServer The backend server interface for catchup operations
+ */
public CatchupServiceImpl(CatchupServerInterface catchupServer) {
this.catchupServer = catchupServer;
}
+ /**
+ * Handles gRPC requests for getting the latest message ID for a topic.
+ *
+ * @param request The request containing the topic name
+ * @param responseObserver The observer for sending the response
+ */
@Override
public void getLatestMessageId(TopicRequest request, StreamObserver responseObserver) {
try {
@@ -76,6 +145,12 @@ public void getLatestMessageId(TopicRequest request, StreamObserver responseObserver) {
try {
@@ -101,6 +176,12 @@ public void fetchEvents(FetchEventsRequest request, StreamObserver
+ * This class holds all necessary configuration parameters for connecting
+ * to the database and processing events.
+ *
+ *
+ * @param affinity The affinity identifier for this instance
+ * @param topics Set of topics to handle
+ * @param dbHost Database host address
+ * @param dbPort Database port number
+ * @param dbUser Database username
+ * @param dbPassword Database password
+ * @param dbName Database name
+ * @param pollInterval Poll interval in milliseconds
+ * @param overrideProps Additional database properties for overriding debezium
+ * defaults
+ */
public record ConfigData(String affinity,
Set topics, // renamed from name
String dbHost,
@@ -12,23 +32,48 @@ public record ConfigData(String affinity,
String dbName,
int pollInterval,
Properties overrideProps) implements PostEventConfig {
+
+ /**
+ * Creates a new ConfigData instance with the specified parameters.
+ *
+ * @param affinity The affinity identifier for this instance
+ * @param topics Set of topics to handle
+ * @param dbHost Database host address
+ * @param dbPort Database port number
+ * @param dbUser Database username
+ * @param dbPassword Database password
+ * @param dbName Database name
+ * @param pollInterval Poll interval in milliseconds
+ */
public ConfigData(String affinity,
- Set topics, // renamed from name
- String dbHost,
- int dbPort,
- String dbUser,
- String dbPassword,
- String dbName,
- int pollInterval) {
- this(affinity, topics, dbHost, dbPort, dbUser, dbPassword, dbName, pollInterval,null);
+ Set topics,
+ String dbHost,
+ int dbPort,
+ String dbUser,
+ String dbPassword,
+ String dbName,
+ int pollInterval) {
+ this(affinity, topics, dbHost, dbPort, dbUser, dbPassword, dbName, pollInterval, null);
}
+
+ /**
+ * Creates a new ConfigData instance with the specified parameters.
+ *
+ * @param affinity The affinity identifier for this instance
+ * @param topics Set of topics to handle
+ * @param dbHost Database host address
+ * @param dbPort Database port number
+ * @param dbUser Database username
+ * @param dbPassword Database password
+ * @param dbName Database name
+ */
public ConfigData(String affinity,
- Set topics, // renamed from name
- String dbHost,
- int dbPort,
- String dbUser,
- String dbPassword,
- String dbName) {
- this(affinity, topics, dbHost, dbPort, dbUser, dbPassword, dbName, 500,null);
+ Set topics, // renamed from name
+ String dbHost,
+ int dbPort,
+ String dbUser,
+ String dbPassword,
+ String dbName) {
+ this(affinity, topics, dbHost, dbPort, dbUser, dbPassword, dbName, 500, null);
}
}
diff --git a/library/src/main/java/com/p14n/postevent/data/Event.java b/library/src/main/java/com/p14n/postevent/data/Event.java
index 415602b..89c00e8 100644
--- a/library/src/main/java/com/p14n/postevent/data/Event.java
+++ b/library/src/main/java/com/p14n/postevent/data/Event.java
@@ -4,22 +4,74 @@
/**
* Record representing an event to be published to the database.
+ * This immutable class encapsulates all the information needed for event
+ * processing
+ * and persistence.
+ *
+ *
+ * An event consists of:
+ *
+ *
+ * - Mandatory fields: id, source, type
+ * - Optional metadata: datacontenttype, dataschema, subject, traceparent
+ * - Payload: data as byte array
+ * - System fields: time, idn (sequence number), topic
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * Event event = Event.create(
+ * "event-123",
+ * "order-service",
+ * "order.created",
+ * "application/json",
+ * "order-schema-v1",
+ * "orders",
+ * jsonData.getBytes(),
+ * null);
+ * }
+ *
+ * @param id Unique identifier for the event
+ * @param source System that generated the event
+ * @param type Event type identifier
+ * @param datacontenttype MIME type of the data payload
+ * @param dataschema Schema identifier for the data payload
+ * @param subject Subject or category of the event
+ * @param data Event payload as byte array
+ * @param time Timestamp when the event occurred
+ * @param idn Sequential identifier number
+ * @param topic Topic or stream identifier
+ * @param traceparent OpenTelemetry trace parent identifier
*/
-public record Event(String id,
- String source,
- String type,
- String datacontenttype,
- String dataschema,
- String subject,
- byte[] data,
- Instant time,
- Long idn,
- String topic,
- String traceparent) implements Traceable {
+public record Event(
+ String id,
+ String source,
+ String type,
+ String datacontenttype,
+ String dataschema,
+ String subject,
+ byte[] data,
+ Instant time,
+ Long idn,
+ String topic,
+ String traceparent) implements Traceable {
/**
* Creates a new Event instance with validation of required fields.
+ * This is a convenience method that sets time, idn, and topic to null.
*
+ * @param id Unique identifier for the event
+ * @param source System that generated the event
+ * @param type Event type identifier
+ * @param datacontenttype MIME type of the data payload
+ * @param dataschema Schema identifier for the data payload
+ * @param subject Subject or category of the event
+ * @param data Event payload as byte array
+ * @param traceparent OpenTelemetry trace parent identifier
+ * @return A new Event instance
* @throws IllegalArgumentException if any required field is null or empty
*/
public static Event create(String id, String source, String type, String datacontenttype, String dataschema,
@@ -27,6 +79,24 @@ public static Event create(String id, String source, String type, String datacon
return create(id, source, type, datacontenttype, dataschema, subject, data, null, null, null, traceparent);
}
+ /**
+ * Creates a new Event instance with validation of required fields and all
+ * optional fields.
+ *
+ * @param id Unique identifier for the event
+ * @param source System that generated the event
+ * @param type Event type identifier
+ * @param datacontenttype MIME type of the data payload
+ * @param dataschema Schema identifier for the data payload
+ * @param subject Subject or category of the event
+ * @param data Event payload as byte array
+ * @param time Timestamp when the event occurred
+ * @param idn Sequential identifier number
+ * @param topic Topic or stream identifier
+ * @param traceparent OpenTelemetry trace parent identifier
+ * @return A new Event instance
+ * @throws IllegalArgumentException if id, source, or type is null or empty
+ */
public static Event create(String id, String source, String type, String datacontenttype, String dataschema,
String subject, byte[] data, Instant time, Long idn, String topic, String traceparent) {
if (id == null || id.trim().isEmpty()) {
diff --git a/library/src/main/java/com/p14n/postevent/data/PostEventConfig.java b/library/src/main/java/com/p14n/postevent/data/PostEventConfig.java
index d94b898..93857a5 100644
--- a/library/src/main/java/com/p14n/postevent/data/PostEventConfig.java
+++ b/library/src/main/java/com/p14n/postevent/data/PostEventConfig.java
@@ -3,30 +3,93 @@
import java.util.Properties;
import java.util.Set;
+/**
+ * Configuration interface for PostEvent system settings.
+ * Defines the essential configuration parameters needed for event processing
+ * and database connectivity.
+ */
public interface PostEventConfig {
- public String affinity();
+ /**
+ * Gets the affinity identifier for this instance.
+ * The affinity is used to identify related event processors.
+ *
+ * @return The affinity string identifier
+ */
+ String affinity();
- public Set topics(); // Changed from single topic to set
+ /**
+ * Gets the set of topics this instance will handle.
+ *
+ * @return Set of topic names
+ */
+ Set topics();
- public String dbHost();
+ /**
+ * Gets the database host address.
+ *
+ * @return The database host address
+ */
+ String dbHost();
- public int dbPort();
+ /**
+ * Gets the database port number.
+ *
+ * @return The database port number
+ */
+ int dbPort();
- public String dbUser();
+ /**
+ * Gets the database username.
+ *
+ * @return The database username
+ */
+ String dbUser();
- public String dbPassword();
+ /**
+ * Gets the database password.
+ *
+ * @return The database password
+ */
+ String dbPassword();
- public String dbName();
+ /**
+ * Gets the database name.
+ *
+ * @return The database name
+ */
+ String dbName();
- public Properties overrideProps();
+ /**
+ * Gets additional database properties for overriding defaults.
+ *
+ * @return Properties object containing override values
+ */
+ Properties overrideProps();
- public default int startupTimeoutSeconds() {
+ /**
+ * Gets the startup timeout in seconds.
+ * Default is 30 seconds.
+ *
+ * @return The startup timeout in seconds
+ */
+ default int startupTimeoutSeconds() {
return 30;
}
- public default String jdbcUrl() {
+ /**
+ * Constructs the JDBC URL for database connection.
+ *
+ * @return The complete JDBC URL string
+ */
+ default String jdbcUrl() {
return String.format("jdbc:postgresql://%s:%d/%s",
dbHost(), dbPort(), dbName());
}
- public int pollInterval();
+
+ /**
+ * Gets the poll interval for checking new events.
+ *
+ * @return The poll interval in milliseconds
+ */
+ int pollInterval();
}
diff --git a/library/src/main/java/com/p14n/postevent/data/Traceable.java b/library/src/main/java/com/p14n/postevent/data/Traceable.java
index 8a0f0e6..c5d893d 100644
--- a/library/src/main/java/com/p14n/postevent/data/Traceable.java
+++ b/library/src/main/java/com/p14n/postevent/data/Traceable.java
@@ -1,11 +1,54 @@
package com.p14n.postevent.data;
+/**
+ * Interface for objects that can be traced and identified in a distributed
+ * system.
+ * Provides essential methods for tracking and correlating events across
+ * services.
+ *
+ *
+ * Key attributes:
+ *
+ *
+ * - {@code id}: Unique identifier for the traceable object
+ * - {@code topic}: Message routing or categorization identifier
+ * - {@code subject}: Business domain or entity identifier
+ * - {@code traceparent}: OpenTelemetry trace context identifier
+ *
+ *
+ *
+ * This interface is typically implemented by event and message classes to
+ * support
+ * distributed tracing and message correlation.
+ *
+ */
public interface Traceable {
- public String id();
- public String topic();
+ /**
+ * Returns the unique identifier of the traceable object.
+ *
+ * @return the unique identifier string
+ */
+ String id();
- public String subject();
+ /**
+ * Returns the topic or stream identifier for message routing.
+ *
+ * @return the topic string
+ */
+ String topic();
- public String traceparent();
+ /**
+ * Returns the business domain subject or entity identifier.
+ *
+ * @return the subject string
+ */
+ String subject();
+
+ /**
+ * Returns the OpenTelemetry trace parent identifier for distributed tracing.
+ *
+ * @return the trace parent string
+ */
+ String traceparent();
}
diff --git a/library/src/main/java/com/p14n/postevent/data/UnprocessedEventFinder.java b/library/src/main/java/com/p14n/postevent/data/UnprocessedEventFinder.java
index 61304ec..697fc69 100644
--- a/library/src/main/java/com/p14n/postevent/data/UnprocessedEventFinder.java
+++ b/library/src/main/java/com/p14n/postevent/data/UnprocessedEventFinder.java
@@ -11,18 +11,46 @@
import org.slf4j.LoggerFactory;
/**
- * Responsible for finding all unprocessed events in the messages table.
- * Unprocessed events have a status of 'u'.
+ * Responsible for finding unprocessed events in the messages table.
+ * Provides methods to query and retrieve events with a status of 'u'
+ * (unprocessed)
+ * using various filtering criteria.
+ *
+ *
+ * The finder uses prepared statements for secure SQL execution and includes
+ * logging for operational visibility. All database operations require an
+ * external
+ * {@link Connection} to be provided.
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * UnprocessedEventFinder finder = new UnprocessedEventFinder();
+ * try (Connection conn = dataSource.getConnection()) {
+ * List events = finder.findUnprocessedEvents(conn);
+ * // Process the events...
+ * }
+ * }
*/
public class UnprocessedEventFinder {
private static final Logger logger = LoggerFactory.getLogger(UnprocessedEventFinder.class);
+ /**
+ * Creates a new UnprocessedEventFinder instance.
+ */
+ public UnprocessedEventFinder() {
+ }
+
/**
* Finds all unprocessed events in the messages table.
+ * Events are ordered by their sequence number (idn) in ascending order.
*
* @param connection Database connection to use
- * @return List of unprocessed events ordered by creation time (ascending)
+ * @return List of unprocessed events
* @throws SQLException if a database error occurs
*/
public List findUnprocessedEvents(Connection connection) throws SQLException {
@@ -44,11 +72,11 @@ public List findUnprocessedEvents(Connection connection) throws SQLExcept
/**
* Finds unprocessed events for a specific subject.
+ * Events are ordered by creation time in ascending order.
*
* @param connection Database connection to use
* @param subject Subject to filter by
- * @return List of unprocessed events for the subject ordered by creation time
- * (ascending)
+ * @return List of unprocessed events for the subject
* @throws SQLException if a database error occurs
*/
public List findUnprocessedEventsBySubject(Connection connection, String subject) throws SQLException {
@@ -72,15 +100,16 @@ public List findUnprocessedEventsBySubject(Connection connection, String
/**
* Finds a limited number of unprocessed events.
+ * Events are ordered by creation time in ascending order.
*
* @param connection Database connection to use
* @param limit Maximum number of events to return
- * @return List of unprocessed events ordered by creation time (ascending),
- * limited to the specified count
- * @throws SQLException if a database error occurs
+ * @return List of unprocessed events, limited to the specified count
+ * @throws SQLException if a database error occurs
+ * @throws IllegalArgumentException if limit is less than or equal to 0
*/
public List findUnprocessedEventsWithLimit(Connection connection, int limit) throws SQLException {
- logger.atInfo().log("Finding up to " + limit + " unprocessed events");
+ logger.atInfo().log("Finding up to {} unprocessed events", limit);
String sql = "SELECT id, source, type, datacontenttype, dataschema, subject, data, " +
"time, idn, topic, traceparent FROM postevent.messages " +
@@ -99,7 +128,6 @@ public List findUnprocessedEventsWithLimit(Connection connection, int lim
}
}
-
/**
* Processes a result set and converts each row to an Event object.
*
@@ -167,11 +195,11 @@ public int countUnprocessedEvents(Connection connection) throws SQLException {
if (rs.next()) {
int count = rs.getInt(1);
- logger.atInfo().log("Found " + count + " unprocessed events");
+ logger.atInfo().log("Found {} unprocessed events", count);
return count;
}
return 0;
}
}
-}
\ No newline at end of file
+}
diff --git a/library/src/main/java/com/p14n/postevent/db/DatabaseSetup.java b/library/src/main/java/com/p14n/postevent/db/DatabaseSetup.java
index cf49bd6..d74e662 100644
--- a/library/src/main/java/com/p14n/postevent/db/DatabaseSetup.java
+++ b/library/src/main/java/com/p14n/postevent/db/DatabaseSetup.java
@@ -15,25 +15,75 @@
import java.util.ArrayList;
import java.util.Set;
+/**
+ * Handles PostgreSQL database setup and initialization for the PostEvent
+ * system.
+ * This class manages schema creation, table initialization, and replication
+ * slot cleanup.
+ *
+ *
+ * Key responsibilities include:
+ *
+ * - Creating the PostEvent schema
+ * - Initializing topic-specific tables
+ * - Setting up message tracking tables
+ * - Managing replication slots
+ * - Providing connection pool setup
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * PostEventConfig config = // initialize configuration
+ * DatabaseSetup setup = new DatabaseSetup(config);
+ *
+ * // Setup all required tables for given topics
+ * setup.setupAll(Set.of("orders", "inventory"));
+ *
+ * // Create connection pool
+ * DataSource pool = DatabaseSetup.createPool(config);
+ * }
+ */
public class DatabaseSetup {
- // SELECT 'select pg_drop_replication_slot(''' || slot_name ||''');' FROM
- // pg_replication_slots where active=false and slot_name like 'postevent%';
private static final Logger logger = LoggerFactory.getLogger(DatabaseSetup.class);
private final String jdbcUrl;
private final String username;
private final String password;
+ /**
+ * Creates a new DatabaseSetup instance using configuration from
+ * PostEventConfig.
+ *
+ * @param cfg Configuration containing database connection details
+ */
public DatabaseSetup(PostEventConfig cfg) {
this(cfg.jdbcUrl(), cfg.dbUser(), cfg.dbPassword());
}
+ /**
+ * Creates a new DatabaseSetup instance with explicit connection parameters.
+ *
+ * @param jdbcUrl PostgreSQL JDBC URL
+ * @param username Database username
+ * @param password Database password
+ */
public DatabaseSetup(String jdbcUrl, String username, String password) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
}
+ /**
+ * Performs complete database setup for multiple topics.
+ * Creates schema, message tables, and topic-specific tables.
+ *
+ * @param topics Set of topic names to initialize
+ * @return this instance for method chaining
+ * @throws RuntimeException if database operations fail
+ */
public DatabaseSetup setupAll(Set topics) {
createSchemaIfNotExists();
createMessagesTableIfNotExists();
@@ -43,10 +93,23 @@ public DatabaseSetup setupAll(Set topics) {
return this;
}
+ /**
+ * Performs complete database setup for a single topic.
+ *
+ * @param topic Topic name to initialize
+ * @return this instance for method chaining
+ * @throws RuntimeException if database operations fail
+ */
public DatabaseSetup setupAll(String topic) {
return setupAll(Set.of(topic));
}
+ /**
+ * Removes inactive replication slots with names starting with 'postevent'.
+ * This cleanup prevents accumulation of unused slots.
+ *
+ * @return this instance for method chaining
+ */
public DatabaseSetup clearOldSlots() {
try (Connection conn = getConnection();
Statement stmt = conn.createStatement()) {
@@ -73,6 +136,12 @@ public DatabaseSetup clearOldSlots() {
return this;
}
+ /**
+ * Creates the PostEvent schema if it doesn't exist.
+ *
+ * @return this instance for method chaining
+ * @throws RuntimeException if schema creation fails
+ */
public DatabaseSetup createSchemaIfNotExists() {
try (Connection conn = getConnection();
Statement stmt = conn.createStatement()) {
@@ -88,6 +157,15 @@ public DatabaseSetup createSchemaIfNotExists() {
return this;
}
+ /**
+ * Creates a topic-specific table if it doesn't exist.
+ * The table stores event data with unique constraints on ID and source.
+ *
+ * @param topic Name of the topic table to create
+ * @return this instance for method chaining
+ * @throws IllegalArgumentException if topic name is invalid
+ * @throws RuntimeException if table creation fails
+ */
public DatabaseSetup createTableIfNotExists(String topic) {
if (topic == null || topic.trim().isEmpty()) {
throw new IllegalArgumentException("Topic name cannot be null or empty");
@@ -124,6 +202,14 @@ traceparent VARCHAR(55),
return this;
}
+ /**
+ * Creates the messages table if it doesn't exist.
+ * This table stores all events with their processing status and supports
+ * efficient querying.
+ *
+ * @return this instance for method chaining
+ * @throws RuntimeException if table creation fails
+ */
public DatabaseSetup createMessagesTableIfNotExists() {
try (Connection conn = getConnection();
Statement stmt = conn.createStatement()) {
@@ -147,12 +233,12 @@ PRIMARY KEY (topic,idn)
stmt.execute(sql);
- // Existing index for hasUnprocessedPriorEvents
+ // Index for hasUnprocessedPriorEvents query
stmt.execute("""
CREATE INDEX IF NOT EXISTS idx_messages_subject_topic_idn_status
ON postevent.messages (subject, topic, idn, status)""");
- // New index for findUnprocessedEvents query
+ // Index for findUnprocessedEvents query
stmt.execute("""
CREATE INDEX IF NOT EXISTS idx_messages_status_time
ON postevent.messages (status, time)""");
@@ -166,6 +252,14 @@ PRIMARY KEY (topic,idn)
return this;
}
+ /**
+ * Creates the contiguous high-water mark table if it doesn't exist.
+ * This table tracks the latest continuously processed message ID for each
+ * topic.
+ *
+ * @return this instance for method chaining
+ * @throws RuntimeException if table creation fails
+ */
public DatabaseSetup createContiguousHwmTableIfNotExists() {
try (Connection conn = getConnection();
Statement stmt = conn.createStatement()) {
@@ -186,10 +280,22 @@ topic_name VARCHAR(255) PRIMARY KEY,
return this;
}
+ /**
+ * Creates a database connection using the configured credentials.
+ *
+ * @return A new database Connection
+ * @throws SQLException if connection fails
+ */
private Connection getConnection() throws SQLException {
return DriverManager.getConnection(jdbcUrl, username, password);
}
+ /**
+ * Creates and configures a connection pool using HikariCP.
+ *
+ * @param cfg Configuration containing database connection details
+ * @return Configured DataSource
+ */
public static DataSource createPool(PostEventConfig cfg) {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(cfg.jdbcUrl());
diff --git a/library/src/main/java/com/p14n/postevent/db/SQL.java b/library/src/main/java/com/p14n/postevent/db/SQL.java
index c55a401..cc8d7f9 100644
--- a/library/src/main/java/com/p14n/postevent/db/SQL.java
+++ b/library/src/main/java/com/p14n/postevent/db/SQL.java
@@ -1,16 +1,65 @@
package com.p14n.postevent.db;
import com.p14n.postevent.data.Event;
-
import java.sql.*;
+/**
+ * Utility class providing SQL-related constants and helper methods for database
+ * operations.
+ * This class handles common database operations such as mapping result sets to
+ * events,
+ * setting prepared statement parameters, and managing database resources.
+ *
+ *
+ * The class defines two sets of columns:
+ *
+ *
+ * - Core columns: Basic event properties (id, source, type, etc.)
+ * - Extended columns: Core columns plus additional fields (time, idn,
+ * topic)
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * // Using core columns in a SELECT statement
+ * String query = "SELECT " + SQL.CORE_COLS + " FROM events";
+ *
+ * // Setting event data on a prepared statement
+ * PreparedStatement stmt = connection.prepareStatement(
+ * "INSERT INTO events (" + SQL.CORE_COLS + ") VALUES (" + SQL.CORE_PH + ")");
+ * SQL.setEventOnStatement(stmt, event);
+ * }
+ */
public class SQL {
+ /** Private constructor to prevent instantiation of utility class */
+ private SQL() {
+ }
+
+ /** Column names for core event properties */
public static String CORE_COLS = "id, source, type, datacontenttype, dataschema, subject, data, traceparent";
+
+ /** Column names including both core and extended properties */
public static String EXT_COLS = CORE_COLS + ", time, idn, topic";
+
+ /** Placeholder parameters for core columns in prepared statements */
public static String CORE_PH = "?,?,?,?,?,?,?,?";
+
+ /** Placeholder parameters for extended columns in prepared statements */
public static String EXT_PH = CORE_PH + ",?,?,?";
+ /**
+ * Creates an Event object from a ResultSet row.
+ * Maps database columns to Event properties using the current row data.
+ *
+ * @param rs ResultSet positioned at the row to map
+ * @param topic Topic name for the event (if not present in ResultSet)
+ * @return New Event instance populated with ResultSet data
+ * @throws SQLException if any database access error occurs
+ */
public static Event eventFromResultSet(ResultSet rs, String topic) throws SQLException {
return Event.create(
rs.getString("id"),
@@ -26,6 +75,14 @@ public static Event eventFromResultSet(ResultSet rs, String topic) throws SQLExc
rs.getString("traceparent"));
}
+ /**
+ * Sets core event properties on a PreparedStatement.
+ * Maps Event properties to the first 8 parameters of the statement.
+ *
+ * @param stmt PreparedStatement to set parameters on
+ * @param event Event containing the data to set
+ * @throws SQLException if any database access error occurs
+ */
public static void setEventOnStatement(PreparedStatement stmt, Event event) throws SQLException {
stmt.setString(1, event.id());
stmt.setString(2, event.source());
@@ -37,12 +94,26 @@ public static void setEventOnStatement(PreparedStatement stmt, Event event) thro
stmt.setString(8, event.traceparent());
}
+ /**
+ * Sets extended event properties (time, IDN, and topic) on a PreparedStatement.
+ * Maps these properties to parameters 9-11 of the statement.
+ *
+ * @param stmt PreparedStatement to set parameters on
+ * @param event Event containing the data to set
+ * @throws SQLException if any database access error occurs
+ */
public static void setTimeIDNAndTopic(PreparedStatement stmt, Event event) throws SQLException {
stmt.setTimestamp(9, Timestamp.from(event.time()));
stmt.setLong(10, event.idn());
stmt.setString(11, event.topic());
}
+ /**
+ * Safely closes a database connection.
+ * Handles null connections and suppresses any exceptions during closure.
+ *
+ * @param conn Connection to close (may be null)
+ */
public static void closeConnection(Connection conn) {
if (conn != null) {
try {
@@ -55,6 +126,13 @@ public static void closeConnection(Connection conn) {
}
}
+ /**
+ * Handles SQLException by attempting to rollback the transaction.
+ * If rollback fails, the rollback exception is added as a suppressed exception.
+ *
+ * @param e Original SQLException that triggered the rollback
+ * @param conn Connection to rollback (may be null)
+ */
public static void handleSQLException(SQLException e, Connection conn) {
if (conn != null) {
try {
diff --git a/library/src/main/java/com/p14n/postevent/debezium/DebeziumServer.java b/library/src/main/java/com/p14n/postevent/debezium/DebeziumServer.java
index 9a839a6..b3fc625 100644
--- a/library/src/main/java/com/p14n/postevent/debezium/DebeziumServer.java
+++ b/library/src/main/java/com/p14n/postevent/debezium/DebeziumServer.java
@@ -19,9 +19,69 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Server implementation for Debezium Change Data Capture (CDC).
+ * Manages a Debezium engine instance to capture database changes and forward
+ * them to a consumer.
+ * Supports PostgreSQL CDC using the pgoutput plugin with configurable topics
+ * and affinity.
+ *
+ *
+ * The server uses JDBC offset storage for tracking progress and supports
+ * filtered publication mode.
+ * It automatically manages replication slots and cleanup on shutdown.
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * PostEventConfig config = // initialize configuration
+ * DebeziumServer server = new DebeziumServer();
+ *
+ * // Start the server with a consumer
+ * server.start(config, event -> {
+ * // Process the change event
+ * System.out.println("Received change: " + event);
+ * });
+ *
+ * // Shutdown when done
+ * server.stop();
+ * }
+ */
public class DebeziumServer {
private static final Logger logger = LoggerFactory.getLogger(DebeziumServer.class);
+ /**
+ * Constructs a new DebeziumServer instance.
+ */
+ public DebeziumServer() {
+ }
+
+ /**
+ * Creates Debezium configuration properties for PostgreSQL CDC.
+ *
+ *
+ * Configures:
+ *
+ *
+ * - PostgreSQL connector with pgoutput plugin
+ * - JDBC offset storage with affinity-based partitioning
+ * - Filtered publication mode for specified topics
+ * - Automatic replication slot management
+ *
+ *
+ * @param affinity Identifier for this instance, used for offset tracking
+ * @param topics Set of topics to monitor for changes
+ * @param dbHost Database host address
+ * @param dbPort Database port
+ * @param dbUser Database username
+ * @param dbPassword Database password
+ * @param dbName Database name
+ * @param pollInterval Interval in milliseconds between polls
+ * @return Properties configured for Debezium PostgreSQL connector
+ */
public static Properties props(
String affinity,
Set topics,
@@ -47,7 +107,7 @@ public static Properties props(
props.setProperty("offset.storage.jdbc.user", dbUser);
props.setProperty("offset.storage.jdbc.password", dbPassword);
props.setProperty("offset.flush.interval.ms", "1000");
- props.setProperty("poll.interval.ms",String.valueOf(pollInterval));
+ props.setProperty("poll.interval.ms", String.valueOf(pollInterval));
props.setProperty("database.hostname", dbHost);
props.setProperty("plugin.name", "pgoutput");
props.setProperty("database.port", dbPort);
@@ -81,6 +141,24 @@ public static Properties props(
private ExecutorService executor;
private DebeziumEngine> engine;
+ /**
+ * Starts the Debezium engine with the specified configuration and consumer.
+ *
+ *
+ * Initializes a single-threaded executor and configures the Debezium engine to:
+ *
+ * - Use JSON format for change events
+ * - Monitor configured topics for changes
+ * - Forward events to the provided consumer
+ *
+ *
+ * @param cfg Configuration for the Debezium engine
+ * @param consumer Consumer to process change events
+ * @throws IllegalStateException if the consumer is null, config is null, or
+ * startup timeout is exceeded
+ * @throws IOException if engine initialization fails
+ * @throws InterruptedException if startup is interrupted
+ */
public void start(PostEventConfig cfg,
Consumer> consumer) throws IOException, InterruptedException {
if (consumer == null) {
@@ -121,6 +199,12 @@ public void taskStarted() {
logger.atInfo().log("Debezium engine started successfully");
}
+ /**
+ * Stops the Debezium engine and executor service.
+ * Ensures clean shutdown of resources with a 5-second timeout.
+ *
+ * @throws IOException if engine shutdown fails
+ */
public void stop() throws IOException {
if (executor != null) {
executor.shutdown();
@@ -139,7 +223,5 @@ public void stop() throws IOException {
logger.error("Shutdown interrupted", e);
}
}
-
}
-
}
diff --git a/library/src/main/java/com/p14n/postevent/debezium/Functions.java b/library/src/main/java/com/p14n/postevent/debezium/Functions.java
index 24404cb..d9b75ec 100644
--- a/library/src/main/java/com/p14n/postevent/debezium/Functions.java
+++ b/library/src/main/java/com/p14n/postevent/debezium/Functions.java
@@ -8,15 +8,70 @@
import java.io.IOException;
import java.time.Instant;
+/**
+ * Utility class providing functions for converting Debezium change events into
+ * application Events.
+ * Handles the parsing and transformation of CDC (Change Data Capture) events
+ * from
+ * Debezium into the application's event model.
+ *
+ *
+ * The class expects Debezium events in JSON format with a specific structure
+ * containing:
+ *
+ * - payload.after - The new state of the record
+ * - payload.source.table - The source table name
+ * - Various event metadata fields (id, source, type, etc.)
+ *
+ */
public class Functions {
+
+ /** Private constructor to prevent instantiation of utility class */
+ private Functions() {
+ }
+
+ /** JSON ObjectMapper instance for parsing Debezium event payloads */
private final static ObjectMapper mapper = new ObjectMapper();
+ /**
+ * Safely extracts text from a JsonNode, returning null if the node is null.
+ *
+ * @param j The JsonNode to extract text from
+ * @return The text value of the node, or null if the node is null
+ */
private static String safeText(JsonNode j) {
- if(j != null){
+ if (j != null) {
return j.asText();
}
return null;
}
+
+ /**
+ * Converts a Debezium ChangeEvent into an application Event.
+ * Parses the JSON payload and extracts relevant fields to construct a new Event
+ * instance.
+ *
+ *
+ * The method expects the following fields in the change event payload:
+ *
+ * - id - Event identifier
+ * - source - Event source
+ * - type - Event type
+ * - datacontenttype - Content type of the data
+ * - dataschema - Schema of the data
+ * - subject - Event subject
+ * - data - Binary event data
+ * - time - Event timestamp
+ * - idn - Sequential identifier
+ * - traceparent - OpenTelemetry trace parent
+ *
+ *
+ * @param record The Debezium ChangeEvent to convert
+ * @return A new Event instance, or null if the record doesn't contain required
+ * fields
+ * @throws IOException If there's an error parsing the JSON or extracting binary
+ * data
+ */
public static Event changeEventToEvent(ChangeEvent record) throws IOException {
var actualObj = mapper.readTree(record.value());
var payload = actualObj.get("payload");
diff --git a/library/src/main/java/com/p14n/postevent/processor/OrderedProcessor.java b/library/src/main/java/com/p14n/postevent/processor/OrderedProcessor.java
index 5fe8fa0..362f1c4 100644
--- a/library/src/main/java/com/p14n/postevent/processor/OrderedProcessor.java
+++ b/library/src/main/java/com/p14n/postevent/processor/OrderedProcessor.java
@@ -26,6 +26,7 @@ public class OrderedProcessor {
/**
* Creates a new OrderedProcessor with the specified processing function.
*
+ * @param systemEventBroker The broker for system events
* @param processorFunction Function that processes an event with a database
* connection
* and returns true if processing was successful
diff --git a/library/src/main/java/com/p14n/postevent/telemetry/BrokerMetrics.java b/library/src/main/java/com/p14n/postevent/telemetry/BrokerMetrics.java
index 89ddfc6..54b1af4 100644
--- a/library/src/main/java/com/p14n/postevent/telemetry/BrokerMetrics.java
+++ b/library/src/main/java/com/p14n/postevent/telemetry/BrokerMetrics.java
@@ -6,11 +6,33 @@
import io.opentelemetry.api.metrics.LongUpDownCounter;
import io.opentelemetry.api.metrics.Meter;
+/**
+ * Manages OpenTelemetry metrics for message broker operations.
+ * Tracks the number of published messages, received messages, and active
+ * subscribers
+ * across different topics.
+ *
+ *
+ * This class provides three main metrics:
+ *
+ *
+ * - messages_published: Counter for total messages published per topic
+ * - messages_received: Counter for total messages received by subscribers per
+ * topic
+ * - active_subscribers: Up/down counter for current number of subscribers per
+ * topic
+ *
+ */
public class BrokerMetrics {
private final LongCounter publishedMessages;
private final LongCounter receivedMessages;
private final LongUpDownCounter activeSubscribers;
+ /**
+ * Creates a new BrokerMetrics instance with the provided OpenTelemetry meter.
+ *
+ * @param meter OpenTelemetry meter used to create the metric instruments
+ */
public BrokerMetrics(Meter meter) {
publishedMessages = meter.counterBuilder("messages_published")
.setDescription("Number of messages published")
@@ -25,23 +47,47 @@ public BrokerMetrics(Meter meter) {
.build();
}
+ /**
+ * Records a message publication event for the specified topic.
+ * Increments the published messages counter with the topic attribute.
+ *
+ * @param topic The topic the message was published to
+ */
public void recordPublished(String topic) {
publishedMessages.add(1, Attributes.of(
AttributeKey.stringKey("topic"), topic));
}
+ /**
+ * Records a message reception event for the specified topic.
+ * Increments the received messages counter with the topic attribute.
+ *
+ * @param topic The topic the message was received from
+ */
public void recordReceived(String topic) {
receivedMessages.add(1, Attributes.of(
AttributeKey.stringKey("topic"), topic));
}
+ /**
+ * Records the addition of a subscriber for the specified topic.
+ * Increments the active subscribers counter with the topic attribute.
+ *
+ * @param topic The topic the subscriber was added to
+ */
public void recordSubscriberAdded(String topic) {
activeSubscribers.add(1, Attributes.of(
AttributeKey.stringKey("topic"), topic));
}
+ /**
+ * Records the removal of a subscriber for the specified topic.
+ * Decrements the active subscribers counter with the topic attribute.
+ *
+ * @param topic The topic the subscriber was removed from
+ */
public void recordSubscriberRemoved(String topic) {
activeSubscribers.add(-1, Attributes.of(
AttributeKey.stringKey("topic"), topic));
}
-}
\ No newline at end of file
+}
diff --git a/library/src/main/java/com/p14n/postevent/telemetry/MapTextMapGetter.java b/library/src/main/java/com/p14n/postevent/telemetry/MapTextMapGetter.java
index 6ad92fa..1084f48 100644
--- a/library/src/main/java/com/p14n/postevent/telemetry/MapTextMapGetter.java
+++ b/library/src/main/java/com/p14n/postevent/telemetry/MapTextMapGetter.java
@@ -4,15 +4,53 @@
import java.util.Map;
+/**
+ * Implementation of OpenTelemetry's TextMapGetter interface for extracting
+ * context from a Map.
+ * This class enables the extraction of trace context information from a
+ * String-to-String Map carrier,
+ * which is commonly used in distributed tracing scenarios.
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * Map carrier = new HashMap<>();
+ * carrier.put("traceparent", "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01");
+ *
+ * MapTextMapGetter getter = new MapTextMapGetter();
+ * String traceParent = getter.get(carrier, "traceparent");
+ * }
+ */
public class MapTextMapGetter implements TextMapGetter