Skip to content

Commit e574077

Browse files
committed
fix
1 parent 8e8335a commit e574077

3 files changed

Lines changed: 43 additions & 38 deletions

File tree

backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,6 @@ object VeloxBroadcastBuildSideCache
4848
VeloxBackendSettings.GLUTEN_VELOX_BROADCAST_CACHE_EXPIRED_TIME_DEFAULT
4949
)
5050

51-
// Track released pointers to prevent double free
52-
private val releasedPointers = ConcurrentHashMap.newKeySet[Long]()
53-
5451
// Use for controlling to build bhj hash table once.
5552
// key: hashtable id, value is hashtable backend pointer(long to string).
5653
private val buildSideRelationCache: Cache[String, BroadcastHashTable] =
@@ -98,9 +95,9 @@ object VeloxBroadcastBuildSideCache
9895
def cleanAll(): Unit = buildSideRelationCache.invalidateAll()
9996

10097
override def onRemoval(key: String, value: BroadcastHashTable, cause: RemovalCause): Unit = {
101-
// Use ConcurrentHashMap.add() which returns false if already present
102-
// This ensures only one thread can successfully mark the pointer as released
103-
if (releasedPointers.add(value.pointer)) {
98+
// Synchronize on the value object to ensure only one thread can release this specific hash table
99+
// This prevents concurrent calls to clearHashTable which can corrupt memory pool state
100+
value.synchronized {
104101
logWarning(s"Remove bhj $key = ${value.pointer}")
105102
if (value.relation != null) {
106103
value.relation match {
@@ -111,9 +108,9 @@ object VeloxBroadcastBuildSideCache
111108
}
112109
}
113110

111+
// clearHashTable must be called under synchronization to prevent
112+
// concurrent access to the same hash table's memory pool
114113
HashJoinBuilder.clearHashTable(value.pointer)
115-
} else {
116-
logWarning(s"Skip already released bhj $key = ${value.pointer}")
117114
}
118115
}
119116
}

backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -204,21 +204,25 @@ case class ColumnarBuildSideRelation(
204204
}.toArray
205205

206206
// Build the hash table
207-
hashTableData = HashJoinBuilder
208-
.nativeBuild(
209-
broadcastContext.buildHashTableId,
210-
batchArray.toArray,
211-
joinKeys,
212-
broadcastContext.substraitJoinType.ordinal(),
213-
broadcastContext.hasMixedFiltCondition,
214-
broadcastContext.isExistenceJoin,
215-
SubstraitUtil.toNameStruct(newOutput).toByteArray,
216-
broadcastContext.isNullAwareAntiJoin,
217-
broadcastContext.bloomFilterPushdownSize,
218-
broadcastContext.broadcastHashTableBuildThreads
219-
)
220-
221-
jniWrapper.close(serializeHandle)
207+
try {
208+
hashTableData = HashJoinBuilder
209+
.nativeBuild(
210+
broadcastContext.buildHashTableId,
211+
batchArray.toArray,
212+
joinKeys,
213+
broadcastContext.substraitJoinType.ordinal(),
214+
broadcastContext.hasMixedFiltCondition,
215+
broadcastContext.isExistenceJoin,
216+
SubstraitUtil.toNameStruct(newOutput).toByteArray,
217+
broadcastContext.isNullAwareAntiJoin,
218+
broadcastContext.bloomFilterPushdownSize,
219+
broadcastContext.broadcastHashTableBuildThreads
220+
)
221+
} finally {
222+
// Release batch handles after building hash table
223+
batchArray.foreach(ColumnarBatches.release)
224+
jniWrapper.close(serializeHandle)
225+
}
222226
(hashTableData, this)
223227
} else {
224228
(HashJoinBuilder.cloneHashTable(hashTableData), null)

backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -174,21 +174,25 @@ class UnsafeColumnarBuildSideRelation(
174174
}.toArray
175175

176176
// Build the hash table
177-
hashTableData = HashJoinBuilder
178-
.nativeBuild(
179-
broadcastContext.buildHashTableId,
180-
batchArray.toArray,
181-
joinKeys,
182-
broadcastContext.substraitJoinType.ordinal(),
183-
broadcastContext.hasMixedFiltCondition,
184-
broadcastContext.isExistenceJoin,
185-
SubstraitUtil.toNameStruct(newOutput).toByteArray,
186-
broadcastContext.isNullAwareAntiJoin,
187-
broadcastContext.bloomFilterPushdownSize,
188-
broadcastContext.broadcastHashTableBuildThreads
189-
)
190-
191-
jniWrapper.close(serializeHandle)
177+
try {
178+
hashTableData = HashJoinBuilder
179+
.nativeBuild(
180+
broadcastContext.buildHashTableId,
181+
batchArray.toArray,
182+
joinKeys,
183+
broadcastContext.substraitJoinType.ordinal(),
184+
broadcastContext.hasMixedFiltCondition,
185+
broadcastContext.isExistenceJoin,
186+
SubstraitUtil.toNameStruct(newOutput).toByteArray,
187+
broadcastContext.isNullAwareAntiJoin,
188+
broadcastContext.bloomFilterPushdownSize,
189+
broadcastContext.broadcastHashTableBuildThreads
190+
)
191+
} finally {
192+
// Release batch handles after building hash table
193+
batchArray.foreach(ColumnarBatches.release)
194+
jniWrapper.close(serializeHandle)
195+
}
192196
(hashTableData, this)
193197
} else {
194198
(HashJoinBuilder.cloneHashTable(hashTableData), null)

0 commit comments

Comments
 (0)