[Java] Add experimental Arrow Flight support#141
Conversation
| */ | ||
| public long ingestBatch(VectorSchemaRoot batch) throws ZerobusException { | ||
| if (batch == null) { | ||
| throw new ZerobusException("Batch must not be null"); |
There was a problem hiding this comment.
how should we handle this case? I think throwing an exception makes sense, but we can make it a no-op too.
There was a problem hiding this comment.
For the regular gRPC stream the return type is Optional<Long> and we return an empty option, so I think matching that would be okay.
There was a problem hiding this comment.
I think we should handle the case where there are 0 rows in the batch as well and treat it as no-op.
teodordelibasic-db
left a comment
There was a problem hiding this comment.
Perhaps we can include some UTs for scenarios that don't need mock server or env vars, like null batch rejection and serialization/deserialization.
| */ | ||
| public long ingestBatch(VectorSchemaRoot batch) throws ZerobusException { | ||
| if (batch == null) { | ||
| throw new ZerobusException("Batch must not be null"); |
There was a problem hiding this comment.
I think we should handle the case where there are 0 rows in the batch as well and treat it as no-op.
| * @throws ZerobusException if an error occurs during close | ||
| */ | ||
| @Override | ||
| public void close() throws ZerobusException { |
There was a problem hiding this comment.
We can investigate this further and fix in follow up PR since it exists for regular stream as well, but LLM found two issues in close:
- Two threads calling
close()concurrently can both read a non-zero handle, leading to doublenativeDestroy(use-after-free). Same risk ifingestBatchraces withclose—ensureOpen()passes, then the handle gets freed underneath. I don't think this is of a high priority since we say streams are not supposed to be used by multiple threads. - If
nativeDestroythrows, the native resource leaks forever.nativeHandleis set to 0, but underlying native memory allocated by the Rust SDK is never freed.
Something like this should fix maybe:
private final AtomicLong nativeHandle = new AtomicLong(0);
public void close() throws ZerobusException {
long handle = nativeHandle.getAndSet(0);
if (handle == 0) return;
try {
nativeClose(handle);
try {
cachedUnackedBatches = nativeGetUnackedBatches(handle);
} catch (Exception e) {
logger.warn("Failed to cache unacked batches: {}", e.getMessage());
cachedUnackedBatches = new ArrayList<>();
}
} finally {
nativeDestroy(handle);
}
}There was a problem hiding this comment.
yeah I saw its the same for regular streams, we mention that it isn't thread safe for both regular/arrow streams currently. I can do this in a follow up PR
|
|
||
| // Cache unacked batches before destroying the handle (for recreateArrowStream) | ||
| try { | ||
| cachedUnackedBatches = nativeGetUnackedBatches(handle); |
There was a problem hiding this comment.
One more similar potential pre-existing issue that is not of a high priority. If nativeGetUnackedBatches fails after nativeClose succeeds, the batches are gone — close() already flushed and shut down the stream, and the cache is set to an empty list. When the user later calls recreateArrowStream, it thinks there's nothing to re-ingest. Data is silently lost. Potential fix is to store the exception and surface it when getUnackedBatches() is called on the closed stream:
private volatile Exception cachedUnackedBatchesError;
// In close():
try {
cachedUnackedBatches = nativeGetUnackedBatches(handle);
} catch (Exception e) {
logger.warn("Failed to cache unacked batches: {}", e.getMessage());
cachedUnackedBatchesError = e;
cachedUnackedBatches = null;
}
// In getUnackedBatches():
public List<byte[]> getUnackedBatches() throws ZerobusException {
if (nativeHandle.get() == 0) {
if (cachedUnackedBatchesError != null) {
throw new ZerobusException(
"Failed to retrieve unacked batches during close: " + cachedUnackedBatchesError.getMessage(),
cachedUnackedBatchesError);
}
return cachedUnackedBatches != null ? cachedUnackedBatches : new ArrayList<>();
}
return nativeGetUnackedBatches(nativeHandle.get());
}| * | ||
| * <p>Package-private for use by {@link ZerobusSdk#createArrowStream}. | ||
| */ | ||
| static byte[] serializeSchemaToIpc(Schema schema) throws ZerobusException { |
There was a problem hiding this comment.
Improvement could be to reuse the RootAllocator since it's heavyweight. Since it's created only during stream creation the performance impact is negligible probably. We can either pass a BufferAllocator to this function from the caller or have a static one.
There was a problem hiding this comment.
made it a static allocator for every create stream, thanks. The other things ill address in a follow up as they apply for the current non-arrow implementation as well
| * @param maxInflightBatches the maximum number of in-flight batches | ||
| * @return this builder for method chaining | ||
| */ | ||
| public ArrowStreamConfigurationOptionsBuilder setMaxInflightBatches(int maxInflightBatches) { |
There was a problem hiding this comment.
Both for Arrow and regular options we could include validations that some of the values are not for example less then 0 and throw if true. Can be handled in some future PR also.
|
Looks good, a couple of potential pre-existing follow up edge cases found by LLM. We can handle the null/empty batch case. |
ca41bf2 to
dfd33a9
Compare
elenagaljak-db
left a comment
There was a problem hiding this comment.
Please add some details about Arrow support into Java readme and example level readme
What changes are proposed in this pull request?
Adds Arrow Flight ingestion to the Java SDK. The Java side accepts VectorSchemaRoot directly, serializes to IPC bytes internally, and passes them across the JNI boundary where the Rust SDK handles encoding, framing, and transmission over the Arrow Flight gRPC protocol.
How is this tested?
Added arrow flight integration tests