diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java new file mode 100644 index 000000000000..d4be999d59f6 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import java.util.List; + +/** + * Lock information. + */ +public final class OmLockInfo { + /** + * lock info provider. + */ + public interface LockInfoProvider { + default LockLevel getLevel() { + return LockLevel.NONE; + } + } + + /** + * volume lock info. + */ + public static class VolumeLockInfo implements LockInfoProvider { + private final String volumeName; + private final LockAction action; + + public VolumeLockInfo(String volumeName, LockAction volLockType) { + this.volumeName = volumeName; + this.action = volLockType; + } + + public String getVolumeName() { + return volumeName; + } + + public LockAction getAction() { + return action; + } + + @Override + public LockLevel getLevel() { + return LockLevel.VOLUME; + } + } + + /** + * bucket lock info. + */ + public static class BucketLockInfo implements LockInfoProvider { + private final VolumeLockInfo volumeLockInfo; + private final String bucketName; + private final LockAction action; + + public BucketLockInfo(String bucketName, LockAction action) { + this.volumeLockInfo = null; + this.bucketName = bucketName; + this.action = action; + } + + public BucketLockInfo(String volumeName, LockAction volLockType, String bucketName, LockAction action) { + this.volumeLockInfo = new VolumeLockInfo(volumeName, volLockType); + this.bucketName = bucketName; + this.action = action; + } + + public VolumeLockInfo getVolumeLockInfo() { + return volumeLockInfo; + } + + public String getBucketName() { + return bucketName; + } + + public LockAction getAction() { + return action; + } + + @Override + public LockLevel getLevel() { + return LockLevel.BUCKET; + } + } + + /** + * key lock info. + */ + public static class KeyLockInfo implements LockInfoProvider { + private final BucketLockInfo bucketLockInfo; + private final String key; + private final LockAction action; + + public KeyLockInfo(String bucketName, LockAction bucketAction, String keyName, LockAction keyAction) { + this.bucketLockInfo = new BucketLockInfo(bucketName, bucketAction); + this.key = keyName; + this.action = keyAction; + } + + public BucketLockInfo getBucketLockInfo() { + return bucketLockInfo; + } + + public String getKey() { + return key; + } + + public LockAction getAction() { + return action; + } + + @Override + public LockLevel getLevel() { + return LockLevel.KEY; + } + } + + /** + * multiple keys lock info. + */ + public static class MultiKeyLockInfo implements LockInfoProvider { + private final BucketLockInfo bucketLockInfo; + private final List keyList; + private final LockAction action; + + public MultiKeyLockInfo( + String bucketName, LockAction bucketAction, List keyList, LockAction keyAction) { + this.bucketLockInfo = new BucketLockInfo(bucketName, bucketAction); + this.keyList = keyList; + this.action = keyAction; + } + + public BucketLockInfo getBucketLockInfo() { + return bucketLockInfo; + } + + public List getKeyList() { + return keyList; + } + + public LockAction getAction() { + return action; + } + + @Override + public LockLevel getLevel() { + return LockLevel.MULTI_KEY; + } + } + + /** + * way the lock should be taken. + */ + public enum LockAction { + NONE, READ, WRITE + } + + /** + * lock stage level, like volume, bucket, key. + */ + public enum LockLevel { + NONE, + VOLUME, + BUCKET, + KEY, + MULTI_KEY + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOperation.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOperation.java new file mode 100644 index 000000000000..956ffc10e0ce --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOperation.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.locks.Lock; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.util.Time; +import org.apache.ratis.util.function.CheckedBiFunction; +import org.apache.ratis.util.function.CheckedFunction; + +/** + * Manage locking of volume, bucket, keys and others. + */ +public class OmLockOperation { + private static final long LOCK_TIMEOUT_DEFAULT = 10 * 60 * 1000; + + private final WrappedStripedLock keyLocking; + private final WrappedStripedLock bucketLocking; + private final WrappedStripedLock volumeLocking; + + public OmLockOperation() { + keyLocking = new WrappedStripedLock(102400, LOCK_TIMEOUT_DEFAULT, false); + bucketLocking = new WrappedStripedLock(1024, LOCK_TIMEOUT_DEFAULT, false); + volumeLocking = new WrappedStripedLock(1024, LOCK_TIMEOUT_DEFAULT, false); + } + + public OmLockObject lock(OmLockInfo.LockInfoProvider lockInfo) throws IOException { + OmLockObject omLockObject = new OmLockObject(lockInfo); + long startTime = Time.monotonicNowNanos(); + try { + OmLockInfo.LockLevel level = lockInfo.getLevel(); + switch (level) { + case VOLUME: + volumeLock((OmLockInfo.VolumeLockInfo) lockInfo, omLockObject); + break; + case BUCKET: + bucketLock((OmLockInfo.BucketLockInfo) lockInfo, omLockObject); + break; + case KEY: + keyLock((OmLockInfo.KeyLockInfo) lockInfo, omLockObject); + break; + case MULTI_KEY: + multiKeyLock((OmLockInfo.MultiKeyLockInfo) lockInfo, omLockObject); + break; + default: + throw new OMException("Unsupported lock level", OMException.ResultCodes.INTERNAL_ERROR); + } + lockStatsBegin(omLockObject.getLockStats(), Time.monotonicNowNanos(), startTime); + } catch (IOException e) { + unlock(omLockObject); + throw e; + } + return omLockObject; + } + + private void volumeLock(OmLockInfo.VolumeLockInfo lockInfo, OmLockObject omLockObject) throws IOException { + List locks = omLockObject.getLocks(); + if (lockInfo.getAction() == OmLockInfo.LockAction.READ) { + locks.add(getLock(volumeLocking::readLock, lockInfo.getVolumeName())); + } else if (lockInfo.getAction() == OmLockInfo.LockAction.WRITE) { + locks.add(getLock(volumeLocking::writeLock, lockInfo.getVolumeName())); + omLockObject.setLockStatType(OmLockStats.Type.WRITE); + } + } + + private void bucketLock(OmLockInfo.BucketLockInfo lockInfo, OmLockObject omLockObject) throws IOException { + if (null != lockInfo.getVolumeLockInfo()) { + volumeLock(lockInfo.getVolumeLockInfo(), omLockObject); + } + List locks = omLockObject.getLocks(); + if (lockInfo.getAction() == OmLockInfo.LockAction.READ) { + locks.add(getLock(bucketLocking::readLock, lockInfo.getBucketName())); + } else if (lockInfo.getAction() == OmLockInfo.LockAction.WRITE) { + locks.add(getLock(bucketLocking::writeLock, lockInfo.getBucketName())); + omLockObject.setLockStatType(OmLockStats.Type.WRITE); + } + } + + private void keyLock(OmLockInfo.KeyLockInfo lockInfo, OmLockObject omLockObject) throws IOException { + bucketLock(lockInfo.getBucketLockInfo(), omLockObject); + List locks = omLockObject.getLocks(); + if (lockInfo.getAction() == OmLockInfo.LockAction.READ) { + locks.add(getLock(keyLocking::readLock, lockInfo.getKey())); + } else if (lockInfo.getAction() == OmLockInfo.LockAction.WRITE) { + locks.add(getLock(keyLocking::writeLock, lockInfo.getKey())); + omLockObject.setLockStatType(OmLockStats.Type.WRITE); + } + } + + private void multiKeyLock(OmLockInfo.MultiKeyLockInfo lockInfo, OmLockObject omLockObject) throws IOException { + bucketLock(lockInfo.getBucketLockInfo(), omLockObject); + List locks = omLockObject.getLocks(); + if (lockInfo.getAction() == OmLockInfo.LockAction.READ) { + getLock(keyLocking::readLock, lockInfo.getKeyList(), locks); + } else if (lockInfo.getAction() == OmLockInfo.LockAction.WRITE) { + getLock(keyLocking::writeLock, lockInfo.getKeyList(), locks); + omLockObject.setLockStatType(OmLockStats.Type.WRITE); + } + } + + private static Lock getLock( + CheckedFunction lockFunction, String name) throws OMException { + try { + Lock lockObj = lockFunction.apply(name); + if (lockObj == null) { + throw new OMException("Unable to get lock for " + name + ", timeout occurred", OMException.ResultCodes.TIMEOUT); + } + return lockObj; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new OMException("waiting for lock is interrupted for " + name, OMException.ResultCodes.INTERNAL_ERROR); + } + } + + private static void getLock( + CheckedBiFunction, List, Boolean, InterruptedException> lockFunction, + List lockKeys, List lockList) throws OMException { + try { + if (!lockFunction.apply(lockKeys, lockList)) { + throw new OMException("Unable to get locks, timeout occurred", OMException.ResultCodes.TIMEOUT); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new OMException("waiting for locks is interrupted", OMException.ResultCodes.INTERNAL_ERROR); + } + } + + public void unlock(OmLockObject lockObject) { + Collections.reverse(lockObject.getLocks()); + lockObject.getLocks().forEach(Lock::unlock); + lockStatsEnd(lockObject.getLockStats(), lockObject.getLockStatType()); + lockObject.getLocks().clear(); + } + + private static void lockStatsBegin(OmLockStats lockStats, long endTime, long startTime) { + lockStats.add(endTime - startTime, OmLockStats.Type.WAIT); + lockStats.setLockStartTime(endTime); + } + + private static void lockStatsEnd(OmLockStats lockStats, OmLockStats.Type type) { + if (lockStats.getLockStartTime() > 0) { + lockStats.add(Time.monotonicNowNanos() - lockStats.getLockStartTime(), type); + } + } + + /** + * Lock information. + */ + public static class OmLockObject { + private final OmLockInfo.LockInfoProvider lockInfoProvider; + private final List locks = new ArrayList<>(); + private final OmLockStats lockStats = new OmLockStats(); + private OmLockStats.Type lockStatType = OmLockStats.Type.READ; + + public OmLockObject(OmLockInfo.LockInfoProvider lockInfoProvider) { + this.lockInfoProvider = lockInfoProvider; + } + + public List getLocks() { + return locks; + } + + public OmLockStats getLockStats() { + return lockStats; + } + + public OmLockStats.Type getLockStatType() { + return lockStatType; + } + + public void setLockStatType(OmLockStats.Type lockStatType) { + this.lockStatType = lockStatType; + } + + public OmLockInfo.LockInfoProvider getLockInfoProvider() { + return lockInfoProvider; + } + } + + /** + * lock stats. + */ + public static class OmLockStats { + private long lockStartTime; + private long waitLockNanos; + private long readLockNanos; + private long writeLockNanos; + + public long getLockStartTime() { + return lockStartTime; + } + + public void setLockStartTime(long lockStartTime) { + this.lockStartTime = lockStartTime; + } + + public long getWaitLockNanos() { + return waitLockNanos; + } + + public long getReadLockNanos() { + return readLockNanos; + } + + public long getWriteLockNanos() { + return writeLockNanos; + } + + void add(long timeNanos, Type type) { + switch (type) { + case WAIT: + waitLockNanos += timeNanos; + break; + case READ: + readLockNanos += timeNanos; + break; + case WRITE: + writeLockNanos += timeNanos; + break; + default: + } + } + + /** + * lock time stat type. + */ + public enum Type { + WAIT, + READ, + WRITE + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/WrappedStripedLock.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/WrappedStripedLock.java new file mode 100644 index 000000000000..b60e60c92d2e --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/WrappedStripedLock.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import com.google.common.util.concurrent.Striped; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import org.apache.hadoop.hdds.utils.SimpleStriped; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Locking class that wraps Striped locking with timeout and logging. It also provides ordering while multiple locks. + */ +public class WrappedStripedLock { + private static final Logger LOG = LoggerFactory.getLogger(WrappedStripedLock.class); + private final Striped fileStripedLock; + private final long lockTimeout; + + public WrappedStripedLock(int stripLockSize, long timeout, boolean fair) { + fileStripedLock = SimpleStriped.readWriteLock(stripLockSize, fair); + lockTimeout = timeout; + } + + /** + * lock the list of keys in order. + * Sample code for lock and unlock handling: + * + * try { + * if (!wrappedStripedLock.lock(keyList, locks)) { + * // timeout occurred, release lock if any in reverse order + * Collections.reverse(locks); + * locks.forEach(Lock::unlock); + * } + * // perform business logic + * } finally { + * // to be released in reverse order + * Collections.reverse(locks); + * locks.forEach(Lock::unlock); + * } + * + * + * @param keyList key list which needs to be locked + * @param locks successful lock object returned which will be used to release lock + * @return boolean true if success, else false + * @throws InterruptedException exception on interrupt + */ + public boolean writeLock(List keyList, List locks) throws InterruptedException { + try { + Iterable readWriteLocks = fileStripedLock.bulkGet(keyList); + for (ReadWriteLock rwLock : readWriteLocks) { + Lock lockObj = rwLock.writeLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + LOG.error("Write lock for keys are failed for the instance {} after wait of {}ms, read lock info: {}", this, + lockTimeout, rwLock.readLock()); + return false; + } + locks.add(lockObj); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Write lock for keys are interrupted for the instance {}", this); + throw e; + } + return true; + } + + /** + * lock single key. + * @param key object for which lock to be taken + * @return lock object to be used to release lock, null if unable to take lock due to timeout + * @throws InterruptedException exception on interrupt + */ + public Lock writeLock(String key) throws InterruptedException { + LOG.debug("Key {} is locked for instance {} {}", key, this, fileStripedLock.get(key)); + try { + Lock lockObj = fileStripedLock.get(key).writeLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + LOG.error("Write lock for the key is failed for the instance {} after wait of {}ms, read lock info: {}", this, + lockTimeout, fileStripedLock.get(key).readLock()); + return null; + } + return lockObj; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Write lock for the key is interrupted for the instance {}", this); + throw e; + } + } + + /** + * lock the list of keys in order. + * Sample code for lock and unlock handling: + * + * try { + * if (!wrappedStripedLock.lock(keyList, locks)) { + * // timeout occurred, release lock if any in reverse order + * Collections.reverse(locks); + * locks.forEach(Lock::unlock); + * } + * // perform business logic + * } finally { + * // to be released in reverse order + * Collections.reverse(locks); + * locks.forEach(Lock::unlock); + * } + * + * + * @param keyList key list which needs to be locked + * @param locks successful lock object returned which will be used to release lock + * @return boolean true if success, else false + * @throws InterruptedException exception on interrupt + */ + public boolean readLock(List keyList, List locks) throws InterruptedException { + try { + Iterable readWriteLocks = fileStripedLock.bulkGet(keyList); + for (ReadWriteLock rwLock : readWriteLocks) { + Lock lockObj = rwLock.readLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + LOG.error("Read lock for keys are failed for the instance {} after wait of {}ms, write lock info: {}", this, + lockTimeout, rwLock.writeLock()); + return false; + } + locks.add(lockObj); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Read lock for keys are interrupted for the instance {}", this); + throw e; + } + return true; + } + + /** + * read lock single key. + * @param key object for which lock to be taken + * @return lock object to be used to release lock, null if unable to take lock due to timeout + * @throws InterruptedException exception on interrupt + */ + public Lock readLock(String key) throws InterruptedException { + try { + LOG.debug("Key {} is read locked for instance {} {}", key, this, fileStripedLock.get(key)); + Lock lockObj = fileStripedLock.get(key).readLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + LOG.error("Read lock for the key is failed for the instance {} after wait of {}ms, write lock info: {}", this, + lockTimeout, fileStripedLock.get(key).readLock()); + return null; + } + return lockObj; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Read lock for the key is interrupted for the instance {}", this); + throw e; + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOperation.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOperation.java new file mode 100644 index 000000000000..e840d7df3fd3 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOperation.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.Test; + +/** + * Test for TestOmLockOperation. + */ +public class TestOmLockOperation { + + @Test + public void testObsLockOprWithParallelLock() throws IOException, ExecutionException, InterruptedException { + OmLockOperation omLockOpr = new OmLockOperation(); + OmLockInfo.KeyLockInfo lockInfo = + new OmLockInfo.KeyLockInfo("bucket", OmLockInfo.LockAction.READ, "testkey", OmLockInfo.LockAction.WRITE); + OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(2, lockObject.getLocks().size()); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try { + OmLockOperation.OmLockObject lockInfoAgain = omLockOpr.lock(lockInfo); + omLockOpr.unlock(lockInfoAgain); + return lockInfoAgain; + } catch (IOException e) { + fail("should not throw exception"); + } + return null; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + omLockOpr.unlock(lockObject); + rst.get(); + } + + @Test + public void testObsLockOprListKeyRepeated() throws IOException { + OmLockOperation omLockOpr = new OmLockOperation(); + OmLockInfo.MultiKeyLockInfo lockInfo = + new OmLockInfo.MultiKeyLockInfo("bucket", OmLockInfo.LockAction.READ, + Arrays.asList("testkey", "testkey2"), OmLockInfo.LockAction.WRITE); + OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(3, lockObject.getLocks().size()); + + omLockOpr.unlock(lockObject); + + lockObject = omLockOpr.lock(lockInfo); + assertEquals(3, lockObject.getLocks().size()); + omLockOpr.unlock(lockObject); + } + + @Test + public void testBucketReadLock() throws IOException { + OmLockOperation omLockOpr = new OmLockOperation(); + OmLockInfo.BucketLockInfo lockInfo = new OmLockInfo.BucketLockInfo("bucket", OmLockInfo.LockAction.READ); + OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(1, lockObject.getLocks().size()); + + omLockOpr.unlock(lockObject); + } + + @Test + public void testBucketReadWithWriteParallelLock() throws IOException, ExecutionException, InterruptedException { + OmLockOperation omLockOpr = new OmLockOperation(); + OmLockInfo.BucketLockInfo lockInfo = new OmLockInfo.BucketLockInfo("bucket", OmLockInfo.LockAction.READ); + OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(1, lockObject.getLocks().size()); + + OmLockInfo.BucketLockInfo writeLockInfo = new OmLockInfo.BucketLockInfo("bucket", OmLockInfo.LockAction.WRITE); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try { + OmLockOperation.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo); + omLockOpr.unlock(lockInfoAgain); + return lockInfoAgain; + } catch (IOException e) { + fail("should not throw exception"); + } + return null; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + omLockOpr.unlock(lockObject); + rst.get(); + } + + @Test + public void testVolumeReadWithWriteParallelLock() throws IOException, ExecutionException, InterruptedException { + OmLockOperation omLockOpr = new OmLockOperation(); + OmLockInfo.VolumeLockInfo lockInfo = new OmLockInfo.VolumeLockInfo("vol1", OmLockInfo.LockAction.READ); + OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(1, lockObject.getLocks().size()); + + OmLockInfo.VolumeLockInfo writeLockInfo = new OmLockInfo.VolumeLockInfo("vol1", OmLockInfo.LockAction.WRITE); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try { + OmLockOperation.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo); + omLockOpr.unlock(lockInfoAgain); + return lockInfoAgain; + } catch (IOException e) { + fail("should not throw exception"); + } + return null; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + omLockOpr.unlock(lockObject); + rst.get(); + } + + @Test + public void testVolWriteWithVolBucketRWParallelLock() throws IOException, ExecutionException, InterruptedException { + OmLockOperation omLockOpr = new OmLockOperation(); + OmLockInfo.VolumeLockInfo lockInfo = new OmLockInfo.VolumeLockInfo("vol1", OmLockInfo.LockAction.WRITE); + OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(1, lockObject.getLocks().size()); + + OmLockInfo.BucketLockInfo writeLockInfo = new OmLockInfo.BucketLockInfo("vol1", + OmLockInfo.LockAction.READ, "buck1", OmLockInfo.LockAction.WRITE); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try { + OmLockOperation.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo); + omLockOpr.unlock(lockInfoAgain); + return lockInfoAgain; + } catch (IOException e) { + fail("should not throw exception"); + } + return null; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + omLockOpr.unlock(lockObject); + rst.get(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestWrappedStripedLock.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestWrappedStripedLock.java new file mode 100644 index 000000000000..4492014b29d7 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestWrappedStripedLock.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import org.junit.jupiter.api.Test; + +/** + * Test for WrappedStripedLock. + */ +public class TestWrappedStripedLock { + @Test + public void testWriteLock() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + + // check if same lock is tried to be taken, it will throw exception with timeout + Lock lock = wrappedStripedLock.writeLock("test"); + CompletableFuture rst = CompletableFuture.runAsync(() -> { + Lock[] out = new Lock[1]; + assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); + assertNull(out[0]); + }); + rst.join(); + + lock.unlock(); + } + + @Test + public void testWriteThenReadLock() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + + // check if same lock is tried to be taken, it will throw exception with timeout + Lock lock = wrappedStripedLock.writeLock("test"); + CompletableFuture rst = CompletableFuture.runAsync(() -> { + Lock[] out = new Lock[1]; + assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); + assertNull(out[0]); + }); + rst.join(); + + lock.unlock(); + } + + @Test + public void testReadThenWriteLock() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + + // check if same lock is tried to be taken, it will throw exception with timeout + Lock lock = wrappedStripedLock.readLock("test"); + CompletableFuture rst = CompletableFuture.runAsync(() -> { + Lock[] out = new Lock[1]; + assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); + assertNull(out[0]); + }); + rst.join(); + + lock.unlock(); + } + + @Test + public void testLockListOrderSame() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + List locks = new ArrayList<>(); + wrappedStripedLock.writeLock(Arrays.asList("test", "test1"), locks); + locks.forEach(Lock::unlock); + List lockReverseOrder = new ArrayList<>(); + wrappedStripedLock.writeLock(Arrays.asList("test1", "test2"), lockReverseOrder); + lockReverseOrder.forEach(Lock::unlock); + + assertEquals(locks.get(0), lockReverseOrder.get(0)); + assertEquals(locks.get(1), lockReverseOrder.get(1)); + } + + @Test + public void testReadLockListOrderSame() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + List locks = new ArrayList<>(); + wrappedStripedLock.readLock(Arrays.asList("test", "test1"), locks); + locks.forEach(Lock::unlock); + List lockReverseOrder = new ArrayList<>(); + wrappedStripedLock.readLock(Arrays.asList("test1", "test2"), lockReverseOrder); + lockReverseOrder.forEach(Lock::unlock); + + assertEquals(locks.get(0), lockReverseOrder.get(0)); + assertEquals(locks.get(1), lockReverseOrder.get(1)); + } + + @Test + public void testLockListFailureOnRelock() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + List locks = new ArrayList<>(); + wrappedStripedLock.writeLock(Arrays.asList("test", "test1"), locks); + + // test write lock failure + CompletableFuture rst = CompletableFuture.runAsync(() -> { + Lock[] out = new Lock[1]; + assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); + assertNull(out[0]); + }); + rst.join(); + + // test read lock failure + rst = CompletableFuture.runAsync(() -> { + Lock[] out = new Lock[1]; + assertDoesNotThrow(() -> out[0] = wrappedStripedLock.readLock("test1")); + assertNull(out[0]); + }); + rst.join(); + + locks.forEach(Lock::unlock); + + // verify if lock is success after unlock + Lock lock = wrappedStripedLock.readLock("test"); + lock.unlock(); + } +}