Skip to content

[Rust] Zero-copy ingestion pipeline #147

@teodordelibasic-db

Description

@teodordelibasic-db

SDK

Rust

Description

The Rust SDK's record ingestion pipeline has unnecessary copying overhead that affects all downstream language SDKs (TypeScript, Java, Python, Go). This task covers the Rust SDK changes needed to eliminate that overhead and expose new APIs that FFI layers can adopt in follow-up work.

Proto/JSON path: 2 deep copies per record in the landing zone

Every record is deep-copied twice before reaching gRPC:

  1. LandingZone::observe() (landing_zone.rs:149) clones the entire Box<IngestRequest> (including all record bytes) to keep a copy in observed_items for retry.
  2. The sender task's tokio::select! (lib.rs:1817) clones the observed item again due to ownership semantics.

Vec<u8> (proto) and String (JSON) both perform O(n) deep copies on .clone(). For a 1 MB protobuf record this is 2 MB of unnecessary allocation + memcpy per send.

Arrow path: double IPC serialization

All FFI layers serialize Arrow RecordBatches to IPC bytes to cross the language boundary. The Rust SDK deserializes back to RecordBatch, then FlightDataEncoderBuilder (arrow_stream.rs:431) re-serializes to IPC for the Flight gRPC protocol:

Language RecordBatch → IPC serialize → [FFI] → IPC deserialize → Rust RecordBatch
  → FlightDataEncoderBuilder → IPC RE-SERIALIZE → gRPC

This is documented as a TODO in the Python bindings (python/rust/src/arrow.rs:272). The same pattern exists in Go (rust/ffi/src/lib.rs:337), Java (rust/jni/src/arrow_stream.rs:107), and TypeScript (typescript/src/lib.rs:1432).

No efficient API for FFI layers to pass owned byte buffers

The current public API accepts impl Into<EncodedRecord> which resolves to Vec<u8> or String. FFI layers that already have byte data must copy into these types (e.g., Go to_vec() from pinned slice, TypeScript buffer.to_vec() from NAPI Buffer, Java convert_byte_array() from JNI). There is no way to pass a reference-counted Bytes buffer that avoids this copy and also avoids the subsequent landing zone clones.

Proposed Solution

1. Add bytes crate dependency and use Bytes-based internal record types

Create crate-internal record types that use bytes::Bytes instead of Vec<u8>/String:

#[derive(Clone)] // O(1) via Bytes refcount
pub(crate) struct InternalBatch {
    pub(crate) records: SmallVec<[Bytes; 1]>,
    pub(crate) record_type: RecordType,
}

Convert from existing public types at the ingestion boundary:

  • Bytes::from(Vec<u8>) — takes ownership of the Vec's allocation (pointer handoff, no memcpy).
  • Bytes::from(String) — same, via into_bytes().
  • Bytes::clone() — atomic refcount increment, O(1) regardless of data size.

Use InternalBatch in IngestRequest and throughout the internal pipeline. The landing zone clones and sender task clones become O(1).

All existing public types (EncodedRecord, EncodedBatch, ProtoBytes, JsonString, etc.) and methods remain unchanged.

2. Configure prost to generate Bytes for proto bytes fields

Update build.rs:

tonic_build::configure()
    .bytes([
        "databricks.zerobus.IngestRecordRequest",
        "databricks.zerobus.ProtoEncodedRecordBatch",
        "databricks.zerobus.CreateIngestStreamRequest",
    ])
    .compile_protos(&["zerobus_service.proto"], &["."])
    .unwrap();

This makes proto bytes fields use bytes::Bytes in the generated code. Proto records then flow from ingestion through prost encoding to gRPC with zero deep copies. The generated types are pub(crate) — invisible to users.

3. New ingest_ipc_batch() on ZerobusArrowStream

Add a method that accepts raw Arrow IPC bytes, bypassing the deserialize→re-serialize round-trip:

impl ZerobusArrowStream {
    pub async fn ingest_ipc_batch(&self, ipc_bytes: Bytes) -> ZerobusResult<OffsetId> { ... }
}

Internally, introduce a dual payload type:

#[derive(Clone)]
pub(crate) enum ArrowPayload {
    Ipc(Bytes),          // From FFI callers. Clone is O(1).
    Batch(RecordBatch),  // From Rust callers. Clone is O(1) via Arc arrays.
}

For the Ipc variant, construct FlightData directly from the IPC bytes without reconstructing a RecordBatch. The row count for ack matching is extracted from the IPC message header (metadata read, not full deserialization).

The existing ingest_batch(RecordBatch) stays unchanged. Recovery: full replay uses stored Bytes (O(1) clone). Partial-batch slicing (rare) falls back to deserialize → slice → re-serialize.

4. New Bytes-accepting API for proto/JSON

impl ZerobusStream {
    pub async fn ingest_proto_bytes(&self, data: Bytes) -> ZerobusResult<OffsetId> { ... }
    pub async fn ingest_json_bytes(&self, data: Bytes) -> ZerobusResult<OffsetId> { ... }
    pub async fn ingest_proto_bytes_batch(&self, data: Vec<Bytes>) -> ZerobusResult<Option<OffsetId>> { ... }
    pub async fn ingest_json_bytes_batch(&self, data: Vec<Bytes>) -> ZerobusResult<Option<OffsetId>> { ... }
}

These skip the Vec<u8>EncodedRecordInternalBatch conversion chain entirely. FFI layers can pass Bytes constructed from their native buffer types directly into the zero-copy pipeline.

Expected impact

Proto/JSON hot path:

Stage Before After
LandingZone observe() O(n) deep copy O(1) refcount
Sender select! clone O(n) deep copy O(1) refcount
→ prost encoding (proto) move into Vec<u8> field move into Bytes field
Total deep copies 2 per record 0

Arrow hot path:

Stage Before After
IPC → RecordBatch parse full deserialization skipped
Pending batch clone O(1) Arc clone O(1) Bytes clone
RecordBatch → Flight IPC full serialization skipped (IPC forwarded)
IPC encode/decode passes 2 0

Backward compatibility

All changes are non-breaking:

  • Existing public types and methods are unchanged.
  • New methods (ingest_ipc_batch, ingest_proto_bytes, ingest_json_bytes) are additive.
  • Internal types (InternalBatch, ArrowPayload, prost config) are pub(crate).

Additional Context

The bytes crate is already a transitive dependency via tonic/hyper/prost. Adding it directly adds no new code to the binary.

Follow-up tasks after this lands:

  • TypeScript SDK: Update NAPI bindings to call ingest_proto_bytes/ingest_json_bytes/ingest_ipc_batch, cache JSON.stringify reference, explore Bytes::from_owner for NAPI Buffer zero-copy.
  • Python SDK: Update PyO3 bindings to call ingest_ipc_batch, resolving the TODO at arrow.rs:272.
  • Java SDK: Update JNI bindings to call ingest_ipc_batch and ingest_proto_bytes.
  • Go SDK: Update CGO FFI to call ingest_ipc_batch and ingest_proto_bytes.
  • All SDKs (future): Arrow C Data Interface for true zero-copy Arrow FFI (eliminates IPC serialization on the language side entirely).

Metadata

Metadata

Labels

enhancementNew feature or request

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions