Introduce OutputBufferManagerRegistry for extensible multi-manager support#1843
Open
karthikeyann wants to merge 188 commits into
Open
Introduce OutputBufferManagerRegistry for extensible multi-manager support#1843karthikeyann wants to merge 188 commits into
karthikeyann wants to merge 188 commits into
Conversation
Co-authored-by: Daniel Bauer <dnb@zurich.ibm.com> Co-authored-by: Sean Rooney <sro@zurich.ibm.com>
Besides some minor structural improvements, the main change in this PR are better logs. The existing logs are pretty useless in a multi-driver situation as they don't specify in which pipeline they occurred. Since the multi-driver-bug won't be the last, I think we should keep these logs for the moment.
Update formatting.
UCXX/UCX is not stream-aware, so without synchronizing the CUDA stream after contiguous_split(), data could be sent via tagSend before the GPU kernels have finished writing to the buffers. The single-partition path already had this sync. Add it to the multi-partition path as well. Review: @wence- comment #1
The DataAndMetadata struct has a stream field that is used in onData() to create the PackedTableWithStream, but the stream was never stored after being obtained from the pool. This meant onData() would use an uninitialized stream view. Set ptr->stream = stream immediately after obtaining the stream from the global pool, before the allocation try/catch. Review: @wence- comment #2
In publish(), the future was obtained from retrievedPromise outside the entryLock scope, creating a race with the consumer. Move get_future() inside the entryLock scope. In poll() and waitFor(), entry fields (data, atEnd, retrievedPromise) were accessed after releasing the entryMutex, creating a race where another thread could modify the entry between the lock release and the field access. Extend the entryLock scope to cover the entire retrieval operation including set_value(). Review: @pentschev comment #5
The CUDA stream used to produce data was lost during intra-node transfer. The source would allocate a new stream from the pool instead of using the producer's stream, breaking stream ordering guarantees. Add IntraNodeTransferResult struct to bundle data + stream + atEnd. Add stream parameter to publish() and stream field to IntraNodeTransferEntry. Update poll()/waitFor() to return IntraNodeTransferResult. Add [[nodiscard]] to publish/poll/waitFor. In CudfExchangeServer, pass rmm::cuda_stream_default to publish() since data is already synchronized from the producer's stream.sync(). In CudfExchangeSource, use result->stream (the producer's stream) in onIntraNodeData instead of getting a new stream from the pool. Review: @wence- comments #3 + #4, @pentschev facebookincubator#12
EndpointRef::onClose is a UCX callback. It was calling Communicator::removeEndpointRef which calls endpoint->closeBlocking(). closeBlocking() internally progresses the UCX worker, which is not allowed from within a callback and can deadlock. Add deferEndpointCleanup() that pushes the endpoint to a work queue. Process the deferred cleanup queue in the main run() loop before the normal work queue. Replace the direct removeEndpointRef call in the onClose callback with deferEndpointCleanup. Review: @pentschev comment #8
worker_->flush() only applies to RMA (Remote Memory Access) operations like ucp_put/ucp_get. This code only uses tag send/recv and active messages, so the flush call is unnecessary. Review: @pentschev comment #7
There was no validation that the serialized metadata size fits in kMetaBufSize. If cudfMetadata was larger than expected, serialize() would write past the end of the buffer. Add VELOX_CHECK_LE to fail fast with a clear error message instead of silently corrupting memory. Review: @pentschev comment facebookincubator#10
The 4KB metadata buffer limit (kMetaBufSize) was too small for tables with many columns, where cudf metadata alone can exceed 4KB. Wide tables would silently truncate metadata, causing data corruption. Rename kMetaBufSize to kMaxMetaBufSize, increase to 1MB (enough for ~10,000+ columns). Add kMetaHeaderSize constant. Change serialize() to allocate exact size needed instead of fixed buffer, reducing memory waste for small metadata. Add const to getSerializedSize(). Update CudfExchangeSource to use kMaxMetaBufSize for the receive buffer. Review: @pentschev comment #9
strncpy does not null-terminate the destination buffer when the source string length equals or exceeds the destination size. If taskId was exactly 256 chars or longer, the buffer would not be null-terminated, leading to buffer overread when the server reads the string. Copy at most sizeof-1 bytes and explicitly null-terminate. Review: @pentschev comment facebookincubator#11
The old code checked handshakePtr != nullptr after reinterpret_cast, which is meaningless (reinterpret_cast of a non-null pointer never returns null). The real risk is the buffer being too small for the cast, which would cause reading past buffer bounds. Add null check on buffer itself, and validate buffer->getSize() >= sizeof(HandshakeMsg) BEFORE the reinterpret_cast. Remove the old meaningless null check after the cast. Review: @pentschev comment facebookincubator#17
Use sizeof(variable) instead of sizeof(type) throughout the metadata serialization/deserialization code and protocol messages. This is safer because if the type of a variable changes, sizeof automatically picks up the new size. In CudfExchangeProtocol.h: - getSerializedSize(): sizeof(kMagicNumber), sizeof(totalSize), sizeof(cudfSize), sizeof(dataSizeBytes), sizeof(remainingBytes[0]) - serialize(): sizeof(kMagicNumber), sizeof(totalSize), sizeof(cudfSize), sizeof(dataSizeBytes), sizeof(numRemaining), sizeof(remainingBytes[0]) - deserializeMetadataMsg(): sizeof(magicNumber), sizeof(totalSize), sizeof(metaSize), sizeof(record.dataSizeBytes), sizeof(numRemaining), sizeof(record.remainingBytes[0]) In Acceptor.cpp: sizeof(*response) instead of sizeof(HandshakeResponse) In CudfExchangeSource.cpp: sizeof(*handshakeReq) instead of sizeof(HandshakeMsg), sizeof(*responseBuffer) instead of sizeof(HandshakeResponse) Review: @pentschev comment facebookincubator#13
The intra-node detection was comparing listener bind addresses, which failed when both VMs bind to 0.0.0.0:18013 — causing all inter-VM transfers to incorrectly use the process-local IntraNodeTransferRegistry instead of UCXX, resulting in infinite polling loops and hangs. Fix: Use server-side peer IP verification instead of client-reported listener addresses. The server now: 1. Captures the peer's actual IP from the connection request 2. Enumerates all local IPs using getifaddrs() 3. Checks if peer IP is in the local IP set Changes: - EndpointRef: Store peer IP from connection - NetUtil: Add getLocalIpAddresses() using getifaddrs()/freeifaddrs() - Acceptor: Server-side intra-node check using peer IP - CudfExchangeServer: Simplified to take bool isIntraNodeTransfer - HandshakeMsg: Remove unused sourceListenerIp/Port fields - CudfExchangeSource: Remove listener address population from handshake
CudfExchangeQueue determines completion when numCompleted_ == numSources_. Each source must enqueue(nullptr) for numCompleted_ to increment. Multiple code paths transition a source to Done without delivering the end marker, causing the consumer to hang forever. Add deliverEndMarker() with atomic CAS (endMarkerDelivered_) to guarantee exactly-once nullptr delivery. Add registered_ guard to prevent spurious numCompleted_ increments for sources created after client close. Updated all 16 code paths: close(), 6 callback early-returns, 7 error paths, and 2 normal atEnd paths. Standardized ordering to always call deliverEndMarker() before setState(Done). Bug: Consumer hangs when any source fails to deliver nullptr end marker
CudfOutputQueueManager::getData() creates a stub queue when called before the task exists. If the producer fails before initializeTask() or noMoreData(), the getData callback is orphaned — it never fires. The CudfExchangeServer is stuck in WaitingForDataFromQueue forever, never sends an end marker, and the consumer hangs. Rewrite CudfOutputQueue::terminate() to fire all pending getData callbacks with nullptr end markers and release outstanding producer promises. Follows the same pattern as checkIfDone(): collect under lock, notify outside lock. Safe against double-fire via enqueueBack() deduplication, getAndClearNotify() null-callback return, and notify() null guard. Bug: Consumer hangs when producer task fails before noMoreData()
EndpointRef::onClose() ran in the UCX progress thread and iterated communicators_ while addCommElem/removeCommElem could modify it from the main thread — a data race. Additionally, close() called from within the iteration could trigger removeCommElem, causing double-erase and iterator invalidation. Fix: Add commMutex_ to protect communicators_. Rewrite onClose() to only defer (no iteration — all work pushed to deferEndpointCleanup). Add closeAndDrainCommunicators() using swap-and-drain pattern: swap the set into a local copy under the lock, then iterate the copy without the lock. This eliminates data races, re-entrancy, and progress-thread blocking. Bug: Data race on communicators_ between UCX callback and main thread
When DDL column names differ from Parquet file column names (e.g., lowercase "_col_0" in DDL vs uppercase "_COL_0" in Parquet, or completely different names), the cuDF connector would return empty results because cuDF requires exact column name matches. This fix reads Parquet metadata to get actual column names and builds an index-based mapping from logical (DDL) names to physical (Parquet) names. The mapping assumes column order is preserved between DDL and Parquet schemas. Changes: - Add buildColumnNameMapping() to read Parquet schema and build mapping - Add mapToPhysicalColumnNames() for O(1) name translation - Apply mapping to column projection in set_columns() - Apply mapping to filter expressions in experimental reader path - Graceful fallback to original names if metadata read fails Limitations: - Only handles top-level columns (nested columns not supported) - Requires column order to match between DDL and Parquet
…ication Implement CudfTopNRowNumber operator that accelerates the common CDC deduplication pattern: ROW_NUMBER() OVER (PARTITION BY keys ORDER BY timestamp DESC NULLS FIRST) WHERE rn = 1 Algorithm for N=1: 1. Sort by (partition_keys, sort_keys) using cudf::stable_sorted_order() 2. Use cudf::unique() with KEEP_FIRST on partition keys Key features: - Only supports limit=1 with row_number function (other cases fail) - Handles all sort orders (ASC/DESC, NULLS FIRST/LAST) - Supports optional row_number output column - Type checking rejects unsupported complex types (ARRAY, MAP, ROW) Files added: - CudfTopNRowNumber.h/cpp: Operator implementation - TopNRowNumberTest.cpp: 15 comprehensive test cases Files modified: - ToCudf.cpp: Integration with operator replacement - CMakeLists.txt: Build configuration
Co-authored-by: Daniel Bauer <dnb@zurich.ibm.com> Co-authored-by: Sean Rooney <sro@zurich.ibm.com> Co-authored-by: Zoltan Arnold Nagy <nag@zurich.ibm.com> Co-authored-by: Luis Garcés-Eruce <lga@zurich.ibm.com>
(cherry picked from commit 79ac017) (cherry picked from commit 0f7869da0bfb1b4a5db0bd72cb8b847eeee85398) (cherry picked from commit 0727a8c2cef50c935789507e6cc1ad905c79defc)
(cherry picked from commit b9599ce711203ebd86ba3da6bc0275547ca5a439) (cherry picked from commit 351b7fd7cb38a5ddb17f1dc32834922601381a3c)
Pass the output node's kind (partitioned/broadcast) to CudfOutputQueueManager::initializeTask() and forward Task::updateOutputBuffers() calls to the cudf queue manager so that broadcast destination count updates reach the cudf exchange layer.
OperatorAdapters.cpp: - Exchange, MergeExchange and PartitionedOutput adapters now check the TransportType annotation on the plan node. Only nodes annotated with kUcx are replaced with cudf/HybridExchange operators; HTTP transport preserves the original Velox operators. - Remove ExchangeClientFacade indirection; store CudfExchangeClient instances directly in a weak_ptr map keyed by (taskId, pipelineId) with stale-entry cleanup. - Add const_cast documentation for the released Exchange operator. - Add keepOperator/canRunOnGPU interaction docs for the HTTP path. ToCudf.h: - Move TaskPipelineKey (renamed from TaskPlanNodeKey) and CudfExchangeClientMap type alias here so they are shared across translation units. - Replace ExchangeClientFacade include with CudfExchangeClient. ToCudf.cpp: - Remove stale ExchangeClientFacade include.
(cherry picked from commit 2221bd3149f6da98ed5acc35f86c0b3f1f8e17e3)
Add test coverage for CAST(date AS varchar) on the cuDF GPU path: - 6 unit tests (castDateToVarchar*) covering normal dates, epoch, year 0001 (SAP null date), pre-epoch, NULL propagation, and try_cast - 1 end-to-end test (castDateToVarcharQuery1Pattern) verifying the full Query 1 CASE WHEN + CAST + string equality pipeline with mixed dates and NULLs Tests currently fail as expected — the DATE→VARCHAR cast implementation (Patch 2) has not been added yet.
Extend the cuDF expression evaluator to handle CAST(date AS varchar) using cudf::strings::from_timestamps with "%Y-%m-%d" format, matching Presto's ISO 8601 output. Changes: - ExpressionEvaluator.cpp: Add CastMode::kDateToString to CastFunction, extend canEvaluate to accept DATE→VARCHAR for both cast and try_cast - AstExpressionUtils.h: Allow cast types unsupported in pure AST (e.g. DATE→VARCHAR) to fall through to the precompute path instead of hard-failing, enabling the FunctionExpression path to handle them
Add test coverage for date_format() on the cuDF GPU path: - 11 unit tests covering supported format specifiers (%Y, %m, %d, %H, %i, %s, %h, %I, %f, %j, %y, %T, %%), NULL propagation, and the SAP YYYYMMDD use case - 1 end-to-end test (dateFormatQuery2Pattern) verifying the full Query 2 CASE WHEN + date_format + CAST(date AS timestamp) + string equality pipeline Tests currently fail as expected — the date_format GPU implementation (Patch 1) has not been added yet.
Implement Presto's date_format() function on the cuDF GPU path using cudf::strings::from_timestamps with Presto-to-cuDF format translation. Changes: - Add prestoToCudfDateFormat() translator supporting %Y, %y, %m, %d, %H, %I, %S, %f, %j, %i->%M, %s->%S, %h->%I, %T->%H:%M:%S, %% - Add DateFormatFunction class using from_timestamps with translated format and empty names column - Register date_format with (timestamp, constant varchar) -> varchar - Add canEvaluate validation to fall back to CPU for unsupported format specifiers (name-dependent %M/%W/%a/%b/%p/%r, unpadded %c/%e/%k/%l, ISO week %v/%x) - Fix tests: year 0001 can't be used with date_format because CAST(date AS timestamp) overflows nanosecond int64 range (~1677-2262)
Extend the cuDF expression evaluator to handle CAST(integer AS varchar) for all integer types (TINYINT, SMALLINT, INTEGER, BIGINT) using cudf::strings::from_integers. This is needed for SAP time columns (ERZET, LFUHR, TDUHR, etc.) stored as INTEGER in INT tables and cast to VARCHAR in view definitions. Changes: - ExpressionEvaluator.cpp: Add CastMode::kIntToString, extend canEvaluate for all integer types to VARCHAR - FilterProjectTest.cpp: Add 8 tests covering int32, int64, int16, int8, negative values, zero, NULL propagation, and try_cast
… PartitionedOutputNode Test serialize/deserialize for kHttp and kUcx transport types, and verify backward compatibility: missing transportType field defaults to kHttp.
1a2f63f to
84fa81e
Compare
84fa81e to
0ac4367
Compare
0ac4367 to
42bd4a2
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Add an abstract IOutputBufferManager interface and a static OutputBufferManagerRegistry to decouple Task from concrete buffer manager implementations.
Key changes: