Conversation
Reviewer's GuideThis PR integrates a new Vert.x module with the core postevent library, refactors DatabaseSetup to support DataSource injection and composable setup steps, and adds both server and client-side Vert.x EventBus adapters, message brokers, and codecs to enable real-time and catchup event processing over the EventBus. Sequence diagram for EventBusCatchupClient fetching events via EventBussequenceDiagram
participant Client as EventBusCatchupClient
participant EventBus
participant Service as EventBusCatchupService
participant CatchupServer
Client->>EventBus: request("catchup.fetch_events.{topic}", requestJson)
EventBus->>Service: deliver request
Service->>CatchupServer: fetchEvents(fromId, toId, limit, topic)
CatchupServer-->>Service: List<Event>
Service->>EventBus: reply(eventsJson)
EventBus->>Client: deliver reply
Sequence diagram for EventBusMessageBroker dual-write publishsequenceDiagram
participant Publisher
participant EventBusMessageBroker
participant Database
participant EventBus
Publisher->>EventBusMessageBroker: publish(topic, event)
EventBusMessageBroker->>Database: persist event
EventBusMessageBroker->>EventBus: publish("events.{topic}", event)
EventBus-->>Subscribers: deliver event
Class diagram for new Vert.x EventBus integrationclassDiagram
class EventBusMessageBroker {
+EventBus eventBus
+DataSource dataSource
+publish(String topic, Event event)
+subscribeToEventBus(String topic, MessageSubscriber<Event> subscriber)
+unsubscribe(String topic)
+close()
}
class EventBusCatchupService {
+CatchupServerInterface catchupServer
+EventBus eventBus
+List<MessageConsumer<JsonObject>> fetchEventsConsumers
+List<MessageConsumer<JsonObject>> getLatestMessageIdConsumers
+Set<String> topics
+start()
+stop()
+close()
}
class EventBusCatchupClient {
+EventBus eventBus
+long timeoutSeconds
+fetchEvents(long fromId, long toId, int limit, String topic)
+getLatestMessageId(String topic)
}
class EventCodec {
+encodeToWire(Buffer buffer, Event event)
+decodeFromWire(int pos, Buffer buffer)
+transform(Event event)
+name()
+systemCodecID()
}
class VertxPersistentConsumer {
+start(Set<String> topics, DataSource ds, EventBus eb, EventBusMessageBroker mb)
+publish(String topic, TransactionalEvent message)
+subscribe(String topic, MessageSubscriber<TransactionalEvent> subscriber)
+unsubscribe(String topic, MessageSubscriber<TransactionalEvent> subscriber)
+convert(TransactionalEvent m)
+close()
}
class VertxConsumerServer {
+start(EventBus eb, EventBusMessageBroker mb, Set<String> topics)
}
EventBusMessageBroker --|> EventMessageBroker
EventBusCatchupService --|> AutoCloseable
EventBusCatchupClient --|> CatchupServerInterface
VertxPersistentConsumer --|> AutoCloseable
VertxPersistentConsumer --|> MessageBroker
EventBusMessageBroker o-- EventCodec
VertxConsumerServer o-- EventBusCatchupService
VertxConsumerServer o-- EventBusMessageBroker
VertxPersistentConsumer o-- EventBusCatchupClient
VertxPersistentConsumer o-- EventBusMessageBroker
Class diagram for updated DatabaseSetup with DataSource injection and composable setupclassDiagram
class DatabaseSetup {
-String jdbcUrl
-String username
-String password
-DataSource ds
+DatabaseSetup(String jdbcUrl, String username, String password)
+DatabaseSetup(DataSource ds)
+DatabaseSetup setupAll(Set<String> topics)
+DatabaseSetup setupDebezium()
+DatabaseSetup setupServer(Set<String> topics)
+DatabaseSetup setupClient()
-Connection getConnection()
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java:81-86` </location>
<code_context>
this.password = password;
+ this.ds = null;
+ }
+ public DatabaseSetup(DataSource ds) {
+ this.jdbcUrl = null;
+ this.username = null;
+ this.password = null;
+ this.ds = ds;
}
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider validating DataSource argument for null.
Without null validation, passing a null DataSource will cause runtime errors. Please add a check and throw IllegalArgumentException if ds is null.
```suggestion
public DatabaseSetup(DataSource ds) {
if (ds == null) {
throw new IllegalArgumentException("DataSource must not be null");
}
this.jdbcUrl = null;
this.username = null;
this.password = null;
this.ds = ds;
}
```
</issue_to_address>
### Comment 2
<location> `vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java:85-90` </location>
<code_context>
+ logger.atInfo().log("Starting EventBusCatchupService");
+
+ // Register consumer for fetchEvents requests
+ fetchEventsConsumers = topics.stream().map(topic -> {
+ logger.atInfo()
+ .addArgument(FETCH_EVENTS_ADDRESS + topic)
+ .log("EventBusCatchupService started, listening on address: {}");
+
+ return eventBus.consumer(FETCH_EVENTS_ADDRESS + topic, this::handleFetchEvents);
+ }).toList();
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Potential risk if start() is called multiple times without stop().
Multiple calls to start() without stop() will register additional consumers, causing duplicate event handling. Please add a safeguard to prevent this or clarify the intended behavior in documentation.
</issue_to_address>
### Comment 3
<location> `vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java:135-144` </location>
<code_context>
+ public void subscribeToEventBus(String topic, MessageSubscriber<Event> subscriber) {
</code_context>
<issue_to_address>
**issue (bug_risk):** Overwriting existing consumer for a topic may cause resource leaks.
When adding a new consumer for a topic, ensure any existing consumer is properly unregistered to prevent resource leaks.
</issue_to_address>
### Comment 4
<location> `vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java:117` </location>
<code_context>
+ });
+
+ String eventsJson = future.get(timeoutSeconds, TimeUnit.SECONDS);
+ List<Event> events = Json.decodeValue(eventsJson, List.class);
+
+ logger.atDebug()
</code_context>
<issue_to_address>
**issue (bug_risk):** Deserialization may not produce a List<Event> as expected.
Json.decodeValue with List.class returns a List of LinkedHashMap, not Event objects. Use a TypeReference, such as new TypeReference<List<Event>>() {}, to ensure proper deserialization.
</issue_to_address>
### Comment 5
<location> `vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java:35` </location>
<code_context>
+ @Override
+ public void encodeToWire(Buffer buffer, Event event) {
+ String json = Json.encode(event);
+ byte[] jsonBytes = json.getBytes();
+
+ // Write length prefix followed by JSON bytes
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Specify character encoding when converting String to bytes.
Relying on the platform default charset can cause inconsistent results. Use json.getBytes(StandardCharsets.UTF_8) for predictable encoding.
</issue_to_address>
### Comment 6
<location> `vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java:57` </location>
<code_context>
+
+ // Read JSON bytes and convert to string
+ byte[] jsonBytes = buffer.getBytes(pos + 4, pos + 4 + length);
+ String json = new String(jsonBytes);
+
+ // Deserialize from JSON
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Specify character encoding when constructing String from bytes.
Use new String(jsonBytes, StandardCharsets.UTF_8) to ensure consistent decoding across platforms.
Suggested implementation:
```java
byte[] jsonBytes = buffer.getBytes(pos + 4, pos + 4 + length);
String json = new String(jsonBytes, java.nio.charset.StandardCharsets.UTF_8);
```
If `java.nio.charset.StandardCharsets` is not already imported at the top of the file, add:
import java.nio.charset.StandardCharsets;
</issue_to_address>
### Comment 7
<location> `vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java:98-102` </location>
<code_context>
+ public void close() {
+ logger.atInfo().log("Closing consumer client");
+
+ for (AutoCloseable c : closeables) {
+ try {
+ c.close();
+ } catch (Exception e) {
+ logger.atWarn()
+ .setCause(e)
+ .addArgument(c.getClass().getSimpleName())
</code_context>
<issue_to_address>
**issue (bug_risk):** NullPointerException risk if closeables is not initialized.
If close() is invoked before start(), closeables may be null and cause a NullPointerException. To prevent this, initialize closeables to an empty list or add a null check in close().
</issue_to_address>
### Comment 8
<location> `core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java:96` </location>
<code_context>
* @throws RuntimeException if database operations fail
*/
public DatabaseSetup setupAll(Set<String> topics) {
+ setupClient();
+ setupServer(topics);
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring to centralize schema creation and unify constructors to eliminate duplication and null-checks.
Here are two small refactorings that will remove the duplication and the “null-check” complexity without changing any public API or behavior:
1) Collapse all schema-creation into the single entry point (`setupAll`) so that `setupClient`/`setupServer` no longer need to each call `createSchemaIfNotExists()`.
```java
public DatabaseSetup setupAll(Set<String> topics) {
createSchemaIfNotExists(); // only here
setupClient();
setupServer(topics);
setupDebezium();
return this;
}
public DatabaseSetup setupClient() {
createMessagesTableIfNotExists();
createContiguousHwmTableIfNotExists();
return this;
}
public DatabaseSetup setupServer(Set<String> topics) {
topics.forEach(this::createTableIfNotExists);
return this;
}
public DatabaseSetup setupDebezium() {
clearOldSlots();
return this;
}
```
2) Unify your constructors so you always hold exactly one `DataSource` (and drop the JDBC fields + null‐check in `getConnection()`):
```java
// primary ctor
public DatabaseSetup(DataSource ds) {
this.ds = ds;
}
// convenience ctors chain to primary
public DatabaseSetup(PostEventConfig cfg) {
this(createPool(cfg));
}
public DatabaseSetup(String jdbcUrl, String username, String password) {
this(createPool(new PostEventConfig(jdbcUrl, username, password)));
}
private Connection getConnection() throws SQLException {
return ds.getConnection();
}
```
This way you:
- Call `createSchemaIfNotExists()` exactly once.
- Eliminate the `if(ds!=null)…` branch.
- Keep all four public API methods intact.
</issue_to_address>
### Comment 9
<location> `vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java:89` </location>
<code_context>
+ * @throws Exception If the request fails or times out
+ */
+ @Override
+ public List<Event> fetchEvents(long fromId, long toId, int limit, String topic) {
+ logger.atDebug()
+ .addArgument(fromId)
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the duplicated request-handling logic into a single private helper method to simplify both public methods.
Here’s one way to collapse all of that boilerplate into a single private helper, and then rewrite both public methods to be just three lines each. The helper handles:
- building the `CompletableFuture`
- wiring up the Vert.x `request(...)` callback
- blocking with timeout
- propagating failures
```java
// add to EventBusCatchupClient
private <R> R requestAndDecode(
String address,
JsonObject payload,
Function<Object, R> decoder
) {
try {
CompletableFuture<Object> fut = new CompletableFuture<>();
eventBus.request(address, payload, ar -> {
if (ar.succeeded()) {
fut.complete(ar.result().body());
} else {
fut.completeExceptionally(
new RuntimeException("Bus request failed: " + ar.cause().getMessage(), ar.cause())
);
}
});
Object body = fut.get(timeoutSeconds, TimeUnit.SECONDS);
return decoder.apply(body);
} catch (Exception e) {
throw new RuntimeException("Request to " + address + " failed", e);
}
}
```
Then your two methods collapse to:
```java
@Override
public List<Event> fetchEvents(long fromId, long toId, int limit, String topic) {
JsonObject req = new JsonObject()
.put("fromId", fromId)
.put("toId", toId)
.put("limit", limit)
.put("topic", topic);
// decode the reply-body string into List<Event>
return requestAndDecode(
FETCH_EVENTS_ADDRESS + topic,
req,
body -> Json.decodeValue((String) body, new TypeReference<List<Event>>(){} )
);
}
@Override
public long getLatestMessageId(String topic) {
JsonObject req = new JsonObject().put("topic", topic);
// extract "latestId" from the returned JsonObject
return requestAndDecode(
GET_LATEST_MESSAGE_ID_ADDRESS + topic,
req,
body -> ((JsonObject) body).getLong("latestId")
);
}
```
This preserves all logging & timeouts (move logs into callers if desired), but removes the duplicated future-and-callback boilerplate.
</issue_to_address>
### Comment 10
<location> `vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java:40` </location>
<code_context>
+ this.batchSize = batchSize;
+ }
+
+ public void start(Set<String> topics, DataSource ds,EventBus eb, EventBusMessageBroker mb) {
+ logger.atInfo().log("Starting consumer client");
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the start method by extracting logical blocks into well-named helper methods to improve readability and maintainability.
You can dramatically simplify start(...) by extracting each block into a well-named helper. For example:
```java
public void start(Set<String> topics, DataSource ds, EventBus eb, EventBusMessageBroker mb) {
ensureNotStarted();
setupDatabase(ds);
initBrokers(ds);
registerSubscriptions(topics, eb, mb);
schedulePeriodicChecks(topics);
collectCloseables();
logger.atInfo().log("Consumer client started successfully");
}
private void ensureNotStarted() {
if (tb != null) throw new IllegalStateException("Already started");
logger.atInfo().log("Starting consumer client");
}
private void setupDatabase(DataSource ds) {
new DatabaseSetup(ds).setupClient();
}
private void initBrokers(DataSource ds) {
seb = new SystemEventBroker(asyncExecutor, ot);
tb = new TransactionalBroker(ds, asyncExecutor, ot, seb);
pb = new PersistentBroker<>(tb, ds, seb);
}
private void registerSubscriptions(Set<String> topics, EventBus eb, EventBusMessageBroker mb) {
var catchupClient = new EventBusCatchupClient(eb);
for (var topic : topics) {
mb.subscribeToEventBus(topic, pb);
seb.publish(SystemEvent.FetchLatest.withTopic(topic));
}
seb.subscribe(new CatchupService(ds, catchupClient, seb));
seb.subscribe(new UnprocessedSubmitter(seb, ds, new UnprocessedEventFinder(), tb, batchSize));
}
private void schedulePeriodicChecks(Set<String> topics) {
asyncExecutor.scheduleAtFixedRate(() -> {
seb.publish(SystemEvent.UnprocessedCheckRequired);
topics.forEach(t -> seb.publish(SystemEvent.FetchLatest.withTopic(t)));
}, 30, 30, TimeUnit.SECONDS);
}
private void collectCloseables() {
closeables = List.of(pb, seb, tb);
}
```
This preserves exactly the same behavior but makes each step self-documenting and much easier to read.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java
Outdated
Show resolved
Hide resolved
| public void subscribeToEventBus(String topic, MessageSubscriber<Event> subscriber) { | ||
| logger.atInfo() | ||
| .addArgument(topic) | ||
| .log("Subscribing to topic: {}"); | ||
|
|
||
| String eventBusAddress = "events." + topic; | ||
|
|
||
| MessageConsumer<Event> consumer = eventBus.consumer(eventBusAddress); | ||
| consumer.handler(message -> { | ||
| Event event = message.body(); |
There was a problem hiding this comment.
issue (bug_risk): Overwriting existing consumer for a topic may cause resource leaks.
When adding a new consumer for a topic, ensure any existing consumer is properly unregistered to prevent resource leaks.
| }); | ||
|
|
||
| String eventsJson = future.get(timeoutSeconds, TimeUnit.SECONDS); | ||
| List<Event> events = Json.decodeValue(eventsJson, List.class); |
There was a problem hiding this comment.
issue (bug_risk): Deserialization may not produce a List as expected.
Json.decodeValue with List.class returns a List of LinkedHashMap, not Event objects. Use a TypeReference, such as new TypeReference<List>() {}, to ensure proper deserialization.
vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java
Outdated
Show resolved
Hide resolved
vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java
Outdated
Show resolved
Hide resolved
| for (AutoCloseable c : closeables) { | ||
| try { | ||
| c.close(); | ||
| } catch (Exception e) { | ||
| logger.atWarn() |
There was a problem hiding this comment.
issue (bug_risk): NullPointerException risk if closeables is not initialized.
If close() is invoked before start(), closeables may be null and cause a NullPointerException. To prevent this, initialize closeables to an empty list or add a null check in close().
| @@ -85,11 +94,24 @@ public DatabaseSetup(String jdbcUrl, String username, String password) { | |||
| * @throws RuntimeException if database operations fail | |||
| */ | |||
| public DatabaseSetup setupAll(Set<String> topics) { | |||
There was a problem hiding this comment.
issue (complexity): Consider refactoring to centralize schema creation and unify constructors to eliminate duplication and null-checks.
Here are two small refactorings that will remove the duplication and the “null-check” complexity without changing any public API or behavior:
- Collapse all schema-creation into the single entry point (
setupAll) so thatsetupClient/setupServerno longer need to each callcreateSchemaIfNotExists().
public DatabaseSetup setupAll(Set<String> topics) {
createSchemaIfNotExists(); // only here
setupClient();
setupServer(topics);
setupDebezium();
return this;
}
public DatabaseSetup setupClient() {
createMessagesTableIfNotExists();
createContiguousHwmTableIfNotExists();
return this;
}
public DatabaseSetup setupServer(Set<String> topics) {
topics.forEach(this::createTableIfNotExists);
return this;
}
public DatabaseSetup setupDebezium() {
clearOldSlots();
return this;
}- Unify your constructors so you always hold exactly one
DataSource(and drop the JDBC fields + null‐check ingetConnection()):
// primary ctor
public DatabaseSetup(DataSource ds) {
this.ds = ds;
}
// convenience ctors chain to primary
public DatabaseSetup(PostEventConfig cfg) {
this(createPool(cfg));
}
public DatabaseSetup(String jdbcUrl, String username, String password) {
this(createPool(new PostEventConfig(jdbcUrl, username, password)));
}
private Connection getConnection() throws SQLException {
return ds.getConnection();
}This way you:
- Call
createSchemaIfNotExists()exactly once. - Eliminate the
if(ds!=null)…branch. - Keep all four public API methods intact.
vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java
Outdated
Show resolved
Hide resolved
| this.batchSize = batchSize; | ||
| } | ||
|
|
||
| public void start(Set<String> topics, DataSource ds,EventBus eb, EventBusMessageBroker mb) { |
There was a problem hiding this comment.
issue (complexity): Consider refactoring the start method by extracting logical blocks into well-named helper methods to improve readability and maintainability.
You can dramatically simplify start(...) by extracting each block into a well-named helper. For example:
public void start(Set<String> topics, DataSource ds, EventBus eb, EventBusMessageBroker mb) {
ensureNotStarted();
setupDatabase(ds);
initBrokers(ds);
registerSubscriptions(topics, eb, mb);
schedulePeriodicChecks(topics);
collectCloseables();
logger.atInfo().log("Consumer client started successfully");
}
private void ensureNotStarted() {
if (tb != null) throw new IllegalStateException("Already started");
logger.atInfo().log("Starting consumer client");
}
private void setupDatabase(DataSource ds) {
new DatabaseSetup(ds).setupClient();
}
private void initBrokers(DataSource ds) {
seb = new SystemEventBroker(asyncExecutor, ot);
tb = new TransactionalBroker(ds, asyncExecutor, ot, seb);
pb = new PersistentBroker<>(tb, ds, seb);
}
private void registerSubscriptions(Set<String> topics, EventBus eb, EventBusMessageBroker mb) {
var catchupClient = new EventBusCatchupClient(eb);
for (var topic : topics) {
mb.subscribeToEventBus(topic, pb);
seb.publish(SystemEvent.FetchLatest.withTopic(topic));
}
seb.subscribe(new CatchupService(ds, catchupClient, seb));
seb.subscribe(new UnprocessedSubmitter(seb, ds, new UnprocessedEventFinder(), tb, batchSize));
}
private void schedulePeriodicChecks(Set<String> topics) {
asyncExecutor.scheduleAtFixedRate(() -> {
seb.publish(SystemEvent.UnprocessedCheckRequired);
topics.forEach(t -> seb.publish(SystemEvent.FetchLatest.withTopic(t)));
}, 30, 30, TimeUnit.SECONDS);
}
private void collectCloseables() {
closeables = List.of(pb, seb, tb);
}This preserves exactly the same behavior but makes each step self-documenting and much easier to read.
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
Summary by Sourcery
Add a new Vert.x submodule to enable event publishing, catchup, and transactional consumption over the Vert.x EventBus, along with related database setup refactoring, build configuration, documentation, and an example.
New Features:
Enhancements:
Build:
Documentation:
Tests: