Skip to content

Commit 21f05a6

Browse files
Vertx (#54)
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
1 parent 4a77358 commit 21f05a6

19 files changed

Lines changed: 1367 additions & 36 deletions

File tree

app/src/main/java/com/p14n/postevent/App.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.p14n.postevent.data.ConfigData;
2222
import com.p14n.postevent.data.Event;
2323
import com.p14n.postevent.db.DatabaseSetup;
24-
24+
import com.p14n.postevent.db.PoolSetup;
2525
import com.p14n.postevent.telemetry.OpenTelemetryFunctions;
2626
import io.opentelemetry.api.OpenTelemetry;
2727
import io.opentelemetry.api.trace.Tracer;
@@ -119,7 +119,7 @@ private static void run(String affinity, String[] write, String[] read, String d
119119
RemotePersistentConsumer cc = null;
120120

121121
var ot = Opentelemetry.create("postevent");
122-
var ds = JdbcTelemetry.create(ot).wrap(DatabaseSetup.createPool(cfg));
122+
var ds = JdbcTelemetry.create(ot).wrap(PoolSetup.createPool(cfg));
123123

124124
try {
125125
if (write.length > 0) {

build.gradle.ref

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
import com.vanniktech.maven.publish.SonatypeHost
2+
import com.vanniktech.maven.publish.JavaLibrary
3+
import com.vanniktech.maven.publish.JavadocJar
4+
5+
plugins {
6+
id 'java'
7+
id 'com.adarshr.test-logger' version '4.0.0'
8+
id 'com.google.protobuf' version '0.9.2'
9+
id "com.vanniktech.maven.publish" version "0.31.0"
10+
}
11+
12+
compileJava {
13+
sourceCompatibility = 21
14+
targetCompatibility = 21
15+
}
16+
17+
group = 'com.p14n'
18+
version = '1.0.1-SNAPSHOT'
19+
20+
repositories {
21+
mavenCentral()
22+
}
23+
24+
dependencies {
25+
implementation 'io.debezium:debezium-api:3.0.1.Final'
26+
implementation ('io.debezium:debezium-embedded:3.0.1.Final') {
27+
exclude group: 'org.glassfish.jersey.containers', module: 'jersey-container-servlet'
28+
exclude group: 'org.glassfish.jersey.inject', module: 'jersey-hk2'
29+
exclude group: 'org.eclipse.jetty'
30+
}
31+
implementation 'io.debezium:debezium-connector-postgres:3.0.1.Final'
32+
implementation 'io.debezium:debezium-storage-jdbc:3.0.1.Final'
33+
implementation 'org.slf4j:slf4j-api:2.0.9'
34+
implementation 'com.zaxxer:HikariCP:6.2.1'
35+
36+
constraints {
37+
implementation 'com.google.guava:guava:32.0.0-jre'
38+
}
39+
40+
// gRPC dependencies
41+
implementation 'io.grpc:grpc-netty-shaded:1.53.0'
42+
implementation 'io.grpc:grpc-protobuf:1.53.0'
43+
implementation 'io.grpc:grpc-stub:1.53.0'
44+
implementation 'io.grpc:grpc-api:1.53.0'
45+
46+
implementation 'javax.annotation:javax.annotation-api:1.3.2'
47+
48+
// For code generation
49+
implementation 'com.google.protobuf:protobuf-java:3.21.7'
50+
51+
testImplementation platform('io.zonky.test.postgres:embedded-postgres-binaries-bom:16.2.0')
52+
testImplementation 'io.zonky.test:embedded-postgres:2.0.7'
53+
testImplementation 'net.jqwik:jqwik:1.8.2'
54+
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.0'
55+
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.0'
56+
testRuntimeOnly 'ch.qos.logback:logback-classic:1.4.11'
57+
testImplementation 'org.mockito:mockito-core:3.12.4'
58+
59+
// OpenTelemetry core dependencies
60+
implementation 'io.opentelemetry:opentelemetry-api:1.32.0'
61+
62+
// Instrumentation for GRPC
63+
implementation 'io.opentelemetry.instrumentation:opentelemetry-grpc-1.6:1.32.0-alpha'
64+
65+
}
66+
67+
testlogger {
68+
theme 'standard'
69+
showExceptions true
70+
showStackTraces true
71+
showFullStackTraces false
72+
showCauses true
73+
slowThreshold 2000
74+
showSummary true
75+
showSimpleNames false
76+
showPassed true
77+
showSkipped true
78+
showFailed true
79+
showStandardStreams false
80+
showPassedStandardStreams true
81+
showSkippedStandardStreams true
82+
showFailedStandardStreams true
83+
}
84+
85+
test {
86+
useJUnitPlatform {
87+
includeEngines 'jqwik', 'junit-jupiter'
88+
}
89+
maxHeapSize = "1G"
90+
minHeapSize = "512M"
91+
maxParallelForks = 1
92+
failFast = true
93+
testLogging.showStandardStreams = true
94+
}
95+
96+
task fastTest( type: Test ) {
97+
useJUnitPlatform {
98+
includeEngines 'junit-jupiter'
99+
exclude '**/dst/**'
100+
101+
}
102+
}
103+
104+
jar {
105+
manifest {
106+
}
107+
}
108+
109+
// Configure Protobuf plugin
110+
protobuf {
111+
protoc {
112+
artifact = 'com.google.protobuf:protoc:3.21.7'
113+
}
114+
plugins {
115+
grpc {
116+
artifact = 'io.grpc:protoc-gen-grpc-java:1.53.0'
117+
}
118+
}
119+
generateProtoTasks {
120+
all()*.plugins {
121+
grpc {}
122+
}
123+
}
124+
}
125+
126+
sourceSets {
127+
main {
128+
java {
129+
srcDirs 'build/generated/source/proto/main/grpc'
130+
srcDirs 'build/generated/source/proto/main/java'
131+
}
132+
}
133+
}
134+
135+
tasks.withType(Jar) {
136+
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
137+
}
138+
139+
javadoc {
140+
exclude "**/grpc/**"
141+
source = sourceSets.main.allJava
142+
}
143+
144+
mavenPublishing {
145+
146+
configure(new JavaLibrary(new JavadocJar.Javadoc(), true))
147+
148+
publishToMavenCentral(SonatypeHost.CENTRAL_PORTAL, true)
149+
150+
signAllPublications()
151+
152+
coordinates("com.p14n", "postevent", version)
153+
154+
pom {
155+
name = "Postevent"
156+
description = 'A reliable event publishing and consumption system using PostgreSQL and gRPC'
157+
inceptionYear = "2025"
158+
url = "https://github.com/p14n/postevent/"
159+
licenses {
160+
license {
161+
name = 'MIT License'
162+
url = 'https://opensource.org/licenses/MIT'
163+
}
164+
}
165+
developers {
166+
developer {
167+
id = 'p14n'
168+
name = 'Dean Chapman'
169+
email = 'dean@p14n.com'
170+
}
171+
}
172+
scm {
173+
connection = 'scm:git:git://github.com/p14n/postevent.git'
174+
developerConnection = 'scm:git:ssh://github.com:p14n/postevent.git'
175+
url = 'https://github.com/p14n/postevent'
176+
}
177+
}
178+
}

core/build.gradle

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,8 @@ repositories {
1717

1818
dependencies {
1919
// Core database and connection pooling
20-
implementation 'com.zaxxer:HikariCP:6.2.1'
2120
implementation 'org.slf4j:slf4j-api:2.0.9'
2221

23-
// Guava for utilities
24-
implementation 'com.google.guava:guava:32.0.0-jre'
25-
2622
// OpenTelemetry core dependencies
2723
implementation 'io.opentelemetry:opentelemetry-api:1.32.0'
2824

core/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
import java.util.ArrayList;
44
import java.util.List;
55
import java.util.concurrent.*;
6+
import java.util.concurrent.atomic.AtomicLong;
67

7-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
8+
import static java.lang.String.format;
89

910
/**
1011
* Default implementation of {@link AsyncExecutor} that provides configurable
@@ -49,6 +50,19 @@ public DefaultExecutor(int scheduledSize, int fixedSize) {
4950
this.es = createFixedExecutorService(fixedSize);
5051
}
5152

53+
protected ThreadFactory createNamedFactory(String nameFormat,ThreadFactory backingFactory){
54+
AtomicLong count = (nameFormat != null) ? new AtomicLong(0) : null;
55+
return runnable -> {
56+
Thread thread = backingFactory.newThread(runnable);
57+
if (nameFormat != null) {
58+
thread.setName(format(nameFormat, count.getAndIncrement()));
59+
}
60+
return thread;
61+
};
62+
}
63+
protected ThreadFactory createNamedFactory(String nameFormat) {
64+
return createNamedFactory(nameFormat,Executors.defaultThreadFactory());
65+
}
5266
/**
5367
* Creates a fixed-size thread pool with named threads.
5468
*
@@ -57,7 +71,7 @@ public DefaultExecutor(int scheduledSize, int fixedSize) {
5771
*/
5872
protected ExecutorService createFixedExecutorService(int size) {
5973
return Executors.newFixedThreadPool(size,
60-
new ThreadFactoryBuilder().setNameFormat("post-event-fixed-%d").build());
74+
createNamedFactory("post-event-fixed-%d"));
6175
}
6276

6377
/**
@@ -67,8 +81,7 @@ protected ExecutorService createFixedExecutorService(int size) {
6781
*/
6882
protected ExecutorService createVirtualExecutorService() {
6983
return Executors.newThreadPerTaskExecutor(
70-
new ThreadFactoryBuilder().setThreadFactory(Thread.ofVirtual().factory())
71-
.setNameFormat("post-event-virtual-%d").build());
84+
createNamedFactory("post-event-virtual-%d",Thread.ofVirtual().factory()));
7285
}
7386

7487
/**
@@ -79,7 +92,7 @@ protected ExecutorService createVirtualExecutorService() {
7992
*/
8093
protected ScheduledExecutorService createScheduledExecutorService(int size) {
8194
return Executors.newScheduledThreadPool(size,
82-
new ThreadFactoryBuilder().setNameFormat("post-event-scheduled-%d").build());
95+
createNamedFactory("post-event-scheduled-%d"));
8396
}
8497

8598
@Override

core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.p14n.postevent.db;
22

33
import com.p14n.postevent.data.PostEventConfig;
4-
import com.zaxxer.hikari.HikariDataSource;
54

65
import javax.sql.DataSource;
76

@@ -35,16 +34,14 @@
3534
* Example usage:
3635
* </p>
3736
*
38-
* <pre>{@code
37+
* <pre>
38+
* {@code
3939
* PostEventConfig config = // initialize configuration
4040
* DatabaseSetup setup = new DatabaseSetup(config);
4141
*
4242
* // Setup all required tables for given topics
4343
* setup.setupAll(Set.of("orders", "inventory"));
4444
*
45-
* // Create connection pool
46-
* DataSource pool = DatabaseSetup.createPool(config);
47-
* }</pre>
4845
*/
4946
public class DatabaseSetup {
5047
private static final Logger logger = LoggerFactory.getLogger(DatabaseSetup.class);
@@ -53,6 +50,8 @@ public class DatabaseSetup {
5350
private final String username;
5451
private final String password;
5552

53+
private final DataSource ds;
54+
5655
/**
5756
* Creates a new DatabaseSetup instance using configuration from
5857
* PostEventConfig.
@@ -74,6 +73,17 @@ public DatabaseSetup(String jdbcUrl, String username, String password) {
7473
this.jdbcUrl = jdbcUrl;
7574
this.username = username;
7675
this.password = password;
76+
this.ds = null;
77+
}
78+
79+
public DatabaseSetup(DataSource ds) {
80+
if (ds == null) {
81+
throw new IllegalArgumentException("DataSource must not be null");
82+
}
83+
this.jdbcUrl = null;
84+
this.username = null;
85+
this.password = null;
86+
this.ds = ds;
7787
}
7888

7989
/**
@@ -85,11 +95,27 @@ public DatabaseSetup(String jdbcUrl, String username, String password) {
8595
* @throws RuntimeException if database operations fail
8696
*/
8797
public DatabaseSetup setupAll(Set<String> topics) {
98+
setupClient();
99+
setupServer(topics);
100+
setupDebezium();
101+
return this;
102+
}
103+
104+
public DatabaseSetup setupDebezium() {
105+
clearOldSlots();
106+
return this;
107+
}
108+
109+
public DatabaseSetup setupServer(Set<String> topics) {
110+
createSchemaIfNotExists();
111+
topics.stream().forEach(this::createTableIfNotExists);
112+
return this;
113+
}
114+
115+
public DatabaseSetup setupClient() {
88116
createSchemaIfNotExists();
89117
createMessagesTableIfNotExists();
90118
createContiguousHwmTableIfNotExists();
91-
topics.stream().forEach(this::createTableIfNotExists);
92-
clearOldSlots();
93119
return this;
94120
}
95121

@@ -287,21 +313,9 @@ topic_name VARCHAR(255) PRIMARY KEY,
287313
* @throws SQLException if connection fails
288314
*/
289315
private Connection getConnection() throws SQLException {
316+
if (ds != null)
317+
return ds.getConnection();
290318
return DriverManager.getConnection(jdbcUrl, username, password);
291319
}
292320

293-
/**
294-
* Creates and configures a connection pool using HikariCP.
295-
*
296-
* @param cfg Configuration containing database connection details
297-
* @return Configured DataSource
298-
*/
299-
public static DataSource createPool(PostEventConfig cfg) {
300-
HikariDataSource ds = new HikariDataSource();
301-
ds.setJdbcUrl(cfg.jdbcUrl());
302-
ds.setUsername(cfg.dbUser());
303-
ds.setPassword(cfg.dbPassword());
304-
return ds;
305-
}
306-
307321
}

debezium/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ dependencies {
4242
constraints {
4343
implementation 'com.google.guava:guava:32.0.0-jre'
4444
}
45+
46+
// Database connection pooling
47+
implementation 'com.zaxxer:HikariCP:5.0.1'
4548

4649
// Test dependencies
4750
testImplementation platform('io.zonky.test.postgres:embedded-postgres-binaries-bom:16.2.0')

0 commit comments

Comments
 (0)