This directory contains example applications demonstrating different usage patterns of the Zerobus Ingest SDK for Java.
The examples are organized by stream type and demonstrate both single-record and batch ingestion patterns.
Features demonstrated:
ZerobusProtoStream- Protocol Buffer ingestion with method-level genericsZerobusJsonStream- JSON ingestion with flexible serializationZerobusStream(deprecated) - Legacy Future-based API for backward compatibility
examples/
├── README.md (this file)
├── proto/ (Protocol Buffer examples - ZerobusProtoStream)
│ ├── README.md
│ ├── AirQualityProto.java (generated proto)
│ ├── SingleRecordExample.java
│ └── BatchIngestionExample.java
├── json/ (JSON examples - ZerobusJsonStream)
│ ├── README.md
│ ├── SingleRecordExample.java
│ └── BatchIngestionExample.java
└── legacy/ (Legacy examples - ZerobusStream)
└── LegacyStreamExample.java
| Example | Stream Class | Description |
|---|---|---|
proto/SingleRecordExample |
ZerobusProtoStream |
Single record ingestion (Message + byte[]) |
proto/BatchIngestionExample |
ZerobusProtoStream |
Batch ingestion |
json/SingleRecordExample |
ZerobusJsonStream |
Single record ingestion (Object + String) |
json/BatchIngestionExample |
ZerobusJsonStream |
Batch ingestion |
legacy/LegacyStreamExample |
ZerobusStream |
Legacy Future-based API |
Each example demonstrates: single ingestion + wait, batch ingestion + wait for last, and recreateStream.
ZerobusProtoStream stream = sdk.createProtoStream(
tableName,
MyProto.getDescriptor().toProto(),
clientId,
clientSecret
).join();
// Method-level generics - flexible typing
stream.ingestRecordOffset(myProtoMessage); // Message
stream.ingestRecordOffset(preEncodedBytes); // byte[]
stream.ingestRecordsOffset(listOfMessages); // batch
stream.ingestRecordsOffset(listOfByteArrays); // batchZerobusJsonStream stream = sdk.createJsonStream(
tableName,
clientId,
clientSecret
).join();
// Method-level generics - flexible typing
stream.ingestRecordOffset(object, gson::toJson); // Object + serializer
stream.ingestRecordOffset(jsonString); // String
stream.ingestRecordsOffset(objects, gson::toJson);// batch
stream.ingestRecordsOffset(jsonStrings); // batch@SuppressWarnings("deprecation")
ZerobusStream<MyProto> stream = sdk.createStream(
tableProperties, clientId, clientSecret
).join();
// Class-level generics - fixed type, Future-based
stream.ingestRecord(myProtoMessage).join(); // CompletableFuture<Void>CREATE TABLE <catalog>.default.air_quality (
device_name STRING,
temp INT,
humidity BIGINT
) USING DELTA;Create a service principal with SELECT and MODIFY permissions on the table.
export ZEROBUS_SERVER_ENDPOINT="<workspace-id>.zerobus.<region>.cloud.databricks.com"
export DATABRICKS_WORKSPACE_URL="https://<workspace>.cloud.databricks.com"
export ZEROBUS_TABLE_NAME="<catalog>.<schema>.<table>"
export DATABRICKS_CLIENT_ID="your-client-id"
export DATABRICKS_CLIENT_SECRET="your-client-secret"cd .. # Go to SDK root
mvn package -DskipTestscd examples
# Compile examples
javac -d . -cp "../target/classes:$(cd .. && mvn dependency:build-classpath -q -DincludeScope=runtime -Dmdep.outputFile=/dev/stdout)" \
proto/com/databricks/zerobus/examples/proto/AirQualityProto.java \
proto/SingleRecordExample.java \
proto/BatchIngestionExample.java
# Run single record example
java -cp ".:../target/classes:$(cd .. && mvn dependency:build-classpath -q -DincludeScope=runtime -Dmdep.outputFile=/dev/stdout)" \
com.databricks.zerobus.examples.proto.SingleRecordExample
# Run batch example
java -cp ".:../target/classes:$(cd .. && mvn dependency:build-classpath -q -DincludeScope=runtime -Dmdep.outputFile=/dev/stdout)" \
com.databricks.zerobus.examples.proto.BatchIngestionExamplecd examples
# Compile examples
javac -d . -cp "../target/classes:$(cd .. && mvn dependency:build-classpath -q -DincludeScope=runtime -Dmdep.outputFile=/dev/stdout)" \
json/SingleRecordExample.java \
json/BatchIngestionExample.java
# Run single record example
java -cp ".:../target/classes:$(cd .. && mvn dependency:build-classpath -q -DincludeScope=runtime -Dmdep.outputFile=/dev/stdout)" \
com.databricks.zerobus.examples.json.SingleRecordExample
# Run batch example
java -cp ".:../target/classes:$(cd .. && mvn dependency:build-classpath -q -DincludeScope=runtime -Dmdep.outputFile=/dev/stdout)" \
com.databricks.zerobus.examples.json.BatchIngestionExamplecd examples
# Compile (requires proto for AirQuality)
javac -d . -cp "../target/classes:$(cd .. && mvn dependency:build-classpath -q -DincludeScope=runtime -Dmdep.outputFile=/dev/stdout)" \
proto/com/databricks/zerobus/examples/proto/AirQualityProto.java \
legacy/LegacyStreamExample.java
# Run legacy example
java -cp ".:../target/classes:$(cd .. && mvn dependency:build-classpath -q -DincludeScope=runtime -Dmdep.outputFile=/dev/stdout)" \
com.databricks.zerobus.examples.legacy.LegacyStreamExample| Use Case | Stream Class | Why |
|---|---|---|
| Protocol Buffers (new code) | ZerobusProtoStream |
Method-level generics, batch support |
| JSON (new code) | ZerobusJsonStream |
Clean API, no proto dependency |
Existing code with ZerobusStream |
ZerobusStream |
Backward compatible, migrate later |
| Feature | ZerobusProtoStream | ZerobusJsonStream | ZerobusStream |
|---|---|---|---|
| Generics | Method-level | Method-level | Class-level |
| Return Type | long offset |
long offset |
CompletableFuture |
| Batch Support | Yes | Yes | No |
| Status | Recommended | Recommended | Deprecated |