Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public void invalidate(UUID key) {
}
omMetrics.decNumSnapshotCacheSize();
}
pendingEvictionQueue.remove(k);
return null;
});
}
Expand Down Expand Up @@ -323,8 +324,13 @@ private UncheckedAutoCloseableSupplier<OMLockDetails> lock(Supplier<OMLockDetail

AtomicReference<OMLockDetails> lockDetails = new AtomicReference<>(emptyLockFunction.get());
if (lockDetails.get().isLockAcquired()) {
if (!cleanupFunction.get()) {
try {
if (!cleanupFunction.get()) {
lockDetails.set(emptyUnlockFunction.get());
}
} catch (Throwable t) {
lockDetails.set(emptyUnlockFunction.get());
throw t;
}
}

Expand Down Expand Up @@ -377,26 +383,26 @@ private synchronized Void cleanup(UUID evictionKey, boolean expectKeyToBePresent
}

dbMap.compute(evictionKey, (k, v) -> {
pendingEvictionQueue.remove(k);
ReferenceCounted<OmSnapshot> result = null;
if (v == null) {
throw new IllegalStateException("SnapshotId '" + k + "' does not exist in cache. The RocksDB " +
LOG.warn("SnapshotId '" + k + "' does not exist in cache. The RocksDB " +
"instance of the Snapshot may not be closed properly.");
}

if (v.getTotalRefCount() > 0) {
} else if (v.getTotalRefCount() > 0) {
LOG.debug("SnapshotId {} is still being referenced ({}), skipping its clean up.", k, v.getTotalRefCount());
return v;
result = v;
} else {
LOG.debug("Closing SnapshotId {}. It is not being referenced anymore.", k);
// Close the instance, which also closes its DB handle.
try {
v.get().close();
} catch (IOException ex) {
throw new IllegalStateException("Error while closing snapshot DB.", ex);
LOG.error("Error while closing snapshot DB.", ex);
return v;
}
omMetrics.decNumSnapshotCacheSize();
return null;
}
pendingEvictionQueue.remove(k);
return result;
});
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.ozone.om.lock.DAGLeveledResource.SNAPSHOT_DB_LOCK;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.VOLUME_LOCK;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand All @@ -39,13 +40,16 @@

import com.google.common.cache.CacheLoader;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
Expand Down Expand Up @@ -511,4 +515,147 @@ void testSnapshotOperationsNotBlockedDuringCompaction() throws IOException, Inte
verify(store1, times(1)).compactTable("table2");
verify(store1, times(0)).compactTable("keyTable");
}

@SuppressWarnings("unchecked")
private static Set<UUID> getPendingEvictionQueue(SnapshotCache cache) {
try {
Field f = SnapshotCache.class.getDeclaredField("pendingEvictionQueue");
f.setAccessible(true);
return (Set<UUID>) f.get(cache);
} catch (ReflectiveOperationException e) {
throw new IllegalStateException("Failed to access pendingEvictionQueue via reflection", e);
}
}

private static IOzoneManagerLock newAcquiringLock() {
IOzoneManagerLock acquiringLock = mock(IOzoneManagerLock.class);
when(acquiringLock.acquireReadLock(eq(SNAPSHOT_DB_LOCK), any(String[].class)))
.thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED);
when(acquiringLock.releaseReadLock(eq(SNAPSHOT_DB_LOCK), any(String[].class)))
.thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED);
when(acquiringLock.acquireResourceWriteLock(eq(SNAPSHOT_DB_LOCK)))
.thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED);
when(acquiringLock.releaseResourceWriteLock(eq(SNAPSHOT_DB_LOCK)))
.thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED);
when(acquiringLock.acquireWriteLock(eq(SNAPSHOT_DB_LOCK), any(String[].class)))
.thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED);
when(acquiringLock.releaseWriteLock(eq(SNAPSHOT_DB_LOCK), any(String[].class)))
.thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED);
return acquiringLock;
}

private OmSnapshot mockSnapshot(UUID snapshotId) {
final OmSnapshot omSnapshot = mock(OmSnapshot.class);
when(omSnapshot.getSnapshotTableKey()).thenReturn(snapshotId.toString());
when(omSnapshot.getSnapshotID()).thenReturn(snapshotId);

return omSnapshot;
}

@Test
@DisplayName("Stale eviction key (invalidate + late close) is cleaned up without throwing")
void testStaleEvictionKeyDuringCleanup() throws IOException {
snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 0, true, newAcquiringLock());
final UUID snapshotId = UUID.randomUUID();

// Acquire a snapshot handle so it is ref-counted in the cache.
UncheckedAutoCloseableSupplier<OmSnapshot> handle = snapshotCache.get(snapshotId);
assertEquals(1, snapshotCache.size());

// Invalidate removes the dbMap entry. The handle still exists and will later hit refcount=0.
snapshotCache.invalidate(snapshotId);
assertEquals(0, snapshotCache.size());

// Late close triggers ReferenceCounted callback which can re-add snapshotId to pendingEvictionQueue.
handle.close();
assertTrue(getPendingEvictionQueue(snapshotCache).contains(snapshotId));

// cleanup(true) is invoked by lock(); it should remove the stale key and not throw.
assertDoesNotThrow(() -> {
try (UncheckedAutoCloseableSupplier<OMLockDetails> lockDetails = snapshotCache.lock()) {
assertTrue(lockDetails.get().isLockAcquired());
}
});
assertFalse(getPendingEvictionQueue(snapshotCache).contains(snapshotId));
}

@Test
@DisplayName("Close failure keeps snapshot in eviction queue for retry")
void testCloseFailureRetriesSnapshot() throws Exception {

snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 0, true, newAcquiringLock());
final UUID snapshotId = UUID.randomUUID();

final AtomicBoolean failCloseOnce = new AtomicBoolean(true);
final OmSnapshot failingSnapshot = mockSnapshot(snapshotId);

OMMetadataManager metadataManager = mock(OMMetadataManager.class);
DBStore store = mock(DBStore.class);
when(failingSnapshot.getMetadataManager()).thenReturn(metadataManager);
when(metadataManager.getStore()).thenReturn(store);
when(store.listTables()).thenReturn(new ArrayList<>());

doAnswer(invocation -> {
if (failCloseOnce.getAndSet(false)) {
throw new IOException("close failed");
}
return null;
}).when(failingSnapshot).close();

when(cacheLoader.load(eq(snapshotId))).thenReturn(failingSnapshot);

// Load + close handle so refcount transitions to 0 and snapshotId is queued for eviction.
try (UncheckedAutoCloseableSupplier<OmSnapshot> ignored = snapshotCache.get(snapshotId)) {
assertEquals(1, snapshotCache.size());
assertEquals(1, omMetrics.getNumSnapshotCacheSize());
}
assertEquals(0L, snapshotCache.getDbMap().get(snapshotId).getTotalRefCount());
assertTrue(getPendingEvictionQueue(snapshotCache).contains(snapshotId));

// First cleanup attempt fails to close; entry should remain in dbMap and key should stay queued for retry.
try (UncheckedAutoCloseableSupplier<OMLockDetails> lockDetails = snapshotCache.lock()) {
assertTrue(lockDetails.get().isLockAcquired());
}
assertTrue(snapshotCache.getDbMap().containsKey(snapshotId));
assertTrue(getPendingEvictionQueue(snapshotCache).contains(snapshotId));
assertEquals(1, omMetrics.getNumSnapshotCacheSize());

// Second cleanup attempt should succeed (close no longer throws), removing entry and eviction key.
try (UncheckedAutoCloseableSupplier<OMLockDetails> lockDetails = snapshotCache.lock()) {
assertTrue(lockDetails.get().isLockAcquired());
}
assertFalse(snapshotCache.getDbMap().containsKey(snapshotId));
assertFalse(getPendingEvictionQueue(snapshotCache).contains(snapshotId));
assertEquals(0, omMetrics.getNumSnapshotCacheSize());
}

@Test
@DisplayName("lock supplier releases write lock if cleanup throws an exception")
void testLockSupplierReleasesWriteLockOnCleanupException() throws Exception {
IOzoneManagerLock acquiringLock = newAcquiringLock();
snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 0, true, acquiringLock);

final UUID snapshotId = UUID.randomUUID();
final OmSnapshot failingSnapshot = mockSnapshot(snapshotId);

OMMetadataManager metadataManager = mock(OMMetadataManager.class);
DBStore store = mock(DBStore.class);
when(failingSnapshot.getMetadataManager()).thenReturn(metadataManager);
when(metadataManager.getStore()).thenReturn(store);
// Trigger an unchecked exception during compaction, which is not caught by cleanup().
when(store.listTables()).thenThrow(new RuntimeException("listTables failed"));

when(cacheLoader.load(eq(snapshotId))).thenReturn(failingSnapshot);

// Load the snapshot and close so it is enqueued for eviction (refcount reaches 0).
try (UncheckedAutoCloseableSupplier<OmSnapshot> ignored = snapshotCache.get(snapshotId)) {
assertEquals(1, snapshotCache.size());
}
assertTrue(getPendingEvictionQueue(snapshotCache).contains(snapshotId));

// cleanup(true) will throw -> lock() should release the resource write lock before rethrowing.
assertThrows(RuntimeException.class, () -> snapshotCache.lock());
verify(acquiringLock, times(1)).acquireResourceWriteLock(eq(SNAPSHOT_DB_LOCK));
verify(acquiringLock, times(1)).releaseResourceWriteLock(eq(SNAPSHOT_DB_LOCK));
}
}
Loading