From 302e9a663a980c460aea650540962fa58165edd7 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Thu, 27 Mar 2025 14:14:16 +0530 Subject: [PATCH 1/3] HDDS-12356. granular locking framework --- .../hadoop/ozone/om/lock/OmLockInfo.java | 181 +++++++++++++ .../hadoop/ozone/om/lock/OmLockOperation.java | 252 ++++++++++++++++++ .../ozone/om/lock/WrappedStripedLock.java | 177 ++++++++++++ .../ozone/om/lock/TestOmLockOperation.java | 194 ++++++++++++++ .../ozone/om/lock/TestWrappedStripedLock.java | 139 ++++++++++ 5 files changed, 943 insertions(+) create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOperation.java create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/WrappedStripedLock.java create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOperation.java create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestWrappedStripedLock.java diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java new file mode 100644 index 000000000000..d4be999d59f6 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import java.util.List; + +/** + * Lock information. + */ +public final class OmLockInfo { + /** + * lock info provider. + */ + public interface LockInfoProvider { + default LockLevel getLevel() { + return LockLevel.NONE; + } + } + + /** + * volume lock info. + */ + public static class VolumeLockInfo implements LockInfoProvider { + private final String volumeName; + private final LockAction action; + + public VolumeLockInfo(String volumeName, LockAction volLockType) { + this.volumeName = volumeName; + this.action = volLockType; + } + + public String getVolumeName() { + return volumeName; + } + + public LockAction getAction() { + return action; + } + + @Override + public LockLevel getLevel() { + return LockLevel.VOLUME; + } + } + + /** + * bucket lock info. + */ + public static class BucketLockInfo implements LockInfoProvider { + private final VolumeLockInfo volumeLockInfo; + private final String bucketName; + private final LockAction action; + + public BucketLockInfo(String bucketName, LockAction action) { + this.volumeLockInfo = null; + this.bucketName = bucketName; + this.action = action; + } + + public BucketLockInfo(String volumeName, LockAction volLockType, String bucketName, LockAction action) { + this.volumeLockInfo = new VolumeLockInfo(volumeName, volLockType); + this.bucketName = bucketName; + this.action = action; + } + + public VolumeLockInfo getVolumeLockInfo() { + return volumeLockInfo; + } + + public String getBucketName() { + return bucketName; + } + + public LockAction getAction() { + return action; + } + + @Override + public LockLevel getLevel() { + return LockLevel.BUCKET; + } + } + + /** + * key lock info. + */ + public static class KeyLockInfo implements LockInfoProvider { + private final BucketLockInfo bucketLockInfo; + private final String key; + private final LockAction action; + + public KeyLockInfo(String bucketName, LockAction bucketAction, String keyName, LockAction keyAction) { + this.bucketLockInfo = new BucketLockInfo(bucketName, bucketAction); + this.key = keyName; + this.action = keyAction; + } + + public BucketLockInfo getBucketLockInfo() { + return bucketLockInfo; + } + + public String getKey() { + return key; + } + + public LockAction getAction() { + return action; + } + + @Override + public LockLevel getLevel() { + return LockLevel.KEY; + } + } + + /** + * multiple keys lock info. + */ + public static class MultiKeyLockInfo implements LockInfoProvider { + private final BucketLockInfo bucketLockInfo; + private final List keyList; + private final LockAction action; + + public MultiKeyLockInfo( + String bucketName, LockAction bucketAction, List keyList, LockAction keyAction) { + this.bucketLockInfo = new BucketLockInfo(bucketName, bucketAction); + this.keyList = keyList; + this.action = keyAction; + } + + public BucketLockInfo getBucketLockInfo() { + return bucketLockInfo; + } + + public List getKeyList() { + return keyList; + } + + public LockAction getAction() { + return action; + } + + @Override + public LockLevel getLevel() { + return LockLevel.MULTI_KEY; + } + } + + /** + * way the lock should be taken. + */ + public enum LockAction { + NONE, READ, WRITE + } + + /** + * lock stage level, like volume, bucket, key. + */ + public enum LockLevel { + NONE, + VOLUME, + BUCKET, + KEY, + MULTI_KEY + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOperation.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOperation.java new file mode 100644 index 000000000000..956ffc10e0ce --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOperation.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.locks.Lock; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.util.Time; +import org.apache.ratis.util.function.CheckedBiFunction; +import org.apache.ratis.util.function.CheckedFunction; + +/** + * Manage locking of volume, bucket, keys and others. + */ +public class OmLockOperation { + private static final long LOCK_TIMEOUT_DEFAULT = 10 * 60 * 1000; + + private final WrappedStripedLock keyLocking; + private final WrappedStripedLock bucketLocking; + private final WrappedStripedLock volumeLocking; + + public OmLockOperation() { + keyLocking = new WrappedStripedLock(102400, LOCK_TIMEOUT_DEFAULT, false); + bucketLocking = new WrappedStripedLock(1024, LOCK_TIMEOUT_DEFAULT, false); + volumeLocking = new WrappedStripedLock(1024, LOCK_TIMEOUT_DEFAULT, false); + } + + public OmLockObject lock(OmLockInfo.LockInfoProvider lockInfo) throws IOException { + OmLockObject omLockObject = new OmLockObject(lockInfo); + long startTime = Time.monotonicNowNanos(); + try { + OmLockInfo.LockLevel level = lockInfo.getLevel(); + switch (level) { + case VOLUME: + volumeLock((OmLockInfo.VolumeLockInfo) lockInfo, omLockObject); + break; + case BUCKET: + bucketLock((OmLockInfo.BucketLockInfo) lockInfo, omLockObject); + break; + case KEY: + keyLock((OmLockInfo.KeyLockInfo) lockInfo, omLockObject); + break; + case MULTI_KEY: + multiKeyLock((OmLockInfo.MultiKeyLockInfo) lockInfo, omLockObject); + break; + default: + throw new OMException("Unsupported lock level", OMException.ResultCodes.INTERNAL_ERROR); + } + lockStatsBegin(omLockObject.getLockStats(), Time.monotonicNowNanos(), startTime); + } catch (IOException e) { + unlock(omLockObject); + throw e; + } + return omLockObject; + } + + private void volumeLock(OmLockInfo.VolumeLockInfo lockInfo, OmLockObject omLockObject) throws IOException { + List locks = omLockObject.getLocks(); + if (lockInfo.getAction() == OmLockInfo.LockAction.READ) { + locks.add(getLock(volumeLocking::readLock, lockInfo.getVolumeName())); + } else if (lockInfo.getAction() == OmLockInfo.LockAction.WRITE) { + locks.add(getLock(volumeLocking::writeLock, lockInfo.getVolumeName())); + omLockObject.setLockStatType(OmLockStats.Type.WRITE); + } + } + + private void bucketLock(OmLockInfo.BucketLockInfo lockInfo, OmLockObject omLockObject) throws IOException { + if (null != lockInfo.getVolumeLockInfo()) { + volumeLock(lockInfo.getVolumeLockInfo(), omLockObject); + } + List locks = omLockObject.getLocks(); + if (lockInfo.getAction() == OmLockInfo.LockAction.READ) { + locks.add(getLock(bucketLocking::readLock, lockInfo.getBucketName())); + } else if (lockInfo.getAction() == OmLockInfo.LockAction.WRITE) { + locks.add(getLock(bucketLocking::writeLock, lockInfo.getBucketName())); + omLockObject.setLockStatType(OmLockStats.Type.WRITE); + } + } + + private void keyLock(OmLockInfo.KeyLockInfo lockInfo, OmLockObject omLockObject) throws IOException { + bucketLock(lockInfo.getBucketLockInfo(), omLockObject); + List locks = omLockObject.getLocks(); + if (lockInfo.getAction() == OmLockInfo.LockAction.READ) { + locks.add(getLock(keyLocking::readLock, lockInfo.getKey())); + } else if (lockInfo.getAction() == OmLockInfo.LockAction.WRITE) { + locks.add(getLock(keyLocking::writeLock, lockInfo.getKey())); + omLockObject.setLockStatType(OmLockStats.Type.WRITE); + } + } + + private void multiKeyLock(OmLockInfo.MultiKeyLockInfo lockInfo, OmLockObject omLockObject) throws IOException { + bucketLock(lockInfo.getBucketLockInfo(), omLockObject); + List locks = omLockObject.getLocks(); + if (lockInfo.getAction() == OmLockInfo.LockAction.READ) { + getLock(keyLocking::readLock, lockInfo.getKeyList(), locks); + } else if (lockInfo.getAction() == OmLockInfo.LockAction.WRITE) { + getLock(keyLocking::writeLock, lockInfo.getKeyList(), locks); + omLockObject.setLockStatType(OmLockStats.Type.WRITE); + } + } + + private static Lock getLock( + CheckedFunction lockFunction, String name) throws OMException { + try { + Lock lockObj = lockFunction.apply(name); + if (lockObj == null) { + throw new OMException("Unable to get lock for " + name + ", timeout occurred", OMException.ResultCodes.TIMEOUT); + } + return lockObj; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new OMException("waiting for lock is interrupted for " + name, OMException.ResultCodes.INTERNAL_ERROR); + } + } + + private static void getLock( + CheckedBiFunction, List, Boolean, InterruptedException> lockFunction, + List lockKeys, List lockList) throws OMException { + try { + if (!lockFunction.apply(lockKeys, lockList)) { + throw new OMException("Unable to get locks, timeout occurred", OMException.ResultCodes.TIMEOUT); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new OMException("waiting for locks is interrupted", OMException.ResultCodes.INTERNAL_ERROR); + } + } + + public void unlock(OmLockObject lockObject) { + Collections.reverse(lockObject.getLocks()); + lockObject.getLocks().forEach(Lock::unlock); + lockStatsEnd(lockObject.getLockStats(), lockObject.getLockStatType()); + lockObject.getLocks().clear(); + } + + private static void lockStatsBegin(OmLockStats lockStats, long endTime, long startTime) { + lockStats.add(endTime - startTime, OmLockStats.Type.WAIT); + lockStats.setLockStartTime(endTime); + } + + private static void lockStatsEnd(OmLockStats lockStats, OmLockStats.Type type) { + if (lockStats.getLockStartTime() > 0) { + lockStats.add(Time.monotonicNowNanos() - lockStats.getLockStartTime(), type); + } + } + + /** + * Lock information. + */ + public static class OmLockObject { + private final OmLockInfo.LockInfoProvider lockInfoProvider; + private final List locks = new ArrayList<>(); + private final OmLockStats lockStats = new OmLockStats(); + private OmLockStats.Type lockStatType = OmLockStats.Type.READ; + + public OmLockObject(OmLockInfo.LockInfoProvider lockInfoProvider) { + this.lockInfoProvider = lockInfoProvider; + } + + public List getLocks() { + return locks; + } + + public OmLockStats getLockStats() { + return lockStats; + } + + public OmLockStats.Type getLockStatType() { + return lockStatType; + } + + public void setLockStatType(OmLockStats.Type lockStatType) { + this.lockStatType = lockStatType; + } + + public OmLockInfo.LockInfoProvider getLockInfoProvider() { + return lockInfoProvider; + } + } + + /** + * lock stats. + */ + public static class OmLockStats { + private long lockStartTime; + private long waitLockNanos; + private long readLockNanos; + private long writeLockNanos; + + public long getLockStartTime() { + return lockStartTime; + } + + public void setLockStartTime(long lockStartTime) { + this.lockStartTime = lockStartTime; + } + + public long getWaitLockNanos() { + return waitLockNanos; + } + + public long getReadLockNanos() { + return readLockNanos; + } + + public long getWriteLockNanos() { + return writeLockNanos; + } + + void add(long timeNanos, Type type) { + switch (type) { + case WAIT: + waitLockNanos += timeNanos; + break; + case READ: + readLockNanos += timeNanos; + break; + case WRITE: + writeLockNanos += timeNanos; + break; + default: + } + } + + /** + * lock time stat type. + */ + public enum Type { + WAIT, + READ, + WRITE + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/WrappedStripedLock.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/WrappedStripedLock.java new file mode 100644 index 000000000000..b60e60c92d2e --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/WrappedStripedLock.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import com.google.common.util.concurrent.Striped; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import org.apache.hadoop.hdds.utils.SimpleStriped; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Locking class that wraps Striped locking with timeout and logging. It also provides ordering while multiple locks. + */ +public class WrappedStripedLock { + private static final Logger LOG = LoggerFactory.getLogger(WrappedStripedLock.class); + private final Striped fileStripedLock; + private final long lockTimeout; + + public WrappedStripedLock(int stripLockSize, long timeout, boolean fair) { + fileStripedLock = SimpleStriped.readWriteLock(stripLockSize, fair); + lockTimeout = timeout; + } + + /** + * lock the list of keys in order. + * Sample code for lock and unlock handling: + * + * try { + * if (!wrappedStripedLock.lock(keyList, locks)) { + * // timeout occurred, release lock if any in reverse order + * Collections.reverse(locks); + * locks.forEach(Lock::unlock); + * } + * // perform business logic + * } finally { + * // to be released in reverse order + * Collections.reverse(locks); + * locks.forEach(Lock::unlock); + * } + * + * + * @param keyList key list which needs to be locked + * @param locks successful lock object returned which will be used to release lock + * @return boolean true if success, else false + * @throws InterruptedException exception on interrupt + */ + public boolean writeLock(List keyList, List locks) throws InterruptedException { + try { + Iterable readWriteLocks = fileStripedLock.bulkGet(keyList); + for (ReadWriteLock rwLock : readWriteLocks) { + Lock lockObj = rwLock.writeLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + LOG.error("Write lock for keys are failed for the instance {} after wait of {}ms, read lock info: {}", this, + lockTimeout, rwLock.readLock()); + return false; + } + locks.add(lockObj); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Write lock for keys are interrupted for the instance {}", this); + throw e; + } + return true; + } + + /** + * lock single key. + * @param key object for which lock to be taken + * @return lock object to be used to release lock, null if unable to take lock due to timeout + * @throws InterruptedException exception on interrupt + */ + public Lock writeLock(String key) throws InterruptedException { + LOG.debug("Key {} is locked for instance {} {}", key, this, fileStripedLock.get(key)); + try { + Lock lockObj = fileStripedLock.get(key).writeLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + LOG.error("Write lock for the key is failed for the instance {} after wait of {}ms, read lock info: {}", this, + lockTimeout, fileStripedLock.get(key).readLock()); + return null; + } + return lockObj; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Write lock for the key is interrupted for the instance {}", this); + throw e; + } + } + + /** + * lock the list of keys in order. + * Sample code for lock and unlock handling: + * + * try { + * if (!wrappedStripedLock.lock(keyList, locks)) { + * // timeout occurred, release lock if any in reverse order + * Collections.reverse(locks); + * locks.forEach(Lock::unlock); + * } + * // perform business logic + * } finally { + * // to be released in reverse order + * Collections.reverse(locks); + * locks.forEach(Lock::unlock); + * } + * + * + * @param keyList key list which needs to be locked + * @param locks successful lock object returned which will be used to release lock + * @return boolean true if success, else false + * @throws InterruptedException exception on interrupt + */ + public boolean readLock(List keyList, List locks) throws InterruptedException { + try { + Iterable readWriteLocks = fileStripedLock.bulkGet(keyList); + for (ReadWriteLock rwLock : readWriteLocks) { + Lock lockObj = rwLock.readLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + LOG.error("Read lock for keys are failed for the instance {} after wait of {}ms, write lock info: {}", this, + lockTimeout, rwLock.writeLock()); + return false; + } + locks.add(lockObj); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Read lock for keys are interrupted for the instance {}", this); + throw e; + } + return true; + } + + /** + * read lock single key. + * @param key object for which lock to be taken + * @return lock object to be used to release lock, null if unable to take lock due to timeout + * @throws InterruptedException exception on interrupt + */ + public Lock readLock(String key) throws InterruptedException { + try { + LOG.debug("Key {} is read locked for instance {} {}", key, this, fileStripedLock.get(key)); + Lock lockObj = fileStripedLock.get(key).readLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + LOG.error("Read lock for the key is failed for the instance {} after wait of {}ms, write lock info: {}", this, + lockTimeout, fileStripedLock.get(key).readLock()); + return null; + } + return lockObj; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Read lock for the key is interrupted for the instance {}", this); + throw e; + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOperation.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOperation.java new file mode 100644 index 000000000000..e840d7df3fd3 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOperation.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.Test; + +/** + * Test for TestOmLockOperation. + */ +public class TestOmLockOperation { + + @Test + public void testObsLockOprWithParallelLock() throws IOException, ExecutionException, InterruptedException { + OmLockOperation omLockOpr = new OmLockOperation(); + OmLockInfo.KeyLockInfo lockInfo = + new OmLockInfo.KeyLockInfo("bucket", OmLockInfo.LockAction.READ, "testkey", OmLockInfo.LockAction.WRITE); + OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(2, lockObject.getLocks().size()); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try { + OmLockOperation.OmLockObject lockInfoAgain = omLockOpr.lock(lockInfo); + omLockOpr.unlock(lockInfoAgain); + return lockInfoAgain; + } catch (IOException e) { + fail("should not throw exception"); + } + return null; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + omLockOpr.unlock(lockObject); + rst.get(); + } + + @Test + public void testObsLockOprListKeyRepeated() throws IOException { + OmLockOperation omLockOpr = new OmLockOperation(); + OmLockInfo.MultiKeyLockInfo lockInfo = + new OmLockInfo.MultiKeyLockInfo("bucket", OmLockInfo.LockAction.READ, + Arrays.asList("testkey", "testkey2"), OmLockInfo.LockAction.WRITE); + OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(3, lockObject.getLocks().size()); + + omLockOpr.unlock(lockObject); + + lockObject = omLockOpr.lock(lockInfo); + assertEquals(3, lockObject.getLocks().size()); + omLockOpr.unlock(lockObject); + } + + @Test + public void testBucketReadLock() throws IOException { + OmLockOperation omLockOpr = new OmLockOperation(); + OmLockInfo.BucketLockInfo lockInfo = new OmLockInfo.BucketLockInfo("bucket", OmLockInfo.LockAction.READ); + OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(1, lockObject.getLocks().size()); + + omLockOpr.unlock(lockObject); + } + + @Test + public void testBucketReadWithWriteParallelLock() throws IOException, ExecutionException, InterruptedException { + OmLockOperation omLockOpr = new OmLockOperation(); + OmLockInfo.BucketLockInfo lockInfo = new OmLockInfo.BucketLockInfo("bucket", OmLockInfo.LockAction.READ); + OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(1, lockObject.getLocks().size()); + + OmLockInfo.BucketLockInfo writeLockInfo = new OmLockInfo.BucketLockInfo("bucket", OmLockInfo.LockAction.WRITE); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try { + OmLockOperation.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo); + omLockOpr.unlock(lockInfoAgain); + return lockInfoAgain; + } catch (IOException e) { + fail("should not throw exception"); + } + return null; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + omLockOpr.unlock(lockObject); + rst.get(); + } + + @Test + public void testVolumeReadWithWriteParallelLock() throws IOException, ExecutionException, InterruptedException { + OmLockOperation omLockOpr = new OmLockOperation(); + OmLockInfo.VolumeLockInfo lockInfo = new OmLockInfo.VolumeLockInfo("vol1", OmLockInfo.LockAction.READ); + OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(1, lockObject.getLocks().size()); + + OmLockInfo.VolumeLockInfo writeLockInfo = new OmLockInfo.VolumeLockInfo("vol1", OmLockInfo.LockAction.WRITE); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try { + OmLockOperation.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo); + omLockOpr.unlock(lockInfoAgain); + return lockInfoAgain; + } catch (IOException e) { + fail("should not throw exception"); + } + return null; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + omLockOpr.unlock(lockObject); + rst.get(); + } + + @Test + public void testVolWriteWithVolBucketRWParallelLock() throws IOException, ExecutionException, InterruptedException { + OmLockOperation omLockOpr = new OmLockOperation(); + OmLockInfo.VolumeLockInfo lockInfo = new OmLockInfo.VolumeLockInfo("vol1", OmLockInfo.LockAction.WRITE); + OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(1, lockObject.getLocks().size()); + + OmLockInfo.BucketLockInfo writeLockInfo = new OmLockInfo.BucketLockInfo("vol1", + OmLockInfo.LockAction.READ, "buck1", OmLockInfo.LockAction.WRITE); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try { + OmLockOperation.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo); + omLockOpr.unlock(lockInfoAgain); + return lockInfoAgain; + } catch (IOException e) { + fail("should not throw exception"); + } + return null; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + omLockOpr.unlock(lockObject); + rst.get(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestWrappedStripedLock.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestWrappedStripedLock.java new file mode 100644 index 000000000000..4492014b29d7 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestWrappedStripedLock.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import org.junit.jupiter.api.Test; + +/** + * Test for WrappedStripedLock. + */ +public class TestWrappedStripedLock { + @Test + public void testWriteLock() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + + // check if same lock is tried to be taken, it will throw exception with timeout + Lock lock = wrappedStripedLock.writeLock("test"); + CompletableFuture rst = CompletableFuture.runAsync(() -> { + Lock[] out = new Lock[1]; + assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); + assertNull(out[0]); + }); + rst.join(); + + lock.unlock(); + } + + @Test + public void testWriteThenReadLock() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + + // check if same lock is tried to be taken, it will throw exception with timeout + Lock lock = wrappedStripedLock.writeLock("test"); + CompletableFuture rst = CompletableFuture.runAsync(() -> { + Lock[] out = new Lock[1]; + assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); + assertNull(out[0]); + }); + rst.join(); + + lock.unlock(); + } + + @Test + public void testReadThenWriteLock() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + + // check if same lock is tried to be taken, it will throw exception with timeout + Lock lock = wrappedStripedLock.readLock("test"); + CompletableFuture rst = CompletableFuture.runAsync(() -> { + Lock[] out = new Lock[1]; + assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); + assertNull(out[0]); + }); + rst.join(); + + lock.unlock(); + } + + @Test + public void testLockListOrderSame() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + List locks = new ArrayList<>(); + wrappedStripedLock.writeLock(Arrays.asList("test", "test1"), locks); + locks.forEach(Lock::unlock); + List lockReverseOrder = new ArrayList<>(); + wrappedStripedLock.writeLock(Arrays.asList("test1", "test2"), lockReverseOrder); + lockReverseOrder.forEach(Lock::unlock); + + assertEquals(locks.get(0), lockReverseOrder.get(0)); + assertEquals(locks.get(1), lockReverseOrder.get(1)); + } + + @Test + public void testReadLockListOrderSame() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + List locks = new ArrayList<>(); + wrappedStripedLock.readLock(Arrays.asList("test", "test1"), locks); + locks.forEach(Lock::unlock); + List lockReverseOrder = new ArrayList<>(); + wrappedStripedLock.readLock(Arrays.asList("test1", "test2"), lockReverseOrder); + lockReverseOrder.forEach(Lock::unlock); + + assertEquals(locks.get(0), lockReverseOrder.get(0)); + assertEquals(locks.get(1), lockReverseOrder.get(1)); + } + + @Test + public void testLockListFailureOnRelock() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + List locks = new ArrayList<>(); + wrappedStripedLock.writeLock(Arrays.asList("test", "test1"), locks); + + // test write lock failure + CompletableFuture rst = CompletableFuture.runAsync(() -> { + Lock[] out = new Lock[1]; + assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); + assertNull(out[0]); + }); + rst.join(); + + // test read lock failure + rst = CompletableFuture.runAsync(() -> { + Lock[] out = new Lock[1]; + assertDoesNotThrow(() -> out[0] = wrappedStripedLock.readLock("test1")); + assertNull(out[0]); + }); + rst.join(); + + locks.forEach(Lock::unlock); + + // verify if lock is success after unlock + Lock lock = wrappedStripedLock.readLock("test"); + lock.unlock(); + } +} From dcc371b6b5661f974025969b5c7c91dd42fd2595 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Wed, 2 Apr 2025 18:39:21 +0530 Subject: [PATCH 2/3] fix review comments --- .../hadoop/ozone/om/lock/OmLockInfo.java | 199 ++++++-------- .../hadoop/ozone/om/lock/OmLockOperation.java | 252 ------------------ .../ozone/om/lock/OmRequestGatekeeper.java | 242 +++++++++++++++++ .../ozone/om/lock/WrappedStripedLock.java | 177 ------------ ...ration.java => TestRequestGatekeeper.java} | 76 +++--- .../ozone/om/lock/TestWrappedStripedLock.java | 139 ---------- 6 files changed, 366 insertions(+), 719 deletions(-) delete mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOperation.java create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestGatekeeper.java delete mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/WrappedStripedLock.java rename hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/{TestOmLockOperation.java => TestRequestGatekeeper.java} (61%) delete mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestWrappedStripedLock.java diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java index d4be999d59f6..54cad22daca4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java @@ -17,165 +17,136 @@ package org.apache.hadoop.ozone.om.lock; -import java.util.List; +import java.util.HashSet; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import org.apache.hadoop.ozone.OzoneConsts; /** * Lock information. */ public final class OmLockInfo { - /** - * lock info provider. - */ - public interface LockInfoProvider { - default LockLevel getLevel() { - return LockLevel.NONE; - } + private final LockInfo volumeLock; + private final LockInfo bucketLock; + private final Set keyLocks; + + private OmLockInfo(Builder builder) { + volumeLock = builder.volumeLock; + bucketLock = builder.bucketLock; + keyLocks = builder.keyLocks; } - /** - * volume lock info. - */ - public static class VolumeLockInfo implements LockInfoProvider { - private final String volumeName; - private final LockAction action; - - public VolumeLockInfo(String volumeName, LockAction volLockType) { - this.volumeName = volumeName; - this.action = volLockType; - } - - public String getVolumeName() { - return volumeName; - } + public Optional getVolumeLock() { + return Optional.ofNullable(volumeLock); + } - public LockAction getAction() { - return action; - } + public Optional getBucketLock() { + return Optional.ofNullable(bucketLock); + } - @Override - public LockLevel getLevel() { - return LockLevel.VOLUME; - } + public Optional> getKeyLocks() { + return Optional.ofNullable(keyLocks); } /** - * bucket lock info. + * Builds an {@link OmLockInfo} object with optional volume, bucket or key locks. */ - public static class BucketLockInfo implements LockInfoProvider { - private final VolumeLockInfo volumeLockInfo; - private final String bucketName; - private final LockAction action; - - public BucketLockInfo(String bucketName, LockAction action) { - this.volumeLockInfo = null; - this.bucketName = bucketName; - this.action = action; - } + public static final class Builder { + private LockInfo volumeLock; + private LockInfo bucketLock; + private Set keyLocks; - public BucketLockInfo(String volumeName, LockAction volLockType, String bucketName, LockAction action) { - this.volumeLockInfo = new VolumeLockInfo(volumeName, volLockType); - this.bucketName = bucketName; - this.action = action; + public Builder() { } - public VolumeLockInfo getVolumeLockInfo() { - return volumeLockInfo; + public Builder addVolumeReadLock(String volume) { + volumeLock = LockInfo.writeLockInfo(volume); + return this; } - public String getBucketName() { - return bucketName; + public Builder addVolumeWriteLock(String volume) { + volumeLock = LockInfo.readLockInfo(volume); + return this; } - public LockAction getAction() { - return action; + public Builder addBucketReadLock(String volume, String bucket) { + bucketLock = LockInfo.readLockInfo(joinStrings(volume, bucket)); + return this; } - @Override - public LockLevel getLevel() { - return LockLevel.BUCKET; + public Builder addBucketWriteLock(String volume, String bucket) { + bucketLock = LockInfo.writeLockInfo(joinStrings(volume, bucket)); + return this; } - } - /** - * key lock info. - */ - public static class KeyLockInfo implements LockInfoProvider { - private final BucketLockInfo bucketLockInfo; - private final String key; - private final LockAction action; - - public KeyLockInfo(String bucketName, LockAction bucketAction, String keyName, LockAction keyAction) { - this.bucketLockInfo = new BucketLockInfo(bucketName, bucketAction); - this.key = keyName; - this.action = keyAction; - } - - public BucketLockInfo getBucketLockInfo() { - return bucketLockInfo; + // Currently there is no use case for key level read locks. + public Builder addKeyWriteLock(String volume, String bucket, String key) { + // Lazy init keys. + if (keyLocks == null) { + keyLocks = new HashSet<>(); + } + keyLocks.add(LockInfo.writeLockInfo(joinStrings(volume, bucket, key))); + return this; } - public String getKey() { - return key; + private String joinStrings(String... parts) { + return String.join(OzoneConsts.OZONE_URI_DELIMITER, parts); } - public LockAction getAction() { - return action; - } - - @Override - public LockLevel getLevel() { - return LockLevel.KEY; + public OmLockInfo build() { + return new OmLockInfo(this); } } /** - * multiple keys lock info. + * This class provides specifications about a lock's requirements. */ - public static class MultiKeyLockInfo implements LockInfoProvider { - private final BucketLockInfo bucketLockInfo; - private final List keyList; - private final LockAction action; + public static final class LockInfo implements Comparable { + private final String name; + private final boolean isWriteLock; + + private LockInfo(String name, boolean isWriteLock) { + this.name = name; + this.isWriteLock = isWriteLock; + } - public MultiKeyLockInfo( - String bucketName, LockAction bucketAction, List keyList, LockAction keyAction) { - this.bucketLockInfo = new BucketLockInfo(bucketName, bucketAction); - this.keyList = keyList; - this.action = keyAction; + public static LockInfo writeLockInfo(String key) { + return new LockInfo(key, true); } - public BucketLockInfo getBucketLockInfo() { - return bucketLockInfo; + public static LockInfo readLockInfo(String key) { + return new LockInfo(key, false); } - public List getKeyList() { - return keyList; + public String getName() { + return name; } - public LockAction getAction() { - return action; + public boolean isWriteLock() { + return isWriteLock; } @Override - public LockLevel getLevel() { - return LockLevel.MULTI_KEY; + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof LockInfo)) { + return false; + } + LockInfo lockInfo = (LockInfo) o; + return isWriteLock == lockInfo.isWriteLock && Objects.equals(name, lockInfo.name); } - } - /** - * way the lock should be taken. - */ - public enum LockAction { - NONE, READ, WRITE - } + @Override + public int hashCode() { + return Objects.hash(name, isWriteLock); + } - /** - * lock stage level, like volume, bucket, key. - */ - public enum LockLevel { - NONE, - VOLUME, - BUCKET, - KEY, - MULTI_KEY + @Override + public int compareTo(LockInfo other) { + return Integer.compare(hashCode(), other.hashCode()); + } } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOperation.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOperation.java deleted file mode 100644 index 956ffc10e0ce..000000000000 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOperation.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.om.lock; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.locks.Lock; -import org.apache.hadoop.ozone.om.exceptions.OMException; -import org.apache.hadoop.util.Time; -import org.apache.ratis.util.function.CheckedBiFunction; -import org.apache.ratis.util.function.CheckedFunction; - -/** - * Manage locking of volume, bucket, keys and others. - */ -public class OmLockOperation { - private static final long LOCK_TIMEOUT_DEFAULT = 10 * 60 * 1000; - - private final WrappedStripedLock keyLocking; - private final WrappedStripedLock bucketLocking; - private final WrappedStripedLock volumeLocking; - - public OmLockOperation() { - keyLocking = new WrappedStripedLock(102400, LOCK_TIMEOUT_DEFAULT, false); - bucketLocking = new WrappedStripedLock(1024, LOCK_TIMEOUT_DEFAULT, false); - volumeLocking = new WrappedStripedLock(1024, LOCK_TIMEOUT_DEFAULT, false); - } - - public OmLockObject lock(OmLockInfo.LockInfoProvider lockInfo) throws IOException { - OmLockObject omLockObject = new OmLockObject(lockInfo); - long startTime = Time.monotonicNowNanos(); - try { - OmLockInfo.LockLevel level = lockInfo.getLevel(); - switch (level) { - case VOLUME: - volumeLock((OmLockInfo.VolumeLockInfo) lockInfo, omLockObject); - break; - case BUCKET: - bucketLock((OmLockInfo.BucketLockInfo) lockInfo, omLockObject); - break; - case KEY: - keyLock((OmLockInfo.KeyLockInfo) lockInfo, omLockObject); - break; - case MULTI_KEY: - multiKeyLock((OmLockInfo.MultiKeyLockInfo) lockInfo, omLockObject); - break; - default: - throw new OMException("Unsupported lock level", OMException.ResultCodes.INTERNAL_ERROR); - } - lockStatsBegin(omLockObject.getLockStats(), Time.monotonicNowNanos(), startTime); - } catch (IOException e) { - unlock(omLockObject); - throw e; - } - return omLockObject; - } - - private void volumeLock(OmLockInfo.VolumeLockInfo lockInfo, OmLockObject omLockObject) throws IOException { - List locks = omLockObject.getLocks(); - if (lockInfo.getAction() == OmLockInfo.LockAction.READ) { - locks.add(getLock(volumeLocking::readLock, lockInfo.getVolumeName())); - } else if (lockInfo.getAction() == OmLockInfo.LockAction.WRITE) { - locks.add(getLock(volumeLocking::writeLock, lockInfo.getVolumeName())); - omLockObject.setLockStatType(OmLockStats.Type.WRITE); - } - } - - private void bucketLock(OmLockInfo.BucketLockInfo lockInfo, OmLockObject omLockObject) throws IOException { - if (null != lockInfo.getVolumeLockInfo()) { - volumeLock(lockInfo.getVolumeLockInfo(), omLockObject); - } - List locks = omLockObject.getLocks(); - if (lockInfo.getAction() == OmLockInfo.LockAction.READ) { - locks.add(getLock(bucketLocking::readLock, lockInfo.getBucketName())); - } else if (lockInfo.getAction() == OmLockInfo.LockAction.WRITE) { - locks.add(getLock(bucketLocking::writeLock, lockInfo.getBucketName())); - omLockObject.setLockStatType(OmLockStats.Type.WRITE); - } - } - - private void keyLock(OmLockInfo.KeyLockInfo lockInfo, OmLockObject omLockObject) throws IOException { - bucketLock(lockInfo.getBucketLockInfo(), omLockObject); - List locks = omLockObject.getLocks(); - if (lockInfo.getAction() == OmLockInfo.LockAction.READ) { - locks.add(getLock(keyLocking::readLock, lockInfo.getKey())); - } else if (lockInfo.getAction() == OmLockInfo.LockAction.WRITE) { - locks.add(getLock(keyLocking::writeLock, lockInfo.getKey())); - omLockObject.setLockStatType(OmLockStats.Type.WRITE); - } - } - - private void multiKeyLock(OmLockInfo.MultiKeyLockInfo lockInfo, OmLockObject omLockObject) throws IOException { - bucketLock(lockInfo.getBucketLockInfo(), omLockObject); - List locks = omLockObject.getLocks(); - if (lockInfo.getAction() == OmLockInfo.LockAction.READ) { - getLock(keyLocking::readLock, lockInfo.getKeyList(), locks); - } else if (lockInfo.getAction() == OmLockInfo.LockAction.WRITE) { - getLock(keyLocking::writeLock, lockInfo.getKeyList(), locks); - omLockObject.setLockStatType(OmLockStats.Type.WRITE); - } - } - - private static Lock getLock( - CheckedFunction lockFunction, String name) throws OMException { - try { - Lock lockObj = lockFunction.apply(name); - if (lockObj == null) { - throw new OMException("Unable to get lock for " + name + ", timeout occurred", OMException.ResultCodes.TIMEOUT); - } - return lockObj; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new OMException("waiting for lock is interrupted for " + name, OMException.ResultCodes.INTERNAL_ERROR); - } - } - - private static void getLock( - CheckedBiFunction, List, Boolean, InterruptedException> lockFunction, - List lockKeys, List lockList) throws OMException { - try { - if (!lockFunction.apply(lockKeys, lockList)) { - throw new OMException("Unable to get locks, timeout occurred", OMException.ResultCodes.TIMEOUT); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new OMException("waiting for locks is interrupted", OMException.ResultCodes.INTERNAL_ERROR); - } - } - - public void unlock(OmLockObject lockObject) { - Collections.reverse(lockObject.getLocks()); - lockObject.getLocks().forEach(Lock::unlock); - lockStatsEnd(lockObject.getLockStats(), lockObject.getLockStatType()); - lockObject.getLocks().clear(); - } - - private static void lockStatsBegin(OmLockStats lockStats, long endTime, long startTime) { - lockStats.add(endTime - startTime, OmLockStats.Type.WAIT); - lockStats.setLockStartTime(endTime); - } - - private static void lockStatsEnd(OmLockStats lockStats, OmLockStats.Type type) { - if (lockStats.getLockStartTime() > 0) { - lockStats.add(Time.monotonicNowNanos() - lockStats.getLockStartTime(), type); - } - } - - /** - * Lock information. - */ - public static class OmLockObject { - private final OmLockInfo.LockInfoProvider lockInfoProvider; - private final List locks = new ArrayList<>(); - private final OmLockStats lockStats = new OmLockStats(); - private OmLockStats.Type lockStatType = OmLockStats.Type.READ; - - public OmLockObject(OmLockInfo.LockInfoProvider lockInfoProvider) { - this.lockInfoProvider = lockInfoProvider; - } - - public List getLocks() { - return locks; - } - - public OmLockStats getLockStats() { - return lockStats; - } - - public OmLockStats.Type getLockStatType() { - return lockStatType; - } - - public void setLockStatType(OmLockStats.Type lockStatType) { - this.lockStatType = lockStatType; - } - - public OmLockInfo.LockInfoProvider getLockInfoProvider() { - return lockInfoProvider; - } - } - - /** - * lock stats. - */ - public static class OmLockStats { - private long lockStartTime; - private long waitLockNanos; - private long readLockNanos; - private long writeLockNanos; - - public long getLockStartTime() { - return lockStartTime; - } - - public void setLockStartTime(long lockStartTime) { - this.lockStartTime = lockStartTime; - } - - public long getWaitLockNanos() { - return waitLockNanos; - } - - public long getReadLockNanos() { - return readLockNanos; - } - - public long getWriteLockNanos() { - return writeLockNanos; - } - - void add(long timeNanos, Type type) { - switch (type) { - case WAIT: - waitLockNanos += timeNanos; - break; - case READ: - readLockNanos += timeNanos; - break; - case WRITE: - writeLockNanos += timeNanos; - break; - default: - } - } - - /** - * lock time stat type. - */ - public enum Type { - WAIT, - READ, - WRITE - } - } -} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestGatekeeper.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestGatekeeper.java new file mode 100644 index 000000000000..cd45fa9e5096 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestGatekeeper.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import com.google.common.util.concurrent.Striped; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import org.apache.hadoop.hdds.utils.SimpleStriped; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.util.Time; + +/** + * Manage locking of volume, bucket, keys and others. + */ +public class OmRequestGatekeeper { + private static final long LOCK_TIMEOUT_DEFAULT = 10 * 60 * 1000; + private static final int NUM_VOLUME_STRIPES = 1024; + private static final int NUM_BUCKET_STRIPES = 1024; + private static final int NUM_KEY_STRIPES = 4096; + + private final Striped volumeLocks; + private final Striped bucketLocks; + private final Striped keyLocks; + + public OmRequestGatekeeper() { + volumeLocks = SimpleStriped.readWriteLock(NUM_VOLUME_STRIPES, false); + bucketLocks = SimpleStriped.readWriteLock(NUM_BUCKET_STRIPES, false); + keyLocks = SimpleStriped.readWriteLock(NUM_KEY_STRIPES, false); + } + + public OmLockObject lock(OmLockInfo lockInfo) throws IOException { + OmLockObject omLockObject = new OmLockObject(lockInfo); + List locks = omLockObject.getLocks(); + long startTime = Time.monotonicNowNanos(); + Optional optionalVolumeLock = lockInfo.getVolumeLock(); + Optional optionalBucketLock = lockInfo.getBucketLock(); + Optional> optionalKeyLocks = lockInfo.getKeyLocks(); + + if (optionalVolumeLock.isPresent()) { + OmLockInfo.LockInfo volumeLockInfo = optionalVolumeLock.get(); + if (volumeLockInfo.isWriteLock()) { + omLockObject.setLockStatType(OmLockStats.Type.WRITE); + locks.add(volumeLocks.get(volumeLockInfo.getName()).writeLock()); + } else { + locks.add(volumeLocks.get(volumeLockInfo.getName()).readLock()); + } + } + + if (optionalBucketLock.isPresent()) { + OmLockInfo.LockInfo bucketLockInfo = optionalBucketLock.get(); + if (bucketLockInfo.isWriteLock()) { + omLockObject.setLockStatType(OmLockStats.Type.WRITE); + locks.add(bucketLocks.get(bucketLockInfo.getName()).writeLock()); + } else { + locks.add(bucketLocks.get(bucketLockInfo.getName()).readLock()); + } + } + + if (optionalKeyLocks.isPresent()) { + for (ReadWriteLock keyLock: keyLocks.bulkGet(optionalKeyLocks.get())) { + omLockObject.setLockStatType(OmLockStats.Type.WRITE); + locks.add(keyLock.writeLock()); + } + } + + try { + acquireLocks(locks); + lockStatsBegin(omLockObject.getLockStats(), Time.monotonicNowNanos(), startTime); + } catch (InterruptedException e) { + locks.clear(); + Thread.currentThread().interrupt(); + throw new OMException("waiting for locks is interrupted", OMException.ResultCodes.INTERNAL_ERROR); + } catch (TimeoutException e) { + locks.clear(); + throw new OMException("Unable to get locks, timeout occurred", OMException.ResultCodes.TIMEOUT); + } + return omLockObject; + } + + /* + Optional: If we want more diagnostic info on the type of lock that failed to be acquired (volume, bucket, or key), + We can make the parameter a list of objects that wrap the Lock with information about its type. + Note that logging the specific volume, bucket or keys this lock was trying to acquire is not helpful and + misleading because collisions within the stripe lock might mean we are blocked on a request for a completely + different part of the namespace. + Obtaining the thread ID that we were waiting on would be more useful, but there is no easy way to do that. + */ + private void acquireLocks(List locks) throws TimeoutException, InterruptedException { + List acquiredLocks = new ArrayList<>(locks.size()); + for (Lock lock: locks) { + if (lock.tryLock(LOCK_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS)) { + try { + acquiredLocks.add(lock); + } catch (Throwable e) { + // We acquired this lock but were unable to add it to our acquired locks list. + lock.unlock(); + releaseLocks(acquiredLocks); + throw e; + } + } else { + releaseLocks(acquiredLocks); + throw new TimeoutException("Failed to acquire lock after the given timeout."); + } + } + } + + public void unlock(OmLockObject lockObject) { + releaseLocks(lockObject.getLocks()); + lockStatsEnd(lockObject.getLockStats(), lockObject.getLockStatType()); + lockObject.getLocks().clear(); + } + + private void releaseLocks(List locks) { + ListIterator reverseIterator = locks.listIterator(locks.size()); + while (reverseIterator.hasPrevious()) { + Lock lock = reverseIterator.previous(); + lock.unlock(); + } + } + + private static void lockStatsBegin(OmLockStats lockStats, long endTime, long startTime) { + lockStats.add(endTime - startTime, OmLockStats.Type.WAIT); + lockStats.setLockStartTime(endTime); + } + + private static void lockStatsEnd(OmLockStats lockStats, OmLockStats.Type type) { + if (lockStats.getLockStartTime() > 0) { + lockStats.add(Time.monotonicNowNanos() - lockStats.getLockStartTime(), type); + } + } + + /** + * Lock information after taking locks. + */ + public static class OmLockObject { + private final OmLockInfo omLockInfo; + private final List locks = new ArrayList<>(); + private final OmLockStats lockStats = new OmLockStats(); + private OmLockStats.Type lockStatType = OmLockStats.Type.READ; + + public OmLockObject(OmLockInfo lockInfoProvider) { + this.omLockInfo = lockInfoProvider; + } + + public List getLocks() { + return locks; + } + + public OmLockStats getLockStats() { + return lockStats; + } + + public OmLockStats.Type getLockStatType() { + return lockStatType; + } + + public void setLockStatType(OmLockStats.Type lockStatType) { + this.lockStatType = lockStatType; + } + + public OmLockInfo getOmLockInfo() { + return omLockInfo; + } + } + + /** + * lock stats. + */ + public static class OmLockStats { + private long lockStartTime; + private long waitLockNanos; + private long readLockNanos; + private long writeLockNanos; + + public long getLockStartTime() { + return lockStartTime; + } + + public void setLockStartTime(long lockStartTime) { + this.lockStartTime = lockStartTime; + } + + public long getWaitLockNanos() { + return waitLockNanos; + } + + public long getReadLockNanos() { + return readLockNanos; + } + + public long getWriteLockNanos() { + return writeLockNanos; + } + + void add(long timeNanos, Type type) { + switch (type) { + case WAIT: + waitLockNanos += timeNanos; + break; + case READ: + readLockNanos += timeNanos; + break; + case WRITE: + writeLockNanos += timeNanos; + break; + default: + } + } + + /** + * lock time stat type. + */ + public enum Type { + WAIT, + READ, + WRITE + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/WrappedStripedLock.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/WrappedStripedLock.java deleted file mode 100644 index b60e60c92d2e..000000000000 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/WrappedStripedLock.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.om.lock; - -import com.google.common.util.concurrent.Striped; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import org.apache.hadoop.hdds.utils.SimpleStriped; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Locking class that wraps Striped locking with timeout and logging. It also provides ordering while multiple locks. - */ -public class WrappedStripedLock { - private static final Logger LOG = LoggerFactory.getLogger(WrappedStripedLock.class); - private final Striped fileStripedLock; - private final long lockTimeout; - - public WrappedStripedLock(int stripLockSize, long timeout, boolean fair) { - fileStripedLock = SimpleStriped.readWriteLock(stripLockSize, fair); - lockTimeout = timeout; - } - - /** - * lock the list of keys in order. - * Sample code for lock and unlock handling: - * - * try { - * if (!wrappedStripedLock.lock(keyList, locks)) { - * // timeout occurred, release lock if any in reverse order - * Collections.reverse(locks); - * locks.forEach(Lock::unlock); - * } - * // perform business logic - * } finally { - * // to be released in reverse order - * Collections.reverse(locks); - * locks.forEach(Lock::unlock); - * } - * - * - * @param keyList key list which needs to be locked - * @param locks successful lock object returned which will be used to release lock - * @return boolean true if success, else false - * @throws InterruptedException exception on interrupt - */ - public boolean writeLock(List keyList, List locks) throws InterruptedException { - try { - Iterable readWriteLocks = fileStripedLock.bulkGet(keyList); - for (ReadWriteLock rwLock : readWriteLocks) { - Lock lockObj = rwLock.writeLock(); - boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); - if (!b) { - LOG.error("Write lock for keys are failed for the instance {} after wait of {}ms, read lock info: {}", this, - lockTimeout, rwLock.readLock()); - return false; - } - locks.add(lockObj); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Write lock for keys are interrupted for the instance {}", this); - throw e; - } - return true; - } - - /** - * lock single key. - * @param key object for which lock to be taken - * @return lock object to be used to release lock, null if unable to take lock due to timeout - * @throws InterruptedException exception on interrupt - */ - public Lock writeLock(String key) throws InterruptedException { - LOG.debug("Key {} is locked for instance {} {}", key, this, fileStripedLock.get(key)); - try { - Lock lockObj = fileStripedLock.get(key).writeLock(); - boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); - if (!b) { - LOG.error("Write lock for the key is failed for the instance {} after wait of {}ms, read lock info: {}", this, - lockTimeout, fileStripedLock.get(key).readLock()); - return null; - } - return lockObj; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Write lock for the key is interrupted for the instance {}", this); - throw e; - } - } - - /** - * lock the list of keys in order. - * Sample code for lock and unlock handling: - * - * try { - * if (!wrappedStripedLock.lock(keyList, locks)) { - * // timeout occurred, release lock if any in reverse order - * Collections.reverse(locks); - * locks.forEach(Lock::unlock); - * } - * // perform business logic - * } finally { - * // to be released in reverse order - * Collections.reverse(locks); - * locks.forEach(Lock::unlock); - * } - * - * - * @param keyList key list which needs to be locked - * @param locks successful lock object returned which will be used to release lock - * @return boolean true if success, else false - * @throws InterruptedException exception on interrupt - */ - public boolean readLock(List keyList, List locks) throws InterruptedException { - try { - Iterable readWriteLocks = fileStripedLock.bulkGet(keyList); - for (ReadWriteLock rwLock : readWriteLocks) { - Lock lockObj = rwLock.readLock(); - boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); - if (!b) { - LOG.error("Read lock for keys are failed for the instance {} after wait of {}ms, write lock info: {}", this, - lockTimeout, rwLock.writeLock()); - return false; - } - locks.add(lockObj); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Read lock for keys are interrupted for the instance {}", this); - throw e; - } - return true; - } - - /** - * read lock single key. - * @param key object for which lock to be taken - * @return lock object to be used to release lock, null if unable to take lock due to timeout - * @throws InterruptedException exception on interrupt - */ - public Lock readLock(String key) throws InterruptedException { - try { - LOG.debug("Key {} is read locked for instance {} {}", key, this, fileStripedLock.get(key)); - Lock lockObj = fileStripedLock.get(key).readLock(); - boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); - if (!b) { - LOG.error("Read lock for the key is failed for the instance {} after wait of {}ms, write lock info: {}", this, - lockTimeout, fileStripedLock.get(key).readLock()); - return null; - } - return lockObj; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Read lock for the key is interrupted for the instance {}", this); - throw e; - } - } -} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOperation.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestRequestGatekeeper.java similarity index 61% rename from hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOperation.java rename to hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestRequestGatekeeper.java index e840d7df3fd3..5ce80ad28bfb 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOperation.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestRequestGatekeeper.java @@ -22,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; -import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -30,21 +29,22 @@ import org.junit.jupiter.api.Test; /** - * Test for TestOmLockOperation. + * Test for OmRequestGatekeeper. */ -public class TestOmLockOperation { +public class TestRequestGatekeeper { @Test public void testObsLockOprWithParallelLock() throws IOException, ExecutionException, InterruptedException { - OmLockOperation omLockOpr = new OmLockOperation(); - OmLockInfo.KeyLockInfo lockInfo = - new OmLockInfo.KeyLockInfo("bucket", OmLockInfo.LockAction.READ, "testkey", OmLockInfo.LockAction.WRITE); - OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + OmRequestGatekeeper omLockOpr = new OmRequestGatekeeper(); + OmLockInfo lockInfo = new OmLockInfo.Builder() + .addBucketReadLock("vol", "bucket") + .addKeyWriteLock("vol", "bucket", "testkey").build(); + OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo); assertEquals(2, lockObject.getLocks().size()); - CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { try { - OmLockOperation.OmLockObject lockInfoAgain = omLockOpr.lock(lockInfo); + OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(lockInfo); omLockOpr.unlock(lockInfoAgain); return lockInfoAgain; } catch (IOException e) { @@ -68,11 +68,12 @@ public void testObsLockOprWithParallelLock() throws IOException, ExecutionExcept @Test public void testObsLockOprListKeyRepeated() throws IOException { - OmLockOperation omLockOpr = new OmLockOperation(); - OmLockInfo.MultiKeyLockInfo lockInfo = - new OmLockInfo.MultiKeyLockInfo("bucket", OmLockInfo.LockAction.READ, - Arrays.asList("testkey", "testkey2"), OmLockInfo.LockAction.WRITE); - OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + OmRequestGatekeeper omLockOpr = new OmRequestGatekeeper(); + OmLockInfo lockInfo = new OmLockInfo.Builder() + .addBucketReadLock("vol", "bucket") + .addKeyWriteLock("vol", "bucket", "testkey") + .addKeyWriteLock("vol", "bucket", "testkey2").build(); + OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo); assertEquals(3, lockObject.getLocks().size()); omLockOpr.unlock(lockObject); @@ -84,9 +85,9 @@ public void testObsLockOprListKeyRepeated() throws IOException { @Test public void testBucketReadLock() throws IOException { - OmLockOperation omLockOpr = new OmLockOperation(); - OmLockInfo.BucketLockInfo lockInfo = new OmLockInfo.BucketLockInfo("bucket", OmLockInfo.LockAction.READ); - OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + OmRequestGatekeeper omLockOpr = new OmRequestGatekeeper(); + OmLockInfo lockInfo = new OmLockInfo.Builder().addBucketReadLock("vol", "bucket").build(); + OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo); assertEquals(1, lockObject.getLocks().size()); omLockOpr.unlock(lockObject); @@ -94,16 +95,16 @@ public void testBucketReadLock() throws IOException { @Test public void testBucketReadWithWriteParallelLock() throws IOException, ExecutionException, InterruptedException { - OmLockOperation omLockOpr = new OmLockOperation(); - OmLockInfo.BucketLockInfo lockInfo = new OmLockInfo.BucketLockInfo("bucket", OmLockInfo.LockAction.READ); - OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + OmRequestGatekeeper omLockOpr = new OmRequestGatekeeper(); + OmLockInfo lockInfo = new OmLockInfo.Builder().addBucketReadLock("vol", "bucket").build(); + OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo); assertEquals(1, lockObject.getLocks().size()); - OmLockInfo.BucketLockInfo writeLockInfo = new OmLockInfo.BucketLockInfo("bucket", OmLockInfo.LockAction.WRITE); + OmLockInfo writeLockInfo = new OmLockInfo.Builder().addBucketWriteLock("vol", "bucket").build(); - CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { try { - OmLockOperation.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo); + OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo); omLockOpr.unlock(lockInfoAgain); return lockInfoAgain; } catch (IOException e) { @@ -127,16 +128,16 @@ public void testBucketReadWithWriteParallelLock() throws IOException, ExecutionE @Test public void testVolumeReadWithWriteParallelLock() throws IOException, ExecutionException, InterruptedException { - OmLockOperation omLockOpr = new OmLockOperation(); - OmLockInfo.VolumeLockInfo lockInfo = new OmLockInfo.VolumeLockInfo("vol1", OmLockInfo.LockAction.READ); - OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); - assertEquals(1, lockObject.getLocks().size()); + OmRequestGatekeeper omLockOpr = new OmRequestGatekeeper(); + OmLockInfo lockInfo = new OmLockInfo.Builder().addVolumeReadLock("vol").build(); - OmLockInfo.VolumeLockInfo writeLockInfo = new OmLockInfo.VolumeLockInfo("vol1", OmLockInfo.LockAction.WRITE); + OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(1, lockObject.getLocks().size()); - CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + OmLockInfo writeLockInfo = new OmLockInfo.Builder().addVolumeWriteLock("vol").build(); + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { try { - OmLockOperation.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo); + OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo); omLockOpr.unlock(lockInfoAgain); return lockInfoAgain; } catch (IOException e) { @@ -160,17 +161,18 @@ public void testVolumeReadWithWriteParallelLock() throws IOException, ExecutionE @Test public void testVolWriteWithVolBucketRWParallelLock() throws IOException, ExecutionException, InterruptedException { - OmLockOperation omLockOpr = new OmLockOperation(); - OmLockInfo.VolumeLockInfo lockInfo = new OmLockInfo.VolumeLockInfo("vol1", OmLockInfo.LockAction.WRITE); - OmLockOperation.OmLockObject lockObject = omLockOpr.lock(lockInfo); + OmRequestGatekeeper omLockOpr = new OmRequestGatekeeper(); + OmLockInfo lockInfo = new OmLockInfo.Builder().addVolumeWriteLock("vol").build(); + + OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo); assertEquals(1, lockObject.getLocks().size()); - OmLockInfo.BucketLockInfo writeLockInfo = new OmLockInfo.BucketLockInfo("vol1", - OmLockInfo.LockAction.READ, "buck1", OmLockInfo.LockAction.WRITE); + OmLockInfo writeLockInfo = new OmLockInfo.Builder().addVolumeReadLock("vol") + .addBucketWriteLock("vol", "buck1").build(); - CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { try { - OmLockOperation.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo); + OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo); omLockOpr.unlock(lockInfoAgain); return lockInfoAgain; } catch (IOException e) { diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestWrappedStripedLock.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestWrappedStripedLock.java deleted file mode 100644 index 4492014b29d7..000000000000 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestWrappedStripedLock.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.om.lock; - -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.locks.Lock; -import org.junit.jupiter.api.Test; - -/** - * Test for WrappedStripedLock. - */ -public class TestWrappedStripedLock { - @Test - public void testWriteLock() throws InterruptedException { - WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); - - // check if same lock is tried to be taken, it will throw exception with timeout - Lock lock = wrappedStripedLock.writeLock("test"); - CompletableFuture rst = CompletableFuture.runAsync(() -> { - Lock[] out = new Lock[1]; - assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); - assertNull(out[0]); - }); - rst.join(); - - lock.unlock(); - } - - @Test - public void testWriteThenReadLock() throws InterruptedException { - WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); - - // check if same lock is tried to be taken, it will throw exception with timeout - Lock lock = wrappedStripedLock.writeLock("test"); - CompletableFuture rst = CompletableFuture.runAsync(() -> { - Lock[] out = new Lock[1]; - assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); - assertNull(out[0]); - }); - rst.join(); - - lock.unlock(); - } - - @Test - public void testReadThenWriteLock() throws InterruptedException { - WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); - - // check if same lock is tried to be taken, it will throw exception with timeout - Lock lock = wrappedStripedLock.readLock("test"); - CompletableFuture rst = CompletableFuture.runAsync(() -> { - Lock[] out = new Lock[1]; - assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); - assertNull(out[0]); - }); - rst.join(); - - lock.unlock(); - } - - @Test - public void testLockListOrderSame() throws InterruptedException { - WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); - List locks = new ArrayList<>(); - wrappedStripedLock.writeLock(Arrays.asList("test", "test1"), locks); - locks.forEach(Lock::unlock); - List lockReverseOrder = new ArrayList<>(); - wrappedStripedLock.writeLock(Arrays.asList("test1", "test2"), lockReverseOrder); - lockReverseOrder.forEach(Lock::unlock); - - assertEquals(locks.get(0), lockReverseOrder.get(0)); - assertEquals(locks.get(1), lockReverseOrder.get(1)); - } - - @Test - public void testReadLockListOrderSame() throws InterruptedException { - WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); - List locks = new ArrayList<>(); - wrappedStripedLock.readLock(Arrays.asList("test", "test1"), locks); - locks.forEach(Lock::unlock); - List lockReverseOrder = new ArrayList<>(); - wrappedStripedLock.readLock(Arrays.asList("test1", "test2"), lockReverseOrder); - lockReverseOrder.forEach(Lock::unlock); - - assertEquals(locks.get(0), lockReverseOrder.get(0)); - assertEquals(locks.get(1), lockReverseOrder.get(1)); - } - - @Test - public void testLockListFailureOnRelock() throws InterruptedException { - WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); - List locks = new ArrayList<>(); - wrappedStripedLock.writeLock(Arrays.asList("test", "test1"), locks); - - // test write lock failure - CompletableFuture rst = CompletableFuture.runAsync(() -> { - Lock[] out = new Lock[1]; - assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); - assertNull(out[0]); - }); - rst.join(); - - // test read lock failure - rst = CompletableFuture.runAsync(() -> { - Lock[] out = new Lock[1]; - assertDoesNotThrow(() -> out[0] = wrappedStripedLock.readLock("test1")); - assertNull(out[0]); - }); - rst.join(); - - locks.forEach(Lock::unlock); - - // verify if lock is success after unlock - Lock lock = wrappedStripedLock.readLock("test"); - lock.unlock(); - } -} From 247d0c2d8225f702fed1f83bb5ece29c6f160b99 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Fri, 4 Apr 2025 10:50:28 +0530 Subject: [PATCH 3/3] fix review comments --- .../hadoop/ozone/om/lock/OmLockInfo.java | 20 ++++ .../ozone/om/lock/OmRequestGatekeeper.java | 107 +++++++----------- .../ozone/om/lock/TestRequestGatekeeper.java | 68 +++++------ 3 files changed, 93 insertions(+), 102 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java index 54cad22daca4..17a287ffb09a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java @@ -49,6 +49,21 @@ public Optional> getKeyLocks() { return Optional.ofNullable(keyLocks); } + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + if (volumeLock != null) { + sb.append("Volume:").append(volumeLock); + } + if (bucketLock != null) { + sb.append("Bucket:").append(bucketLock); + } + if (keyLocks != null) { + sb.append("Keys:").append(keyLocks); + } + return sb.toString(); + } + /** * Builds an {@link OmLockInfo} object with optional volume, bucket or key locks. */ @@ -148,5 +163,10 @@ public int hashCode() { public int compareTo(LockInfo other) { return Integer.compare(hashCode(), other.hashCode()); } + + @Override + public String toString() { + return "LockInfo{" + "name=" + name + ", isWriteLock=" + isWriteLock + '}'; + } } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestGatekeeper.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestGatekeeper.java index cd45fa9e5096..71d551093566 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestGatekeeper.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestGatekeeper.java @@ -21,9 +21,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.ListIterator; import java.util.Optional; import java.util.Set; +import java.util.Stack; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; @@ -53,16 +53,16 @@ public OmRequestGatekeeper() { public OmLockObject lock(OmLockInfo lockInfo) throws IOException { OmLockObject omLockObject = new OmLockObject(lockInfo); - List locks = omLockObject.getLocks(); long startTime = Time.monotonicNowNanos(); Optional optionalVolumeLock = lockInfo.getVolumeLock(); Optional optionalBucketLock = lockInfo.getBucketLock(); Optional> optionalKeyLocks = lockInfo.getKeyLocks(); + List locks = new ArrayList<>(); if (optionalVolumeLock.isPresent()) { OmLockInfo.LockInfo volumeLockInfo = optionalVolumeLock.get(); if (volumeLockInfo.isWriteLock()) { - omLockObject.setLockStatType(OmLockStats.Type.WRITE); + omLockObject.setReadStatsType(false); locks.add(volumeLocks.get(volumeLockInfo.getName()).writeLock()); } else { locks.add(volumeLocks.get(volumeLockInfo.getName()).readLock()); @@ -72,7 +72,7 @@ public OmLockObject lock(OmLockInfo lockInfo) throws IOException { if (optionalBucketLock.isPresent()) { OmLockInfo.LockInfo bucketLockInfo = optionalBucketLock.get(); if (bucketLockInfo.isWriteLock()) { - omLockObject.setLockStatType(OmLockStats.Type.WRITE); + omLockObject.setReadStatsType(false); locks.add(bucketLocks.get(bucketLockInfo.getName()).writeLock()); } else { locks.add(bucketLocks.get(bucketLockInfo.getName()).readLock()); @@ -81,35 +81,24 @@ public OmLockObject lock(OmLockInfo lockInfo) throws IOException { if (optionalKeyLocks.isPresent()) { for (ReadWriteLock keyLock: keyLocks.bulkGet(optionalKeyLocks.get())) { - omLockObject.setLockStatType(OmLockStats.Type.WRITE); + omLockObject.setReadStatsType(false); locks.add(keyLock.writeLock()); } } try { - acquireLocks(locks); + acquireLocks(locks, omLockObject.getLocks()); lockStatsBegin(omLockObject.getLockStats(), Time.monotonicNowNanos(), startTime); } catch (InterruptedException e) { - locks.clear(); Thread.currentThread().interrupt(); - throw new OMException("waiting for locks is interrupted", OMException.ResultCodes.INTERNAL_ERROR); + throw new OMException("Waiting for locks is interrupted, " + lockInfo, OMException.ResultCodes.INTERNAL_ERROR); } catch (TimeoutException e) { - locks.clear(); - throw new OMException("Unable to get locks, timeout occurred", OMException.ResultCodes.TIMEOUT); + throw new OMException("Timeout occurred for locks " + lockInfo, OMException.ResultCodes.TIMEOUT); } return omLockObject; } - /* - Optional: If we want more diagnostic info on the type of lock that failed to be acquired (volume, bucket, or key), - We can make the parameter a list of objects that wrap the Lock with information about its type. - Note that logging the specific volume, bucket or keys this lock was trying to acquire is not helpful and - misleading because collisions within the stripe lock might mean we are blocked on a request for a completely - different part of the namespace. - Obtaining the thread ID that we were waiting on would be more useful, but there is no easy way to do that. - */ - private void acquireLocks(List locks) throws TimeoutException, InterruptedException { - List acquiredLocks = new ArrayList<>(locks.size()); + private void acquireLocks(List locks, Stack acquiredLocks) throws TimeoutException, InterruptedException { for (Lock lock: locks) { if (lock.tryLock(LOCK_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS)) { try { @@ -127,45 +116,41 @@ private void acquireLocks(List locks) throws TimeoutException, Interrupted } } - public void unlock(OmLockObject lockObject) { - releaseLocks(lockObject.getLocks()); - lockStatsEnd(lockObject.getLockStats(), lockObject.getLockStatType()); - lockObject.getLocks().clear(); - } - - private void releaseLocks(List locks) { - ListIterator reverseIterator = locks.listIterator(locks.size()); - while (reverseIterator.hasPrevious()) { - Lock lock = reverseIterator.previous(); - lock.unlock(); + private void releaseLocks(Stack locks) { + while (!locks.empty()) { + locks.pop().unlock(); } } private static void lockStatsBegin(OmLockStats lockStats, long endTime, long startTime) { - lockStats.add(endTime - startTime, OmLockStats.Type.WAIT); + lockStats.addWaitLockNanos(endTime - startTime); lockStats.setLockStartTime(endTime); } - private static void lockStatsEnd(OmLockStats lockStats, OmLockStats.Type type) { + private static void lockStatsEnd(OmLockStats lockStats, boolean readStatsType) { if (lockStats.getLockStartTime() > 0) { - lockStats.add(Time.monotonicNowNanos() - lockStats.getLockStartTime(), type); + if (readStatsType) { + lockStats.addReadLockNanos(Time.monotonicNowNanos() - lockStats.getLockStartTime()); + } else { + lockStats.addWriteLockNanos(Time.monotonicNowNanos() - lockStats.getLockStartTime()); + } } } /** - * Lock information after taking locks. + * Lock information after taking locks, and to be used to release locks. */ - public static class OmLockObject { + public static class OmLockObject implements AutoCloseable { private final OmLockInfo omLockInfo; - private final List locks = new ArrayList<>(); + private final Stack locks = new Stack<>(); private final OmLockStats lockStats = new OmLockStats(); - private OmLockStats.Type lockStatType = OmLockStats.Type.READ; + private boolean readStatsType = true; public OmLockObject(OmLockInfo lockInfoProvider) { this.omLockInfo = lockInfoProvider; } - public List getLocks() { + public Stack getLocks() { return locks; } @@ -173,17 +158,25 @@ public OmLockStats getLockStats() { return lockStats; } - public OmLockStats.Type getLockStatType() { - return lockStatType; + public void setReadStatsType(boolean readStatsType) { + this.readStatsType = readStatsType; } - public void setLockStatType(OmLockStats.Type lockStatType) { - this.lockStatType = lockStatType; + public boolean getReadStatsType() { + return readStatsType; } public OmLockInfo getOmLockInfo() { return omLockInfo; } + + @Override + public void close() throws IOException { + while (!locks.empty()) { + locks.pop().unlock(); + } + lockStatsEnd(lockStats, readStatsType); + } } /** @@ -215,28 +208,16 @@ public long getWriteLockNanos() { return writeLockNanos; } - void add(long timeNanos, Type type) { - switch (type) { - case WAIT: - waitLockNanos += timeNanos; - break; - case READ: - readLockNanos += timeNanos; - break; - case WRITE: - writeLockNanos += timeNanos; - break; - default: - } + void addWaitLockNanos(long timeNanos) { + waitLockNanos += timeNanos; + } + + void addReadLockNanos(long timeNanos) { + readLockNanos += timeNanos; } - /** - * lock time stat type. - */ - public enum Type { - WAIT, - READ, - WRITE + void addWriteLockNanos(long timeNanos) { + writeLockNanos += timeNanos; } } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestRequestGatekeeper.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestRequestGatekeeper.java index 5ce80ad28bfb..1cdd87daa24d 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestRequestGatekeeper.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestRequestGatekeeper.java @@ -42,15 +42,13 @@ public void testObsLockOprWithParallelLock() throws IOException, ExecutionExcept OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo); assertEquals(2, lockObject.getLocks().size()); - CompletableFuture rst = CompletableFuture.supplyAsync(() -> { - try { - OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(lockInfo); - omLockOpr.unlock(lockInfoAgain); - return lockInfoAgain; + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try (OmRequestGatekeeper.OmLockObject ignored = omLockOpr.lock(lockInfo)) { + return true; } catch (IOException e) { fail("should not throw exception"); } - return null; + return false; }); // parallel lock wait should fail as previous lock not released @@ -62,7 +60,7 @@ public void testObsLockOprWithParallelLock() throws IOException, ExecutionExcept } // after unlock, the thread should be able to get lock - omLockOpr.unlock(lockObject); + lockObject.close(); rst.get(); } @@ -73,24 +71,22 @@ public void testObsLockOprListKeyRepeated() throws IOException { .addBucketReadLock("vol", "bucket") .addKeyWriteLock("vol", "bucket", "testkey") .addKeyWriteLock("vol", "bucket", "testkey2").build(); - OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo); - assertEquals(3, lockObject.getLocks().size()); - - omLockOpr.unlock(lockObject); + try (OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo)) { + assertEquals(3, lockObject.getLocks().size()); + } - lockObject = omLockOpr.lock(lockInfo); - assertEquals(3, lockObject.getLocks().size()); - omLockOpr.unlock(lockObject); + try (OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo)) { + assertEquals(3, lockObject.getLocks().size()); + } } @Test public void testBucketReadLock() throws IOException { OmRequestGatekeeper omLockOpr = new OmRequestGatekeeper(); OmLockInfo lockInfo = new OmLockInfo.Builder().addBucketReadLock("vol", "bucket").build(); - OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo); - assertEquals(1, lockObject.getLocks().size()); - - omLockOpr.unlock(lockObject); + try (OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo)) { + assertEquals(1, lockObject.getLocks().size()); + } } @Test @@ -102,15 +98,13 @@ public void testBucketReadWithWriteParallelLock() throws IOException, ExecutionE OmLockInfo writeLockInfo = new OmLockInfo.Builder().addBucketWriteLock("vol", "bucket").build(); - CompletableFuture rst = CompletableFuture.supplyAsync(() -> { - try { - OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo); - omLockOpr.unlock(lockInfoAgain); - return lockInfoAgain; + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try (OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo)) { + return true; } catch (IOException e) { fail("should not throw exception"); } - return null; + return false; }); // parallel lock wait should fail as previous lock not released @@ -122,7 +116,7 @@ public void testBucketReadWithWriteParallelLock() throws IOException, ExecutionE } // after unlock, the thread should be able to get lock - omLockOpr.unlock(lockObject); + lockObject.close(); rst.get(); } @@ -135,15 +129,13 @@ public void testVolumeReadWithWriteParallelLock() throws IOException, ExecutionE assertEquals(1, lockObject.getLocks().size()); OmLockInfo writeLockInfo = new OmLockInfo.Builder().addVolumeWriteLock("vol").build(); - CompletableFuture rst = CompletableFuture.supplyAsync(() -> { - try { - OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo); - omLockOpr.unlock(lockInfoAgain); - return lockInfoAgain; + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try (OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo)) { + return true; } catch (IOException e) { fail("should not throw exception"); } - return null; + return false; }); // parallel lock wait should fail as previous lock not released @@ -155,7 +147,7 @@ public void testVolumeReadWithWriteParallelLock() throws IOException, ExecutionE } // after unlock, the thread should be able to get lock - omLockOpr.unlock(lockObject); + lockObject.close(); rst.get(); } @@ -170,15 +162,13 @@ public void testVolWriteWithVolBucketRWParallelLock() throws IOException, Execut OmLockInfo writeLockInfo = new OmLockInfo.Builder().addVolumeReadLock("vol") .addBucketWriteLock("vol", "buck1").build(); - CompletableFuture rst = CompletableFuture.supplyAsync(() -> { - try { - OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo); - omLockOpr.unlock(lockInfoAgain); - return lockInfoAgain; + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try (OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo)) { + return true; } catch (IOException e) { fail("should not throw exception"); } - return null; + return false; }); // parallel lock wait should fail as previous lock not released @@ -190,7 +180,7 @@ public void testVolWriteWithVolBucketRWParallelLock() throws IOException, Execut } // after unlock, the thread should be able to get lock - omLockOpr.unlock(lockObject); + lockObject.close(); rst.get(); } }