From bfd16b3cee29448606910a59b867e9b48e386661 Mon Sep 17 00:00:00 2001 From: sychen Date: Mon, 9 Mar 2026 20:05:31 +0800 Subject: [PATCH 1/2] fix --- .../service/deploy/worker/storage/Flusher.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala index 40daca1df68..d0f09262258 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala @@ -19,7 +19,8 @@ package org.apache.celeborn.service.deploy.worker.storage import java.io.IOException import java.util.concurrent.{ExecutorService, LinkedBlockingQueue, TimeUnit} -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLongArray} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLongArray} +import java.util.function.IntUnaryOperator import scala.util.Random @@ -48,7 +49,7 @@ abstract private[worker] class Flusher( protected val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount) protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]() protected val workers = new Array[ExecutorService](threadCount) - protected var nextWorkerIndex: Int = 0 + protected val nextWorkerIndex: AtomicInteger = new AtomicInteger(0) val lastBeginFlushTime: AtomicLongArray = new AtomicLongArray(threadCount) val stopFlag = new AtomicBoolean(false) @@ -104,9 +105,10 @@ abstract private[worker] class Flusher( ThreadPoolSource.registerSource(s"$this", workers) } - def getWorkerIndex: Int = synchronized { - nextWorkerIndex = (nextWorkerIndex + 1) % threadCount - nextWorkerIndex + def getWorkerIndex: Int = { + nextWorkerIndex.updateAndGet(new IntUnaryOperator { + override def applyAsInt(i: Int): Int = (i + 1) % threadCount + }) } def takeBuffer(): CompositeByteBuf = { From a51a4af16e867fc4756de07b1f8f1d24e5fb2c10 Mon Sep 17 00:00:00 2001 From: cxzl25 <3898450+cxzl25@users.noreply.github.com> Date: Thu, 19 Mar 2026 20:15:14 +0800 Subject: [PATCH 2/2] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../celeborn/service/deploy/worker/storage/Flusher.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala index d0f09262258..f5bd4b0d779 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala @@ -50,6 +50,9 @@ abstract private[worker] class Flusher( protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]() protected val workers = new Array[ExecutorService](threadCount) protected val nextWorkerIndex: AtomicInteger = new AtomicInteger(0) + private val workerIndexUpdater: IntUnaryOperator = new IntUnaryOperator { + override def applyAsInt(i: Int): Int = (i + 1) % threadCount + } val lastBeginFlushTime: AtomicLongArray = new AtomicLongArray(threadCount) val stopFlag = new AtomicBoolean(false) @@ -106,9 +109,7 @@ abstract private[worker] class Flusher( } def getWorkerIndex: Int = { - nextWorkerIndex.updateAndGet(new IntUnaryOperator { - override def applyAsInt(i: Int): Int = (i + 1) % threadCount - }) + nextWorkerIndex.updateAndGet(workerIndexUpdater) } def takeBuffer(): CompositeByteBuf = {