From c084ef93410aeba0d3bdb951cbc5ead6e856b093 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 2 Apr 2026 14:19:55 +0800 Subject: [PATCH 1/2] fix --- .../pipe/coordinator/task/PipeTaskCoordinator.java | 14 ++------------ .../subscription/SubscriptionCoordinator.java | 10 ++-------- 2 files changed, 4 insertions(+), 20 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java index f48c64c3ea258..ee1ccd2a8fba4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java @@ -82,19 +82,9 @@ public AtomicReference lock() { /** * Unlock the pipe task coordinator. Calling this method will clear the pipe task info holder, * which means that the holder will be null after calling this method. - * - * @return {@code true} if successfully unlocked, {@code false} if current thread is not holding - * the lock. */ - public boolean unlock() { - try { - pipeTaskCoordinatorLock.unlock(); - return true; - } catch (IllegalMonitorStateException ignored) { - // This is thrown if unlock() is called without lock() called first. - LOGGER.warn("This thread is not holding the lock."); - return false; - } + public void unlock() { + pipeTaskCoordinatorLock.unlock(); } public boolean isLocked() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java index 28c596de7ba68..ac88f31f92128 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java @@ -105,14 +105,8 @@ public boolean unlock() { subscriptionInfoHolder = null; } - try { - coordinatorLock.unlock(); - return true; - } catch (IllegalMonitorStateException ignored) { - // This is thrown if unlock() is called without lock() called first. - LOGGER.warn("This thread is not holding the lock."); - return false; - } + coordinatorLock.unlock(); + return true; } public boolean isLocked() { From c80536a2cf1e00c6bbbc233b15849d8af65f4317 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 2 Apr 2026 14:25:38 +0800 Subject: [PATCH 2/2] Update PipeTaskCoordinatorLock.java --- .../task/PipeTaskCoordinatorLock.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java index 12b9261900491..b86c556f20df8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java @@ -22,24 +22,29 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; /** - * {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task coordinator. It is used to + * {@link PipeTaskCoordinatorLock} is a cross-thread lock for pipe task coordinator. It is used to * ensure that only one thread can execute the pipe task coordinator at the same time. + * + *

Uses {@link Semaphore} instead of {@link java.util.concurrent.locks.ReentrantLock} to support + * cross-thread acquire/release, which is required by the procedure recovery mechanism: locks may be + * acquired on the StateMachineUpdater thread during {@code restoreLock()} and released on a + * ProcedureCoreWorker thread after execution. */ public class PipeTaskCoordinatorLock { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskCoordinatorLock.class); - private final ReentrantLock lock = new ReentrantLock(); + private final Semaphore semaphore = new Semaphore(1); public void lock() { LOGGER.debug( "PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName()); try { - lock.lockInterruptibly(); + semaphore.acquire(); LOGGER.debug( "PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName()); } catch (final InterruptedException e) { @@ -54,7 +59,7 @@ public boolean tryLock() { try { LOGGER.debug( "PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName()); - if (lock.tryLock(10, TimeUnit.SECONDS)) { + if (semaphore.tryAcquire(10, TimeUnit.SECONDS)) { LOGGER.debug( "PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName()); return true; @@ -74,12 +79,12 @@ public boolean tryLock() { } public void unlock() { - lock.unlock(); + semaphore.release(); LOGGER.debug( "PipeTaskCoordinator lock released by thread {}", Thread.currentThread().getName()); } public boolean isLocked() { - return lock.isLocked(); + return semaphore.availablePermits() == 0; } }