|
| 1 | +package com.databricks.zerobus.examples.arrow; |
| 2 | + |
| 3 | +import com.databricks.zerobus.*; |
| 4 | +import java.util.Arrays; |
| 5 | +import java.util.List; |
| 6 | +import org.apache.arrow.memory.BufferAllocator; |
| 7 | +import org.apache.arrow.memory.RootAllocator; |
| 8 | +import org.apache.arrow.vector.BigIntVector; |
| 9 | +import org.apache.arrow.vector.IntVector; |
| 10 | +import org.apache.arrow.vector.LargeVarCharVector; |
| 11 | +import org.apache.arrow.vector.VectorSchemaRoot; |
| 12 | +import org.apache.arrow.vector.types.pojo.ArrowType; |
| 13 | +import org.apache.arrow.vector.types.pojo.Field; |
| 14 | +import org.apache.arrow.vector.types.pojo.Schema; |
| 15 | + |
| 16 | +/** |
| 17 | + * Arrow Flight ingestion example. |
| 18 | + * |
| 19 | + * <p>Demonstrates ingesting columnar data using Apache Arrow record batches via the Arrow Flight |
| 20 | + * protocol. This provides high-performance ingestion for large datasets. |
| 21 | + * |
| 22 | + * <p>Prerequisites: |
| 23 | + * |
| 24 | + * <ul> |
| 25 | + * <li>A Delta table with columns: device_name (STRING), temp (INT), humidity (BIGINT) |
| 26 | + * <li>Apache Arrow Java libraries on the classpath (arrow-vector, arrow-memory-netty) |
| 27 | + * </ul> |
| 28 | + * |
| 29 | + * <p>Run with: {@code java -cp <classpath> com.databricks.zerobus.examples.arrow.ArrowIngestionExample} |
| 30 | + */ |
| 31 | +public class ArrowIngestionExample { |
| 32 | + |
| 33 | + public static void main(String[] args) throws Exception { |
| 34 | + String serverEndpoint = System.getenv("ZEROBUS_SERVER_ENDPOINT"); |
| 35 | + String workspaceUrl = System.getenv("DATABRICKS_WORKSPACE_URL"); |
| 36 | + String tableName = System.getenv("ZEROBUS_TABLE_NAME"); |
| 37 | + String clientId = System.getenv("DATABRICKS_CLIENT_ID"); |
| 38 | + String clientSecret = System.getenv("DATABRICKS_CLIENT_SECRET"); |
| 39 | + |
| 40 | + if (serverEndpoint == null |
| 41 | + || workspaceUrl == null |
| 42 | + || tableName == null |
| 43 | + || clientId == null |
| 44 | + || clientSecret == null) { |
| 45 | + System.err.println("Error: Required environment variables not set."); |
| 46 | + System.err.println( |
| 47 | + "Set: ZEROBUS_SERVER_ENDPOINT, DATABRICKS_WORKSPACE_URL, ZEROBUS_TABLE_NAME,"); |
| 48 | + System.err.println(" DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET"); |
| 49 | + System.exit(1); |
| 50 | + } |
| 51 | + |
| 52 | + System.out.println("=== Arrow Flight Ingestion Example ===\n"); |
| 53 | + |
| 54 | + // Define the Arrow schema matching the Delta table |
| 55 | + Schema schema = |
| 56 | + new Schema( |
| 57 | + Arrays.asList( |
| 58 | + Field.nullable("device_name", ArrowType.LargeUtf8.INSTANCE), |
| 59 | + Field.nullable("temp", new ArrowType.Int(32, true)), |
| 60 | + Field.nullable("humidity", new ArrowType.Int(64, true)))); |
| 61 | + |
| 62 | + try (BufferAllocator allocator = new RootAllocator(); |
| 63 | + ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl)) { |
| 64 | + |
| 65 | + // === Single batch ingestion === |
| 66 | + System.out.println("--- Single Batch Ingestion ---"); |
| 67 | + |
| 68 | + ZerobusArrowStream stream = |
| 69 | + sdk.createArrowStream(tableName, schema, clientId, clientSecret).join(); |
| 70 | + |
| 71 | + try { |
| 72 | + try (VectorSchemaRoot batch = VectorSchemaRoot.create(schema, allocator)) { |
| 73 | + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); |
| 74 | + IntVector tempVector = (IntVector) batch.getVector("temp"); |
| 75 | + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); |
| 76 | + |
| 77 | + int rowCount = 5; |
| 78 | + batch.allocateNew(); |
| 79 | + for (int i = 0; i < rowCount; i++) { |
| 80 | + nameVector.setSafe(i, ("arrow-device-" + i).getBytes()); |
| 81 | + tempVector.setSafe(i, 20 + i); |
| 82 | + humidityVector.setSafe(i, 50 + i); |
| 83 | + } |
| 84 | + batch.setRowCount(rowCount); |
| 85 | + |
| 86 | + long offset = stream.ingestBatch(batch).get(); |
| 87 | + stream.waitForOffset(offset); |
| 88 | + System.out.println( |
| 89 | + " " + rowCount + " rows ingested and acknowledged (offset: " + offset + ")"); |
| 90 | + } |
| 91 | + |
| 92 | + // === Multiple batch ingestion === |
| 93 | + System.out.println("\n--- Multiple Batch Ingestion ---"); |
| 94 | + |
| 95 | + long lastOffset = -1; |
| 96 | + for (int batchNum = 0; batchNum < 3; batchNum++) { |
| 97 | + try (VectorSchemaRoot batch = VectorSchemaRoot.create(schema, allocator)) { |
| 98 | + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); |
| 99 | + IntVector tempVector = (IntVector) batch.getVector("temp"); |
| 100 | + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); |
| 101 | + |
| 102 | + int rowCount = 10; |
| 103 | + batch.allocateNew(); |
| 104 | + for (int i = 0; i < rowCount; i++) { |
| 105 | + nameVector.setSafe(i, ("arrow-batch-" + batchNum + "-row-" + i).getBytes()); |
| 106 | + tempVector.setSafe(i, 30 + i); |
| 107 | + humidityVector.setSafe(i, 60 + i); |
| 108 | + } |
| 109 | + batch.setRowCount(rowCount); |
| 110 | + |
| 111 | + lastOffset = stream.ingestBatch(batch).get(); |
| 112 | + } |
| 113 | + } |
| 114 | + stream.flush(); |
| 115 | + System.out.println(" 3 batches (30 rows total) ingested and flushed"); |
| 116 | + |
| 117 | + // === Custom options === |
| 118 | + System.out.println("\n--- Custom Options ---"); |
| 119 | + |
| 120 | + ArrowStreamConfigurationOptions customOptions = |
| 121 | + ArrowStreamConfigurationOptions.builder() |
| 122 | + .setMaxInflightBatches(2000) |
| 123 | + .setFlushTimeoutMs(600000) |
| 124 | + .setRecovery(true) |
| 125 | + .setRecoveryRetries(5) |
| 126 | + .build(); |
| 127 | + System.out.println( |
| 128 | + " maxInflightBatches: " + customOptions.maxInflightBatches()); |
| 129 | + System.out.println(" flushTimeoutMs: " + customOptions.flushTimeoutMs()); |
| 130 | + System.out.println(" recoveryRetries: " + customOptions.recoveryRetries()); |
| 131 | + |
| 132 | + } finally { |
| 133 | + stream.close(); |
| 134 | + } |
| 135 | + |
| 136 | + // === Demonstrate getUnackedBatches and recreateArrowStream === |
| 137 | + System.out.println("\n--- Unacked Batches (after close) ---"); |
| 138 | + |
| 139 | + List<byte[]> unackedBatches = stream.getUnackedBatches(); |
| 140 | + System.out.println(" Unacked batches: " + unackedBatches.size()); |
| 141 | + System.out.println(" (Expected 0 after successful flush/close)"); |
| 142 | + |
| 143 | + System.out.println("\n--- Recreate Arrow Stream ---"); |
| 144 | + |
| 145 | + ZerobusArrowStream newStream = sdk.recreateArrowStream(stream).join(); |
| 146 | + try { |
| 147 | + try (VectorSchemaRoot batch = VectorSchemaRoot.create(schema, allocator)) { |
| 148 | + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); |
| 149 | + IntVector tempVector = (IntVector) batch.getVector("temp"); |
| 150 | + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); |
| 151 | + |
| 152 | + batch.allocateNew(); |
| 153 | + nameVector.setSafe(0, "arrow-recreated".getBytes()); |
| 154 | + tempVector.setSafe(0, 99); |
| 155 | + humidityVector.setSafe(0, 99); |
| 156 | + batch.setRowCount(1); |
| 157 | + |
| 158 | + long offset = newStream.ingestBatch(batch).get(); |
| 159 | + newStream.waitForOffset(offset); |
| 160 | + System.out.println(" 1 row ingested on recreated stream (offset: " + offset + ")"); |
| 161 | + } |
| 162 | + } finally { |
| 163 | + newStream.close(); |
| 164 | + } |
| 165 | + |
| 166 | + System.out.println("\n=== Arrow Flight Example Complete ==="); |
| 167 | + } |
| 168 | + } |
| 169 | +} |
0 commit comments