Skip to content

Commit 28c4e02

Browse files
authored
[To dev/1.3] Fix the deadlock at ConfigNode PipeTaskCoordinatorLock (#17233) (#17424)
* fix * Update PipeTaskCoordinatorLock.java
1 parent 03cc1bd commit 28c4e02

File tree

3 files changed

+16
-27
lines changed

3 files changed

+16
-27
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,19 +82,9 @@ public AtomicReference<PipeTaskInfo> lock() {
8282
/**
8383
* Unlock the pipe task coordinator. Calling this method will clear the pipe task info holder,
8484
* which means that the holder will be null after calling this method.
85-
*
86-
* @return {@code true} if successfully unlocked, {@code false} if current thread is not holding
87-
* the lock.
8885
*/
89-
public boolean unlock() {
90-
try {
91-
pipeTaskCoordinatorLock.unlock();
92-
return true;
93-
} catch (IllegalMonitorStateException ignored) {
94-
// This is thrown if unlock() is called without lock() called first.
95-
LOGGER.warn("This thread is not holding the lock.");
96-
return false;
97-
}
86+
public void unlock() {
87+
pipeTaskCoordinatorLock.unlock();
9888
}
9989

10090
public boolean isLocked() {

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,29 @@
2222
import org.slf4j.Logger;
2323
import org.slf4j.LoggerFactory;
2424

25+
import java.util.concurrent.Semaphore;
2526
import java.util.concurrent.TimeUnit;
26-
import java.util.concurrent.locks.ReentrantLock;
2727

2828
/**
29-
* {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task coordinator. It is used to
29+
* {@link PipeTaskCoordinatorLock} is a cross-thread lock for pipe task coordinator. It is used to
3030
* ensure that only one thread can execute the pipe task coordinator at the same time.
31+
*
32+
* <p>Uses {@link Semaphore} instead of {@link java.util.concurrent.locks.ReentrantLock} to support
33+
* cross-thread acquire/release, which is required by the procedure recovery mechanism: locks may be
34+
* acquired on the StateMachineUpdater thread during {@code restoreLock()} and released on a
35+
* ProcedureCoreWorker thread after execution.
3136
*/
3237
public class PipeTaskCoordinatorLock {
3338

3439
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskCoordinatorLock.class);
3540

36-
private final ReentrantLock lock = new ReentrantLock();
41+
private final Semaphore semaphore = new Semaphore(1);
3742

3843
public void lock() {
3944
LOGGER.debug(
4045
"PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName());
4146
try {
42-
lock.lockInterruptibly();
47+
semaphore.acquire();
4348
LOGGER.debug(
4449
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
4550
} catch (final InterruptedException e) {
@@ -54,7 +59,7 @@ public boolean tryLock() {
5459
try {
5560
LOGGER.debug(
5661
"PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName());
57-
if (lock.tryLock(10, TimeUnit.SECONDS)) {
62+
if (semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
5863
LOGGER.debug(
5964
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
6065
return true;
@@ -74,12 +79,12 @@ public boolean tryLock() {
7479
}
7580

7681
public void unlock() {
77-
lock.unlock();
82+
semaphore.release();
7883
LOGGER.debug(
7984
"PipeTaskCoordinator lock released by thread {}", Thread.currentThread().getName());
8085
}
8186

8287
public boolean isLocked() {
83-
return lock.isLocked();
88+
return semaphore.availablePermits() == 0;
8489
}
8590
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,8 @@ public boolean unlock() {
105105
subscriptionInfoHolder = null;
106106
}
107107

108-
try {
109-
coordinatorLock.unlock();
110-
return true;
111-
} catch (IllegalMonitorStateException ignored) {
112-
// This is thrown if unlock() is called without lock() called first.
113-
LOGGER.warn("This thread is not holding the lock.");
114-
return false;
115-
}
108+
coordinatorLock.unlock();
109+
return true;
116110
}
117111

118112
public boolean isLocked() {

0 commit comments

Comments
 (0)