Skip to content

Commit 145a212

Browse files
authored
Return generated idn on publish (#61)
1 parent fc83410 commit 145a212

8 files changed

Lines changed: 26 additions & 16 deletions

File tree

core/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ compileJava {
99
}
1010

1111
group = 'com.p14n'
12-
version = '1.3.3-SNAPSHOT'
12+
version = '1.3.4-SNAPSHOT'
1313

1414
repositories {
1515
mavenCentral()

core/src/main/java/com/p14n/postevent/Publisher.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.sql.Connection;
88
import java.sql.PreparedStatement;
99
import java.sql.SQLException;
10+
import java.sql.Statement;
1011

1112
import static com.p14n.postevent.db.SQL.setEventOnStatement;
1213

@@ -66,21 +67,26 @@ private Publisher() {
6667
* @throws IllegalArgumentException if the topic is null, empty, or contains
6768
* invalid characters
6869
*/
69-
public static void publish(Event event, Connection connection, String topic) throws SQLException {
70+
public static Long publish(Event event, Connection connection, String topic) throws SQLException {
7071
if (topic == null || topic.trim().isEmpty()) {
7172
throw new IllegalArgumentException("Topic name cannot be null or empty");
7273
}
7374
if (!topic.matches("^[a-z_]+$")) {
7475
throw new IllegalArgumentException("Topic name must contain only lowercase letters and underscores");
7576
}
7677

77-
String sql = String.format("INSERT INTO postevent.%s (%s) VALUES (%s)",
78+
String sql = String.format("INSERT INTO postevent.%s (%s) VALUES (%s) RETURNING idn",
7879
topic, SQL.CORE_COLS, SQL.CORE_PH);
7980

80-
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
81+
try (PreparedStatement stmt = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
8182
setEventOnStatement(stmt, event);
8283
stmt.executeUpdate();
84+
var rs = stmt.getGeneratedKeys();
85+
if(rs.next()){
86+
return (Long)rs.getObject(1);
87+
}
8388
}
89+
return null;
8490
}
8591

8692
/**
@@ -94,13 +100,13 @@ public static void publish(Event event, Connection connection, String topic) thr
94100
* @throws IllegalArgumentException if the topic is null, empty, or contains
95101
* invalid characters
96102
*/
97-
public static void publish(Event event, DataSource ds, String topic) throws SQLException {
103+
public static Long publish(Event event, DataSource ds, String topic) throws SQLException {
98104
if (topic == null || topic.trim().isEmpty() || !topic.matches("[a-z_]+")) {
99105
throw new IllegalArgumentException("Invalid topic name: must be non-null, non-empty, and only contain lowercase letters and underscores.");
100106
}
101107

102108
try (Connection c = ds.getConnection()) {
103-
publish(event, c, topic);
109+
return publish(event, c, topic);
104110
}
105111
}
106112
}

core/src/main/java/com/p14n/postevent/data/Event.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,7 @@ public static Event create(String id, String source, String type, String datacon
111111

112112
return new Event(id, source, type, datacontenttype, dataschema, subject, data, time, idn, topic, traceparent);
113113
}
114+
public Event withIdn(Long idn){
115+
return new Event(id, source, type, datacontenttype, dataschema, subject, data, time, idn, topic, traceparent);
116+
}
114117
}

debezium/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ compileJava {
99
}
1010

1111
group = 'com.p14n'
12-
version = '1.3.3-SNAPSHOT'
12+
version = '1.3.4-SNAPSHOT'
1313

1414
repositories {
1515
mavenCentral()

grpc/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ compileJava {
1515
}
1616

1717
group = 'com.p14n'
18-
version = '1.3.3-SNAPSHOT'
18+
version = '1.3.4-SNAPSHOT'
1919

2020
repositories {
2121
mavenCentral()
@@ -32,7 +32,7 @@ dependencies {
3232
implementation 'io.grpc:grpc-stub:1.53.0'
3333
implementation 'io.grpc:grpc-api:1.53.0'
3434

35-
implementation 'javax.annotation:javax.annotation-api:1.3.3-SNAPSHOT'
35+
implementation 'javax.annotation:javax.annotation-api:1.3.2'
3636

3737
// For code generation
3838
implementation 'com.google.protobuf:protobuf-java:3.21.7'

vertx/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ compileJava {
1515
}
1616

1717
group = 'com.p14n'
18-
version = '1.3.3-SNAPSHOT'
18+
version = '1.3.4-SNAPSHOT'
1919

2020

2121
repositories {

vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,12 @@ public void publish(String topic, Event event) {
108108

109109
executor.submit(() -> {
110110
try {
111-
Publisher.publish(event, dataSource, topic);
111+
Long idn = Publisher.publish(event, dataSource, topic);
112112

113113
// Then, publish to EventBus for real-time distribution
114114
String eventBusAddress = "events." + topic;
115-
eventBus.publish(eventBusAddress, event);
115+
116+
eventBus.publish(eventBusAddress, event.withIdn(idn));
116117

117118
logger.atDebug()
118119
.addArgument(topic)
@@ -150,11 +151,11 @@ public void publish(String topic, TransactionalEvent event) {
150151

151152
try {
152153

153-
Publisher.publish(event.event(), event.connection(), topic);
154+
Long idn = Publisher.publish(event.event(), event.connection(), topic);
154155

155156
// Then, publish to EventBus for real-time distribution
156157
String eventBusAddress = "events." + topic;
157-
eventBus.publish(eventBusAddress, event.event());
158+
eventBus.publish(eventBusAddress, event.event().withIdn(idn));
158159

159160
logger.atDebug()
160161
.addArgument(topic)

vertx/src/test/java/com/p14n/postevent/vertx/example/VertxConsumerExample.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public static void start(DataSource ds) throws IOException, InterruptedException
4343
"text",
4444
null,
4545
"test",
46-
"hello".getBytes(), Instant.now(),1L ,"order",null));
46+
"hello".getBytes(), Instant.now(),null ,"order",null));
4747

4848
client.subscribe("order", message -> {
4949
System.out.println("Got message");
@@ -56,7 +56,7 @@ public static void start(DataSource ds) throws IOException, InterruptedException
5656
"text",
5757
null,
5858
"test",
59-
"hello".getBytes(), Instant.now(),2L ,"order",null));
59+
"hello".getBytes(), Instant.now(),null ,"order",null));
6060

6161
latch.await(10, TimeUnit.SECONDS);
6262

0 commit comments

Comments
 (0)