diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 0000000..59f44e6 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,17 @@ +--- +name: Feature Request +about: Suggest a new feature or improvement for the Python SDK for Zerobus. +title: "[FEATURE] " +labels: '' +assignees: '' + +--- + +**Problem Statement** +A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] + +**Proposed Solution** +A clear and concise description of what you want to happen. + +**Additional Context** +Add any other context, references or screenshots about the feature request here. diff --git a/.github/ISSUE_TEMPLATE/issue.md b/.github/ISSUE_TEMPLATE/issue.md new file mode 100644 index 0000000..a16ca54 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/issue.md @@ -0,0 +1,30 @@ +--- +name: SDK Issue +about: Use this to report an issue with the Python SDK for Zerobus. +title: "[ISSUE] " +labels: '' +assignees: '' + +--- + +**Description** +A clear and concise description of what the bug is. + +**Reproduction** +A minimal code sample demonstrating the bug. + +**Expected behavior** +A clear and concise description of what you expected to happen. + +**Is it a regression?** +Did this work in a previous version of the SDK? If so, which versions did you try? + +**Debug Logs** +The SDK logs helpful debugging information when debug logging is enabled. Set the log level to debug by adding `logging.basicConfig(level=logging.DEBUG)` to your program, and include the logs here. + +**Other Information** + - OS: [e.g. macOS] + - Version: [e.g. 0.1.0] + +**Additional context** +Add any other context about the problem here. diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..4ab397c --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,28 @@ +## What changes are proposed in this pull request? + +Provide the readers and reviewers with the information they need to understand +this PR in a comprehensive manner. + +Specifically, try to answer the two following questions: + +- **WHAT** changes are being made in the PR? This should be a summary of the + major changes to allow the reader to quickly understand the PR without having + to look at the code. +- **WHY** are these changes needed? This should provide the context that the + reader might be missing. For example, were there any decisions behind the + change that are not reflected in the code itself? + +The "why part" is the most important of the two as it usually cannot be +inferred from the code itself. A well-written PR description will help future +developers (including your future self) to know how to interact and update your +code. + +## How is this tested? + +Describe any tests you have done; especially if test tests are not part of +the unit tests (e.g. local tests). + +**ALWAYS ANSWER THIS QUESTION:** Answer with "N/A" if tests are not applicable +to your PR (e.g. if the PR only modifies comments). Do not be afraid of +answering "Not tested" if the PR has not been tested. Being clear about what +has been done and not done provides important context to the reviewers. \ No newline at end of file diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..daec318 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,6 @@ +version: 2 +updates: + - package-ecosystem: "maven" + directory: "/" + schedule: + interval: "daily" diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml new file mode 100644 index 0000000..33b9ac8 --- /dev/null +++ b/.github/workflows/push.yml @@ -0,0 +1,31 @@ +name: build + +on: + pull_request: + types: [opened, synchronize] + merge_group: + types: [checks_requested] + +jobs: + fmt: + runs-on: + group: databricks-protected-runner-group + labels: linux-ubuntu-latest + steps: + - name: Set up JDK 11 + uses: actions/setup-java@v1 + with: + java-version: 11 + + - name: Checkout + uses: actions/checkout@v4 + + - name: Cache Maven packages + uses: actions/cache@v4 + with: + path: ~/.m2 + key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} + restore-keys: ${{ runner.os }}-m2 + + - name: Check formatting + run: mvn --errors spotless:check diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..6813a4e --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,237 @@ +# Contributing to Zerobus SDK for Python + +We happily welcome contributions to the Zerobus SDK for Python. We use [GitHub Issues](https://github.com/databricks/zerobus-sdk-py/issues) to track community reported issues and [GitHub Pull Requests](https://github.com/databricks/zerobus-sdk-py/pulls) for accepting changes. + +Contributions are licensed on a license-in/license-out basis. + +## Communication + +Before starting work on a major feature, please open a GitHub issue. We will make sure no one else is already working on it and that it is aligned with the goals of the project. + +A "major feature" is defined as any change that is > 100 LOC altered (not including tests), or changes any user-facing behavior. + +We will use the GitHub issue to discuss the feature and come to agreement. This is to prevent your time being wasted, as well as ours. The GitHub review process for major features is also important so that organizations with commit access can come to agreement on design. + +If it is appropriate to write a design document, the document must be hosted either in the GitHub tracking issue, or linked to from the issue and hosted in a world-readable location. + +Small patches and bug fixes don't need prior communication. + +## Development Setup + +### Prerequisites + +- Python 3.7 or higher +- Git +- pip + +### Setting Up Your Development Environment + +1. **Clone the repository:** + ```bash + git clone https://github.com/databricks/zerobus-sdk-py.git + cd zerobus-sdk-py + ``` + +2. **Create and activate a virtual environment:** + ```bash + make dev + ``` + + This will: + - Create a virtual environment in `.venv` + - Install the package in development mode with all dev dependencies + +3. **Activate the virtual environment:** + ```bash + source .venv/bin/activate # On Windows: .venv\Scripts\activate + ``` + +## Coding Style + +Code style is enforced by a formatter check in your pull request. We use [Black](https://github.com/psf/black) to format our code. Run `make fmt` to ensure your code is properly formatted prior to raising a pull request. + +### Running the Formatter + +Format your code before committing: + +```bash +make fmt +``` + +This runs: +- **Black**: Code formatting +- **autoflake**: Remove unused imports +- **isort**: Sort imports + +### Running Linters + +Check your code for issues: + +```bash +make lint +``` + +This runs: +- **pycodestyle**: Style guide enforcement +- **autoflake**: Check for unused imports + +## Pull Request Process + +1. **Create a feature branch:** + ```bash + git checkout -b feature/your-feature-name + ``` + +2. **Make your changes:** + - Write clear, concise commit messages + - Follow existing code style + - Update documentation as needed + +3. **Format your code:** + ```bash + make fmt + ``` + +4. **Commit your changes:** + ```bash + git add . + git commit -m "Add feature: description of your changes" + ``` + +5. **Push to your fork:** + ```bash + git push origin feature/your-feature-name + ``` + +6. **Create a Pull Request:** + - Provide a clear description of changes + - Reference any related issues + - Ensure all CI checks pass + +## Signed Commits + +This repo requires all contributors to sign their commits. To configure this, you can follow [Github's documentation](https://docs.github.com/en/authentication/managing-commit-signature-verification/signing-commits) to create a GPG key, upload it to your Github account, and configure your git client to sign commits. + +## Developer Certificate of Origin + +To contribute to this repository, you must sign off your commits to certify that you have the right to contribute the code and that it complies with the open source license. The rules are pretty simple, if you can certify the content of [DCO](./DCO), then simply add a "Signed-off-by" line to your commit message to certify your compliance. Please use your real name as pseudonymous/anonymous contributions are not accepted. + +``` +Signed-off-by: Joe Smith +``` + +If you set your `user.name` and `user.email` git configs, you can sign your commit automatically with `git commit -s`: + +```bash +git commit -s -m "Your commit message" +``` + +## Code Review Guidelines + +When reviewing code: + +- Check for adherence to code style +- Look for potential edge cases +- Consider performance implications +- Ensure documentation is updated + +## Commit Message Guidelines + +Follow these conventions for commit messages: + +- Use present tense: "Add feature" not "Added feature" +- Use imperative mood: "Fix bug" not "Fixes bug" +- First line should be 50 characters or less +- Reference issues: "Fix #123: Description of fix" + +Example: +``` +Add async stream creation example + +- Add async_example.py demonstrating non-blocking ingestion +- Update README with async API documentation + +Fixes #42 +``` + +## Documentation + +### Updating Documentation + +- Update docstrings for all public APIs +- Use Google-style docstrings +- Include examples in docstrings where helpful +- Update README.md for user-facing changes +- Update examples/ for new features + +Example docstring: +```python +def ingest_record(self, record) -> RecordAcknowledgment: + """ + Submits a single record for ingestion into the stream. + + This method may block if the maximum number of in-flight records + has been reached. + + Args: + record: The Protobuf message object to be ingested. + + Returns: + RecordAcknowledgment: An object to wait on for the server's acknowledgment. + + Raises: + ZerobusException: If the stream is not in a valid state for ingestion. + + Example: + >>> record = AirQuality(device_name="sensor-1", temp=25) + >>> ack = stream.ingest_record(record) + >>> ack.wait_for_ack() + """ +``` + +## Continuous Integration + +All pull requests must pass CI checks: + +- **fmt**: Runs formatting checks (black, autoflake, isort) + +The formatting check runs `make dev fmt` and then checks for any git differences. If there are differences, the check will fail. + +You can view CI results in the GitHub Actions tab of the pull request. + +## Makefile Targets + +Available make targets: + +- `make dev` - Set up development environment +- `make install` - Install package +- `make build` - Build wheel package +- `make fmt` - Format code with black, autoflake, and isort +- `make lint` - Run linting with pycodestyle +- `make clean` - Remove build artifacts +- `make help` - Show available targets + +## Versioning + +We follow [Semantic Versioning](https://semver.org/): + +- **MAJOR**: Incompatible API changes +- **MINOR**: Backwards-compatible functionality additions +- **PATCH**: Backwards-compatible bug fixes + +## Getting Help + +- **Issues**: Open an issue on GitHub for bugs or feature requests +- **Discussions**: Use GitHub Discussions for questions +- **Documentation**: Check the README and examples/ + +## Package Name + +The package is published on PyPI as `databricks-zerobus-ingest-sdk`. + +## Code of Conduct + +- Be respectful and inclusive +- Welcome newcomers +- Focus on constructive feedback +- Follow the [Python Community Code of Conduct](https://www.python.org/psf/conduct/) diff --git a/DCO b/DCO new file mode 100644 index 0000000..d4f11df --- /dev/null +++ b/DCO @@ -0,0 +1,25 @@ +Developer's Certificate of Origin 1.1 + +By making a contribution to this project, I certify that: + +(a) The contribution was created in whole or in part by me and I + have the right to submit it under the open source license + indicated in the file; or + +(b) The contribution is based upon previous work that, to the best + of my knowledge, is covered under an appropriate open source + license and I have the right under that license to submit that + work with modifications, whether created in whole or in part + by me, under the same open source license (unless I am + permitted to submit under a different license), as indicated + in the file; or + +(c) The contribution was provided directly to me by some other + person who certified (a), (b) or (c) and I have not modified + it. + +(d) I understand and agree that this project and the contribution + are public and that a record of the contribution (including all + personal information I submit with it, including my sign-off) is + maintained indefinitely and may be redistributed consistent with + this project or the open source license(s) involved. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..ee8b304 --- /dev/null +++ b/Makefile @@ -0,0 +1,2 @@ +fmt: + mvn spotless:apply diff --git a/README.md b/README.md index e63d6f7..ac11dd7 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,5 @@ # Databricks Zerobus Ingest SDK for Java -[![Maven Central](https://img.shields.io/badge/maven--central-0.1.0--SNAPSHOT-blue)](https://central.sonatype.com/) -[![Java](https://img.shields.io/badge/java-8%2B-blue)](https://www.oracle.com/java/) - The Databricks Zerobus Ingest SDK for Java provides a high-performance client for ingesting data directly into Databricks Delta tables using the Zerobus streaming protocol. ## Features @@ -17,7 +14,7 @@ The Databricks Zerobus Ingest SDK for Java provides a high-performance client fo ### Runtime Requirements -- **Java**: 8 or higher +- **Java**: 8 or higher - [Download Java](https://adoptium.net/) - **Databricks workspace** with Zerobus access enabled ### Dependencies @@ -27,18 +24,19 @@ The Databricks Zerobus Ingest SDK for Java provides a high-performance client fo - Includes `slf4j-simple` for logging out of the box **When using the regular JAR**: -- `protobuf-java` 3.24.0 -- `grpc-netty-shaded` 1.58.0 -- `grpc-protobuf` 1.58.0 -- `grpc-stub` 1.58.0 -- `javax.annotation-api` 1.3.2 -- `slf4j-api` 1.7.36 -- `slf4j-simple` 1.7.36 (or substitute your own SLF4J implementation like `logback-classic` 1.2.11) +- [`protobuf-java` 3.24.0](https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java/3.24.0) +- [`grpc-netty-shaded` 1.58.0](https://mvnrepository.com/artifact/io.grpc/grpc-netty-shaded/1.58.0) +- [`grpc-protobuf` 1.58.0](https://mvnrepository.com/artifact/io.grpc/grpc-protobuf/1.58.0) +- [`grpc-stub` 1.58.0](https://mvnrepository.com/artifact/io.grpc/grpc-stub/1.58.0) +- [`javax.annotation-api` 1.3.2](https://mvnrepository.com/artifact/javax.annotation/javax.annotation-api/1.3.2) +- [`slf4j-api` 1.7.36](https://mvnrepository.com/artifact/org.slf4j/slf4j-api/1.7.36) +- [`slf4j-simple` 1.7.36](https://mvnrepository.com/artifact/org.slf4j/slf4j-simple/1.7.36) (or substitute your own SLF4J implementation like [`logback-classic` 1.2.11](https://mvnrepository.com/artifact/ch.qos.logback/logback-classic/1.2.11)) ### Build Requirements (only for building from source) -- **Maven**: 3.6 or higher -- **Protocol Buffers Compiler** (`protoc`): 3.24.0 (for compiling your own `.proto` schemas) +- **Java**: 8 or higher - [Download Java](https://adoptium.net/) +- **Maven**: 3.6 or higher - [Download Maven](https://maven.apache.org/download.cgi) +- **Protocol Buffers Compiler** (`protoc`): 24.4 - [Download protoc](https://github.com/protocolbuffers/protobuf/releases/tag/v24.4) (for compiling your own `.proto` schemas) ## Quick Start User Guide @@ -114,11 +112,11 @@ mvn clean package This generates two JAR files in the `target/` directory: -- **Regular JAR**: `zerobus-ingest-sdk-java-0.1.0-SNAPSHOT.jar` (144KB) +- **Regular JAR**: `databricks-zerobus-ingest-sdk-0.1.0.jar` (144KB) - Contains only the SDK classes - Requires all dependencies on the classpath -- **Fat JAR**: `zerobus-ingest-sdk-java-0.1.0-SNAPSHOT-jar-with-dependencies.jar` (18MB) +- **Fat JAR**: `databricks-zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar` (18MB) - Contains SDK classes plus all dependencies bundled - Self-contained, easier to deploy @@ -149,14 +147,14 @@ my-zerobus-app/ │ └── proto/ │ └── record.proto └── lib/ - └── zerobus-ingest-sdk-java-0.1.0-SNAPSHOT-jar-with-dependencies.jar + └── databricks-zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar ``` Copy the fat JAR to your project: ```bash mkdir lib -cp ../zerobus-sdk-java/target/zerobus-ingest-sdk-java-0.1.0-SNAPSHOT-jar-with-dependencies.jar lib/ +cp ../zerobus-sdk-java/target/databricks-zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar lib/ ``` #### 3. Define Your Protocol Buffer Schema @@ -219,7 +217,7 @@ public class ZerobusClient { tableProperties, clientId, clientSecret - ); + ).join(); try { // Ingest records @@ -230,8 +228,7 @@ public class ZerobusClient { .setHumidity(50 + (i % 40)) .build(); - IngestRecordResult result = stream.ingestRecord(record); - result.getWriteCompleted().join(); // Wait for durability + stream.ingestRecord(record).join(); // Wait for durability System.out.println("Ingested record " + (i + 1)); } @@ -282,7 +279,7 @@ ZerobusStream stream = sdk.createStream( tableProperties, clientId, clientSecret -); +).join(); try { for (int i = 0; i < 1000; i++) { @@ -292,8 +289,7 @@ try { .setHumidity(50 + i % 40) .build(); - IngestRecordResult result = stream.ingestRecord(record); - result.getWriteCompleted().join(); // Wait for durability + stream.ingestRecord(record).join(); // Wait for durability } } finally { stream.close(); @@ -317,7 +313,7 @@ ZerobusStream stream = sdk.createStream( clientId, clientSecret, options -); +).join(); List> futures = new ArrayList<>(); @@ -329,8 +325,7 @@ try { .setHumidity(50 + i % 40) .build(); - IngestRecordResult result = stream.ingestRecord(record); - futures.add(result.getWriteCompleted()); + futures.add(stream.ingestRecord(record)); } // Flush and wait for all records @@ -458,30 +453,30 @@ ZerobusSdk(String serverEndpoint, String unityCatalogEndpoint) **Methods:** ```java - ZerobusStream createStream( + CompletableFuture> createStream( TableProperties tableProperties, String clientId, String clientSecret, StreamConfigurationOptions options -) throws ZerobusException +) ``` -Creates a new ingestion stream with custom configuration. +Creates a new ingestion stream with custom configuration. Returns a CompletableFuture that completes when the stream is ready. ```java - ZerobusStream createStream( + CompletableFuture> createStream( TableProperties tableProperties, String clientId, String clientSecret -) throws ZerobusException +) ``` -Creates a new ingestion stream with default configuration. +Creates a new ingestion stream with default configuration. Returns a CompletableFuture that completes when the stream is ready. ```java - ZerobusStream recreateStream( + CompletableFuture> recreateStream( ZerobusStream stream -) throws ZerobusException +) ``` -Recreates a failed stream, resending unacknowledged records. +Recreates a failed stream, resending unacknowledged records. Returns a CompletableFuture that completes when the stream is ready. --- @@ -492,9 +487,9 @@ Represents an active ingestion stream. **Methods:** ```java -IngestRecordResult ingestRecord(RecordType record) +CompletableFuture ingestRecord(RecordType record) throws ZerobusException ``` -Ingests a single record into the stream. Returns futures for tracking ingestion progress. +Ingests a single record into the stream. Returns a future that completes when the record is durably written to storage. ```java void flush() throws ZerobusException @@ -624,24 +619,6 @@ Builds and returns the `StreamConfigurationOptions` instance. --- -### IngestRecordResult - -Result of an asynchronous record ingestion operation. - -**Methods:** - -```java -CompletableFuture getRecordAccepted() -``` -Returns a future that completes when the SDK accepts the record for processing (fast). - -```java -CompletableFuture getWriteCompleted() -``` -Returns a future that completes when the record is durably written to storage (slower). - ---- - ### IngestRecordResponse Server acknowledgment response containing durability information. diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 0000000..54ee112 --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,4 @@ +## Reporting a Vulnerability + +We appreciate any security concerns brought to our attention and encourage you to notify us of any potential vulnerabilities discovered in our systems or products. +If you believe you have found a security vulnerability, please report it to us at [security@databricks.com](mailto:security@databricks.com). diff --git a/examples/README.md b/examples/README.md index b9b85b9..183bf69 100644 --- a/examples/README.md +++ b/examples/README.md @@ -21,10 +21,10 @@ Demonstrates synchronous record ingestion where each record is waited for before **Run:** ```bash -javac -cp "../target/zerobus-ingest-sdk-java-0.1.0-SNAPSHOT-jar-with-dependencies.jar" \ +javac -cp "../target/databricks-zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar" \ src/main/java/com/databricks/zerobus/examples/BlockingIngestionExample.java -java -cp "../target/zerobus-ingest-sdk-java-0.1.0-SNAPSHOT-jar-with-dependencies.jar:src/main/java" \ +java -cp "../target/databricks-zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar:src/main/java" \ com.databricks.zerobus.examples.BlockingIngestionExample ``` @@ -47,10 +47,10 @@ Demonstrates asynchronous record ingestion for maximum throughput. **Run:** ```bash -javac -cp "../target/zerobus-ingest-sdk-java-0.1.0-SNAPSHOT-jar-with-dependencies.jar" \ +javac -cp "../target/databricks-zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar" \ src/main/java/com/databricks/zerobus/examples/NonBlockingIngestionExample.java -java -cp "../target/zerobus-ingest-sdk-java-0.1.0-SNAPSHOT-jar-with-dependencies.jar:src/main/java" \ +java -cp "../target/databricks-zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar:src/main/java" \ com.databricks.zerobus.examples.NonBlockingIngestionExample ``` diff --git a/examples/src/main/java/com/databricks/zerobus/examples/BlockingIngestionExample.java b/examples/src/main/java/com/databricks/zerobus/examples/BlockingIngestionExample.java index 0345da8..7762dbb 100644 --- a/examples/src/main/java/com/databricks/zerobus/examples/BlockingIngestionExample.java +++ b/examples/src/main/java/com/databricks/zerobus/examples/BlockingIngestionExample.java @@ -47,7 +47,7 @@ public static void main(String[] args) { tableProperties, CLIENT_ID, CLIENT_SECRET - ); + ).join(); System.out.println("✓ Stream created: " + stream.getStreamId()); // Step 4: Ingest records synchronously @@ -65,13 +65,7 @@ public static void main(String[] args) { .build(); // Ingest and wait for durability - IngestRecordResult result = stream.ingestRecord(record); - - // Wait for SDK to accept the record - result.getRecordAccepted().join(); - - // Wait for record to be durably written - result.getWriteCompleted().join(); + stream.ingestRecord(record).join(); successCount++; diff --git a/examples/src/main/java/com/databricks/zerobus/examples/NonBlockingIngestionExample.java b/examples/src/main/java/com/databricks/zerobus/examples/NonBlockingIngestionExample.java index 4ffdbc0..3d0c305 100644 --- a/examples/src/main/java/com/databricks/zerobus/examples/NonBlockingIngestionExample.java +++ b/examples/src/main/java/com/databricks/zerobus/examples/NonBlockingIngestionExample.java @@ -61,7 +61,7 @@ public static void main(String[] args) { CLIENT_ID, CLIENT_SECRET, options - ); + ).join(); System.out.println("✓ Stream created: " + stream.getStreamId()); // Step 5: Ingest records asynchronously @@ -78,14 +78,8 @@ public static void main(String[] args) { .setHumidity(50 + (i % 40)) .build(); - // Ingest record (non-blocking) - IngestRecordResult result = stream.ingestRecord(record); - - // Wait for SDK to accept (this is fast, just queue check) - result.getRecordAccepted().join(); - - // Collect futures to wait for durability later - futures.add(result.getWriteCompleted()); + // Ingest record and collect future for durability later + futures.add(stream.ingestRecord(record)); // Progress indicator if ((i + 1) % 10000 == 0) { diff --git a/pom.xml b/pom.xml index 889cd57..908e47a 100644 --- a/pom.xml +++ b/pom.xml @@ -1,23 +1,18 @@ - + 4.0.0 - com.databricks - zerobus-ingest-sdk-java - 0.1.0-SNAPSHOT + databricks-zerobus-ingest-sdk + 0.1.0 jar - Zerobus Ingest SDK for Java Databricks Zerobus Ingest SDK for Java - Direct ingestion to Delta tables - 1.8 1.8 UTF-8 - @@ -25,7 +20,6 @@ protobuf-java 3.24.0 - io.grpc @@ -42,21 +36,18 @@ grpc-stub 1.58.0 - javax.annotation javax.annotation-api 1.3.2 - org.slf4j slf4j-api 1.7.36 - org.slf4j @@ -64,15 +55,7 @@ 1.7.36 - - - - kr.motd.maven - os-maven-plugin - 1.7.1 - - @@ -93,7 +76,6 @@ - org.apache.maven.plugins maven-compiler-plugin @@ -103,13 +85,40 @@ 1.8 - + + com.diffplug.spotless + spotless-maven-plugin + + 2.30.0 + + + + + + + + + + + pom.xml + + + false + false + + + true + true + true + + + + org.apache.maven.plugins maven-jar-plugin 3.3.0 - org.apache.maven.plugins @@ -117,10 +126,10 @@ 3.5.1 - package shade + package true jar-with-dependencies @@ -149,5 +158,12 @@ + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + diff --git a/src/main/java/com/databricks/zerobus/BackgroundTask.java b/src/main/java/com/databricks/zerobus/BackgroundTask.java index bfd144a..2080d99 100644 --- a/src/main/java/com/databricks/zerobus/BackgroundTask.java +++ b/src/main/java/com/databricks/zerobus/BackgroundTask.java @@ -1,121 +1,118 @@ package com.databricks.zerobus; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * A background task that runs repeatedly until cancelled. - * Handles task execution, error handling, and cancellation. + * A background task that runs repeatedly until cancelled. Handles task execution, error handling, + * and cancellation. */ class BackgroundTask { - private static final Logger logger = LoggerFactory.getLogger(BackgroundTask.class); + private static final Logger logger = LoggerFactory.getLogger(BackgroundTask.class); - private final Consumer> task; - private final Consumer taskFailureHandler; - private final long delayMillis; - private final ExecutorService executor; + private final Consumer> task; + private final Consumer taskFailureHandler; + private final long delayMillis; + private final ExecutorService executor; - private final AtomicBoolean isActive = new AtomicBoolean(false); - private CompletableFuture cancellationToken; + private final AtomicBoolean isActive = new AtomicBoolean(false); + private CompletableFuture cancellationToken; - /** - * Creates a new background task. - * - * @param task The task to run repeatedly. Takes a cancellation token as parameter. - * @param taskFailureHandler Handler for task failures - * @param delayMillis Delay between task iterations in milliseconds - * @param executor The executor to run the task on - */ - BackgroundTask( - Consumer> task, - Consumer taskFailureHandler, - long delayMillis, - ExecutorService executor) { - this.task = task; - this.taskFailureHandler = taskFailureHandler; - this.delayMillis = delayMillis; - this.executor = executor; - } + /** + * Creates a new background task. + * + * @param task The task to run repeatedly. Takes a cancellation token as parameter. + * @param taskFailureHandler Handler for task failures + * @param delayMillis Delay between task iterations in milliseconds + * @param executor The executor to run the task on + */ + BackgroundTask( + Consumer> task, + Consumer taskFailureHandler, + long delayMillis, + ExecutorService executor) { + this.task = task; + this.taskFailureHandler = taskFailureHandler; + this.delayMillis = delayMillis; + this.executor = executor; + } - /** - * Creates a new background task with no delay. - * - * @param task The task to run repeatedly - * @param taskFailureHandler Handler for task failures - * @param executor The executor to run the task on - */ - BackgroundTask( - Consumer> task, - Consumer taskFailureHandler, - ExecutorService executor) { - this(task, taskFailureHandler, 0, executor); - } + /** + * Creates a new background task with no delay. + * + * @param task The task to run repeatedly + * @param taskFailureHandler Handler for task failures + * @param executor The executor to run the task on + */ + BackgroundTask( + Consumer> task, + Consumer taskFailureHandler, + ExecutorService executor) { + this(task, taskFailureHandler, 0, executor); + } - /** - * Starts the background task. - * - * @throws IllegalStateException if the task is already running - */ - void start() { - boolean alreadyRunning = !isActive.compareAndSet(false, true); + /** + * Starts the background task. + * + * @throws IllegalStateException if the task is already running + */ + void start() { + boolean alreadyRunning = !isActive.compareAndSet(false, true); - if (alreadyRunning) { - throw new IllegalStateException("Background task is already running"); - } + if (alreadyRunning) { + throw new IllegalStateException("Background task is already running"); + } - cancellationToken = new CompletableFuture<>(); + cancellationToken = new CompletableFuture<>(); - CompletableFuture.runAsync(() -> { - try { - while (!cancellationToken.isDone()) { - try { - task.accept(cancellationToken); + CompletableFuture.runAsync( + () -> { + try { + while (!cancellationToken.isDone()) { + try { + task.accept(cancellationToken); - if (delayMillis > 0) { - Thread.sleep(delayMillis); - } - } catch (Throwable e) { - taskFailureHandler.accept(e); - } - } - } catch (Throwable e) { - logger.error("Background task failed", e); - } finally { - synchronized (this) { - isActive.set(false); - this.notifyAll(); + if (delayMillis > 0) { + Thread.sleep(delayMillis); } + } catch (Throwable e) { + taskFailureHandler.accept(e); + } } - }, executor); - } + } catch (Throwable e) { + logger.error("Background task failed", e); + } finally { + synchronized (this) { + isActive.set(false); + this.notifyAll(); + } + } + }, + executor); + } - /** - * Cancels the background task. - */ - void cancel() { - if (cancellationToken != null) { - cancellationToken.complete(null); - } + /** Cancels the background task. */ + void cancel() { + if (cancellationToken != null) { + cancellationToken.complete(null); } + } - /** - * Waits until the background task has stopped. - */ - void waitUntilStopped() { - synchronized (this) { - while (isActive.get()) { - try { - this.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - } + /** Waits until the background task has stopped. */ + void waitUntilStopped() { + synchronized (this) { + while (isActive.get()) { + try { + this.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; } + } } + } } diff --git a/src/main/java/com/databricks/zerobus/IngestRecordResult.java b/src/main/java/com/databricks/zerobus/IngestRecordResult.java deleted file mode 100644 index 46993b6..0000000 --- a/src/main/java/com/databricks/zerobus/IngestRecordResult.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.databricks.zerobus; - -import java.util.concurrent.CompletableFuture; - -/** - * Result of an asynchronous record ingestion operation. - * - *

This class provides two futures that track different stages of the ingestion process: - *

    - *
  • recordAccepted: Completes when the SDK has queued the record internally
  • - *
  • writeCompleted: Completes when the server has durably written the record
  • - *
- * - *

If either future fails, the record may not have been successfully ingested. However, - * due to network timing, it's possible the server received and stored the record even if - * the acknowledgment was lost. - * - *

Example usage: - *

{@code
- * IngestRecordResult result = stream.ingestRecord(myRecord);
- *
- * // Wait for SDK to accept the record (fast)
- * result.getRecordAccepted().join();
- *
- * // Wait for durable storage (slower)
- * result.getWriteCompleted().join();
- * }
- */ -public class IngestRecordResult { - private final CompletableFuture recordAccepted; - private final CompletableFuture writeCompleted; - - /** - * Creates a new IngestRecordResult. - * - * @param recordAccepted Future that completes when the SDK accepts the record - * @param writeCompleted Future that completes when the record is durably written - */ - public IngestRecordResult(CompletableFuture recordAccepted, CompletableFuture writeCompleted) { - this.recordAccepted = recordAccepted; - this.writeCompleted = writeCompleted; - } - - /** - * Returns a future that completes when the SDK accepts the record for processing. - * - *

This typically completes quickly, only waiting for queue space availability. - * Completion indicates the record is queued but not yet sent to the server. - * - * @return Future that completes when the record is accepted by the SDK - */ - public CompletableFuture getRecordAccepted() { - return recordAccepted; - } - - /** - * Returns a future that completes when the record is durably written to storage. - * - *

This completes after the server acknowledges successful storage. This is the - * stronger guarantee and may take longer depending on server load and network latency. - * - * @return Future that completes when the record is durably stored - */ - public CompletableFuture getWriteCompleted() { - return writeCompleted; - } -} diff --git a/src/main/java/com/databricks/zerobus/NonRetriableException.java b/src/main/java/com/databricks/zerobus/NonRetriableException.java index f5d9a12..bb032a3 100644 --- a/src/main/java/com/databricks/zerobus/NonRetriableException.java +++ b/src/main/java/com/databricks/zerobus/NonRetriableException.java @@ -1,27 +1,27 @@ package com.databricks.zerobus; /** - * An exception that indicates a non-retriable error has occurred. - * This is used to signal that stream creation or recovery should not be retried. + * An exception that indicates a non-retriable error has occurred. This is used to signal that + * stream creation or recovery should not be retried. */ public class NonRetriableException extends ZerobusException { - /** - * Constructs a new NonRetriableException with the specified detail message. - * - * @param message the detail message - */ - public NonRetriableException(String message) { - super(message); - } + /** + * Constructs a new NonRetriableException with the specified detail message. + * + * @param message the detail message + */ + public NonRetriableException(String message) { + super(message); + } - /** - * Constructs a new NonRetriableException with the specified detail message and cause. - * - * @param message the detail message - * @param cause the cause of the exception - */ - public NonRetriableException(String message, Throwable cause) { - super(message, cause); - } + /** + * Constructs a new NonRetriableException with the specified detail message and cause. + * + * @param message the detail message + * @param cause the cause of the exception + */ + public NonRetriableException(String message, Throwable cause) { + super(message, cause); + } } diff --git a/src/main/java/com/databricks/zerobus/StreamState.java b/src/main/java/com/databricks/zerobus/StreamState.java index c40c806..8cf5bd6 100644 --- a/src/main/java/com/databricks/zerobus/StreamState.java +++ b/src/main/java/com/databricks/zerobus/StreamState.java @@ -4,6 +4,7 @@ * Represents the lifecycle state of a ZerobusStream. * *

State transitions typically follow this pattern: + * *

  * UNINITIALIZED → OPENED → FLUSHING → CLOSED
  *                    ↓
diff --git a/src/main/java/com/databricks/zerobus/TableProperties.java b/src/main/java/com/databricks/zerobus/TableProperties.java
index e6ecb90..41d4bae 100644
--- a/src/main/java/com/databricks/zerobus/TableProperties.java
+++ b/src/main/java/com/databricks/zerobus/TableProperties.java
@@ -9,55 +9,54 @@
  * @param  The type of records to be ingested (must extend Message).
  */
 public class TableProperties {
-    private final String tableName;
-    private final Message defaultInstance;
+  private final String tableName;
+  private final Message defaultInstance;
 
-    /**
-     * Creates a new TableProperties instance.
-     *
-     * @param tableName The name of the table to ingest records into.
-     * @param defaultInstance The default instance of the record type (used to get the descriptor).
-     */
-    public TableProperties(String tableName, RecordType defaultInstance) {
-        this.tableName = tableName;
-        this.defaultInstance = defaultInstance;
-    }
+  /**
+   * Creates a new TableProperties instance.
+   *
+   * @param tableName The name of the table to ingest records into.
+   * @param defaultInstance The default instance of the record type (used to get the descriptor).
+   */
+  public TableProperties(String tableName, RecordType defaultInstance) {
+    this.tableName = tableName;
+    this.defaultInstance = defaultInstance;
+  }
 
-    /**
-     * Returns the table name.
-     *
-     * @return the table name
-     */
-    public String getTableName() {
-        return tableName;
-    }
+  /**
+   * Returns the table name.
+   *
+   * @return the table name
+   */
+  public String getTableName() {
+    return tableName;
+  }
 
-    /**
-     * Returns the default instance.
-     *
-     * @return the default instance
-     */
-    public Message getDefaultInstance() {
-        return defaultInstance;
-    }
+  /**
+   * Returns the default instance.
+   *
+   * @return the default instance
+   */
+  public Message getDefaultInstance() {
+    return defaultInstance;
+  }
 
-    /**
-     * Gets the descriptor proto for the record type.
-     *
-     * @return the descriptor proto
-     */
-    Descriptors.Descriptor getDescriptor() {
-        return defaultInstance.getDescriptorForType();
-    }
+  /**
+   * Gets the descriptor proto for the record type.
+   *
+   * @return the descriptor proto
+   */
+  Descriptors.Descriptor getDescriptor() {
+    return defaultInstance.getDescriptorForType();
+  }
 
-    /**
-     * Gets the DescriptorProto for the record type.
-     * This is used to send the schema to the server.
-     *
-     * @return the DescriptorProto
-     */
-    com.google.protobuf.DescriptorProtos.DescriptorProto getDescriptorProto() {
-        Descriptors.Descriptor descriptor = getDescriptor();
-        return descriptor.toProto();
-    }
+  /**
+   * Gets the DescriptorProto for the record type. This is used to send the schema to the server.
+   *
+   * @return the DescriptorProto
+   */
+  com.google.protobuf.DescriptorProtos.DescriptorProto getDescriptorProto() {
+    Descriptors.Descriptor descriptor = getDescriptor();
+    return descriptor.toProto();
+  }
 }
diff --git a/src/main/java/com/databricks/zerobus/TokenFactory.java b/src/main/java/com/databricks/zerobus/TokenFactory.java
index a8b6827..e6a9c57 100644
--- a/src/main/java/com/databricks/zerobus/TokenFactory.java
+++ b/src/main/java/com/databricks/zerobus/TokenFactory.java
@@ -14,141 +14,149 @@
 /**
  * Factory for obtaining OAuth 2.0 access tokens with Unity Catalog privileges.
  *
- * 

This class uses the OAuth 2.0 client credentials flow with authorization details - * to request tokens scoped to specific Unity Catalog resources. The generated tokens - * include privileges for catalog, schema, and table access required for ingestion. + *

This class uses the OAuth 2.0 client credentials flow with authorization details to request + * tokens scoped to specific Unity Catalog resources. The generated tokens include privileges for + * catalog, schema, and table access required for ingestion. */ public class TokenFactory { - /** - * Obtains an OAuth token with Unity Catalog privileges for the specified table. - * - *

The token request includes authorization details that grant: - *

    - *
  • USE CATALOG on the table's catalog
  • - *
  • USE SCHEMA on the table's schema
  • - *
  • SELECT and MODIFY on the target table
  • - *
- * - * @param tableName The fully qualified table name (catalog.schema.table) - * @param workspaceId The Databricks workspace ID - * @param workspaceUrl The Unity Catalog endpoint URL - * @param clientId The OAuth client ID - * @param clientSecret The OAuth client secret - * @return The OAuth access token (JWT) - * @throws NonRetriableException if the token request fails or table name is invalid - */ - public static String getZerobusToken( - String tableName, - String workspaceId, - String workspaceUrl, - String clientId, - String clientSecret) throws NonRetriableException { - - // Parse and validate the three-part table name - String[] threePartTableName = tableName.split("\\."); - if (threePartTableName.length != 3) { - throw new NonRetriableException( - "Table name '" + tableName + "' must be in the format of catalog.schema.table"); - } + /** + * Obtains an OAuth token with Unity Catalog privileges for the specified table. + * + *

The token request includes authorization details that grant: + * + *

    + *
  • USE CATALOG on the table's catalog + *
  • USE SCHEMA on the table's schema + *
  • SELECT and MODIFY on the target table + *
+ * + * @param tableName The fully qualified table name (catalog.schema.table) + * @param workspaceId The Databricks workspace ID + * @param workspaceUrl The Unity Catalog endpoint URL + * @param clientId The OAuth client ID + * @param clientSecret The OAuth client secret + * @return The OAuth access token (JWT) + * @throws NonRetriableException if the token request fails or table name is invalid + */ + public static String getZerobusToken( + String tableName, + String workspaceId, + String workspaceUrl, + String clientId, + String clientSecret) + throws NonRetriableException { + + // Parse and validate the three-part table name + String[] threePartTableName = tableName.split("\\."); + if (threePartTableName.length != 3) { + throw new NonRetriableException( + "Table name '" + tableName + "' must be in the format of catalog.schema.table"); + } - String catalogName = threePartTableName[0]; - String schemaName = threePartTableName[1]; - String tableNameOnly = threePartTableName[2]; - - // Build authorization details using the RAR (RFC 9396) format. - // Newlines are required for proper JWT claim formatting. - String authorizationDetails = String.format( - "[\n" + - " {\n" + - " \"type\": \"unity_catalog_privileges\",\n" + - " \"privileges\": [\"USE CATALOG\"],\n" + - " \"object_type\": \"CATALOG\",\n" + - " \"object_full_path\": \"%s\"\n" + - " },\n" + - " {\n" + - " \"type\": \"unity_catalog_privileges\",\n" + - " \"privileges\": [\"USE SCHEMA\"],\n" + - " \"object_type\": \"SCHEMA\",\n" + - " \"object_full_path\": \"%s.%s\"\n" + - " },\n" + - " {\n" + - " \"type\": \"unity_catalog_privileges\",\n" + - " \"privileges\": [\"SELECT\", \"MODIFY\"],\n" + - " \"object_type\": \"TABLE\",\n" + - " \"object_full_path\": \"%s.%s.%s\"\n" + - " }\n" + - "]", - catalogName, - catalogName, schemaName, - catalogName, schemaName, tableNameOnly); - - String urlString = workspaceUrl + "/oidc/v1/token"; - - try { - // Build OAuth 2.0 client credentials request with Unity Catalog authorization details - String formData = "grant_type=client_credentials" + - "&scope=all-apis" + - "&resource=api://databricks/workspaces/" + workspaceId + "/zerobusDirectWriteApi" + - "&authorization_details=" + URLEncoder.encode(authorizationDetails, "UTF-8"); - - // Encode credentials for HTTP Basic authentication - String credentials = Base64.getEncoder() - .encodeToString((clientId + ":" + clientSecret).getBytes(StandardCharsets.UTF_8)); - - HttpURLConnection connection = (HttpURLConnection) new URL(urlString).openConnection(); - connection.setRequestMethod("POST"); - connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); - connection.setRequestProperty("Authorization", "Basic " + credentials); - connection.setDoOutput(true); - - OutputStreamWriter writer = new OutputStreamWriter( - connection.getOutputStream(), StandardCharsets.UTF_8); - writer.write(formData); - writer.close(); - - int responseCode = connection.getResponseCode(); - - if (responseCode != 200) { - String errorBody = "No error details available"; - if (connection.getErrorStream() != null) { - BufferedReader errorReader = new BufferedReader( - new InputStreamReader(connection.getErrorStream(), StandardCharsets.UTF_8)); - StringBuilder errorBuilder = new StringBuilder(); - String line; - while ((line = errorReader.readLine()) != null) { - errorBuilder.append(line).append("\n"); - } - errorReader.close(); - errorBody = errorBuilder.toString(); - } - throw new NonRetriableException( - "OAuth request failed with status " + responseCode + ": " + errorBody); - } - - BufferedReader reader = new BufferedReader( - new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8)); - StringBuilder responseBody = new StringBuilder(); - String line; - while ((line = reader.readLine()) != null) { - responseBody.append(line).append("\n"); - } - reader.close(); - - // Extract access token using regex to avoid dependency on a JSON library. - // Pattern matches: "access_token": "value" with flexible whitespace. - Pattern accessTokenPattern = Pattern.compile("\"access_token\"\\s*:\\s*\"([^\"]+)\""); - Matcher matcher = accessTokenPattern.matcher(responseBody.toString()); - - if (matcher.find()) { - return matcher.group(1); - } else { - throw new NonRetriableException("No access token received from OAuth response"); - } - } catch (NonRetriableException e) { - throw e; - } catch (Exception e) { - throw new NonRetriableException("Unexpected error getting OAuth token: " + e.getMessage(), e); + String catalogName = threePartTableName[0]; + String schemaName = threePartTableName[1]; + String tableNameOnly = threePartTableName[2]; + + // Build authorization details using the RAR (RFC 9396) format. + // Newlines are required for proper JWT claim formatting. + String authorizationDetails = + String.format( + "[\n" + + " {\n" + + " \"type\": \"unity_catalog_privileges\",\n" + + " \"privileges\": [\"USE CATALOG\"],\n" + + " \"object_type\": \"CATALOG\",\n" + + " \"object_full_path\": \"%s\"\n" + + " },\n" + + " {\n" + + " \"type\": \"unity_catalog_privileges\",\n" + + " \"privileges\": [\"USE SCHEMA\"],\n" + + " \"object_type\": \"SCHEMA\",\n" + + " \"object_full_path\": \"%s.%s\"\n" + + " },\n" + + " {\n" + + " \"type\": \"unity_catalog_privileges\",\n" + + " \"privileges\": [\"SELECT\", \"MODIFY\"],\n" + + " \"object_type\": \"TABLE\",\n" + + " \"object_full_path\": \"%s.%s.%s\"\n" + + " }\n" + + "]", + catalogName, catalogName, schemaName, catalogName, schemaName, tableNameOnly); + + String urlString = workspaceUrl + "/oidc/v1/token"; + + try { + // Build OAuth 2.0 client credentials request with Unity Catalog authorization details + String formData = + "grant_type=client_credentials" + + "&scope=all-apis" + + "&resource=api://databricks/workspaces/" + + workspaceId + + "/zerobusDirectWriteApi" + + "&authorization_details=" + + URLEncoder.encode(authorizationDetails, "UTF-8"); + + // Encode credentials for HTTP Basic authentication + String credentials = + Base64.getEncoder() + .encodeToString((clientId + ":" + clientSecret).getBytes(StandardCharsets.UTF_8)); + + HttpURLConnection connection = (HttpURLConnection) new URL(urlString).openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); + connection.setRequestProperty("Authorization", "Basic " + credentials); + connection.setDoOutput(true); + + OutputStreamWriter writer = + new OutputStreamWriter(connection.getOutputStream(), StandardCharsets.UTF_8); + writer.write(formData); + writer.close(); + + int responseCode = connection.getResponseCode(); + + if (responseCode != 200) { + String errorBody = "No error details available"; + if (connection.getErrorStream() != null) { + BufferedReader errorReader = + new BufferedReader( + new InputStreamReader(connection.getErrorStream(), StandardCharsets.UTF_8)); + StringBuilder errorBuilder = new StringBuilder(); + String line; + while ((line = errorReader.readLine()) != null) { + errorBuilder.append(line).append("\n"); + } + errorReader.close(); + errorBody = errorBuilder.toString(); } + throw new NonRetriableException( + "OAuth request failed with status " + responseCode + ": " + errorBody); + } + + BufferedReader reader = + new BufferedReader( + new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8)); + StringBuilder responseBody = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + responseBody.append(line).append("\n"); + } + reader.close(); + + // Extract access token using regex to avoid dependency on a JSON library. + // Pattern matches: "access_token": "value" with flexible whitespace. + Pattern accessTokenPattern = Pattern.compile("\"access_token\"\\s*:\\s*\"([^\"]+)\""); + Matcher matcher = accessTokenPattern.matcher(responseBody.toString()); + + if (matcher.find()) { + return matcher.group(1); + } else { + throw new NonRetriableException("No access token received from OAuth response"); + } + } catch (NonRetriableException e) { + throw e; + } catch (Exception e) { + throw new NonRetriableException("Unexpected error getting OAuth token: " + e.getMessage(), e); } + } } diff --git a/src/main/java/com/databricks/zerobus/ZerobusException.java b/src/main/java/com/databricks/zerobus/ZerobusException.java index 9bc0687..0aede36 100644 --- a/src/main/java/com/databricks/zerobus/ZerobusException.java +++ b/src/main/java/com/databricks/zerobus/ZerobusException.java @@ -1,27 +1,27 @@ package com.databricks.zerobus; /** - * Base exception class for all Ingest API related errors. - * This allows clients to catch all Ingest API specific exceptions with a single catch block. + * Base exception class for all Ingest API related errors. This allows clients to catch all Ingest + * API specific exceptions with a single catch block. */ public class ZerobusException extends Exception { - /** - * Constructs a new ZerobusException with the specified detail message. - * - * @param message the detail message - */ - public ZerobusException(String message) { - super(message); - } + /** + * Constructs a new ZerobusException with the specified detail message. + * + * @param message the detail message + */ + public ZerobusException(String message) { + super(message); + } - /** - * Constructs a new ZerobusException with the specified detail message and cause. - * - * @param message the detail message - * @param cause the cause of the exception - */ - public ZerobusException(String message, Throwable cause) { - super(message, cause); - } + /** + * Constructs a new ZerobusException with the specified detail message and cause. + * + * @param message the detail message + * @param cause the cause of the exception + */ + public ZerobusException(String message, Throwable cause) { + super(message, cause); + } } diff --git a/src/main/java/com/databricks/zerobus/ZerobusSdk.java b/src/main/java/com/databricks/zerobus/ZerobusSdk.java index b1a7b9b..5d312f2 100644 --- a/src/main/java/com/databricks/zerobus/ZerobusSdk.java +++ b/src/main/java/com/databricks/zerobus/ZerobusSdk.java @@ -3,9 +3,6 @@ import com.google.protobuf.Message; import io.grpc.Status; import io.grpc.StatusRuntimeException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Iterator; import java.util.Random; import java.util.concurrent.CompletableFuture; @@ -13,6 +10,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The main entry point for the Zerobus SDK. @@ -22,6 +21,7 @@ * operations. * *

Example usage: + * *

{@code
  * ZerobusSdk sdk = new ZerobusSdk(
  *     "server-endpoint.databricks.com",
@@ -33,7 +33,7 @@
  *     clientId,
  *     clientSecret,
  *     options
- * );
+ * ).join();
  * }
* * @see ZerobusStream @@ -84,8 +84,8 @@ void setStubFactory(ZerobusSdkStubFactory stubFactory) { * *

The workspace ID is the first component of the endpoint hostname. * - *

Example: {@code 1234567890123456.zerobus.us-west-2.cloud.databricks.com} - * returns {@code 1234567890123456} + *

Example: {@code 1234567890123456.zerobus.us-west-2.cloud.databricks.com} returns {@code + * 1234567890123456} * * @param endpoint The server endpoint (may include protocol prefix) * @return The extracted workspace ID @@ -108,25 +108,26 @@ private static String extractWorkspaceId(String endpoint) { /** * Creates an executor service for stream operations. * - *

The executor uses daemon threads to avoid preventing JVM shutdown. - * Each thread is named with a unique instance ID for debugging purposes. + *

The executor uses daemon threads to avoid preventing JVM shutdown. Each thread is named with + * a unique instance ID for debugging purposes. * * @return A new ExecutorService configured for stream operations */ private static ExecutorService createStreamExecutor() { long instanceId = 1000000000L + Math.abs(RANDOM.nextLong() % 9000000000L); - ThreadFactory daemonThreadFactory = new ThreadFactory() { - private final AtomicInteger counter = new AtomicInteger(0); + ThreadFactory daemonThreadFactory = + new ThreadFactory() { + private final AtomicInteger counter = new AtomicInteger(0); - @Override - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable); - thread.setDaemon(true); - thread.setName(THREAD_NAME_PREFIX + instanceId + "-" + counter.getAndIncrement()); - return thread; - } - }; + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable); + thread.setDaemon(true); + thread.setName(THREAD_NAME_PREFIX + instanceId + "-" + counter.getAndIncrement()); + return thread; + } + }; return Executors.newFixedThreadPool(STREAM_EXECUTOR_THREAD_POOL_SIZE, daemonThreadFactory); } @@ -144,115 +145,32 @@ public Thread newThread(Runnable runnable) { * @param options Configuration options for the stream including timeouts, retry settings, and * callback functions. * @param The type of records to be ingested (must extend Message). - * @return A ZerobusStream when the stream is ready. - * @throws ZerobusException If creating the stream fails. + * @return A CompletableFuture that completes with the ZerobusStream when the stream is ready. */ - public ZerobusStream createStream( + public CompletableFuture> createStream( TableProperties tableProperties, String clientId, String clientSecret, - StreamConfigurationOptions options) - throws ZerobusException { - ExecutorService streamExecutor = createStreamExecutor(); - try { - return createStreamImpl( - tableProperties, clientId, clientSecret, options, streamExecutor, streamExecutor) - .join(); - } catch (java.util.concurrent.CompletionException e) { - // Unwrap CompletionException to get the original cause - Throwable cause = e.getCause(); - if (cause instanceof ZerobusException) { - throw (ZerobusException) cause; - } - throw new ZerobusException("Stream creation failed: " + cause.getMessage(), cause); - } - } + StreamConfigurationOptions options) { - /** - * Creates a new gRPC stream for ingesting records into a table with default options. - * - * @param tableProperties Configuration for the target table including table name and record type - * information. - * @param clientId The OAuth client ID for authentication. - * @param clientSecret The OAuth client secret for authentication. - * @param The type of records to be ingested (must extend Message). - * @return A ZerobusStream when the stream is ready. - * @throws ZerobusException If creating the stream fails. - */ - public ZerobusStream createStream( - TableProperties tableProperties, String clientId, String clientSecret) - throws ZerobusException { - return this.createStream(tableProperties, clientId, clientSecret, DEFAULT_OPTIONS); - } - - /** - * Recreate stream from a failed stream. - * - *

Uses the same table properties and stream options as the failed stream. It will also ingest - * all unacknowledged records from the failed stream. - * - * @param stream The stream to be recreated. - * @param The type of records to be ingested (must extend Message). - * @return A ZerobusStream when the stream is ready. - * @throws ZerobusException If recreating the stream fails. - */ - public ZerobusStream recreateStream( - ZerobusStream stream) throws ZerobusException { ExecutorService streamExecutor = createStreamExecutor(); - try { - return recreateStreamImpl(stream, streamExecutor, streamExecutor) - .join(); - } catch (java.util.concurrent.CompletionException e) { - // Unwrap CompletionException to get the original cause - Throwable cause = e.getCause(); - if (cause instanceof ZerobusException) { - throw (ZerobusException) cause; - } - throw new ZerobusException("Stream recreation failed: " + cause.getMessage(), cause); - } - } - - /** - * Create a stream in the Zerobus service. - * Returns ZerobusStream object which can be used to send messages to the stream. - * - * @param tableProperties The table properties - * @param clientId The OAuth client ID for authentication - * @param clientSecret The OAuth client secret for authentication - * @param options The stream configuration options - * @param streamReservedExecutor The executor reserved for stream operations - * @param ec The executor for async operations - * @param The record type - * @return A CompletableFuture that completes with the ZerobusStream - */ - private CompletableFuture> createStreamImpl( - TableProperties tableProperties, - String clientId, - String clientSecret, - StreamConfigurationOptions options, - ExecutorService streamReservedExecutor, - ExecutorService ec) { - CompletableFuture> resultFuture = new CompletableFuture<>(); try { logger.debug("Creating stream for table: " + tableProperties.getTableName()); // Generate authentication token - String token = TokenFactory.getZerobusToken( - tableProperties.getTableName(), - workspaceId, - unityCatalogEndpoint, - clientId, - clientSecret); + String token = + TokenFactory.getZerobusToken( + tableProperties.getTableName(), + workspaceId, + unityCatalogEndpoint, + clientId, + clientSecret); // Create gRPC stub with authentication ZerobusGrpc.ZerobusStub stub = - stubFactory.createStub( - serverEndpoint, - true, - tableProperties.getTableName(), - token); + stubFactory.createStub(serverEndpoint, true, tableProperties.getTableName(), token); ZerobusStream stream = new ZerobusStream<>( @@ -265,8 +183,8 @@ private CompletableFuture clientId, clientSecret, options, - streamReservedExecutor, - ec); + streamExecutor, + streamExecutor); stream .initialize() @@ -303,41 +221,55 @@ private CompletableFuture return resultFuture; } + /** + * Creates a new gRPC stream for ingesting records into a table with default options. + * + * @param tableProperties Configuration for the target table including table name and record type + * information. + * @param clientId The OAuth client ID for authentication. + * @param clientSecret The OAuth client secret for authentication. + * @param The type of records to be ingested (must extend Message). + * @return A CompletableFuture that completes with the ZerobusStream when the stream is ready. + */ + public CompletableFuture> createStream( + TableProperties tableProperties, String clientId, String clientSecret) { + return this.createStream(tableProperties, clientId, clientSecret, DEFAULT_OPTIONS); + } + /** * Recreate stream from a failed stream. * - * @param failedStream The failed stream - * @param streamReservedExecutor The executor reserved for stream operations - * @param ec The executor for async operations - * @param The record type - * @return A CompletableFuture that completes with the new ZerobusStream + *

Uses the same table properties and stream options as the failed stream. It will also ingest + * all unacknowledged records from the failed stream. + * + * @param failedStream The stream to be recreated. + * @param The type of records to be ingested (must extend Message). + * @return A CompletableFuture that completes with the new ZerobusStream when the stream is ready. */ - private CompletableFuture> - recreateStreamImpl( - ZerobusStream failedStream, - ExecutorService streamReservedExecutor, - ExecutorService ec) { + public CompletableFuture> recreateStream( + ZerobusStream failedStream) { CompletableFuture> resultFuture = new CompletableFuture<>(); - createStreamImpl( + createStream( failedStream.getTableProperties(), failedStream.getClientId(), failedStream.getClientSecret(), - failedStream.getOptions(), - streamReservedExecutor, - ec) + failedStream.getOptions()) .whenComplete( (stream, error) -> { if (error == null) { // ingest unacked records Iterator unackedRecords = failedStream.getUnackedRecords(); - while (unackedRecords.hasNext()) { - stream.ingestRecord(unackedRecords.next()); + try { + while (unackedRecords.hasNext()) { + stream.ingestRecord(unackedRecords.next()); + } + resultFuture.complete(stream); + } catch (ZerobusException e) { + resultFuture.completeExceptionally(e); } - - resultFuture.complete(stream); } else { resultFuture.completeExceptionally(error); } diff --git a/src/main/java/com/databricks/zerobus/ZerobusSdkStubUtils.java b/src/main/java/com/databricks/zerobus/ZerobusSdkStubUtils.java index 48eb6ad..9c9650a 100644 --- a/src/main/java/com/databricks/zerobus/ZerobusSdkStubUtils.java +++ b/src/main/java/com/databricks/zerobus/ZerobusSdkStubUtils.java @@ -8,14 +8,13 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; - import java.util.concurrent.TimeUnit; /** * Factory for creating Zerobus gRPC stubs with proper configuration. * - *

This factory handles the creation of gRPC channels and stubs with - * appropriate settings for long-lived streaming connections. + *

This factory handles the creation of gRPC channels and stubs with appropriate settings for + * long-lived streaming connections. */ class ZerobusSdkStubFactory { @@ -34,8 +33,8 @@ class ZerobusSdkStubFactory { /** * Creates a new managed gRPC channel. * - *

The channel is configured for long-lived streaming with appropriate - * keep-alive settings and message size limits. + *

The channel is configured for long-lived streaming with appropriate keep-alive settings and + * message size limits. * * @param endpoint The endpoint URL (may include protocol prefix) * @param useTls Whether to use TLS encryption @@ -44,8 +43,8 @@ class ZerobusSdkStubFactory { ManagedChannel createGrpcChannel(String endpoint, boolean useTls) { EndpointInfo endpointInfo = parseEndpoint(endpoint, useTls); - NettyChannelBuilder builder = NettyChannelBuilder - .forAddress(endpointInfo.host, endpointInfo.port); + NettyChannelBuilder builder = + NettyChannelBuilder.forAddress(endpointInfo.host, endpointInfo.port); // Configure TLS or plaintext if (useTls) { @@ -66,11 +65,11 @@ ManagedChannel createGrpcChannel(String endpoint, boolean useTls) { /** * Creates a new Zerobus gRPC stub with authentication. * - *

The stub is configured with an interceptor that adds authentication - * headers to all outgoing requests. + *

The stub is configured with an interceptor that adds authentication headers to all outgoing + * requests. * - *

Note: Currently creates a new channel for each stub. Consider - * reusing channels across multiple streams for better resource utilization. + *

Note: Currently creates a new channel for each stub. Consider reusing channels across + * multiple streams for better resource utilization. * * @param endpoint The endpoint URL * @param useTls Whether to use TLS encryption @@ -79,10 +78,7 @@ ManagedChannel createGrpcChannel(String endpoint, boolean useTls) { * @return A configured ZerobusStub */ ZerobusGrpc.ZerobusStub createStub( - String endpoint, - boolean useTls, - String tableName, - String token) { + String endpoint, boolean useTls, String tableName, String token) { ManagedChannel channel = createGrpcChannel(endpoint, useTls); ClientInterceptor authInterceptor = new AuthenticationInterceptor(token, tableName); Channel interceptedChannel = io.grpc.ClientInterceptors.intercept(channel, authInterceptor); @@ -117,16 +113,15 @@ private EndpointInfo parseEndpoint(String endpoint, boolean useTls) { // Split host and port String[] parts = cleanEndpoint.split(":", 2); String host = parts[0]; - int port = parts.length > 1 - ? Integer.parseInt(parts[1]) - : (useTls ? DEFAULT_TLS_PORT : DEFAULT_PLAINTEXT_PORT); + int port = + parts.length > 1 + ? Integer.parseInt(parts[1]) + : (useTls ? DEFAULT_TLS_PORT : DEFAULT_PLAINTEXT_PORT); return new EndpointInfo(host, port); } - /** - * Container for parsed endpoint information. - */ + /** Container for parsed endpoint information. */ private static class EndpointInfo { final String host; final int port; @@ -142,9 +137,10 @@ private static class EndpointInfo { * gRPC client interceptor that adds authentication headers to requests. * *

This interceptor attaches the following headers to all outgoing requests: + * *

    - *
  • Authorization: Bearer token
  • - *
  • x-databricks-zerobus-table-name: table name
  • + *
  • Authorization: Bearer token + *
  • x-databricks-zerobus-table-name: table name *
*/ class AuthenticationInterceptor implements ClientInterceptor { @@ -171,9 +167,7 @@ class AuthenticationInterceptor implements ClientInterceptor { @Override public ClientCall interceptCall( - MethodDescriptor method, - CallOptions callOptions, - Channel next) { + MethodDescriptor method, CallOptions callOptions, Channel next) { return new io.grpc.ForwardingClientCall.SimpleForwardingClientCall( next.newCall(method, callOptions)) { @Override diff --git a/src/main/java/com/databricks/zerobus/ZerobusStream.java b/src/main/java/com/databricks/zerobus/ZerobusStream.java index ce5e20d..2657d2d 100644 --- a/src/main/java/com/databricks/zerobus/ZerobusStream.java +++ b/src/main/java/com/databricks/zerobus/ZerobusStream.java @@ -1,38 +1,27 @@ package com.databricks.zerobus; +import com.databricks.zerobus.ZerobusGrpc.ZerobusStub; +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; -import io.grpc.stub.StreamObserver; -import com.google.protobuf.ByteString; -import com.google.protobuf.Message; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.databricks.zerobus.ZerobusGrpc.ZerobusStub; -import com.databricks.zerobus.EphemeralStreamRequest; -import com.databricks.zerobus.EphemeralStreamResponse; -import com.databricks.zerobus.CreateIngestStreamRequest; -import com.databricks.zerobus.IngestRecordRequest; -import com.databricks.zerobus.RecordType; -import com.databricks.zerobus.IngestRecordResponse; - -/** - * Types of stream failures that can occur during ingestion. - */ +/** Types of stream failures that can occur during ingestion. */ enum StreamFailureType { /** Unknown failure type */ UNKNOWN, @@ -44,9 +33,7 @@ enum StreamFailureType { SERVER_UNRESPONSIVE } -/** - * Tracks stream failure counts and types for recovery decisions. - */ +/** Tracks stream failure counts and types for recovery decisions. */ class StreamFailureInfo { private StreamFailureType _failureType = StreamFailureType.UNKNOWN; private int _failureCounts = 0; @@ -77,9 +64,8 @@ synchronized StreamFailureType getFailureType() { } /** - * Utility for classifying gRPC errors as retriable or non-retriable. - * Non-retriable errors indicate issues that cannot be resolved by retrying - * (e.g., invalid credentials, missing resources). + * Utility for classifying gRPC errors as retriable or non-retriable. Non-retriable errors indicate + * issues that cannot be resolved by retrying (e.g., invalid credentials, missing resources). */ class GrpcErrorHandling { private static final Set NON_RETRIABLE_CODES = new HashSet<>(); @@ -113,7 +99,8 @@ class Record { final ByteString protoEncodedRecord; final CompletableFuture ackPromise; - Record(long offsetId, T record, ByteString protoEncodedRecord, CompletableFuture ackPromise) { + Record( + long offsetId, T record, ByteString protoEncodedRecord, CompletableFuture ackPromise) { this.offsetId = offsetId; this.record = record; this.protoEncodedRecord = protoEncodedRecord; @@ -122,15 +109,19 @@ class Record { } /** - * Zerobus stream for ingesting records into a table. - * Should be created using ZerobusSdk.createStream. + * Zerobus stream for ingesting records into a table. Should be created using + * ZerobusSdk.createStream. */ public class ZerobusStream { private static final Logger logger = LoggerFactory.getLogger(ZerobusStream.class); - // implicit ec: ExecutionContext - this is the ExecutionContext that client provides to run async operations (e.g.create stream async result processing) - // zerobusStreamExecutor: ExecutionContext - This is used only for futures like timeout counter / stream recovery / stream unresponsiveness detection, so we don't block threads from customer's ExecutionContext - // We have to use a separate executor (bounded) to make sure stream progress is not blocked + // implicit ec: ExecutionContext - this is the ExecutionContext that client provides to run async + // operations (e.g.create stream async result processing) + // zerobusStreamExecutor: ExecutionContext - This is used only for futures like timeout counter / + // stream recovery / stream unresponsiveness detection, so we don't block threads from customer's + // ExecutionContext + // We have to use a separate executor (bounded) to make sure stream progress is + // not blocked private static final int CREATE_STREAM_TIMEOUT_MS = 15000; @@ -151,7 +142,8 @@ public class ZerobusStream { private Optional> stream = Optional.empty(); private Optional> streamCreatedEvent = Optional.empty(); - // Sending records is asynchronus task which consumes records from recordsQueuedForSending and sends them to the server + // Sending records is asynchronus task which consumes records from recordsQueuedForSending and + // sends them to the server private final ArrayBlockingQueue recordsQueuedForSending; // Here we store records which are not yet acknowledged by the server @@ -239,30 +231,35 @@ private synchronized void setState(StreamState newState) { logger.debug("Stream state changed to " + newState); } - private CompletableFuture runWithTimeout(long timeoutMs, java.util.function.Supplier> getFuture) { + private CompletableFuture runWithTimeout( + long timeoutMs, java.util.function.Supplier> getFuture) { AtomicBoolean done = new AtomicBoolean(false); CompletableFuture future = getFuture.get(); - future.whenComplete((result, error) -> { - synchronized (done) { - done.set(true); - done.notifyAll(); - } - }); - - CompletableFuture timeoutFuture = CompletableFuture.runAsync(() -> { - synchronized (done) { - try { - done.wait(timeoutMs); - if (!done.get()) { - throw new RuntimeException(new TimeoutException("Operation timed out!")); + future.whenComplete( + (result, error) -> { + synchronized (done) { + done.set(true); + done.notifyAll(); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - }, zerobusStreamExecutor); + }); + + CompletableFuture timeoutFuture = + CompletableFuture.runAsync( + () -> { + synchronized (done) { + try { + done.wait(timeoutMs); + if (!done.get()) { + throw new RuntimeException(new TimeoutException("Operation timed out!")); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + }, + zerobusStreamExecutor); return CompletableFuture.anyOf(future, timeoutFuture).thenApply(result -> null); } @@ -270,15 +267,16 @@ private CompletableFuture runWithTimeout(long timeoutMs, java.util.functio /** * Retries an operation with exponential backoff until success or max retries reached. * - *

This method uses recursion through the RetryHelper inner class to avoid blocking - * the caller thread. Each retry is scheduled asynchronously on the stream executor. + *

This method uses recursion through the RetryHelper inner class to avoid blocking the caller + * thread. Each retry is scheduled asynchronously on the stream executor. * * @param maxRetries Maximum number of retry attempts * @param context Context string for logging * @param f Supplier that provides the operation to retry * @return CompletableFuture that completes with the operation result or error */ - private CompletableFuture runWithRetries(long maxRetries, String context, java.util.function.Supplier> f) { + private CompletableFuture runWithRetries( + long maxRetries, String context, java.util.function.Supplier> f) { CompletableFuture resultPromise = new CompletableFuture<>(); int backoffMs = options.recovery() ? options.recoveryBackoffMs() : 0; @@ -287,30 +285,35 @@ class RetryHelper { void tryNext(int attempt) { logger.debug("[" + context + "] Running attempt ... "); - f.get().whenComplete((response, error) -> { - if (error == null) { - resultPromise.complete(response); - } else if (error instanceof NonRetriableException || error.getCause() instanceof NonRetriableException) { - // Non-retriable errors should fail immediately without retrying - resultPromise.completeExceptionally(error); - } else { - if (attempt < maxRetries - 1) { - // Schedule next retry after backoff period - CompletableFuture.runAsync(() -> { - logger.debug("[" + context + "] Retrying in " + backoffMs + " ms ... "); - try { - Thread.sleep(backoffMs); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - tryNext(attempt + 1); - }, zerobusStreamExecutor); - } else { - // Exhausted all retries - resultPromise.completeExceptionally(error); - } - } - }); + f.get() + .whenComplete( + (response, error) -> { + if (error == null) { + resultPromise.complete(response); + } else if (error instanceof NonRetriableException + || error.getCause() instanceof NonRetriableException) { + // Non-retriable errors should fail immediately without retrying + resultPromise.completeExceptionally(error); + } else { + if (attempt < maxRetries - 1) { + // Schedule next retry after backoff period + CompletableFuture.runAsync( + () -> { + logger.debug("[" + context + "] Retrying in " + backoffMs + " ms ... "); + try { + Thread.sleep(backoffMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + tryNext(attempt + 1); + }, + zerobusStreamExecutor); + } else { + // Exhausted all retries + resultPromise.completeExceptionally(error); + } + } + }); } } @@ -344,101 +347,113 @@ private CompletableFuture createStream() { stream = Optional.empty(); streamCreatedEvent = Optional.empty(); - runWithTimeout(timeoutMs, () -> { - CompletableFuture createStreamTry = new CompletableFuture<>(); - - // Generate a fresh token for this stream creation attempt - try { - String token = TokenFactory.getZerobusToken( - tableProperties.getTableName(), - workspaceId, - unityCatalogEndpoint, - clientId, - clientSecret); - - // Create a new stub with the fresh token - stub = stubFactory.createStub( - serverEndpoint, - true, - tableProperties.getTableName(), - token); - - logger.debug("Generated new token and created stub for stream"); - } catch (NonRetriableException e) { - createStreamTry.completeExceptionally(e); - return createStreamTry; - } + runWithTimeout( + timeoutMs, + () -> { + CompletableFuture createStreamTry = new CompletableFuture<>(); - // Create the gRPC stream with the new stub - streamCreatedEvent = Optional.of(new CompletableFuture<>()); - stream = Optional.of( - (ClientCallStreamObserver) stub.ephemeralStream(ackReceiver) - ); - - logger.debug("Creating ephemeral stream for table " + tableProperties.getTableName()); - - // Create the initial request - EphemeralStreamRequest createStreamRequest = EphemeralStreamRequest.newBuilder() - .setCreateStream( - CreateIngestStreamRequest.newBuilder() - .setTableName(tableProperties.getTableName()) - .setDescriptorProto(ByteString.copyFrom(descriptorProto.toByteArray())) - .setRecordType(com.databricks.zerobus.RecordType.PROTO) - .build() - ) - .build(); - - // Send the CreateStreamRequest - try { - sendMessage(createStreamRequest); - } catch (Exception exception) { - failStream(exception); - createStreamTry.completeExceptionally(exception); - return createStreamTry; - } + // Generate a fresh token for this stream creation attempt + try { + String token = + TokenFactory.getZerobusToken( + tableProperties.getTableName(), + workspaceId, + unityCatalogEndpoint, + clientId, + clientSecret); + + // Create a new stub with the fresh token + stub = + stubFactory.createStub( + serverEndpoint, true, tableProperties.getTableName(), token); + + logger.debug("Generated new token and created stub for stream"); + } catch (NonRetriableException e) { + createStreamTry.completeExceptionally(e); + return createStreamTry; + } - streamCreatedEvent.get().whenComplete((id, e) -> { - if (e == null) { - streamId = Optional.of(id); - recordsSenderTask.start(); - createStreamTry.complete(null); - } else if (e instanceof ZerobusException) { - failStream(e); - streamId = Optional.empty(); - streamCreatedEvent = Optional.empty(); - stream = Optional.empty(); - createStreamTry.completeExceptionally(e); - } else { - failStream(e); - streamId = Optional.empty(); - streamCreatedEvent = Optional.empty(); - stream = Optional.empty(); - createStreamTry.completeExceptionally(new ZerobusException(e.getMessage(), e)); - } - }); + // Create the gRPC stream with the new stub + streamCreatedEvent = Optional.of(new CompletableFuture<>()); + stream = + Optional.of( + (ClientCallStreamObserver) + stub.ephemeralStream(ackReceiver)); + + logger.debug("Creating ephemeral stream for table " + tableProperties.getTableName()); + + // Create the initial request + EphemeralStreamRequest createStreamRequest = + EphemeralStreamRequest.newBuilder() + .setCreateStream( + CreateIngestStreamRequest.newBuilder() + .setTableName(tableProperties.getTableName()) + .setDescriptorProto( + ByteString.copyFrom(descriptorProto.toByteArray())) + .setRecordType(com.databricks.zerobus.RecordType.PROTO) + .build()) + .build(); + + // Send the CreateStreamRequest + try { + sendMessage(createStreamRequest); + } catch (Exception exception) { + failStream(exception); + createStreamTry.completeExceptionally(exception); + return createStreamTry; + } - return createStreamTry; - }).whenComplete((result, e) -> { - if (e == null) { - createStreamDone.complete(null); - } else { - failStream(e); - Throwable ex; - if (e instanceof StatusRuntimeException) { - Status.Code code = ((StatusRuntimeException) e).getStatus().getCode(); - if (GrpcErrorHandling.isNonRetriable(code)) { - ex = new NonRetriableException("Non-retriable gRPC error during stream creation: " + e.getMessage(), e); - } else { - ex = new ZerobusException("Stream creation failed: " + e.getMessage(), e); - } - } else if (e instanceof NonRetriableException) { - ex = new NonRetriableException("Stream creation failed: " + e.getMessage(), e); - } else { - ex = new ZerobusException("Stream creation failed: " + e.getMessage(), e); - } - createStreamDone.completeExceptionally(ex); - } - }); + streamCreatedEvent + .get() + .whenComplete( + (id, e) -> { + if (e == null) { + streamId = Optional.of(id); + recordsSenderTask.start(); + createStreamTry.complete(null); + } else if (e instanceof ZerobusException) { + failStream(e); + streamId = Optional.empty(); + streamCreatedEvent = Optional.empty(); + stream = Optional.empty(); + createStreamTry.completeExceptionally(e); + } else { + failStream(e); + streamId = Optional.empty(); + streamCreatedEvent = Optional.empty(); + stream = Optional.empty(); + createStreamTry.completeExceptionally( + new ZerobusException(e.getMessage(), e)); + } + }); + + return createStreamTry; + }) + .whenComplete( + (result, e) -> { + if (e == null) { + createStreamDone.complete(null); + } else { + failStream(e); + Throwable ex; + if (e instanceof StatusRuntimeException) { + Status.Code code = ((StatusRuntimeException) e).getStatus().getCode(); + if (GrpcErrorHandling.isNonRetriable(code)) { + ex = + new NonRetriableException( + "Non-retriable gRPC error during stream creation: " + e.getMessage(), + e); + } else { + ex = new ZerobusException("Stream creation failed: " + e.getMessage(), e); + } + } else if (e instanceof NonRetriableException) { + ex = new NonRetriableException("Stream creation failed: " + e.getMessage(), e); + } else { + ex = new ZerobusException("Stream creation failed: " + e.getMessage(), e); + } + createStreamDone.completeExceptionally(ex); + } + }); return createStreamDone; } @@ -450,30 +465,32 @@ CompletableFuture initialize() { if (state != StreamState.UNINITIALIZED) { logger.error("Stream cannot be initialized/opened more than once"); initializeDone.completeExceptionally( - new ZerobusException("Stream cannot be initialized/opened more than once") - ); + new ZerobusException("Stream cannot be initialized/opened more than once")); return initializeDone; } } int retries = options.recovery() ? options.recoveryRetries() : 1; - runWithRetries(retries, "CreateStream", () -> createStream()).whenComplete((result, e) -> { - if (e == null) { - setState(StreamState.OPENED); - serverUnresponsivenessDetectionTask.start(); - logger.info("Stream created successfully with id " + streamId.get()); - initializeDone.complete(null); - } else { - setState(StreamState.FAILED); - logger.error("Failed to create stream: ", e); - if (e instanceof ZerobusException) { - initializeDone.completeExceptionally(e); - } else { - initializeDone.completeExceptionally(new ZerobusException("Stream creation failed: " + e.getMessage(), e)); - } - } - }); + runWithRetries(retries, "CreateStream", () -> createStream()) + .whenComplete( + (result, e) -> { + if (e == null) { + setState(StreamState.OPENED); + serverUnresponsivenessDetectionTask.start(); + logger.info("Stream created successfully with id " + streamId.get()); + initializeDone.complete(null); + } else { + setState(StreamState.FAILED); + logger.error("Failed to create stream: ", e); + if (e instanceof ZerobusException) { + initializeDone.completeExceptionally(e); + } else { + initializeDone.completeExceptionally( + new ZerobusException("Stream creation failed: " + e.getMessage(), e)); + } + } + }); return initializeDone; } @@ -481,7 +498,8 @@ CompletableFuture initialize() { /** * Closes the stream and cleans up resources. * - * @param hardFailure If true, marks stream as FAILED and saves unacked records for potential retry + * @param hardFailure If true, marks stream as FAILED and saves unacked records for potential + * retry * @param exception The exception that caused the failure (if any) */ private void closeStream(boolean hardFailure, Optional exception) { @@ -515,7 +533,8 @@ private void closeStream(boolean hardFailure, Optional excepti try { Record record = inflightRecords.take(); unackedRecordsAfterStreamFailure.add(record); - record.ackPromise.completeExceptionally(exception.orElse(new ZerobusException("Stream failed"))); + record.ackPromise.completeExceptionally( + exception.orElse(new ZerobusException("Stream failed"))); this.notifyAll(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -539,8 +558,10 @@ private void closeStream(boolean hardFailure, Optional excepti } } - private CompletableFuture closeStreamAsync(boolean hardFailure, Optional exception) { - return CompletableFuture.runAsync(() -> closeStream(hardFailure, exception), zerobusStreamExecutor); + private CompletableFuture closeStreamAsync( + boolean hardFailure, Optional exception) { + return CompletableFuture.runAsync( + () -> closeStream(hardFailure, exception), zerobusStreamExecutor); } private void enqueueRecordsForResending() { @@ -559,14 +580,14 @@ private void enqueueRecordsForResending() { record.offsetId = offsetId; - EphemeralStreamRequest recordRequest = EphemeralStreamRequest.newBuilder() - .setIngestRecord( - IngestRecordRequest.newBuilder() - .setOffsetId(offsetId) - .setProtoEncodedRecord(record.protoEncodedRecord) - .build() - ) - .build(); + EphemeralStreamRequest recordRequest = + EphemeralStreamRequest.newBuilder() + .setIngestRecord( + IngestRecordRequest.newBuilder() + .setOffsetId(offsetId) + .setProtoEncodedRecord(record.protoEncodedRecord) + .build()) + .build(); try { recordsQueuedForSending.put(recordRequest); @@ -582,10 +603,11 @@ private void enqueueRecordsForResending() { * Attempts to recover a failed stream by recreating it and resending unacked records. * *

This method: + * *

    - *
  1. Closes the current stream without marking it as hard failure
  2. - *
  3. Creates a new stream with the same configuration
  4. - *
  5. Re-enqueues all unacknowledged records for sending
  6. + *
  7. Closes the current stream without marking it as hard failure + *
  8. Creates a new stream with the same configuration + *
  9. Re-enqueues all unacknowledged records for sending *
* * @return CompletableFuture that completes when recovery succeeds or fails @@ -593,55 +615,79 @@ private void enqueueRecordsForResending() { private CompletableFuture recoverStream() { CompletableFuture recoverStreamDone = new CompletableFuture<>(); - CompletableFuture.runAsync(() -> { - if (!options.recovery()) { - logger.debug("Stream recovery is disabled"); - recoverStreamDone.completeExceptionally(new ZerobusException("Stream recovery is disabled")); - } else { - logger.warn("Stream broken! Running stream recovery for stream id '" + streamId.orElse("unknown") + "' ... "); - - // Close the broken stream but don't mark as hard failure since we're attempting recovery - closeStream(false, Optional.empty()); - - synchronized (this) { - int retries = options.recoveryRetries(); - // Reduce remaining retries based on consecutive failures of the same type - int leftRetries = Math.max(0, retries - streamFailureInfo.getFailureCounts() + 1); - - if (leftRetries == 0) { - logger.debug("Stream recovery failed: Run out of retries"); - recoverStreamDone.completeExceptionally(new ZerobusException("Stream recovery failed")); - return; - } - - logger.debug("Stream recovery: Running with " + leftRetries + " / " + retries + " retries left"); - - runWithRetries(leftRetries, "RecoverStream", () -> { - CompletableFuture recoverStreamTry = new CompletableFuture<>(); - - createStream().whenComplete((result, e) -> { - if (e != null) { - logger.debug("Stream recovery: Failed to create stream: " + e.getMessage()); - recoverStreamTry.completeExceptionally(e); - } else { - enqueueRecordsForResending(); - recoverStreamTry.complete(null); + CompletableFuture.runAsync( + () -> { + if (!options.recovery()) { + logger.debug("Stream recovery is disabled"); + recoverStreamDone.completeExceptionally( + new ZerobusException("Stream recovery is disabled")); + } else { + logger.warn( + "Stream broken! Running stream recovery for stream id '" + + streamId.orElse("unknown") + + "' ... "); + + // Close the broken stream but don't mark as hard failure since we're attempting + // recovery + closeStream(false, Optional.empty()); + + synchronized (this) { + int retries = options.recoveryRetries(); + // Reduce remaining retries based on consecutive failures of the same type + int leftRetries = Math.max(0, retries - streamFailureInfo.getFailureCounts() + 1); + + if (leftRetries == 0) { + logger.debug("Stream recovery failed: Run out of retries"); + recoverStreamDone.completeExceptionally( + new ZerobusException("Stream recovery failed")); + return; } - }); - return recoverStreamTry; - }).whenComplete((result, e) -> { - if (e == null) { - logger.info("Stream recovery completed successfully. New stream id: " + streamId.get()); - recoverStreamDone.complete(null); - } else { - logger.error("Stream recovery failed: " + e.getMessage(), e); - recoverStreamDone.completeExceptionally(e); + logger.debug( + "Stream recovery: Running with " + + leftRetries + + " / " + + retries + + " retries left"); + + runWithRetries( + leftRetries, + "RecoverStream", + () -> { + CompletableFuture recoverStreamTry = new CompletableFuture<>(); + + createStream() + .whenComplete( + (result, e) -> { + if (e != null) { + logger.debug( + "Stream recovery: Failed to create stream: " + + e.getMessage()); + recoverStreamTry.completeExceptionally(e); + } else { + enqueueRecordsForResending(); + recoverStreamTry.complete(null); + } + }); + + return recoverStreamTry; + }) + .whenComplete( + (result, e) -> { + if (e == null) { + logger.info( + "Stream recovery completed successfully. New stream id: " + + streamId.get()); + recoverStreamDone.complete(null); + } else { + logger.error("Stream recovery failed: " + e.getMessage(), e); + recoverStreamDone.completeExceptionally(e); + } + }); } - }); - } - } - }, zerobusStreamExecutor); + } + }, + zerobusStreamExecutor); return recoverStreamDone; } @@ -661,8 +707,9 @@ private void handleStreamFailed(StreamFailureType streamFailureType, Optional Stream failed during creation // FAILED -> Stream already failed (don't handle it twice) // RECOVERING -> Stream is recovering from a failure, no action needed @@ -675,7 +722,8 @@ private void handleStreamFailed(StreamFailureType streamFailureType, Optional { - if (e == null) { - setState(StreamState.OPENED); - logger.info("Stream recovered successfully with id " + streamId.get()); - } else { - logger.error("Stream recovery failed", e); - closeStream(true, exception); - } - }); + recoverStream() + .whenComplete( + (result, e) -> { + if (e == null) { + setState(StreamState.OPENED); + logger.info("Stream recovered successfully with id " + streamId.get()); + } else { + logger.error("Stream recovery failed", e); + closeStream(true, exception); + } + }); } } - private CompletableFuture handleStreamFailedAsync(StreamFailureType streamFailureType, Optional error) { - return CompletableFuture.runAsync(() -> handleStreamFailed(streamFailureType, error), zerobusStreamExecutor); + private CompletableFuture handleStreamFailedAsync( + StreamFailureType streamFailureType, Optional error) { + return CompletableFuture.runAsync( + () -> handleStreamFailed(streamFailureType, error), zerobusStreamExecutor); } // Task that checks if server is responsive (time it takes for server to ack a record) @@ -716,88 +768,93 @@ private CompletableFuture handleStreamFailedAsync(StreamFailureType stream private BackgroundTask serverUnresponsivenessDetectionTask; private void initServerUnresponsivenessDetectionTask() { - serverUnresponsivenessDetectionTask = new BackgroundTask( - cancellationToken -> { - long taskIterationStartTime = System.currentTimeMillis(); - synchronized (ZerobusStream.this) { - switch (state) { - case UNINITIALIZED: - case CLOSED: - case FAILED: - break; - - case RECOVERING: - logger.debug("Server unresponsiveness detection task: Waiting for stream to finish recovering"); - try { - ZerobusStream.this.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - break; + serverUnresponsivenessDetectionTask = + new BackgroundTask( + cancellationToken -> { + long taskIterationStartTime = System.currentTimeMillis(); + synchronized (ZerobusStream.this) { + switch (state) { + case UNINITIALIZED: + case CLOSED: + case FAILED: + break; - case OPENED: - case FLUSHING: - if (inflightRecords.isEmpty()) { - logger.debug("Server unresponsiveness detection task: Waiting for some records to be ingested"); - try { - ZerobusStream.this.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } else { - // STREAM IS OPENED OR FLUSHING AND THERE ARE RECORDS IN THE QUEUE - CHECK IF SERVER IS RESPONSIVE - long latestRespondedOffsetIdBefore = latestRespondedOffsetId; - boolean serverResponsive = false; - boolean serverResponsiveTimeout = false; - - while (!serverResponsive && !serverResponsiveTimeout) { - if (latestRespondedOffsetIdBefore != latestRespondedOffsetId) { - serverResponsive = true; - } else { - long remainingTime = options.serverLackOfAckTimeoutMs() - (System.currentTimeMillis() - taskIterationStartTime); - - if (remainingTime <= 0) { - // We don't want to block here, since this potentially can close the stream, which will wait for this task to finish (deadlock) - handleStreamFailedAsync( - StreamFailureType.SERVER_UNRESPONSIVE, - Optional.of(new ZerobusException("Server is unresponsive")) - ); - serverResponsiveTimeout = true; - } else { + case RECOVERING: + logger.debug( + "Server unresponsiveness detection task: Waiting for stream to finish recovering"); try { - ZerobusStream.this.wait(remainingTime); - if (cancellationToken.isDone()) { - // In case of a stream close, break the loop so that it doesn't hang waiting for the timeout. - serverResponsive = true; - } + ZerobusStream.this.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - } + break; + + case OPENED: + case FLUSHING: + if (inflightRecords.isEmpty()) { + logger.debug( + "Server unresponsiveness detection task: Waiting for some records to be ingested"); + try { + ZerobusStream.this.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else { + // STREAM IS OPENED OR FLUSHING AND THERE ARE RECORDS IN THE QUEUE - CHECK IF + // SERVER IS RESPONSIVE + long latestRespondedOffsetIdBefore = latestRespondedOffsetId; + boolean serverResponsive = false; + boolean serverResponsiveTimeout = false; + + while (!serverResponsive && !serverResponsiveTimeout) { + if (latestRespondedOffsetIdBefore != latestRespondedOffsetId) { + serverResponsive = true; + } else { + long remainingTime = + options.serverLackOfAckTimeoutMs() + - (System.currentTimeMillis() - taskIterationStartTime); + + if (remainingTime <= 0) { + // We don't want to block here, since this potentially can close the + // stream, which will wait for this task to finish (deadlock) + handleStreamFailedAsync( + StreamFailureType.SERVER_UNRESPONSIVE, + Optional.of(new ZerobusException("Server is unresponsive"))); + serverResponsiveTimeout = true; + } else { + try { + ZerobusStream.this.wait(remainingTime); + if (cancellationToken.isDone()) { + // In case of a stream close, break the loop so that it doesn't hang + // waiting for the timeout. + serverResponsive = true; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + } + break; } } - } - break; - } - } - }, - error -> { - // This should never happen (task won't throw any errors), but if it does, we need to handle it - // and it probably won't be recoverable - logger.error("Server unresponsiveness detection task failed: " + error.getMessage(), error); - - closeStreamAsync( - true, - Optional.of( - new ZerobusException( - "Server unresponsiveness detection task failed: " + error.getMessage(), - error - ) - ) - ); - }, - zerobusStreamExecutor - ); + }, + error -> { + // This should never happen (task won't throw any errors), but if it does, we need to + // handle it + // and it probably won't be recoverable + logger.error( + "Server unresponsiveness detection task failed: " + error.getMessage(), error); + + closeStreamAsync( + true, + Optional.of( + new ZerobusException( + "Server unresponsiveness detection task failed: " + error.getMessage(), + error))); + }, + zerobusStreamExecutor); } // Task that consumes records from recordsQueuedForSending and sends them to the server @@ -805,250 +862,269 @@ private void initServerUnresponsivenessDetectionTask() { private BackgroundTask recordsSenderTask; private void initRecordsSenderTask() { - recordsSenderTask = new BackgroundTask( - cancellationToken -> { - // Check if there are records to send - Optional recordRequest; - synchronized (ZerobusStream.this) { - switch (state) { - case OPENED: - case FLUSHING: - if (recordsQueuedForSending.isEmpty()) { - try { - ZerobusStream.this.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - recordRequest = Optional.empty(); - } else { - try { - recordRequest = Optional.of(recordsQueuedForSending.take()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - recordRequest = Optional.empty(); - } - } - break; - case CLOSED: - if (recordsQueuedForSending.isEmpty()) { - recordRequest = Optional.empty(); - } else { - try { - recordRequest = Optional.of(recordsQueuedForSending.take()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - recordRequest = Optional.empty(); - } - } - break; - default: - recordRequest = Optional.empty(); - break; - } - } - - // If we have a record, wait for stream to be ready and send it - if (recordRequest.isPresent()) { - if (stream.isPresent()) { - ClientCallStreamObserver strm = stream.get(); - // Wait for stream to be ready - synchronized (ZerobusStream.this) { - while (!strm.isReady() && !cancellationToken.isDone()) { - try { - ZerobusStream.this.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - } - } - if (!cancellationToken.isDone()) { - // Send the record - try { - sendMessage(recordRequest.get()); - streamFailureInfo.resetFailure(StreamFailureType.SENDING_MESSAGE); - } catch (Exception ex) { - logger.error("Error while sending record: " + ex.getMessage(), ex); - - // Use async to avoid deadlock: handleStreamFailed() may call closeStream() - // which waits for this task to stop. - handleStreamFailedAsync(StreamFailureType.SENDING_MESSAGE, Optional.of(ex)); - - // Wait for state change before continuing. This prevents repeatedly attempting - // to send the next record which would likely fail with the same error. - // The task will be restarted after recovery (or shut down if recovery fails). + recordsSenderTask = + new BackgroundTask( + cancellationToken -> { + // Check if there are records to send + Optional recordRequest; synchronized (ZerobusStream.this) { - while ((state == StreamState.OPENED || state == StreamState.FLUSHING) && !cancellationToken.isDone()) { - try { - ZerobusStream.this.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + switch (state) { + case OPENED: + case FLUSHING: + if (recordsQueuedForSending.isEmpty()) { + try { + ZerobusStream.this.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + recordRequest = Optional.empty(); + } else { + try { + recordRequest = Optional.of(recordsQueuedForSending.take()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + recordRequest = Optional.empty(); + } + } + break; + case CLOSED: + if (recordsQueuedForSending.isEmpty()) { + recordRequest = Optional.empty(); + } else { + try { + recordRequest = Optional.of(recordsQueuedForSending.take()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + recordRequest = Optional.empty(); + } + } + break; + default: + recordRequest = Optional.empty(); break; + } + } + + // If we have a record, wait for stream to be ready and send it + if (recordRequest.isPresent()) { + if (stream.isPresent()) { + ClientCallStreamObserver strm = stream.get(); + // Wait for stream to be ready + synchronized (ZerobusStream.this) { + while (!strm.isReady() && !cancellationToken.isDone()) { + try { + ZerobusStream.this.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + if (!cancellationToken.isDone()) { + // Send the record + try { + sendMessage(recordRequest.get()); + streamFailureInfo.resetFailure(StreamFailureType.SENDING_MESSAGE); + } catch (Exception ex) { + logger.error("Error while sending record: " + ex.getMessage(), ex); + + // Use async to avoid deadlock: handleStreamFailed() may call closeStream() + // which waits for this task to stop. + handleStreamFailedAsync(StreamFailureType.SENDING_MESSAGE, Optional.of(ex)); + + // Wait for state change before continuing. This prevents repeatedly + // attempting + // to send the next record which would likely fail with the same error. + // The task will be restarted after recovery (or shut down if recovery fails). + synchronized (ZerobusStream.this) { + while ((state == StreamState.OPENED || state == StreamState.FLUSHING) + && !cancellationToken.isDone()) { + try { + ZerobusStream.this.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + } } } } - } - } - } - } - // No record available, continue to next iteration - }, - error -> { - // This should never happen (task won't throw any errors), but if it does, we need to handle it - // and it probably won't be recoverable - logger.error("Records sender task failed: " + error.getMessage(), error); - - closeStreamAsync( - true, - Optional.of( - new ZerobusException( - "Records sender task failed: " + error.getMessage(), - error - ) - ) - ); - }, - zerobusStreamExecutor - ); + // No record available, continue to next iteration + }, + error -> { + // This should never happen (task won't throw any errors), but if it does, we need to + // handle it + // and it probably won't be recoverable + logger.error("Records sender task failed: " + error.getMessage(), error); + + closeStreamAsync( + true, + Optional.of( + new ZerobusException( + "Records sender task failed: " + error.getMessage(), error))); + }, + zerobusStreamExecutor); } private ClientResponseObserver ackReceiver; private void initAckReceiver() { - ackReceiver = new ClientResponseObserver() { - // Track state for the receiver - private Optional ackReceiverStreamId = Optional.empty(); - - @Override - public void beforeStart(ClientCallStreamObserver requestStream) { - requestStream.setOnReadyHandler(() -> { - synchronized (ZerobusStream.this) { - ZerobusStream.this.notifyAll(); + ackReceiver = + new ClientResponseObserver() { + // Track state for the receiver + private Optional ackReceiverStreamId = Optional.empty(); + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + requestStream.setOnReadyHandler( + () -> { + synchronized (ZerobusStream.this) { + ZerobusStream.this.notifyAll(); + } + }); } - }); - } - - @Override - public void onNext(EphemeralStreamResponse response) { - switch (response.getPayloadCase()) { - // *** Create stream response *** - case CREATE_STREAM_RESPONSE: - ackReceiverStreamId = Optional.of( - response.getCreateStreamResponse().getStreamId().isEmpty() ? - null : response.getCreateStreamResponse().getStreamId() - ); - if (!ackReceiverStreamId.isPresent() || ackReceiverStreamId.get() == null) { - throw new RuntimeException(new ZerobusException("Invalid response from server: stream id is missing")); - } - logger.debug("Stream created with id " + ackReceiverStreamId.get()); - streamCreatedEvent.get().complete(ackReceiverStreamId.get()); - break; - // *** Ingest record response (durability ack) *** - case INGEST_RECORD_RESPONSE: - String streamIdForReceiver = ackReceiverStreamId.orElseThrow(() -> - new RuntimeException(new ZerobusException("Invalid response from server: expected stream id but got record ack")) - ); - long ackedOffsetId = response.getIngestRecordResponse().getDurabilityAckUpToOffset(); - logger.debug("Acked offset " + ackedOffsetId); - - synchronized (ZerobusStream.this) { + @Override + public void onNext(EphemeralStreamResponse response) { + switch (response.getPayloadCase()) { + // *** Create stream response *** + case CREATE_STREAM_RESPONSE: + ackReceiverStreamId = + Optional.of( + response.getCreateStreamResponse().getStreamId().isEmpty() + ? null + : response.getCreateStreamResponse().getStreamId()); + if (!ackReceiverStreamId.isPresent() || ackReceiverStreamId.get() == null) { + throw new RuntimeException( + new ZerobusException("Invalid response from server: stream id is missing")); + } + logger.debug("Stream created with id " + ackReceiverStreamId.get()); + streamCreatedEvent.get().complete(ackReceiverStreamId.get()); + break; - // Edge case: Stream was recovered/recreated while ack was in flight. - // Ignore stale acks from old stream to avoid incorrectly completing promises. - if (!streamId.isPresent() || !streamIdForReceiver.equals(streamId.get())) { - return; - } + // *** Ingest record response (durability ack) *** + case INGEST_RECORD_RESPONSE: + String streamIdForReceiver = + ackReceiverStreamId.orElseThrow( + () -> + new RuntimeException( + new ZerobusException( + "Invalid response from server: expected stream id but got record ack"))); + long ackedOffsetId = + response.getIngestRecordResponse().getDurabilityAckUpToOffset(); + logger.debug("Acked offset " + ackedOffsetId); + + synchronized (ZerobusStream.this) { + + // Edge case: Stream was recovered/recreated while ack was in flight. + // Ignore stale acks from old stream to avoid incorrectly completing promises. + if (!streamId.isPresent() || !streamIdForReceiver.equals(streamId.get())) { + return; + } - // Receiving an ack proves the server is responsive and connection is healthy - streamFailureInfo.resetFailure(StreamFailureType.SERVER_CLOSED_STREAM); - streamFailureInfo.resetFailure(StreamFailureType.SERVER_UNRESPONSIVE); + // Receiving an ack proves the server is responsive and connection is healthy + streamFailureInfo.resetFailure(StreamFailureType.SERVER_CLOSED_STREAM); + streamFailureInfo.resetFailure(StreamFailureType.SERVER_UNRESPONSIVE); + + latestRespondedOffsetId = Math.max(latestRespondedOffsetId, ackedOffsetId); + + // Complete promises for all records up to and including the acked offset. + // Server guarantees durability for all records <= ackedOffsetId. + boolean processingDone = false; + while (!processingDone) { + if (inflightRecords.isEmpty()) { + processingDone = true; + } else { + Record record = inflightRecords.peek(); + + if (record.offsetId > ackedOffsetId) { + // This record hasn't been acked yet + processingDone = true; + } else { + record.ackPromise.complete(null); + try { + inflightRecords.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + } - latestRespondedOffsetId = Math.max(latestRespondedOffsetId, ackedOffsetId); + ZerobusStream.this.notifyAll(); + } - // Complete promises for all records up to and including the acked offset. - // Server guarantees durability for all records <= ackedOffsetId. - boolean processingDone = false; - while (!processingDone) { - if (inflightRecords.isEmpty()) { - processingDone = true; - } else { - Record record = inflightRecords.peek(); + // Invoke user callback asynchronously to avoid blocking the gRPC receiver thread. + // Exceptions in user code should not affect stream operation. + if (options.ackCallback().isPresent()) { + CompletableFuture.runAsync( + () -> { + options.ackCallback().get().accept(response.getIngestRecordResponse()); + }, + ec) + .exceptionally( + e -> { + logger.error( + "Exception in async ack_callback for offset " + + response + .getIngestRecordResponse() + .getDurabilityAckUpToOffset(), + e); + return null; + }); + } + break; - if (record.offsetId > ackedOffsetId) { - // This record hasn't been acked yet - processingDone = true; - } else { - record.ackPromise.complete(null); - try { - inflightRecords.take(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } + // *** Close stream signal *** + case CLOSE_STREAM_SIGNAL: + if (options.recovery()) { + double durationMs = 0.0; + if (response.getCloseStreamSignal().hasDuration()) { + durationMs = + response.getCloseStreamSignal().getDuration().getSeconds() * 1000.0 + + response.getCloseStreamSignal().getDuration().getNanos() / 1000000.0; } + logger.info( + String.format( + "Server will close the stream in %.3fms. Triggering stream recovery.", + durationMs)); + handleStreamFailed(StreamFailureType.SERVER_CLOSED_STREAM, Optional.empty()); } - } + break; - ZerobusStream.this.notifyAll(); + // *** Unknown response *** + default: + throw new RuntimeException(new ZerobusException("Invalid response from server")); } + } - // Invoke user callback asynchronously to avoid blocking the gRPC receiver thread. - // Exceptions in user code should not affect stream operation. - if (options.ackCallback().isPresent()) { - CompletableFuture.runAsync(() -> { - options.ackCallback().get().accept(response.getIngestRecordResponse()); - }, ec).exceptionally(e -> { - logger.error( - "Exception in async ack_callback for offset " + response.getIngestRecordResponse().getDurabilityAckUpToOffset(), - e - ); - return null; - }); - } - break; - - // *** Close stream signal *** - case CLOSE_STREAM_SIGNAL: - if (options.recovery()) { - double durationMs = 0.0; - if (response.getCloseStreamSignal().hasDuration()) { - durationMs = response.getCloseStreamSignal().getDuration().getSeconds() * 1000.0 + - response.getCloseStreamSignal().getDuration().getNanos() / 1000000.0; + @Override + public void onError(Throwable t) { + Optional error = Optional.of(t); + + if (t instanceof StatusRuntimeException) { + Status.Code code = ((StatusRuntimeException) t).getStatus().getCode(); + if (GrpcErrorHandling.isNonRetriable(code)) { + error = + Optional.of( + new NonRetriableException( + "Non-retriable gRPC error: " + ((StatusRuntimeException) t).getStatus(), + t)); } - logger.info(String.format("Server will close the stream in %.3fms. Triggering stream recovery.", durationMs)); - handleStreamFailed(StreamFailureType.SERVER_CLOSED_STREAM, Optional.empty()); } - break; - // *** Unknown response *** - default: - throw new RuntimeException(new ZerobusException("Invalid response from server")); - } - } - - @Override - public void onError(Throwable t) { - Optional error = Optional.of(t); - - if (t instanceof StatusRuntimeException) { - Status.Code code = ((StatusRuntimeException) t).getStatus().getCode(); - if (GrpcErrorHandling.isNonRetriable(code)) { - error = Optional.of( - new NonRetriableException("Non-retriable gRPC error: " + ((StatusRuntimeException) t).getStatus(), t) - ); + handleStreamFailed(StreamFailureType.SERVER_CLOSED_STREAM, error); } - } - handleStreamFailed(StreamFailureType.SERVER_CLOSED_STREAM, error); - } - - @Override - public void onCompleted() { - logger.debug("Server called close on the stream"); - handleStreamFailed(StreamFailureType.SERVER_CLOSED_STREAM, Optional.empty()); - } - }; + @Override + public void onCompleted() { + logger.debug("Server called close on the stream"); + handleStreamFailed(StreamFailureType.SERVER_CLOSED_STREAM, Optional.empty()); + } + }; } private void sendMessage(EphemeralStreamRequest message) throws Exception { @@ -1059,95 +1135,93 @@ private void sendMessage(EphemeralStreamRequest message) throws Exception { * Ingests a record into the stream. * * @param record The record to ingest. - * @return An IngestRecordResult containing two futures: - * - recordAccepted: completes when the SDK accepts and queues the record for processing - * - writeCompleted: completes when the server acknowledges the record has been durably stored - * If either future raises an exception, the record most probably was not acknowledged, - * but it is also possible that the server acknowledged the record but the response was lost. - * In this case client should decide whether to retry the record or not. + * @return A CompletableFuture that completes when the server acknowledges the record has been + * durably stored. If the future raises an exception, the record most probably was not + * acknowledged, but it is also possible that the server acknowledged the record but the + * response was lost. In this case client should decide whether to retry the record or not. + * @throws ZerobusException if the stream is not in a valid state for ingestion */ - public IngestRecordResult ingestRecord(RecordType record) { - CompletableFuture enqueuePromise = new CompletableFuture<>(); + public CompletableFuture ingestRecord(RecordType record) throws ZerobusException { CompletableFuture durabilityPromise = new CompletableFuture<>(); - CompletableFuture.runAsync(() -> { - synchronized (this) { - // Wait until there is space in the queue - boolean recordQueueFull = true; - while (recordQueueFull) { - switch (state) { - case RECOVERING: - case FLUSHING: - logger.debug("Ingest record: Waiting for stream " + streamId.orElse("") + " to finish recovering/flushing"); + synchronized (this) { + // Wait until there is space in the queue + boolean recordQueueFull = true; + while (recordQueueFull) { + switch (state) { + case RECOVERING: + case FLUSHING: + logger.debug( + "Ingest record: Waiting for stream " + + streamId.orElse("") + + " to finish recovering/flushing"); + try { + this.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + durabilityPromise.completeExceptionally( + new ZerobusException("Interrupted while waiting for stream", e)); + return durabilityPromise; + } + break; + case FAILED: + case CLOSED: + case UNINITIALIZED: + logger.error( + "Cannot ingest record when stream is closed or not opened for stream ID " + + streamId.orElse("unknown")); + throw new ZerobusException( + "Cannot ingest record when stream is closed or not opened for stream ID " + + streamId.orElse("unknown")); + case OPENED: + if (inflightRecords.remainingCapacity() > 0) { + recordQueueFull = false; + } else { + logger.debug("Ingest record: Waiting for space in the queue"); try { this.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - break; - case FAILED: - case CLOSED: - case UNINITIALIZED: - logger.error("Cannot ingest record when stream is closed or not opened for stream ID " + streamId.orElse("unknown")); - throw new RuntimeException(new ZerobusException( - "Cannot ingest record when stream is closed or not opened for stream ID " + streamId.orElse("unknown") - )); - case OPENED: - if (inflightRecords.remainingCapacity() > 0) { - recordQueueFull = false; - } else { - logger.debug("Ingest record: Waiting for space in the queue"); - try { - this.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } + durabilityPromise.completeExceptionally( + new ZerobusException("Interrupted while waiting for space in queue", e)); + return durabilityPromise; } - break; - } + } + break; } + } - ByteString protoEncodedRecord = ByteString.copyFrom(record.toByteArray()); - lastSentOffsetId += 1; - long offsetId = lastSentOffsetId; + ByteString protoEncodedRecord = ByteString.copyFrom(record.toByteArray()); + lastSentOffsetId += 1; + long offsetId = lastSentOffsetId; - try { - inflightRecords.put(new Record<>(offsetId, record, protoEncodedRecord, durabilityPromise)); + try { + inflightRecords.put(new Record<>(offsetId, record, protoEncodedRecord, durabilityPromise)); - recordsQueuedForSending.put( + recordsQueuedForSending.put( EphemeralStreamRequest.newBuilder() - .setIngestRecord( - IngestRecordRequest.newBuilder() - .setOffsetId(offsetId) - .setProtoEncodedRecord(protoEncodedRecord) - .build() - ) - .build() - ); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - - this.notifyAll(); - } - }, zerobusStreamExecutor).whenComplete((result, ex) -> { - if (ex == null) { - enqueuePromise.complete(null); - } else { - enqueuePromise.completeExceptionally(ex); - durabilityPromise.completeExceptionally(ex); + .setIngestRecord( + IngestRecordRequest.newBuilder() + .setOffsetId(offsetId) + .setProtoEncodedRecord(protoEncodedRecord) + .build()) + .build()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + durabilityPromise.completeExceptionally( + new ZerobusException("Interrupted while enqueuing record", e)); + return durabilityPromise; } - }); - return new IngestRecordResult(enqueuePromise, durabilityPromise); + this.notifyAll(); + } + + return durabilityPromise; } /** - * Flushes the stream, waiting for all queued records to be acknowledged by the server. - * The stream doesn't close after flushing. + * Flushes the stream, waiting for all queued records to be acknowledged by the server. The stream + * doesn't close after flushing. * * @throws ZerobusException If the stream is not opened. */ @@ -1186,7 +1260,8 @@ public void flush() throws ZerobusException { if (inflightRecords.isEmpty()) { recordsFlushed = true; } else { - long remainingTime = options.flushTimeoutMs() - (System.currentTimeMillis() - startTime); + long remainingTime = + options.flushTimeoutMs() - (System.currentTimeMillis() - startTime); if (remainingTime <= 0) { logger.error("Flushing stream timed out"); @@ -1220,8 +1295,8 @@ public void flush() throws ZerobusException { } /** - * Closes the stream, while first flushing all queued records. - * Once a stream is closed, it cannot be reopened. + * Closes the stream, while first flushing all queued records. Once a stream is closed, it cannot + * be reopened. * * @throws ZerobusException If the stream is not opened. */