From fd869d8b6712784b7fc8fe9c64b5e08a4a6a1ca5 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Mon, 4 May 2026 23:02:50 +0800 Subject: [PATCH 1/4] detect 409 for mpu complete and refactor atomic rewrite code Signed-off-by: peterxcli --- .../ozone/client/rpc/OzoneRpcClientTests.java | 31 ++++ .../om/request/key/OMKeyCommitRequest.java | 29 +-- .../key/OMKeyCommitRequestWithFSO.java | 2 +- .../om/request/key/OMKeyCreateRequest.java | 4 +- .../key/OMKeyCreateRequestWithFSO.java | 5 +- .../ozone/om/request/key/OMKeyRequest.java | 167 ++++++++++++++---- .../S3MultipartUploadCompleteRequest.java | 61 +++++-- ...MultipartUploadCompleteRequestWithFSO.java | 20 ++- .../om/request/key/TestOMKeyRequest.java | 45 +++++ .../TestS3MultipartUploadCompleteRequest.java | 92 +++++++++- ...MultipartUploadCompleteRequestWithFSO.java | 29 +++ 11 files changed, 403 insertions(+), 82 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java index d9e04e5eed3a..7512cc01fb33 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java @@ -1547,6 +1547,37 @@ void testRewriteKeyIfMatchSuccess(BucketLayout layout) throws IOException { .getMetadata().get(ETAG)); } + @ParameterizedTest + @EnumSource + void testRewriteKeyIfMatchFailsDueToOutdatedGenerationAtCommit( + BucketLayout layout) throws IOException { + checkFeatureEnable(OzoneManagerVersion.ATOMIC_REWRITE_KEY); + OzoneBucket bucket = createBucket(layout); + OzoneKeyDetails keyDetails = createTestKeyWithETag(bucket); + String etag = keyDetails.getMetadata().get(ETAG); + byte[] overwriteContent = "overwrite".getBytes(UTF_8); + + OzoneOutputStream out = null; + try { + out = bucket.rewriteKeyIfMatch( + keyDetails.getName(), keyDetails.getDataSize(), etag, + RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE), + keyDetails.getMetadata(), Collections.emptyMap()); + out.write("rewrite".getBytes(UTF_8)); + + createTestKey(bucket, keyDetails.getName(), overwriteContent); + + OMException e = assertThrows(OMException.class, out::close); + assertEquals(ATOMIC_WRITE_CONFLICT, e.getResult()); + } finally { + if (out != null) { + out.close(); + } + } + + assertKeyContent(bucket, keyDetails.getName(), overwriteContent); + } + @ParameterizedTest @EnumSource void testRewriteKeyIfMatchFailsWithWrongETag(BucketLayout layout) throws IOException { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java index 752397efc7d2..b53fe0aa29e5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java @@ -302,7 +302,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut } } - validateAtomicRewrite(keyToDelete, omKeyInfo, auditMap); + validateAtomicRewriteAtCommit(keyToDelete, omKeyInfo, auditMap); // Optimistic locking validation has passed. Now set the rewrite fields to null so they are // not persisted in the key table. // Combination @@ -617,31 +617,4 @@ public static OMRequest disallowRecovery( return req; } - protected void validateAtomicRewrite(OmKeyInfo existing, OmKeyInfo toCommit, Map auditMap) - throws OMException { - if (toCommit.getExpectedDataGeneration() != null) { - // These values are not passed in the request keyArgs, so add them into the auditMap if they are present - // in the open key entry. - Long expectedGen = toCommit.getExpectedDataGeneration(); - auditMap.put(OzoneConsts.REWRITE_GENERATION, String.valueOf(expectedGen)); - - if (expectedGen == OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS) { - if (existing != null) { - throw new OMException("Atomic create-if-not-exists conflicted with an existing key", - OMException.ResultCodes.ATOMIC_WRITE_CONFLICT); - } - } else { - if (existing == null) { - throw new OMException("Atomic rewrite conflicted because the key no longer exists", - OMException.ResultCodes.ATOMIC_WRITE_CONFLICT); - } - if (expectedGen != existing.getUpdateID()) { - throw new OMException("Cannot commit as current generation (" + existing.getUpdateID() + - ") does not match the expected generation to rewrite (" + expectedGen + ")", - OMException.ResultCodes.ATOMIC_WRITE_CONFLICT); - } - } - } - } - } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java index 25b5a4b15d41..3c68a0b9ee31 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java @@ -242,7 +242,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut // creation after the knob turned on. Map oldKeyVersionsToDeleteMap = null; - validateAtomicRewrite(keyToDelete, omKeyInfo, auditMap); + validateAtomicRewriteAtCommit(keyToDelete, omKeyInfo, auditMap); // Optimistic locking validation has passed. Now set the rewrite fields to null so they are // not persisted in the key table. omKeyInfo.setExpectedDataGeneration(null); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java index b82541791b75..553af62d2ef4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java @@ -255,8 +255,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut keyName); OmKeyInfo dbKeyInfo = omMetadataManager.getKeyTable(getBucketLayout()) .getIfExist(dbKeyName); - validateAtomicRewrite(dbKeyInfo, keyArgs); - keyArgs = validateAndRewriteIfMatchAsExpectedGeneration(keyArgs, dbKeyInfo); + keyArgs = resolveConditionalWriteAtAdmission( + dbKeyInfo, keyArgs, auditMap); OmBucketInfo bucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java index 99fabb46de11..b70ff107c150 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java @@ -121,9 +121,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut dbFileInfo = OMFileRequest.getOmKeyInfoFromFileTable(false, omMetadataManager, dbFileKey, keyName); } - validateAtomicRewrite(dbFileInfo, keyArgs); - keyArgs = validateAndRewriteIfMatchAsExpectedGeneration( - keyArgs, dbFileInfo); + keyArgs = resolveConditionalWriteAtAdmission( + dbFileInfo, keyArgs, auditMap); // Check if a file or directory exists with same key name. if (pathInfoFSO.getDirectoryResult() == DIRECTORY_EXISTS) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java index 5bab4de3a4f3..9025bb5f2c6e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java @@ -1308,46 +1308,153 @@ protected void validateEncryptionKeyInfo(OmBucketInfo bucketInfo, KeyArgs keyArg } /** - * Validates atomic rewrite conditions for conditional writes. - *

- * For If-None-Match: * (expectedDataGeneration = EXPECTED_GEN_CREATE_IF_NOT_EXISTS), - * the key must NOT exist. - *

- * For If-Match with a specific generation, the key must exist with matching updateID. + * Resolves conditional write state at request admission time. * - * @param dbKeyInfo the existing key info from the database (null if key doesn't exist) - * @param keyArgs the key arguments containing expected generation - * @throws OMException if validation fails + * This validates expected generations as admission-time preconditions and + * rewrites matching ETags into expected generations for commit-time checks. */ - protected void validateAtomicRewrite(OmKeyInfo dbKeyInfo, KeyArgs keyArgs) - throws OMException { + protected KeyArgs resolveConditionalWriteAtAdmission(OmKeyInfo dbKeyInfo, + KeyArgs keyArgs, Map auditMap) throws OMException { if (keyArgs.hasExpectedDataGeneration()) { - long expectedGen = keyArgs.getExpectedDataGeneration(); - // If expectedGen is EXPECTED_GEN_CREATE_IF_NOT_EXISTS, it means the key MUST NOT exist (If-None-Match) - if (expectedGen == OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS) { - if (dbKeyInfo != null) { - throw new OMException("Key already exists", - OMException.ResultCodes.KEY_ALREADY_EXISTS); - } - } else { - // If a key does not exist, or if it exists but the updateID do not match, then fail this request. - if (dbKeyInfo == null) { - throw new OMException("Key not found during expected rewrite", - OMException.ResultCodes.KEY_NOT_FOUND); - } - if (dbKeyInfo.getUpdateID() != expectedGen) { - throw new OMException("Generation mismatch during expected rewrite", - OMException.ResultCodes.KEY_NOT_FOUND); - } + validateAtomicRewriteAtAdmission(dbKeyInfo, keyArgs, auditMap); + return keyArgs; + } + + KeyArgs resolvedKeyArgs = validateAndRewriteIfMatchAsExpectedGeneration( + keyArgs, dbKeyInfo); + if (resolvedKeyArgs.hasExpectedDataGeneration()) { + addRewriteGenerationToAuditMap( + resolvedKeyArgs.getExpectedDataGeneration(), auditMap); + } + return resolvedKeyArgs; + } + + /** + * Validates the condition at request admission time. + * + * This returns precondition-style errors because the client condition is + * already false before the request is admitted for a serialized commit. + */ + protected void validateAtomicRewriteAtAdmission(OmKeyInfo dbKeyInfo, + KeyArgs keyArgs, Map auditMap) throws OMException { + if (!keyArgs.hasExpectedDataGeneration()) { + return; + } + validateExpectedDataGeneration(dbKeyInfo, + keyArgs.getExpectedDataGeneration(), auditMap, + AtomicRewritePhase.ADMISSION); + } + + /** + * Validates an already admitted condition at serialized commit time. + * + * Mismatches here mean the admission snapshot became stale, so callers should + * surface these as atomic write conflicts. + */ + protected void validateAtomicRewriteAtCommit(OmKeyInfo existing, + OmKeyInfo toCommit, Map auditMap) throws OMException { + validateAtomicRewriteAtCommit(existing, + toCommit.getExpectedDataGeneration(), auditMap); + } + + /** + * Validates an already admitted condition at serialized commit time. + * + * This form is for callers that must check the admitted generation before + * building the final key info to commit. + */ + protected void validateAtomicRewriteAtCommit(OmKeyInfo existing, + Long expectedGen, Map auditMap) throws OMException { + if (expectedGen == null) { + return; + } + validateExpectedDataGeneration(existing, expectedGen, auditMap, + AtomicRewritePhase.COMMIT); + } + + private void validateExpectedDataGeneration(OmKeyInfo existing, + long expectedGen, Map auditMap, + AtomicRewritePhase phase) throws OMException { + addRewriteGenerationToAuditMap(expectedGen, auditMap); + + if (expectedGen == OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS) { + if (existing != null) { + throw phase.keyAlreadyExists(); + } + } else { + if (existing == null) { + throw phase.keyNotFound(); + } + if (expectedGen != existing.getUpdateID()) { + throw phase.generationMismatch(existing.getUpdateID(), expectedGen); } } } + private void addRewriteGenerationToAuditMap(long expectedGen, + Map auditMap) { + if (auditMap != null) { + auditMap.put(OzoneConsts.REWRITE_GENERATION, + String.valueOf(expectedGen)); + } + } + + private enum AtomicRewritePhase { + ADMISSION { + @Override + OMException keyAlreadyExists() { + return new OMException("Key already exists", + OMException.ResultCodes.KEY_ALREADY_EXISTS); + } + + @Override + OMException keyNotFound() { + return new OMException("Key not found during expected rewrite", + OMException.ResultCodes.KEY_NOT_FOUND); + } + + @Override + OMException generationMismatch(long currentGen, long expectedGen) { + return new OMException("Generation mismatch during expected rewrite", + OMException.ResultCodes.KEY_NOT_FOUND); + } + }, + + COMMIT { + @Override + OMException keyAlreadyExists() { + return new OMException("Atomic create-if-not-exists conflicted with " + + "an existing key", + OMException.ResultCodes.ATOMIC_WRITE_CONFLICT); + } + + @Override + OMException keyNotFound() { + return new OMException("Atomic rewrite conflicted because the key no " + + "longer exists", + OMException.ResultCodes.ATOMIC_WRITE_CONFLICT); + } + + @Override + OMException generationMismatch(long currentGen, long expectedGen) { + return new OMException("Cannot commit as current generation (" + + currentGen + ") does not match the expected generation to " + + "rewrite (" + expectedGen + ")", + OMException.ResultCodes.ATOMIC_WRITE_CONFLICT); + } + }; + + abstract OMException keyAlreadyExists(); + + abstract OMException keyNotFound(); + + abstract OMException generationMismatch(long currentGen, long expectedGen); + } + /** * Validates If-Match ETag condition. *

* This method checks if the existing key's ETag matches the expected ETag. - * Use this for single-phase operations (like MPU complete) where no rewrite is needed. * * @param keyArgs the key arguments containing expected ETag * @param dbKeyInfo the existing key info from the database @@ -1379,7 +1486,7 @@ protected void validateIfMatchETag(KeyArgs keyArgs, OmKeyInfo dbKeyInfo) *

* This method checks if the existing key's ETag matches the expected ETag. * If it matches, the ETag condition is converted to a generation-based condition - * for atomic commit validation in two-phase operations (CreateKey → CommitKey). + * for commit-time atomic rewrite validation. * * @param keyArgs the key arguments containing expected ETag * @param dbKeyInfo the existing key info from the database diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java index 179eb87aabae..0bd58b6ec405 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java @@ -129,10 +129,12 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { .setKeyName(keyPath).build(); KeyArgs resolvedArgs = resolveBucketAndCheckKeyAcls(newKeyArgs, ozoneManager, ACLType.WRITE); + KeyArgs conditionalArgs = resolveConditionalWrite( + ozoneManager.getMetadataManager(), resolvedArgs); return getOmRequest().toBuilder().setCompleteMultiPartUploadRequest( multipartUploadCompleteRequest.toBuilder().setKeyArgs( - resolvedArgs)).setUserInfo(getUserInfo()).build(); + conditionalArgs)).setUserInfo(getUserInfo()).build(); } @Override @@ -211,15 +213,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut omBucketInfo.getDefaultReplicationConfig() : null, ozoneManager); - OmMultipartKeyInfo multipartKeyInfoFromArgs = - new OmMultipartKeyInfo.Builder() - .setUploadID(keyArgs.getMultipartUploadID()) - .setCreationTime(keyArgs.getModificationTime()) - .setReplicationConfig(replicationConfig) - .setObjectID(pathInfoFSO.getLeafNodeObjectId()) - .setUpdateID(trxnLogIndex) - .setParentID(pathInfoFSO.getLastKnownParentId()) - .build(); OmKeyInfo keyInfoFromArgs = new OmKeyInfo.Builder() .setVolumeName(volumeName) @@ -263,17 +256,19 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut checkDirectoryAlreadyExists(ozoneManager, omBucketInfo, keyName, omMetadataManager); + // Conditional write validation (If-None-Match / If-Match). + OmKeyInfo existingKeyInfo = omMetadataManager + .getKeyTable(getBucketLayout()).get(dbOzoneKey); + if (keyArgs.hasExpectedDataGeneration()) { + validateAtomicRewriteAtCommit(existingKeyInfo, + keyArgs.getExpectedDataGeneration(), auditMap); + } + if (multipartKeyInfo == null) { throw new OMException( failureMessage(requestedVolume, requestedBucket, keyName), OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); } - - // Conditional write validation (If-None-Match / If-Match). - // BUCKET_LOCK is held, so validation and commit are atomic. - // Only 412 PreconditionFailed is possible; 409 Conflict cannot occur. - OmKeyInfo existingKeyInfo = omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey); - validateAtomicRewrite(existingKeyInfo, keyArgs); validateIfMatchETag(keyArgs, existingKeyInfo); if (!partsList.isEmpty()) { @@ -409,6 +404,40 @@ protected OMClientResponse getOmClientResponse(String multipartKey, getBucketLayout(), omBucketInfo, bucketId); } + private KeyArgs resolveConditionalWrite( + OMMetadataManager omMetadataManager, KeyArgs keyArgs) + throws IOException { + if (!keyArgs.hasExpectedETag() + && !keyArgs.hasExpectedDataGeneration()) { + return keyArgs; + } + + String multipartKey = omMetadataManager.getMultipartKey( + keyArgs.getVolumeName(), + keyArgs.getBucketName(), keyArgs.getKeyName(), + keyArgs.getMultipartUploadID()); + if (omMetadataManager.getMultipartInfoTable().get(multipartKey) == null) { + throw new OMException( + failureMessage(keyArgs.getVolumeName(), keyArgs.getBucketName(), + keyArgs.getKeyName()), + OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); + } + + OmKeyInfo existingKeyInfo = getExistingKeyInfo( + omMetadataManager, keyArgs); + return resolveConditionalWriteAtAdmission(existingKeyInfo, keyArgs, null); + } + + protected OmKeyInfo getExistingKeyInfo( + OMMetadataManager omMetadataManager, KeyArgs keyArgs) + throws IOException { + String dbOzoneKey = getDBOzoneKey(omMetadataManager, + keyArgs.getVolumeName(), keyArgs.getBucketName(), + keyArgs.getKeyName()); + return getOmKeyInfoFromKeyTable(dbOzoneKey, keyArgs.getKeyName(), + omMetadataManager); + } + protected void checkDirectoryAlreadyExists(OzoneManager ozoneManager, OmBucketInfo omBucketInfo, String keyName, OMMetadataManager omMetadataManager) throws IOException { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestWithFSO.java index a5c8b2703d68..cad6c0c9e3e0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestWithFSO.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.om.request.s3.multipart; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE; import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.DIRECTORY_EXISTS; import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.getParentId; @@ -150,6 +151,24 @@ protected String getDBMultipartOpenKey(String volumeName, String bucketName, return omMetadataManager.getMultipartKeyFSO(volumeName, bucketName, keyName, uploadID); } + @Override + protected OmKeyInfo getExistingKeyInfo( + OMMetadataManager omMetadataManager, + OzoneManagerProtocolProtos.KeyArgs keyArgs) throws IOException { + try { + String dbOzoneKey = getDBOzoneKey(omMetadataManager, + keyArgs.getVolumeName(), keyArgs.getBucketName(), + keyArgs.getKeyName()); + return getOmKeyInfoFromKeyTable(dbOzoneKey, keyArgs.getKeyName(), + omMetadataManager); + } catch (OMException ex) { + if (ex.getResult() == DIRECTORY_NOT_FOUND) { + return null; + } + throw ex; + } + } + @Override protected S3MultipartUploadCompleteResponse getOmClientResponse( OzoneManagerProtocolProtos.OMResponse.Builder omResponse, @@ -178,4 +197,3 @@ public BucketLayout getBucketLayout() { return BucketLayout.FILE_SYSTEM_OPTIMIZED; } } - diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java index 405cc706ef90..37620c435d7d 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; import static org.apache.hadoop.ozone.om.request.OMRequestTestUtils.setupReplicationConfigValidation; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; @@ -33,7 +34,9 @@ import jakarta.annotation.Nonnull; import java.nio.file.Path; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.UUID; import org.apache.commons.lang3.tuple.Pair; @@ -56,6 +59,7 @@ import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.audit.AuditLogger; import org.apache.hadoop.ozone.audit.AuditMessage; import org.apache.hadoop.ozone.om.DeletingServiceMetrics; @@ -74,6 +78,7 @@ import org.apache.hadoop.ozone.om.OzoneManagerPrepareState; import org.apache.hadoop.ozone.om.ResolvedBucket; import org.apache.hadoop.ozone.om.ScmClient; +import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -82,6 +87,7 @@ import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; import org.apache.hadoop.ozone.om.request.snapshot.OMSnapshotCreateRequest; import org.apache.hadoop.ozone.om.request.snapshot.TestOMSnapshotCreateRequest; +import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotCreateResponse; import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; @@ -93,6 +99,7 @@ import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.mockito.Mockito; import org.slf4j.event.Level; @@ -133,6 +140,29 @@ public class TestOMKeyRequest { protected long txnLogId = 100000L; protected long version = 0L; + @Test + public void testAdmissionAtomicRewriteAddsGenerationToAuditMap() + throws Exception { + long expectedGeneration = 10L; + OmKeyInfo existingKeyInfo = new OmKeyInfo.Builder() + .setKeyName(keyName) + .setUpdateID(expectedGeneration) + .build(); + KeyArgs keyArgs = KeyArgs.newBuilder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setExpectedDataGeneration(expectedGeneration) + .build(); + Map auditMap = new HashMap<>(); + + new TestableOMKeyRequest().validateAtomicRewriteAtAdmission( + existingKeyInfo, keyArgs, auditMap); + + assertEquals(String.valueOf(expectedGeneration), + auditMap.get(OzoneConsts.REWRITE_GENERATION)); + } + @BeforeEach public void setup() throws Exception { ozoneManager = mock(OzoneManager.class); @@ -282,6 +312,21 @@ protected OzoneConfiguration getOzoneConfiguration() { return new OzoneConfiguration(); } + private static final class TestableOMKeyRequest extends OMKeyRequest { + private TestableOMKeyRequest() { + super(OzoneManagerProtocolProtos.OMRequest.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.CreateKey) + .setClientId(UUID.randomUUID().toString()) + .build()); + } + + @Override + public OMClientResponse validateAndUpdateCache( + OzoneManager ozoneManager, ExecutionContext context) { + return null; + } + } + /** * Verify path in open key table. Also, it returns OMKeyInfo for the given * key path. diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java index dbf82cc18ea0..a3c37ea777ae 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -41,6 +42,7 @@ import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Part; import org.apache.hadoop.util.Time; @@ -62,6 +64,65 @@ public void testPreExecute() throws Exception { UUID.randomUUID().toString(), new ArrayList<>()); } + @Test + public void testPreExecuteRewritesExpectedETagToGeneration() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = getKeyName(); + String expectedETag = "matching-etag"; + long expectedGeneration = 10L; + + addVolumeAndBucket(volumeName, bucketName); + addCommittedKeyToTable(volumeName, bucketName, keyName, expectedGeneration, + expectedETag); + + OMRequest initiateMPURequest = doPreExecuteInitiateMPU(volumeName, + bucketName, keyName); + OMClientResponse initiateResponse = getS3InitiateMultipartUploadReq( + initiateMPURequest).validateAndUpdateCache(ozoneManager, 1L); + String uploadID = initiateResponse.getOMResponse() + .getInitiateMultiPartUploadResponse().getMultipartUploadID(); + + OMRequest request = withCompleteKeyArgs( + OMRequestTestUtils.createCompleteMPURequest(volumeName, bucketName, + keyName, uploadID, new ArrayList<>()), + keyArgs -> keyArgs.setExpectedETag(expectedETag)); + + OMRequest modifiedRequest = getS3MultipartUploadCompleteReq(request) + .preExecute(ozoneManager); + KeyArgs resolvedKeyArgs = modifiedRequest + .getCompleteMultiPartUploadRequest().getKeyArgs(); + + assertFalse(resolvedKeyArgs.hasExpectedETag()); + assertTrue(resolvedKeyArgs.hasExpectedDataGeneration()); + assertEquals(expectedGeneration, + resolvedKeyArgs.getExpectedDataGeneration()); + } + + @Test + public void testConditionalCompleteReturnsConflictBeforeMissingUpload() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = getKeyName(); + + addVolumeAndBucket(volumeName, bucketName); + addCommittedKeyToTable(volumeName, bucketName, keyName, 20L, + "new-etag"); + + OMRequest request = withCompleteKeyArgs( + OMRequestTestUtils.createCompleteMPURequest(volumeName, bucketName, + keyName, UUID.randomUUID().toString(), new ArrayList<>()), + keyArgs -> keyArgs.setExpectedDataGeneration(10L)); + + OMClientResponse response = getS3MultipartUploadCompleteReq(request) + .validateAndUpdateCache(ozoneManager, 3L); + + assertEquals(OzoneManagerProtocolProtos.Status.ATOMIC_WRITE_CONFLICT, + response.getOMResponse().getStatus()); + } + @Test public void testValidateAndUpdateCacheSuccess() throws Exception { String volumeName = UUID.randomUUID().toString(); @@ -212,6 +273,20 @@ protected void addVolumeAndBucket(String volumeName, String bucketName) omMetadataManager, getBucketLayout()); } + protected OmKeyInfo addCommittedKeyToTable(String volumeName, + String bucketName, String keyName, long updateID, String eTag) + throws Exception { + OmKeyInfo keyInfo = OMRequestTestUtils.createOmKeyInfo( + volumeName, bucketName, keyName, + RatisReplicationConfig.getInstance(ONE)) + .setUpdateID(updateID) + .addMetadata(OzoneConsts.ETAG, eTag) + .build(); + omMetadataManager.getKeyTable(getBucketLayout()) + .put(getOzoneDBKey(volumeName, bucketName, keyName), keyInfo); + return keyInfo; + } + @Test public void testInvalidPartOrderError() throws Exception { String volumeName = UUID.randomUUID().toString(); @@ -373,5 +448,20 @@ protected String getKeyName() { protected long getNamespaceCount() { return 1L; } -} + private OMRequest withCompleteKeyArgs(OMRequest request, + KeyArgsUpdater updater) { + OzoneManagerProtocolProtos.MultipartUploadCompleteRequest completeRequest = + request.getCompleteMultiPartUploadRequest(); + KeyArgs.Builder keyArgs = completeRequest.getKeyArgs().toBuilder(); + updater.update(keyArgs); + return request.toBuilder() + .setCompleteMultiPartUploadRequest(completeRequest.toBuilder() + .setKeyArgs(keyArgs)) + .build(); + } + + private interface KeyArgsUpdater { + void update(KeyArgs.Builder keyArgs); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java index 9f0f5cbcdfd9..9e8280397e8f 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java @@ -89,6 +89,35 @@ protected void addKeyToTable(String volumeName, String bucketName, omMetadataManager); } + @Override + protected OmKeyInfo addCommittedKeyToTable(String volumeName, + String bucketName, String keyName, long updateID, String eTag) + throws Exception { + String parentDir = OzoneFSUtils.getParentDir(keyName); + assertNotEquals("Parent doesn't exists!", parentDir, keyName); + + long parentID = OMRequestTestUtils.addParentsToDirTable(volumeName, + bucketName, parentDir, omMetadataManager); + long objectId = parentID + 1; + + OmKeyInfo omKeyInfoFSO = + OMRequestTestUtils.createOmKeyInfo(volumeName, bucketName, keyName, + RatisReplicationConfig.getInstance(ONE), + new OmKeyLocationInfoGroup(0L, new ArrayList<>(), true)) + .setObjectID(objectId) + .setParentObjectID(parentID) + .setUpdateID(updateID) + .addMetadata(OzoneConsts.ETAG, eTag) + .build(); + + String fileName = OzoneFSUtils.getFileName(keyName); + omKeyInfoFSO.setKeyName(fileName); + OMRequestTestUtils.addFileToKeyTable(false, false, + fileName, omKeyInfoFSO, 0L, omKeyInfoFSO.getObjectID(), + omMetadataManager); + return omKeyInfoFSO; + } + private long getParentID(String volumeName, String bucketName, String keyName) throws IOException { final long volumeId = omMetadataManager.getVolumeId(volumeName); From 576fcce9e6994a06481b364f164c6cba33e692a0 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 5 May 2026 01:26:02 +0800 Subject: [PATCH 2/4] update s3 conditional request to include the concurrent conflict detection Signed-off-by: peterxcli --- .../content/design/s3-conditional-requests.md | 143 +++++++++++------- 1 file changed, 85 insertions(+), 58 deletions(-) diff --git a/hadoop-hdds/docs/content/design/s3-conditional-requests.md b/hadoop-hdds/docs/content/design/s3-conditional-requests.md index 6e2d1d0eca0e..cb8153a0d81d 100644 --- a/hadoop-hdds/docs/content/design/s3-conditional-requests.md +++ b/hadoop-hdds/docs/content/design/s3-conditional-requests.md @@ -105,8 +105,8 @@ temporary part state. | | | | |---|---|---| |**Header**|**Meaning**|**Failure result**| -|`If-None-Match: "*"`|Complete the MPU only if no committed object currently exists at the destination key.|`412 Precondition Failed` if the destination key already exists.| -|`If-Match: ""`|Complete the MPU only if the current committed object exists and its ETag matches.|`412 Precondition Failed` if the object is missing or the ETag does not match.| +|`If-None-Match: "*"`|Complete the MPU only if no committed object currently exists at the destination key.|`412 Precondition Failed` if the destination key already exists when the request is admitted; `409 ConditionalRequestConflict` if a conflicting write wins before the admitted complete request commits.| +|`If-Match: ""`|Complete the MPU only if the current committed object exists and its ETag matches.|`412 Precondition Failed` if the object is missing or the ETag does not match when the request is admitted; `409 ConditionalRequestConflict` if a conflicting write wins before the admitted complete request commits.| #### Restrictions and Notes @@ -118,6 +118,9 @@ temporary part state. object for `If-None-Match: *`. - The MPU parts remain associated with the multipart upload until the complete request succeeds or the upload is aborted. +- After a `409 ConditionalRequestConflict`, the client must fetch the + latest object ETag, create a new multipart upload, re-upload the + parts, and complete that new upload using the latest condition. ### AWS S3 Conditional Read Specification @@ -259,7 +262,7 @@ sequenceDiagram opt Open key created GW->>OM: commitKey() OM->>OM: Recheck key absence during commit - OM-->>GW: Success or generation mismatch + OM-->>GW: Success or ATOMIC_WRITE_CONFLICT end else If-Match: etag Note over GW,OM: No pre-read for current ETag/updateID on the optimistic path @@ -269,10 +272,10 @@ sequenceDiagram opt Open key created GW->>OM: commitKey() OM->>OM: Reload open key and reuse atomic rewrite generation check - OM-->>GW: Success or KEY_NOT_FOUND + OM-->>GW: Success or ATOMIC_WRITE_CONFLICT end end - GW-->>User: 200 OK or 412 Precondition Failed + GW-->>User: 200 OK, 412 Precondition Failed, or 409 ConditionalRequestConflict ``` #### If-None-Match Implementation @@ -308,15 +311,17 @@ public static final long EXPECTED_DATA_GENERATION_CREATE_IF_NOT_EXISTS = -1L; 1. During the commit phase (or strict atomic create), the OM validates that the key still does not exist. 2. If a concurrent client created the key between the Create and Commit - phases, the transaction fails with a generation-mismatch error. + phases, the transaction fails with `ATOMIC_WRITE_CONFLICT`, which maps + to S3 `409 ConditionalRequestConflict`. ##### Race Condition Handling Using `OzoneConsts.EXPECTED_DATA_GENERATION_CREATE_IF_NOT_EXISTS = -1` ensures atomicity. If a concurrent write (Client B) commits between Client A's Create and Commit, Client A's commit fails the -`CREATE IF NOT EXISTS` validation check, preserving strict -create-if-not-exists semantics. +`CREATE IF NOT EXISTS` validation check with `ATOMIC_WRITE_CONFLICT`, +preserving strict create-if-not-exists semantics while telling the +client to retry from a fresh object view. > **Note**: This ability will be added along with > [HDDS-13963](https://issues.apache.org/jira/browse/HDDS-13963) @@ -395,15 +400,16 @@ from HDDS-10656: 2. Read current committed key from `KeyTable`. 3. Validate `currentKey.updateID == openKey.expectedDataGeneration`. 4. If match, commit succeeds. If mismatch (concurrent modification), - throw `KEY_NOT_FOUND` (maps to S3 412). + throw `ATOMIC_WRITE_CONFLICT` (maps to S3 409). 5. Clear the conditional fields before persisting the final committed key so they remain open-key state only. This approach ensures end-to-end atomicity: even if another client modifies the key between Create and Commit phases, the commit will -fail. The gateway never needs to fetch `updateID` itself; OM derives it -from the matched ETag during `createKey`, and the rest of the write then -rides on the standard atomic rewrite path. +fail with a retryable conflict instead of being reported as a failed +precondition. The gateway never needs to fetch `updateID` itself; OM +derives it from the matched ETag during `createKey`, and the rest of the +write then rides on the standard atomic rewrite path. #### Error Mapping @@ -411,18 +417,20 @@ rides on the standard atomic rewrite path. |---|---|---|---| |**OM Error**|**S3 Status**|**S3 Error Code**|**Scenario**| |`KEY_ALREADY_EXISTS`|412|PreconditionFailed|If-None-Match failed (key exists)| -|`KEY_NOT_FOUND`|412|PreconditionFailed|If-Match failed (key missing or concurrent modification)| +|`KEY_NOT_FOUND`|412|PreconditionFailed|If-Match failed during admission because the key is missing| |`ETAG_NOT_AVAILABLE`|412|PreconditionFailed|If-Match failed (key has no ETag, e.g., created via OFS)| |`ETAG_MISMATCH`|412|PreconditionFailed|If-Match failed (ETag mismatch)| +|`ATOMIC_WRITE_CONFLICT`|409|ConditionalRequestConflict|An admitted conditional write lost a concurrent commit race| ## AWS S3 Conditional CompleteMultipartUpload Implementation Conditional MPU completion should reuse the same conditional write fields -already present in `KeyArgs`, but unlike normal conditional `PUT`, it -does not need an open-key plus later commit bridge. The final key is -assembled and committed inside one OM transaction in -`S3MultipartUploadCompleteRequest.validateAndUpdateCache(...)` while -holding the normal bucket write lock. +already present in `KeyArgs`. Unlike normal conditional `PUT`, the +gateway does not stream object data through an open-key plus later +client-side commit path. However, the condition still needs the same +admission/commit split: OM resolves the destination precondition before +the complete request is admitted, then revalidates the resolved +generation during the serialized commit transaction. ### Request Sequence @@ -435,17 +443,23 @@ sequenceDiagram User->>GW: CompleteMultipartUpload with optional If-None-Match / If-Match GW->>GW: Parse conditional write headers GW->>OM: completeMultipartUpload(keyArgs + partsList) - OM->>OM: Acquire bucket lock - OM->>OM: Load current committed key and MPU state - OM->>OM: Validate destination precondition - alt Precondition failed + OM->>OM: Admission check: verify MPU exists and resolve condition + alt Admission precondition failed OM-->>GW: KEY_ALREADY_EXISTS / KEY_NOT_FOUND / ETAG_* GW-->>User: 412 Precondition Failed - else Preconditions pass - OM->>OM: Validate MPU parts and build final key - OM->>OM: Write key table entry and remove MPU state - OM-->>GW: Success - GW-->>User: 200 OK + else Request admitted + OM->>OM: Acquire bucket lock + OM->>OM: Load current committed key and MPU state + OM->>OM: Revalidate expectedDataGeneration at commit + alt Admitted condition became stale + OM-->>GW: ATOMIC_WRITE_CONFLICT + GW-->>User: 409 ConditionalRequestConflict + else Condition still holds + OM->>OM: Validate MPU parts and build final key + OM->>OM: Write key table entry and remove MPU state + OM-->>GW: Success + GW-->>User: 200 OK + end end ``` @@ -462,38 +476,50 @@ sequenceDiagram - `If-Match: ""` -> `expectedETag = ` 4. Ensure the OM protocol translator copies these optional fields from `OmKeyArgs` into the `CompleteMultiPartUploadRequest` `KeyArgs`. -5. Map conditional validation failures to the same S3 - `PreconditionFailed` response used by the other conditional write - paths. +5. Map admission-time conditional validation failures to + `PreconditionFailed`, and map commit-time + `ATOMIC_WRITE_CONFLICT` failures to + `ConditionalRequestConflict`. ### OM Validation -Validation should occur in -`S3MultipartUploadCompleteRequest.validateAndUpdateCache(...)` after the -request acquires the bucket write lock and before it assembles the final -key from the uploaded parts. +Validation is split between request admission and serialized commit. +This lets Ozone return `412 PreconditionFailed` when the caller's +condition is already false, and `409 ConditionalRequestConflict` when a +condition that was true at admission becomes stale before commit. The proposed flow is: -1. Load the current committed destination key from `keyTable`. -2. Reuse the same `If-Match` validation helper already used by +1. During admission, verify that the multipart upload still exists for + conditional complete requests. +2. Load the current committed destination key and call + `resolveConditionalWriteAtAdmission(...)`. +3. For `If-Match`, reuse the same ETag validation helper used by conditional `PUT` to convert `expectedETag` into - `expectedDataGeneration`. -3. Reuse the same atomic rewrite validation helper to evaluate: - - create-if-absent semantics for `If-None-Match: *` - - generation/ETag match semantics for `If-Match` -4. If validation passes, continue with the existing MPU complete logic: + `expectedDataGeneration`. For `If-None-Match: *`, validate the + create-if-absent marker as an admission-time precondition. +4. During `S3MultipartUploadCompleteRequest.validateAndUpdateCache(...)`, + while applying the request as a serialized OM transaction, reload the + current committed key and call `validateAtomicRewriteAtCommit(...)` + when `expectedDataGeneration` is present. +5. If the admitted generation is stale, fail with + `ATOMIC_WRITE_CONFLICT`. This check runs before reporting a missing + MPU state, so duplicate conditional complete attempts for the same + upload surface the object conflict instead of being reduced to + `NoSuchUpload`. +6. If validation passes, continue with the existing MPU complete logic: validate part order, validate part identity, compute the MPU ETag, write the final key, and delete the multipart metadata/open-key state. -5. Clear conditional-only fields before persisting the committed key so +7. Clear conditional-only fields before persisting the committed key so they remain request-scoped metadata rather than part of the final key state. -Because the final destination validation and the key-table write happen -under the same OM bucket lock in one request, this path does not need a -separate second-phase commit revalidation step like conditional -`PutObject`. +The complete operation still writes the final key in one OM transaction. +The additional commit-time generation check exists to distinguish a +stale admitted condition from an admission-time precondition failure and +to preserve AWS-compatible retry guidance for racing conditional +completes. ### Error Mapping @@ -504,13 +530,12 @@ separate second-phase commit revalidation step like conditional |`KEY_NOT_FOUND`|412|PreconditionFailed|`If-Match` failed because the current destination object is missing| |`ETAG_NOT_AVAILABLE`|412|PreconditionFailed|Destination key has no ETag metadata| |`ETAG_MISMATCH`|412|PreconditionFailed|Destination ETag mismatch| +|`ATOMIC_WRITE_CONFLICT`|409|ConditionalRequestConflict|The destination changed after the condition was admitted but before the complete request committed| -If Ozone later introduces an explicit conflict result for this path, the -gateway should map it to `409 ConditionalRequestConflict`, reusing the -same S3 error mapping as conditional `PutObject`. The initial MPU -complete design does not require a dedicated second-phase conflict -signal because the validation and final key update are already -serialized in one OM transaction. +On `409 ConditionalRequestConflict`, the client should fetch the latest +destination object ETag, initiate a new multipart upload, re-upload the +parts, and complete the new upload with a condition based on that latest +view. ## AWS S3 Conditional Read Implementation @@ -608,9 +633,9 @@ sequenceDiagram opt Destination open key created GW->>OM: commit destination key using source snapshot OM->>OM: Revalidate generation for conditional writes - OM-->>GW: Success or 412-mapped error + OM-->>GW: Success or ATOMIC_WRITE_CONFLICT end - GW-->>User: 200 OK or 412 Precondition Failed + GW-->>User: 200 OK, 412 Precondition Failed, or 409 ConditionalRequestConflict end ``` @@ -663,13 +688,15 @@ Conditional support should remain orthogonal to the existing metadata/tagging di |**Failure point**|**OM/Gateway result**|**S3 Status**|**Scenario**| |Source validator|Gateway precondition failure|412|Source ETag/date condition failed| |Destination validator|`KEY_ALREADY_EXISTS`|412|`If-None-Match` failed at destination| -|Destination validator|`KEY_NOT_FOUND`|412|`If-Match` failed because destination is missing or changed| +|Destination validator|`KEY_NOT_FOUND`|412|`If-Match` failed because destination is missing during admission| |Destination validator|`ETAG_NOT_AVAILABLE`|412|Destination key has no ETag metadata| |Destination validator|`ETAG_MISMATCH`|412|Destination ETag mismatch| +|Destination commit|`ATOMIC_WRITE_CONFLICT`|409|Destination changed after the conditional open was admitted| -This initial design intentionally reuses the `412` mapping already described for conditional writes. If Ozone later -wants to distinguish in-flight destination races with a dedicated `409 ConditionalRequestConflict`, that can be added -without changing the overall structure above. +Admission-time destination failures reuse the `412` mapping already +described for conditional writes. Once a destination condition has been +admitted, a later generation mismatch is a write conflict and should be +reported as `409 ConditionalRequestConflict`. ## AWS S3 Conditional Delete Implementation From a7dcd150816e5bfd15889da495b93126e8526498 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 6 May 2026 17:29:48 +0800 Subject: [PATCH 3/4] create key preExecute/validate and commit key preExecute as admission, only commit validate as commit Signed-off-by: peterxcli --- .../om/request/key/OMKeyCommitRequest.java | 35 +++++++++++ .../key/OMKeyCommitRequestWithFSO.java | 59 +++++++++++++++++++ .../ozone/om/request/key/OMKeyRequest.java | 18 +++++- .../request/key/TestOMKeyCommitRequest.java | 59 +++++++++++++++++++ 4 files changed, 169 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java index b53fe0aa29e5..533853713d90 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java @@ -129,12 +129,47 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { KeyArgs resolvedKeyArgs = resolveBucketAndCheckOpenKeyAcls(newKeyArgs.build(), ozoneManager, IAccessAuthorizer.ACLType.WRITE, commitKeyRequest.getClientID()); + validateConditionalCommitAtAdmission(ozoneManager.getMetadataManager(), + resolvedKeyArgs, commitKeyRequest.getClientID()); return request.toBuilder() .setCommitKeyRequest(commitKeyRequest.toBuilder() .setKeyArgs(resolvedKeyArgs)).build(); } + protected void validateConditionalCommitAtAdmission( + OMMetadataManager omMetadataManager, KeyArgs keyArgs, long clientId) + throws IOException { + OmKeyInfo openKeyInfo = getOpenKeyInfoForCommitAdmission( + omMetadataManager, keyArgs, clientId); + if (openKeyInfo == null) { + return; + } + + validateAtomicRewriteAtAdmission( + getCommittedKeyInfoForCommitAdmission(omMetadataManager, keyArgs), + openKeyInfo.getExpectedDataGeneration(), null); + } + + protected OmKeyInfo getOpenKeyInfoForCommitAdmission( + OMMetadataManager omMetadataManager, KeyArgs keyArgs, long clientId) + throws IOException { + String dbOpenKey = omMetadataManager.getOpenKey( + keyArgs.getVolumeName(), keyArgs.getBucketName(), + keyArgs.getKeyName(), clientId); + return omMetadataManager.getOpenKeyTable(getBucketLayout()) + .get(dbOpenKey); + } + + protected OmKeyInfo getCommittedKeyInfoForCommitAdmission( + OMMetadataManager omMetadataManager, KeyArgs keyArgs) + throws IOException { + String dbOzoneKey = omMetadataManager.getOzoneKey( + keyArgs.getVolumeName(), keyArgs.getBucketName(), + keyArgs.getKeyName()); + return omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey); + } + @Override @SuppressWarnings("methodlength") public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java index 3c68a0b9ee31..eb220d1f0de6 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java @@ -17,9 +17,12 @@ package org.apache.hadoop.ozone.om.request.key; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_ALREADY_CLOSED; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND; import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.BUCKET_LOCK; import com.google.common.annotations.VisibleForTesting; @@ -73,6 +76,62 @@ public OMKeyCommitRequestWithFSO(OMRequest omRequest, super(omRequest, bucketLayout); } + @Override + protected OmKeyInfo getOpenKeyInfoForCommitAdmission( + OMMetadataManager omMetadataManager, KeyArgs keyArgs, long clientId) + throws IOException { + try { + OmFSOFile fsoFile = getOmFSOFileForCommitAdmission( + omMetadataManager, keyArgs); + return OMFileRequest.getOmKeyInfoFromFileTable(true, + omMetadataManager, fsoFile.getOpenFileName(clientId), + keyArgs.getKeyName()); + } catch (OMException ex) { + if (isMissingNamespaceForCommitAdmission(ex)) { + return null; + } + throw ex; + } + } + + @Override + protected OmKeyInfo getCommittedKeyInfoForCommitAdmission( + OMMetadataManager omMetadataManager, KeyArgs keyArgs) + throws IOException { + try { + OmFSOFile fsoFile = getOmFSOFileForCommitAdmission( + omMetadataManager, keyArgs); + return omMetadataManager.getKeyTable(getBucketLayout()) + .get(fsoFile.getOzonePathKey()); + } catch (OMException ex) { + if (isMissingNamespaceForCommitAdmission(ex)) { + return null; + } + throw ex; + } + } + + private OmFSOFile getOmFSOFileForCommitAdmission( + OMMetadataManager omMetadataManager, KeyArgs keyArgs) + throws IOException { + String keyName = keyArgs.getKeyName(); + String errMsg = "Cannot create file : " + keyName + + " as parent directory doesn't exist"; + return new OmFSOFile.Builder() + .setVolumeName(keyArgs.getVolumeName()) + .setBucketName(keyArgs.getBucketName()) + .setKeyName(keyName) + .setOmMetadataManager(omMetadataManager) + .setErrMsg(errMsg) + .build(); + } + + private boolean isMissingNamespaceForCommitAdmission(OMException ex) { + return ex.getResult() == VOLUME_NOT_FOUND + || ex.getResult() == BUCKET_NOT_FOUND + || ex.getResult() == DIRECTORY_NOT_FOUND; + } + @Override @SuppressWarnings("methodlength") public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java index 9025bb5f2c6e..53657c1b6e98 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java @@ -1340,8 +1340,22 @@ protected void validateAtomicRewriteAtAdmission(OmKeyInfo dbKeyInfo, if (!keyArgs.hasExpectedDataGeneration()) { return; } - validateExpectedDataGeneration(dbKeyInfo, - keyArgs.getExpectedDataGeneration(), auditMap, + validateAtomicRewriteAtAdmission(dbKeyInfo, + keyArgs.getExpectedDataGeneration(), auditMap); + } + + /** + * Validates the condition at request admission time. + * + * This form is for callers that resolve the admitted generation from + * persisted open-key state rather than from request key arguments. + */ + protected void validateAtomicRewriteAtAdmission(OmKeyInfo dbKeyInfo, + Long expectedGen, Map auditMap) throws OMException { + if (expectedGen == null) { + return; + } + validateExpectedDataGeneration(dbKeyInfo, expectedGen, auditMap, AtomicRewritePhase.ADMISSION); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java index 41b81471ff96..6c82b65ee9e4 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java @@ -350,6 +350,65 @@ public void testAtomicCreateIfNotExistsCommitKeyAlreadyExists() throws Exception assertEquals(ATOMIC_WRITE_CONFLICT, omClientResponse.getOMResponse().getStatus()); } + @Test + public void testAtomicRewritePreExecuteUsesAdmissionFailure() + throws Exception { + OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager, getBucketLayout()); + + List allocatedLocationList = getKeyLocation( + DEFAULT_COMMIT_BLOCK_SIZE).stream() + .map(OmKeyLocationInfo::getFromProtobuf) + .collect(Collectors.toList()); + + OmKeyInfo.Builder openKeyInfoBuilder = OMRequestTestUtils.createOmKeyInfo( + volumeName, bucketName, keyName, replicationConfig, + new OmKeyLocationInfoGroup(version, new ArrayList<>())); + openKeyInfoBuilder.setExpectedDataGeneration(1L); + addKeyToOpenKeyTable(allocatedLocationList, openKeyInfoBuilder); + + OmKeyInfo staleClosedKey = OMRequestTestUtils.createOmKeyInfo( + volumeName, bucketName, keyName, replicationConfig, + new OmKeyLocationInfoGroup(version, new ArrayList<>())) + .setUpdateID(2L) + .build(); + omMetadataManager.getKeyTable(getBucketLayout()) + .put(getOzonePathKey(), staleClosedKey); + + OMException ex = assertThrows(OMException.class, + () -> doPreExecute(createCommitKeyRequest())); + assertEquals(OMException.ResultCodes.KEY_NOT_FOUND, ex.getResult()); + } + + @Test + public void testAtomicCreateIfNotExistsPreExecuteUsesAdmissionFailure() + throws Exception { + OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager, getBucketLayout()); + + List allocatedLocationList = getKeyLocation( + DEFAULT_COMMIT_BLOCK_SIZE).stream() + .map(OmKeyLocationInfo::getFromProtobuf) + .collect(Collectors.toList()); + + OmKeyInfo.Builder openKeyInfoBuilder = OMRequestTestUtils.createOmKeyInfo( + volumeName, bucketName, keyName, replicationConfig, + new OmKeyLocationInfoGroup(version, new ArrayList<>())); + openKeyInfoBuilder.setExpectedDataGeneration( + OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS); + addKeyToOpenKeyTable(allocatedLocationList, openKeyInfoBuilder); + + OmKeyInfo existingClosedKey = OMRequestTestUtils.createOmKeyInfo( + volumeName, bucketName, keyName, replicationConfig, + new OmKeyLocationInfoGroup(version, new ArrayList<>())).build(); + omMetadataManager.getKeyTable(getBucketLayout()) + .put(getOzonePathKey(), existingClosedKey); + + OMException ex = assertThrows(OMException.class, + () -> doPreExecute(createCommitKeyRequest())); + assertEquals(OMException.ResultCodes.KEY_ALREADY_EXISTS, ex.getResult()); + } + @Test public void testValidateAndUpdateCacheWithUncommittedBlocks() throws Exception { From 5717a4d2c959e32a47a5c1204446e3bb781d65ac Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 6 May 2026 17:30:07 +0800 Subject: [PATCH 4/4] update docs Signed-off-by: peterxcli --- .../content/design/s3-conditional-requests.md | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/docs/content/design/s3-conditional-requests.md b/hadoop-hdds/docs/content/design/s3-conditional-requests.md index cb8153a0d81d..325c4df8f56f 100644 --- a/hadoop-hdds/docs/content/design/s3-conditional-requests.md +++ b/hadoop-hdds/docs/content/design/s3-conditional-requests.md @@ -301,13 +301,22 @@ public static final long EXPECTED_DATA_GENERATION_CREATE_IF_NOT_EXISTS = -1L; OzoneConsts.EXPECTED_DATA_GENERATION_CREATE_IF_NOT_EXISTS` in the outgoing `KeyArgs`. -##### OM Create Phase +##### OM CreateKey Admission Phase + +For conditional `PUT`, `CreateKey` admits the write and creates the +open-key state, but it does not make the destination object visible. +Both `CreateKey.preExecute(...)` and `CreateKey.validateAndUpdateCache(...)` +belong to the admission side of the conditional write flow. 1. OM receives request with `expectedDataGeneration == OzoneConsts.EXPECTED_DATA_GENERATION_CREATE_IF_NOT_EXISTS`. 2. **Pre-check**: If key is already in the OpenKeyTable or KeyTable, throw `KEY_ALREADY_EXISTS`. 3. If not exists, proceed to create the open key entry. -##### OM Commit Phase (Atomicity) +##### OM CommitKey Commit Phase (Atomicity) + +`CommitKey.preExecute(...)` is still admission-side request preparation. +Only `CommitKey.validateAndUpdateCache(...)` performs commit-time +conditional revalidation for `PutObject`. 1. During the commit phase (or strict atomic create), the OM validates that the key still does not exist. 2. If a concurrent client created the key between the Create and Commit @@ -365,10 +374,13 @@ part of the normal write path. `expectedDataGeneration`, the normal output-stream commit path carries that generation on `commitKey`. -##### OM Create Phase +##### OM CreateKey Admission Phase -Validation is performed within the `validateAndUpdateCache` method to -ensure atomicity within the Ratis state machine application. +For conditional `PUT`, the `CreateKey` request is the admission +transaction. `CreateKey.preExecute(...)` performs admission-side request +preparation, and `CreateKey.validateAndUpdateCache(...)` performs the +serialized admission check that either opens the key or returns a +precondition-style failure. 1. **Locking**: The OM acquires the write lock for the bucket/key. 2. **Key Lookup**: Retrieve the existing key from `KeyTable`. @@ -390,10 +402,12 @@ ensure atomicity within the Ratis state machine application. metadata containing that generation, so the later `commitKey` request can carry the same resolved rewrite condition. -##### OM Commit Phase +##### OM CommitKey Commit Phase -The commit phase reuses the existing atomic rewrite validation logic -from HDDS-10656: +`CommitKey.preExecute(...)` is also admission-side request preparation. +Only `CommitKey.validateAndUpdateCache(...)` is the commit phase for the +conditional write. This phase reuses the existing atomic rewrite +validation logic from HDDS-10656: 1. Read open key entry (contains `expectedDataGeneration` set during create phase from the ETag-validated key).