diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala index 2705f3b34cbf..b3039ad861ce 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala @@ -57,7 +57,7 @@ object VeloxBroadcastBuildSideCache def getOrBuildBroadcastHashTable( broadcast: Broadcast[BuildSideRelation], - broadcastContext: BroadcastHashJoinContext): BroadcastHashTable = synchronized { + broadcastContext: BroadcastHashJoinContext): BroadcastHashTable = { buildSideRelationCache .get( @@ -77,14 +77,13 @@ object VeloxBroadcastBuildSideCache } /** This is callback from c++ backend. */ - def get(broadcastHashtableId: String): Long = - synchronized { - Option(buildSideRelationCache.getIfPresent(broadcastHashtableId)) - .map(_.pointer) - .getOrElse(0) - } + def get(broadcastHashtableId: String): Long = { + Option(buildSideRelationCache.getIfPresent(broadcastHashtableId)) + .map(_.pointer) + .getOrElse(0) + } - def invalidateBroadcastHashtable(broadcastHashtableId: String): Unit = synchronized { + def invalidateBroadcastHashtable(broadcastHashtableId: String): Unit = { // Cleanup operations on the backend are idempotent. buildSideRelationCache.invalidate(broadcastHashtableId) } @@ -94,8 +93,11 @@ object VeloxBroadcastBuildSideCache def cleanAll(): Unit = buildSideRelationCache.invalidateAll() - override def onRemoval(key: String, value: BroadcastHashTable, cause: RemovalCause): Unit = { - synchronized { + override def onRemoval(key: String, value: BroadcastHashTable, cause: RemovalCause): Unit = + VeloxBroadcastBuildSideCache.synchronized { + // Serialize all hash table cleanup operations to prevent concurrent + // clearHashTable calls that can corrupt the shared memory pool in + // Velox backend logWarning(s"Remove bhj $key = ${value.pointer}") if (value.relation != null) { value.relation match { @@ -108,5 +110,4 @@ object VeloxBroadcastBuildSideCache HashJoinBuilder.clearHashTable(value.pointer) } - } }