diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java new file mode 100644 index 000000000000..17a287ffb09a --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockInfo.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import org.apache.hadoop.ozone.OzoneConsts; + +/** + * Lock information. + */ +public final class OmLockInfo { + private final LockInfo volumeLock; + private final LockInfo bucketLock; + private final Set keyLocks; + + private OmLockInfo(Builder builder) { + volumeLock = builder.volumeLock; + bucketLock = builder.bucketLock; + keyLocks = builder.keyLocks; + } + + public Optional getVolumeLock() { + return Optional.ofNullable(volumeLock); + } + + public Optional getBucketLock() { + return Optional.ofNullable(bucketLock); + } + + public Optional> getKeyLocks() { + return Optional.ofNullable(keyLocks); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + if (volumeLock != null) { + sb.append("Volume:").append(volumeLock); + } + if (bucketLock != null) { + sb.append("Bucket:").append(bucketLock); + } + if (keyLocks != null) { + sb.append("Keys:").append(keyLocks); + } + return sb.toString(); + } + + /** + * Builds an {@link OmLockInfo} object with optional volume, bucket or key locks. + */ + public static final class Builder { + private LockInfo volumeLock; + private LockInfo bucketLock; + private Set keyLocks; + + public Builder() { + } + + public Builder addVolumeReadLock(String volume) { + volumeLock = LockInfo.writeLockInfo(volume); + return this; + } + + public Builder addVolumeWriteLock(String volume) { + volumeLock = LockInfo.readLockInfo(volume); + return this; + } + + public Builder addBucketReadLock(String volume, String bucket) { + bucketLock = LockInfo.readLockInfo(joinStrings(volume, bucket)); + return this; + } + + public Builder addBucketWriteLock(String volume, String bucket) { + bucketLock = LockInfo.writeLockInfo(joinStrings(volume, bucket)); + return this; + } + + // Currently there is no use case for key level read locks. + public Builder addKeyWriteLock(String volume, String bucket, String key) { + // Lazy init keys. + if (keyLocks == null) { + keyLocks = new HashSet<>(); + } + keyLocks.add(LockInfo.writeLockInfo(joinStrings(volume, bucket, key))); + return this; + } + + private String joinStrings(String... parts) { + return String.join(OzoneConsts.OZONE_URI_DELIMITER, parts); + } + + public OmLockInfo build() { + return new OmLockInfo(this); + } + } + + /** + * This class provides specifications about a lock's requirements. + */ + public static final class LockInfo implements Comparable { + private final String name; + private final boolean isWriteLock; + + private LockInfo(String name, boolean isWriteLock) { + this.name = name; + this.isWriteLock = isWriteLock; + } + + public static LockInfo writeLockInfo(String key) { + return new LockInfo(key, true); + } + + public static LockInfo readLockInfo(String key) { + return new LockInfo(key, false); + } + + public String getName() { + return name; + } + + public boolean isWriteLock() { + return isWriteLock; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof LockInfo)) { + return false; + } + LockInfo lockInfo = (LockInfo) o; + return isWriteLock == lockInfo.isWriteLock && Objects.equals(name, lockInfo.name); + } + + @Override + public int hashCode() { + return Objects.hash(name, isWriteLock); + } + + @Override + public int compareTo(LockInfo other) { + return Integer.compare(hashCode(), other.hashCode()); + } + + @Override + public String toString() { + return "LockInfo{" + "name=" + name + ", isWriteLock=" + isWriteLock + '}'; + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestGatekeeper.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestGatekeeper.java new file mode 100644 index 000000000000..71d551093566 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmRequestGatekeeper.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import com.google.common.util.concurrent.Striped; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.Stack; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import org.apache.hadoop.hdds.utils.SimpleStriped; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.util.Time; + +/** + * Manage locking of volume, bucket, keys and others. + */ +public class OmRequestGatekeeper { + private static final long LOCK_TIMEOUT_DEFAULT = 10 * 60 * 1000; + private static final int NUM_VOLUME_STRIPES = 1024; + private static final int NUM_BUCKET_STRIPES = 1024; + private static final int NUM_KEY_STRIPES = 4096; + + private final Striped volumeLocks; + private final Striped bucketLocks; + private final Striped keyLocks; + + public OmRequestGatekeeper() { + volumeLocks = SimpleStriped.readWriteLock(NUM_VOLUME_STRIPES, false); + bucketLocks = SimpleStriped.readWriteLock(NUM_BUCKET_STRIPES, false); + keyLocks = SimpleStriped.readWriteLock(NUM_KEY_STRIPES, false); + } + + public OmLockObject lock(OmLockInfo lockInfo) throws IOException { + OmLockObject omLockObject = new OmLockObject(lockInfo); + long startTime = Time.monotonicNowNanos(); + Optional optionalVolumeLock = lockInfo.getVolumeLock(); + Optional optionalBucketLock = lockInfo.getBucketLock(); + Optional> optionalKeyLocks = lockInfo.getKeyLocks(); + List locks = new ArrayList<>(); + + if (optionalVolumeLock.isPresent()) { + OmLockInfo.LockInfo volumeLockInfo = optionalVolumeLock.get(); + if (volumeLockInfo.isWriteLock()) { + omLockObject.setReadStatsType(false); + locks.add(volumeLocks.get(volumeLockInfo.getName()).writeLock()); + } else { + locks.add(volumeLocks.get(volumeLockInfo.getName()).readLock()); + } + } + + if (optionalBucketLock.isPresent()) { + OmLockInfo.LockInfo bucketLockInfo = optionalBucketLock.get(); + if (bucketLockInfo.isWriteLock()) { + omLockObject.setReadStatsType(false); + locks.add(bucketLocks.get(bucketLockInfo.getName()).writeLock()); + } else { + locks.add(bucketLocks.get(bucketLockInfo.getName()).readLock()); + } + } + + if (optionalKeyLocks.isPresent()) { + for (ReadWriteLock keyLock: keyLocks.bulkGet(optionalKeyLocks.get())) { + omLockObject.setReadStatsType(false); + locks.add(keyLock.writeLock()); + } + } + + try { + acquireLocks(locks, omLockObject.getLocks()); + lockStatsBegin(omLockObject.getLockStats(), Time.monotonicNowNanos(), startTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new OMException("Waiting for locks is interrupted, " + lockInfo, OMException.ResultCodes.INTERNAL_ERROR); + } catch (TimeoutException e) { + throw new OMException("Timeout occurred for locks " + lockInfo, OMException.ResultCodes.TIMEOUT); + } + return omLockObject; + } + + private void acquireLocks(List locks, Stack acquiredLocks) throws TimeoutException, InterruptedException { + for (Lock lock: locks) { + if (lock.tryLock(LOCK_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS)) { + try { + acquiredLocks.add(lock); + } catch (Throwable e) { + // We acquired this lock but were unable to add it to our acquired locks list. + lock.unlock(); + releaseLocks(acquiredLocks); + throw e; + } + } else { + releaseLocks(acquiredLocks); + throw new TimeoutException("Failed to acquire lock after the given timeout."); + } + } + } + + private void releaseLocks(Stack locks) { + while (!locks.empty()) { + locks.pop().unlock(); + } + } + + private static void lockStatsBegin(OmLockStats lockStats, long endTime, long startTime) { + lockStats.addWaitLockNanos(endTime - startTime); + lockStats.setLockStartTime(endTime); + } + + private static void lockStatsEnd(OmLockStats lockStats, boolean readStatsType) { + if (lockStats.getLockStartTime() > 0) { + if (readStatsType) { + lockStats.addReadLockNanos(Time.monotonicNowNanos() - lockStats.getLockStartTime()); + } else { + lockStats.addWriteLockNanos(Time.monotonicNowNanos() - lockStats.getLockStartTime()); + } + } + } + + /** + * Lock information after taking locks, and to be used to release locks. + */ + public static class OmLockObject implements AutoCloseable { + private final OmLockInfo omLockInfo; + private final Stack locks = new Stack<>(); + private final OmLockStats lockStats = new OmLockStats(); + private boolean readStatsType = true; + + public OmLockObject(OmLockInfo lockInfoProvider) { + this.omLockInfo = lockInfoProvider; + } + + public Stack getLocks() { + return locks; + } + + public OmLockStats getLockStats() { + return lockStats; + } + + public void setReadStatsType(boolean readStatsType) { + this.readStatsType = readStatsType; + } + + public boolean getReadStatsType() { + return readStatsType; + } + + public OmLockInfo getOmLockInfo() { + return omLockInfo; + } + + @Override + public void close() throws IOException { + while (!locks.empty()) { + locks.pop().unlock(); + } + lockStatsEnd(lockStats, readStatsType); + } + } + + /** + * lock stats. + */ + public static class OmLockStats { + private long lockStartTime; + private long waitLockNanos; + private long readLockNanos; + private long writeLockNanos; + + public long getLockStartTime() { + return lockStartTime; + } + + public void setLockStartTime(long lockStartTime) { + this.lockStartTime = lockStartTime; + } + + public long getWaitLockNanos() { + return waitLockNanos; + } + + public long getReadLockNanos() { + return readLockNanos; + } + + public long getWriteLockNanos() { + return writeLockNanos; + } + + void addWaitLockNanos(long timeNanos) { + waitLockNanos += timeNanos; + } + + void addReadLockNanos(long timeNanos) { + readLockNanos += timeNanos; + } + + void addWriteLockNanos(long timeNanos) { + writeLockNanos += timeNanos; + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestRequestGatekeeper.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestRequestGatekeeper.java new file mode 100644 index 000000000000..1cdd87daa24d --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestRequestGatekeeper.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.Test; + +/** + * Test for OmRequestGatekeeper. + */ +public class TestRequestGatekeeper { + + @Test + public void testObsLockOprWithParallelLock() throws IOException, ExecutionException, InterruptedException { + OmRequestGatekeeper omLockOpr = new OmRequestGatekeeper(); + OmLockInfo lockInfo = new OmLockInfo.Builder() + .addBucketReadLock("vol", "bucket") + .addKeyWriteLock("vol", "bucket", "testkey").build(); + OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(2, lockObject.getLocks().size()); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try (OmRequestGatekeeper.OmLockObject ignored = omLockOpr.lock(lockInfo)) { + return true; + } catch (IOException e) { + fail("should not throw exception"); + } + return false; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + lockObject.close(); + rst.get(); + } + + @Test + public void testObsLockOprListKeyRepeated() throws IOException { + OmRequestGatekeeper omLockOpr = new OmRequestGatekeeper(); + OmLockInfo lockInfo = new OmLockInfo.Builder() + .addBucketReadLock("vol", "bucket") + .addKeyWriteLock("vol", "bucket", "testkey") + .addKeyWriteLock("vol", "bucket", "testkey2").build(); + try (OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo)) { + assertEquals(3, lockObject.getLocks().size()); + } + + try (OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo)) { + assertEquals(3, lockObject.getLocks().size()); + } + } + + @Test + public void testBucketReadLock() throws IOException { + OmRequestGatekeeper omLockOpr = new OmRequestGatekeeper(); + OmLockInfo lockInfo = new OmLockInfo.Builder().addBucketReadLock("vol", "bucket").build(); + try (OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo)) { + assertEquals(1, lockObject.getLocks().size()); + } + } + + @Test + public void testBucketReadWithWriteParallelLock() throws IOException, ExecutionException, InterruptedException { + OmRequestGatekeeper omLockOpr = new OmRequestGatekeeper(); + OmLockInfo lockInfo = new OmLockInfo.Builder().addBucketReadLock("vol", "bucket").build(); + OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(1, lockObject.getLocks().size()); + + OmLockInfo writeLockInfo = new OmLockInfo.Builder().addBucketWriteLock("vol", "bucket").build(); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try (OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo)) { + return true; + } catch (IOException e) { + fail("should not throw exception"); + } + return false; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + lockObject.close(); + rst.get(); + } + + @Test + public void testVolumeReadWithWriteParallelLock() throws IOException, ExecutionException, InterruptedException { + OmRequestGatekeeper omLockOpr = new OmRequestGatekeeper(); + OmLockInfo lockInfo = new OmLockInfo.Builder().addVolumeReadLock("vol").build(); + + OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(1, lockObject.getLocks().size()); + + OmLockInfo writeLockInfo = new OmLockInfo.Builder().addVolumeWriteLock("vol").build(); + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try (OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo)) { + return true; + } catch (IOException e) { + fail("should not throw exception"); + } + return false; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + lockObject.close(); + rst.get(); + } + + @Test + public void testVolWriteWithVolBucketRWParallelLock() throws IOException, ExecutionException, InterruptedException { + OmRequestGatekeeper omLockOpr = new OmRequestGatekeeper(); + OmLockInfo lockInfo = new OmLockInfo.Builder().addVolumeWriteLock("vol").build(); + + OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo); + assertEquals(1, lockObject.getLocks().size()); + + OmLockInfo writeLockInfo = new OmLockInfo.Builder().addVolumeReadLock("vol") + .addBucketWriteLock("vol", "buck1").build(); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try (OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo)) { + return true; + } catch (IOException e) { + fail("should not throw exception"); + } + return false; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + lockObject.close(); + rst.get(); + } +}