Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,9 @@ public AtomicReference<PipeTaskInfo> lock() {
/**
* Unlock the pipe task coordinator. Calling this method will clear the pipe task info holder,
* which means that the holder will be null after calling this method.
*
* @return {@code true} if successfully unlocked, {@code false} if current thread is not holding
* the lock.
*/
public boolean unlock() {
try {
pipeTaskCoordinatorLock.unlock();
return true;
} catch (IllegalMonitorStateException ignored) {
// This is thrown if unlock() is called without lock() called first.
LOGGER.warn("This thread is not holding the lock.");
return false;
}
public void unlock() {
pipeTaskCoordinatorLock.unlock();
}

public boolean isLocked() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,29 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
* {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task coordinator. It is used to
* {@link PipeTaskCoordinatorLock} is a cross-thread lock for pipe task coordinator. It is used to
* ensure that only one thread can execute the pipe task coordinator at the same time.
*
* <p>Uses {@link Semaphore} instead of {@link java.util.concurrent.locks.ReentrantLock} to support
* cross-thread acquire/release, which is required by the procedure recovery mechanism: locks may be
* acquired on the StateMachineUpdater thread during {@code restoreLock()} and released on a
* ProcedureCoreWorker thread after execution.
*/
public class PipeTaskCoordinatorLock {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskCoordinatorLock.class);

private final ReentrantLock lock = new ReentrantLock();
private final Semaphore semaphore = new Semaphore(1);

public void lock() {
LOGGER.debug(
"PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName());
try {
lock.lockInterruptibly();
semaphore.acquire();
LOGGER.debug(
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
} catch (final InterruptedException e) {
Expand All @@ -54,7 +59,7 @@ public boolean tryLock() {
try {
LOGGER.debug(
"PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName());
if (lock.tryLock(10, TimeUnit.SECONDS)) {
if (semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
LOGGER.debug(
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
return true;
Expand All @@ -74,12 +79,12 @@ public boolean tryLock() {
}

public void unlock() {
lock.unlock();
semaphore.release();
LOGGER.debug(
"PipeTaskCoordinator lock released by thread {}", Thread.currentThread().getName());
}

public boolean isLocked() {
return lock.isLocked();
return semaphore.availablePermits() == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,8 @@ public boolean unlock() {
subscriptionInfoHolder = null;
}

try {
coordinatorLock.unlock();
return true;
} catch (IllegalMonitorStateException ignored) {
// This is thrown if unlock() is called without lock() called first.
LOGGER.warn("This thread is not holding the lock.");
return false;
}
coordinatorLock.unlock();
return true;
}

public boolean isLocked() {
Expand Down
Loading