Skip to content

Commit ca41bf2

Browse files
Merge branch 'main' into java-arrow
2 parents b1ce6cb + 58b57c4 commit ca41bf2

26 files changed

Lines changed: 2281 additions & 42 deletions

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
.bsp/
1212
.bazelbsp/
1313

14+
# Tools
15+
pyrefly.toml
16+
1417
# OS
1518
.DS_Store
1619
Thumbs.db

python/NEXT_CHANGELOG.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,16 @@
66

77
### New Features and Improvements
88

9+
- **Arrow Flight Support (Experimental)**: Added support for ingesting `pyarrow.RecordBatch` and `pyarrow.Table` objects via Arrow Flight protocol
10+
- **Note**: Arrow Flight is not yet supported by default from the Zerobus server side.
11+
- New `ZerobusArrowStream` class (sync in `zerobus.sdk.sync`, async in `zerobus.sdk.aio`) with `ingest_batch()`, `wait_for_offset()`, `flush()`, `close()`, `get_unacked_batches()` methods
12+
- New `ArrowStreamConfigurationOptions` for configuring Arrow streams (max inflight batches, recovery, timeouts)
13+
- New `create_arrow_stream()` and `recreate_arrow_stream()` methods on both sync and async `ZerobusSdk`
14+
- Accepts both `pyarrow.RecordBatch` and `pyarrow.Table` (Tables are combined to a single batch internally)
15+
- Arrow is opt-in: install via `pip install databricks-zerobus-ingest-sdk[arrow]` (requires `pyarrow>=14.0.0`)
16+
- Arrow types gated behind `_core.arrow` submodule — not loaded unless pyarrow is installed
17+
- Available from both `zerobus.sdk.sync` and `zerobus.sdk.aio`, and re-exported from top-level `zerobus` package
18+
919
### Bug Fixes
1020

1121
- Fixed proto generation tool to skip reserved field numbers 19000-19999 for tables with more than 19000 columns
@@ -14,9 +24,19 @@
1424

1525
### Internal Changes
1626

27+
- Bumped Rust SDK dependency to v1.0.1 with `arrow-flight` feature
28+
- Added `arrow-ipc`, `arrow-schema`, `arrow-array` (v56.2.0) Rust dependencies for IPC serialization
29+
- Added PyO3 arrow module (`arrow.rs`) with `ArrowStreamConfigurationOptions`, `ZerobusArrowStream`, `AsyncZerobusArrowStream` pyclasses
30+
- Added Python-side serialization helpers in `zerobus.sdk.shared.arrow` (`_serialize_schema`, `_serialize_batch`, `_deserialize_batch`)
31+
1732
### Breaking Changes
1833

1934
### Deprecations
2035

2136
### API Changes
2237

38+
- Added `create_arrow_stream(table_name, schema, client_id, client_secret, options=None, headers_provider=None)` to sync and async `ZerobusSdk`
39+
- Added `recreate_arrow_stream(old_stream)` to sync and async `ZerobusSdk`
40+
- Added `ZerobusArrowStream` class (sync and async variants) with methods: `ingest_batch()`, `wait_for_offset()`, `flush()`, `close()`, `get_unacked_batches()`, properties: `is_closed`, `table_name`
41+
- Added `ArrowStreamConfigurationOptions` class with fields: `max_inflight_batches`, `recovery`, `recovery_timeout_ms`, `recovery_backoff_ms`, `recovery_retries`, `server_lack_of_ack_timeout_ms`, `flush_timeout_ms`, `connection_timeout_ms`
42+
- Added optional dependency: `pyarrow>=14.0.0` via `pip install databricks-zerobus-ingest-sdk[arrow]`
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
"""
2+
Asynchronous Ingestion Example - Arrow Flight Mode
3+
4+
This example demonstrates record ingestion using the asynchronous API with Arrow Flight.
5+
6+
Record Type Mode: Arrow (RecordBatch)
7+
- Records are sent as pyarrow RecordBatches
8+
- Uses Arrow Flight protocol for columnar data transfer
9+
- Best for structured/columnar data, DataFrames, Parquet workflows
10+
11+
Requirements:
12+
pip install databricks-zerobus-ingest-sdk[arrow]
13+
14+
Note: Arrow Flight support is experimental and not yet supported for production use.
15+
"""
16+
17+
import asyncio
18+
import logging
19+
import os
20+
import time
21+
22+
import pyarrow as pa
23+
24+
from zerobus.sdk.aio import ZerobusSdk
25+
from zerobus.sdk.shared.arrow import ArrowStreamConfigurationOptions
26+
27+
# Configure logging
28+
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
29+
logger = logging.getLogger(__name__)
30+
31+
32+
# Configuration - update these with your values
33+
SERVER_ENDPOINT = os.getenv(
34+
"ZEROBUS_SERVER_ENDPOINT",
35+
"https://your-shard-id.zerobus.region.cloud.databricks.com",
36+
)
37+
UNITY_CATALOG_ENDPOINT = os.getenv("DATABRICKS_WORKSPACE_URL", "https://your-workspace.cloud.databricks.com")
38+
TABLE_NAME = os.getenv("ZEROBUS_TABLE_NAME", "catalog.schema.table")
39+
40+
# For OAuth authentication
41+
CLIENT_ID = os.getenv("DATABRICKS_CLIENT_ID", "your-oauth-client-id")
42+
CLIENT_SECRET = os.getenv("DATABRICKS_CLIENT_SECRET", "your-oauth-client-secret")
43+
44+
# Number of batches to ingest
45+
NUM_BATCHES = 10
46+
ROWS_PER_BATCH = 100
47+
48+
# Define the Arrow schema
49+
SCHEMA = pa.schema(
50+
[
51+
("device_name", pa.large_utf8()),
52+
("temp", pa.int32()),
53+
("humidity", pa.int64()),
54+
]
55+
)
56+
57+
58+
def create_sample_batch(batch_index):
59+
"""
60+
Creates a sample RecordBatch with air quality data.
61+
62+
Returns a pyarrow.RecordBatch with ROWS_PER_BATCH rows.
63+
"""
64+
return pa.record_batch(
65+
{
66+
"device_name": [f"sensor-{(batch_index * ROWS_PER_BATCH + i) % 10}" for i in range(ROWS_PER_BATCH)],
67+
"temp": [20 + ((batch_index * ROWS_PER_BATCH + i) % 15) for i in range(ROWS_PER_BATCH)],
68+
"humidity": [50 + ((batch_index * ROWS_PER_BATCH + i) % 40) for i in range(ROWS_PER_BATCH)],
69+
},
70+
schema=SCHEMA,
71+
)
72+
73+
74+
async def main():
75+
print("Starting asynchronous ingestion example (Arrow Flight Mode)...")
76+
print("=" * 60)
77+
78+
# Check if credentials are configured
79+
if CLIENT_ID == "your-oauth-client-id" or CLIENT_SECRET == "your-oauth-client-secret":
80+
logger.error("Please set DATABRICKS_CLIENT_ID and DATABRICKS_CLIENT_SECRET environment variables")
81+
return
82+
83+
if SERVER_ENDPOINT == "https://your-shard-id.zerobus.region.cloud.databricks.com":
84+
logger.error("Please set ZEROBUS_SERVER_ENDPOINT environment variable")
85+
return
86+
87+
if TABLE_NAME == "catalog.schema.table":
88+
logger.error("Please set ZEROBUS_TABLE_NAME environment variable")
89+
return
90+
91+
try:
92+
# Step 1: Initialize the SDK
93+
sdk = ZerobusSdk(SERVER_ENDPOINT, UNITY_CATALOG_ENDPOINT)
94+
logger.info("SDK initialized")
95+
96+
# Step 2: Configure arrow stream options (all optional, shown with defaults)
97+
options = ArrowStreamConfigurationOptions(
98+
max_inflight_batches=10,
99+
recovery=True,
100+
recovery_timeout_ms=15000,
101+
recovery_backoff_ms=2000,
102+
recovery_retries=3,
103+
)
104+
logger.info("Arrow stream configuration created")
105+
106+
# Step 3: Create an Arrow Flight stream
107+
#
108+
# Pass a pyarrow.Schema - the SDK handles serialization internally.
109+
# The SDK automatically:
110+
# - Includes authorization header with OAuth token
111+
# - Includes x-databricks-zerobus-table-name header
112+
stream = await sdk.create_arrow_stream(TABLE_NAME, SCHEMA, CLIENT_ID, CLIENT_SECRET, options)
113+
logger.info(f"Arrow stream created for table: {stream.table_name}")
114+
115+
# Step 4: Ingest Arrow RecordBatches asynchronously
116+
logger.info(f"\nIngesting {NUM_BATCHES} batches of {ROWS_PER_BATCH} rows each...")
117+
start_time = time.time()
118+
total_rows = 0
119+
120+
try:
121+
# ========================================================================
122+
# Ingest RecordBatches - each call returns an offset
123+
# ========================================================================
124+
offsets = []
125+
for i in range(NUM_BATCHES):
126+
batch = create_sample_batch(i)
127+
offset = await stream.ingest_batch(batch)
128+
offsets.append(offset)
129+
total_rows += batch.num_rows
130+
logger.info(f" Batch {i + 1}: {batch.num_rows} rows, offset: {offset}")
131+
132+
# ========================================================================
133+
# You can also ingest a pyarrow.Table directly
134+
# The SDK converts it to a single RecordBatch internally
135+
# ========================================================================
136+
table = pa.table(
137+
{
138+
"device_name": [f"sensor-table-{i}" for i in range(50)],
139+
"temp": list(range(20, 70)),
140+
"humidity": list(range(50, 100)),
141+
},
142+
schema=SCHEMA,
143+
)
144+
offset = await stream.ingest_batch(table)
145+
offsets.append(offset)
146+
total_rows += table.num_rows
147+
logger.info(f" Table ingested: {table.num_rows} rows, offset: {offset}")
148+
149+
submit_end_time = time.time()
150+
submit_duration = submit_end_time - start_time
151+
logger.info(f"\nAll batches submitted in {submit_duration:.2f} seconds")
152+
153+
# ========================================================================
154+
# Wait for the last offset to be acknowledged
155+
# ========================================================================
156+
logger.info(f"Waiting for offset {offsets[-1]} to be acknowledged...")
157+
await stream.wait_for_offset(offsets[-1])
158+
logger.info(f" Offset {offsets[-1]} acknowledged")
159+
160+
# Step 5: Flush and close the stream
161+
logger.info("\nFlushing stream...")
162+
await stream.flush()
163+
logger.info("Stream flushed")
164+
165+
end_time = time.time()
166+
total_duration = end_time - start_time
167+
rows_per_second = total_rows / total_duration
168+
169+
await stream.close()
170+
logger.info("Stream closed")
171+
172+
# Print summary
173+
print("\n" + "=" * 60)
174+
print("Ingestion Summary:")
175+
print(f" Total batches: {NUM_BATCHES + 1}")
176+
print(f" Total rows: {total_rows}")
177+
print(f" Submit time: {submit_duration:.2f} seconds")
178+
print(f" Total time: {total_duration:.2f} seconds")
179+
print(f" Throughput: {rows_per_second:.2f} rows/sec")
180+
print(f" Record type: Arrow Flight (RecordBatch)")
181+
print("=" * 60)
182+
183+
except Exception as e:
184+
logger.error(f"\nError during ingestion: {e}")
185+
186+
# On failure, you can retrieve unacked batches for retry
187+
if stream.is_closed:
188+
unacked = await stream.get_unacked_batches()
189+
if unacked:
190+
logger.info(f" {len(unacked)} unacked batches available for retry")
191+
for i, batch in enumerate(unacked):
192+
logger.info(f" Batch {i}: {batch.num_rows} rows, schema: {batch.schema}")
193+
194+
await stream.close()
195+
raise
196+
197+
except Exception as e:
198+
logger.error(f"\nFailed to initialize stream: {e}")
199+
raise
200+
201+
202+
if __name__ == "__main__":
203+
asyncio.run(main())

python/examples/async_example_json.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@
4141

4242
# Configuration - update these with your values
4343
# For AWS:
44-
SERVER_ENDPOINT = os.getenv("ZEROBUS_SERVER_ENDPOINT", "https://your-shard-id.zerobus.region.cloud.databricks.com")
44+
SERVER_ENDPOINT = os.getenv(
45+
"ZEROBUS_SERVER_ENDPOINT",
46+
"https://your-shard-id.zerobus.region.cloud.databricks.com",
47+
)
4548
UNITY_CATALOG_ENDPOINT = os.getenv("DATABRICKS_WORKSPACE_URL", "https://your-workspace.cloud.databricks.com")
4649
# For Azure:
4750
# SERVER_ENDPOINT = os.getenv(

python/examples/async_example_proto.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@
3838

3939
# Configuration - update these with your values
4040
# For AWS:
41-
SERVER_ENDPOINT = os.getenv("ZEROBUS_SERVER_ENDPOINT", "https://your-shard-id.zerobus.region.cloud.databricks.com")
41+
SERVER_ENDPOINT = os.getenv(
42+
"ZEROBUS_SERVER_ENDPOINT",
43+
"https://your-shard-id.zerobus.region.cloud.databricks.com",
44+
)
4245
UNITY_CATALOG_ENDPOINT = os.getenv("DATABRICKS_WORKSPACE_URL", "https://your-workspace.cloud.databricks.com")
4346
# For Azure:
4447
# SERVER_ENDPOINT = os.getenv(

0 commit comments

Comments
 (0)