Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @p14n
28 changes: 28 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Publish package to the Maven Central Repository
on:
release:
types: [created]
jobs:
publish:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Java
uses: actions/setup-java@v3
with:
java-version: '21'
distribution: 'temurin'

- name: Validate Gradle wrapper
uses: gradle/wrapper-validation-action@v1

- name: Publish package
uses: gradle/gradle-build-action@v2
with:
arguments: publishToMavenCentral
env:
ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.MAVEN_USERNAME }}
ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.MAVEN_PASSWORD }}
ORG_GRADLE_PROJECT_signingInMemoryKey: ${{ secrets.GPG_PRIVATE_KEY }}
ORG_GRADLE_PROJECT_signingInMemoryKeyId: ${{ secrets.GPG_KEY_ID }}
ORG_GRADLE_PROJECT_signingInMemoryKeyPassword: ${{ secrets.GPG_PASSPHRASE }}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,4 @@ bin
**/terraform.tfstate
**/terraform.tfstate.backup
app/infra/tf/terraform.tfvars
gradle.properties
10 changes: 4 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

[![Build Status](https://github.com/p14n/postevent/workflows/Test%20PR/badge.svg)](https://github.com/p14n/postevent/actions)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE)
[![Latest Release](https://img.shields.io/github/v/release/yourusername/postevent)](https://github.com/yourusername/postevent/releases)
[![Latest Release](https://img.shields.io/github/v/release/p14n/postevent)](https://github.com/yourusername/p14n/releases)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (typo): Incorrect link URL for "Latest Release" badge.

The link points to .../yourusername/p14n/releases but should likely point to .../p14n/postevent/releases to match the badge source repository and the project's standard repository URL.

Suggested change
[![Latest Release](https://img.shields.io/github/v/release/p14n/postevent)](https://github.com/yourusername/p14n/releases)
[![Latest Release](https://img.shields.io/github/v/release/p14n/postevent)](https://github.com/p14n/postevent/releases)


A reliable event publishing and consumption system using PostgreSQL and gRPC, following the CloudEvents specification.

Expand Down Expand Up @@ -36,15 +36,15 @@ A reliable event publishing and consumption system using PostgreSQL and gRPC, fo

#### Gradle
```groovy
implementation 'com.p14n:postevent:1.0.0'
implementation 'com.p14n:postevent:1.0.0-SNAPSHOT'
```

#### Maven
```xml
<dependency>
<groupId>com.p14n</groupId>
<artifactId>postevent</artifactId>
<version>1.0.0</version>
<version>1.0.0-SNAPSHOT</version>
</dependency>
```

Expand Down Expand Up @@ -155,8 +155,6 @@ This project is licensed under the MIT License - see the [LICENSE](LICENSE) file
## Support

- Create an [Issue](https://github.com/yourusername/postevent/issues)
- Join our [Discord/Slack] community
- Email: support@yourdomain.com

## Acknowledgments

Expand All @@ -166,4 +164,4 @@ This project is licensed under the MIT License - see the [LICENSE](LICENSE) file

## Project Status

Active development, production-ready.
Active development.
61 changes: 52 additions & 9 deletions library/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import com.vanniktech.maven.publish.SonatypeHost
import com.vanniktech.maven.publish.JavaLibrary
import com.vanniktech.maven.publish.JavadocJar

plugins {
id 'java'
id 'com.adarshr.test-logger' version '4.0.0'
id 'com.google.protobuf' version '0.9.2'
id "com.vanniktech.maven.publish" version "0.31.0"
}

compileJava {
Expand All @@ -10,7 +15,7 @@ compileJava {
}

group = 'com.p14n'
version = '1.0-SNAPSHOT'
version = '1.0.0-SNAPSHOT'

repositories {
mavenCentral()
Expand Down Expand Up @@ -57,8 +62,6 @@ dependencies {
// Instrumentation for GRPC
implementation 'io.opentelemetry.instrumentation:opentelemetry-grpc-1.6:1.32.0-alpha'

// Logging integration
//implementation 'io.opentelemetry:opentelemetry-extension-logging:1.32.0'
}

testlogger {
Expand Down Expand Up @@ -89,6 +92,7 @@ test {
failFast = true
testLogging.showStandardStreams = true
}

task fastTest( type: Test ) {
useJUnitPlatform {
includeEngines 'junit-jupiter'
Expand All @@ -102,11 +106,6 @@ jar {
}
}

task runHelloWorld(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
mainClass = 'com.p14n.postevent.HelloWorld'
}

// Configure Protobuf plugin
protobuf {
protoc {
Expand All @@ -124,7 +123,6 @@ protobuf {
}
}

// Make sure the generated code is included in the source sets
sourceSets {
main {
java {
Expand All @@ -133,3 +131,48 @@ sourceSets {
}
}
}

tasks.withType(Jar) {
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
}

javadoc {
exclude "**/grpc/**"
source = sourceSets.main.allJava
}

mavenPublishing {

configure(new JavaLibrary(new JavadocJar.Javadoc(), true))

publishToMavenCentral(SonatypeHost.CENTRAL_PORTAL, true)

signAllPublications()

coordinates("com.p14n", "postevent", version)

pom {
name = "Postevent"
description = 'A reliable event publishing and consumption system using PostgreSQL and gRPC'
inceptionYear = "2025"
url = "https://github.com/p14n/postevent/"
licenses {
license {
name = 'MIT License'
url = 'https://opensource.org/licenses/MIT'
}
}
developers {
developer {
id = 'p14n'
name = 'Dean Chapman'
email = 'dean@p14n.com'
}
}
scm {
connection = 'scm:git:git://github.com/p14n/postevent.git'
developerConnection = 'scm:git:ssh://github.com:p14n/postevent.git'
url = 'https://github.com/p14n/postevent'
}
}
}
79 changes: 76 additions & 3 deletions library/src/main/java/com/p14n/postevent/ConsumerServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import com.p14n.postevent.broker.AsyncExecutor;
import com.p14n.postevent.broker.DefaultExecutor;
import com.p14n.postevent.broker.EventMessageBroker;
import com.p14n.postevent.broker.grpc.MessageBrokerGrpcServer;
import com.p14n.postevent.broker.remote.MessageBrokerGrpcServer;
import com.p14n.postevent.catchup.CatchupServer;
import com.p14n.postevent.catchup.grpc.CatchupGrpcServer;
import com.p14n.postevent.catchup.remote.CatchupGrpcServer;
import com.p14n.postevent.data.ConfigData;

import io.grpc.Server;
Expand All @@ -21,6 +21,38 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A server that manages event consumption and distribution.
* ConsumerServer handles incoming events, manages their persistence,
* and coordinates event distribution to subscribers.
*
* <p>
* This server can be configured to handle multiple topics and supports
* both local and remote consumers through gRPC connections.
* </p>
*
* <p>
* Key features:
* </p>
* <ul>
* <li>Event persistence using a configured DataSource</li>
* <li>Asynchronous event processing via AsyncExecutor</li>
* <li>gRPC-based remote event distribution</li>
* <li>OpenTelemetry integration for monitoring and tracing</li>
* <li>Catchup functionality for missed events</li>
* </ul>
*
* <p>
* Example usage:
* </p>
*
* <pre>{@code
* var config = new ConfigData("myapp", Set.of("orders"), "localhost", 5432,
* "postgres", "postgres", "postgres", 10);
* var server = new ConsumerServer(dataSource, config, OpenTelemetry.noop());
* server.start(8080);
* }</pre>
*/
public class ConsumerServer implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ConsumerServer.class);

Expand All @@ -31,25 +63,56 @@ public class ConsumerServer implements AutoCloseable {
private AsyncExecutor asyncExecutor;
OpenTelemetry ot;

/**
* Creates a new ConsumerServer instance with default executor configuration.
*
* @param ds The datasource for event persistence
* @param cfg The configuration for the consumer server
* @param ot The OpenTelemetry instance for monitoring and tracing
*/
public ConsumerServer(DataSource ds, ConfigData cfg, OpenTelemetry ot) {
this(ds, cfg, new DefaultExecutor(2), ot);
}

/**
* Creates a new ConsumerServer instance with custom executor configuration.
*
* @param ds The datasource for event persistence
* @param cfg The configuration for the consumer server
* @param asyncExecutor The executor for handling asynchronous operations
* @param ot The OpenTelemetry instance for monitoring and tracing
*/
public ConsumerServer(DataSource ds, ConfigData cfg, AsyncExecutor asyncExecutor, OpenTelemetry ot) {
this.ds = ds;
this.cfg = cfg;
this.asyncExecutor = asyncExecutor;
this.ot = ot;
}

/**
* Starts the consumer server on the specified port.
* Initializes the event broker, local consumer, and gRPC services.
*
* @param port The port number to listen on
* @throws IOException If the server fails to start
* @throws InterruptedException If the server startup is interrupted
*/
public void start(int port) throws IOException, InterruptedException {
start(ServerBuilder.forPort(port));
}

/**
* Starts the consumer server with a custom server builder configuration.
* Initializes the event broker, local consumer, and gRPC services.
*
* @param sb The server builder to use for configuration
* @throws IOException If the server fails to start
* @throws InterruptedException If the server startup is interrupted
*/
public void start(ServerBuilder<?> sb) throws IOException, InterruptedException {
logger.atInfo().log("Starting consumer server");

var mb = new EventMessageBroker(asyncExecutor, ot,"consumer_server");
var mb = new EventMessageBroker(asyncExecutor, ot, "consumer_server");
var lc = new LocalConsumer<>(cfg, mb);
var grpcServer = new MessageBrokerGrpcServer(mb);
var catchupServer = new CatchupServer(ds);
Expand All @@ -75,6 +138,10 @@ public void start(ServerBuilder<?> sb) throws IOException, InterruptedException
closeables = List.of(lc, mb, asyncExecutor);
}

/**
* Stops the consumer server and releases all resources.
* Shuts down the gRPC server and closes all managed resources.
*/
public void stop() {
logger.atInfo().log("Stopping consumer server");

Expand All @@ -93,6 +160,12 @@ public void stop() {
logger.atInfo().log("Consumer server stopped");
}

/**
* Stops the consumer server and releases all resources.
* Implementation of AutoCloseable interface.
*
* @throws Exception If an error occurs during shutdown
*/
@Override
public void close() throws Exception {
stop();
Expand Down
Loading
Loading