From a1827f7eea5b13ec873bfc8aada7a29b3186d543 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Thu, 26 Mar 2026 11:00:11 +0000 Subject: [PATCH 1/5] Remove the synchronized lock in VeloxBroadcastBuildSideCache --- .../VeloxBroadcastBuildSideCache.scala | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala index 2705f3b34cbf..b9ecb1d05570 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala @@ -57,7 +57,7 @@ object VeloxBroadcastBuildSideCache def getOrBuildBroadcastHashTable( broadcast: Broadcast[BuildSideRelation], - broadcastContext: BroadcastHashJoinContext): BroadcastHashTable = synchronized { + broadcastContext: BroadcastHashJoinContext): BroadcastHashTable = { buildSideRelationCache .get( @@ -77,14 +77,13 @@ object VeloxBroadcastBuildSideCache } /** This is callback from c++ backend. */ - def get(broadcastHashtableId: String): Long = - synchronized { - Option(buildSideRelationCache.getIfPresent(broadcastHashtableId)) - .map(_.pointer) - .getOrElse(0) - } + def get(broadcastHashtableId: String): Long = { + Option(buildSideRelationCache.getIfPresent(broadcastHashtableId)) + .map(_.pointer) + .getOrElse(0) + } - def invalidateBroadcastHashtable(broadcastHashtableId: String): Unit = synchronized { + def invalidateBroadcastHashtable(broadcastHashtableId: String): Unit = { // Cleanup operations on the backend are idempotent. buildSideRelationCache.invalidate(broadcastHashtableId) } @@ -95,18 +94,16 @@ object VeloxBroadcastBuildSideCache def cleanAll(): Unit = buildSideRelationCache.invalidateAll() override def onRemoval(key: String, value: BroadcastHashTable, cause: RemovalCause): Unit = { - synchronized { - logWarning(s"Remove bhj $key = ${value.pointer}") - if (value.relation != null) { - value.relation match { - case columnar: ColumnarBuildSideRelation => - columnar.reset() - case unsafe: UnsafeColumnarBuildSideRelation => - unsafe.reset() - } + logWarning(s"Remove bhj $key = ${value.pointer}") + if (value.relation != null) { + value.relation match { + case columnar: ColumnarBuildSideRelation => + columnar.reset() + case unsafe: UnsafeColumnarBuildSideRelation => + unsafe.reset() } - - HashJoinBuilder.clearHashTable(value.pointer) } + + HashJoinBuilder.clearHashTable(value.pointer) } } From 98121a4d33853ee7b8ef6222db87cd74bda40e0a Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Thu, 26 Mar 2026 13:45:44 +0000 Subject: [PATCH 2/5] fix --- .../VeloxBroadcastBuildSideCache.scala | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala index b9ecb1d05570..f1c10c3fe407 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.unsafe.UnsafeColumnarBuildSideRelation import com.github.benmanes.caffeine.cache.{Cache, Caffeine, RemovalCause, RemovalListener} +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit case class BroadcastHashTable(pointer: Long, relation: BuildSideRelation) @@ -47,6 +48,9 @@ object VeloxBroadcastBuildSideCache VeloxBackendSettings.GLUTEN_VELOX_BROADCAST_CACHE_EXPIRED_TIME_DEFAULT ) + // Track released pointers to prevent double free + private val releasedPointers = ConcurrentHashMap.newKeySet[Long]() + // Use for controlling to build bhj hash table once. // key: hashtable id, value is hashtable backend pointer(long to string). private val buildSideRelationCache: Cache[String, BroadcastHashTable] = @@ -94,16 +98,22 @@ object VeloxBroadcastBuildSideCache def cleanAll(): Unit = buildSideRelationCache.invalidateAll() override def onRemoval(key: String, value: BroadcastHashTable, cause: RemovalCause): Unit = { - logWarning(s"Remove bhj $key = ${value.pointer}") - if (value.relation != null) { - value.relation match { - case columnar: ColumnarBuildSideRelation => - columnar.reset() - case unsafe: UnsafeColumnarBuildSideRelation => - unsafe.reset() + // Use ConcurrentHashMap.add() which returns false if already present + // This ensures only one thread can successfully mark the pointer as released + if (releasedPointers.add(value.pointer)) { + logWarning(s"Remove bhj $key = ${value.pointer}") + if (value.relation != null) { + value.relation match { + case columnar: ColumnarBuildSideRelation => + columnar.reset() + case unsafe: UnsafeColumnarBuildSideRelation => + unsafe.reset() + } } - } - HashJoinBuilder.clearHashTable(value.pointer) + HashJoinBuilder.clearHashTable(value.pointer) + } else { + logWarning(s"Skip already released bhj $key = ${value.pointer}") + } } } From 7c6496db0450ed21c3f43c5c6f060e73839cedd3 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Thu, 26 Mar 2026 15:09:57 +0000 Subject: [PATCH 3/5] fix --- .../execution/VeloxBroadcastBuildSideCache.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala index f1c10c3fe407..1d2b15f04448 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.execution.unsafe.UnsafeColumnarBuildSideRelation import com.github.benmanes.caffeine.cache.{Cache, Caffeine, RemovalCause, RemovalListener} -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit case class BroadcastHashTable(pointer: Long, relation: BuildSideRelation) @@ -48,9 +47,6 @@ object VeloxBroadcastBuildSideCache VeloxBackendSettings.GLUTEN_VELOX_BROADCAST_CACHE_EXPIRED_TIME_DEFAULT ) - // Track released pointers to prevent double free - private val releasedPointers = ConcurrentHashMap.newKeySet[Long]() - // Use for controlling to build bhj hash table once. // key: hashtable id, value is hashtable backend pointer(long to string). private val buildSideRelationCache: Cache[String, BroadcastHashTable] = @@ -98,9 +94,9 @@ object VeloxBroadcastBuildSideCache def cleanAll(): Unit = buildSideRelationCache.invalidateAll() override def onRemoval(key: String, value: BroadcastHashTable, cause: RemovalCause): Unit = { - // Use ConcurrentHashMap.add() which returns false if already present - // This ensures only one thread can successfully mark the pointer as released - if (releasedPointers.add(value.pointer)) { + // Synchronize on the value object to ensure only one thread can release this specific hash table + // This prevents concurrent calls to clearHashTable which can corrupt memory pool state + value.synchronized { logWarning(s"Remove bhj $key = ${value.pointer}") if (value.relation != null) { value.relation match { @@ -111,9 +107,9 @@ object VeloxBroadcastBuildSideCache } } + // clearHashTable must be called under synchronization to prevent + // concurrent access to the same hash table's memory pool HashJoinBuilder.clearHashTable(value.pointer) - } else { - logWarning(s"Skip already released bhj $key = ${value.pointer}") } } } From 435738a19b6e1b92f2b0567726290953e0993c71 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Thu, 26 Mar 2026 15:27:23 +0000 Subject: [PATCH 4/5] fix --- .../gluten/execution/VeloxBroadcastBuildSideCache.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala index 1d2b15f04448..fab4a1f1577a 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala @@ -94,8 +94,9 @@ object VeloxBroadcastBuildSideCache def cleanAll(): Unit = buildSideRelationCache.invalidateAll() override def onRemoval(key: String, value: BroadcastHashTable, cause: RemovalCause): Unit = { - // Synchronize on the value object to ensure only one thread can release this specific hash table - // This prevents concurrent calls to clearHashTable which can corrupt memory pool state + // Synchronize on the value object to ensure only one thread can release this specific + // hash table This prevents concurrent calls to clearHashTable which can corrupt + // memory pool state. value.synchronized { logWarning(s"Remove bhj $key = ${value.pointer}") if (value.relation != null) { From 35de5e3e47faae23f0a5d72da5bc50c5ae40f1f9 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Thu, 26 Mar 2026 16:08:44 +0000 Subject: [PATCH 5/5] fix --- .../execution/VeloxBroadcastBuildSideCache.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala index fab4a1f1577a..b3039ad861ce 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala @@ -93,11 +93,11 @@ object VeloxBroadcastBuildSideCache def cleanAll(): Unit = buildSideRelationCache.invalidateAll() - override def onRemoval(key: String, value: BroadcastHashTable, cause: RemovalCause): Unit = { - // Synchronize on the value object to ensure only one thread can release this specific - // hash table This prevents concurrent calls to clearHashTable which can corrupt - // memory pool state. - value.synchronized { + override def onRemoval(key: String, value: BroadcastHashTable, cause: RemovalCause): Unit = + VeloxBroadcastBuildSideCache.synchronized { + // Serialize all hash table cleanup operations to prevent concurrent + // clearHashTable calls that can corrupt the shared memory pool in + // Velox backend logWarning(s"Remove bhj $key = ${value.pointer}") if (value.relation != null) { value.relation match { @@ -108,9 +108,6 @@ object VeloxBroadcastBuildSideCache } } - // clearHashTable must be called under synchronization to prevent - // concurrent access to the same hash table's memory pool HashJoinBuilder.clearHashTable(value.pointer) } - } }