From 03515c94f316054398d9ef623b9aef0640667397 Mon Sep 17 00:00:00 2001 From: Jingyan Li Date: Thu, 30 Apr 2026 15:40:24 -0700 Subject: [PATCH 1/4] [blob-transfer] reject schema-version mismatch on the request side MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit added a fast-fail check on the metadata response, but the server's response order is files first, metadata last — so a client that catches the mismatch at the metadata stage has already paid for the entire file transfer. This commit moves the primary check to the request side, modeled on the existing snapshot-table-format check next to it. Client `prepareRequest` now stamps the two schema-version headers on the GET. Server `channelRead0` calls a new `BlobTransferUtils.compareRequestedSchemaVersionsAgainstLocal(...)` right after the table-format validation. On mismatch it returns 400 BAD_REQUEST with an `X-Blob-Transfer-Schema-Mismatch: true` marker header and echoes its own protocol versions in the response, BEFORE any file work begins. Client `P2PFileTransferClientHandler.channelRead0` recognizes the marker on a non-OK response and throws the typed `VeniceBlobTransferIncompatibleSchemaException` with full peer-vs-local context (peer = server's local versions echoed in the rejection). Backward compat preserved: a request without the new headers (older client) is not rejected — the server falls through to the existing flow. The response-side check from the prior commit stays as the safety net for the inverse case (older server that does not yet validate requests will still send full metadata; new client catches at metadata-parse time instead of waiting for the receive timeout). Tests cover: server rejects mismatched request with 400 + marker + no file responses; server doesn't reject requests without the headers (backward compat); client builds the typed exception from the rejection response with correct peer/local versions populated. Existing integration tests in TestNettyP2PBlobTransferManager (real client↔server flow) all pass unchanged because the new client always advertises versions that match the new server. Co-Authored-By: Claude Opus 4.7 (1M context) [blob-transfer] add fast-fail schema version check on metadata response The P2P blob-transfer client deserialized the peer's PartitionState bytes inside P2PMetadataTransferHandler without a SchemaReader, so any peer that serialized PartitionState with a protocol version higher than this binary knows triggered VeniceMessageException at the Netty pipeline tail after the body had been fully transferred. The transfer future was never failed explicitly, so the replica had to wait for blobReceiveTimeoutInMin before falling back to Kafka bootstrap. We saw this on ltx1-app12860.stg with PartitionState v21 during the rolling deploy that introduced PR #2707. Server now stamps two new headers on the metadata response: X-Blob-Transfer-Partition-State-Schema-Version X-Blob-Transfer-Store-Version-State-Schema-Version each carrying the local AvroProtocolDefinition.X.getCurrentProtocolVersion(). Client compares peer's value to its own current version at HTTP header-parse time, before any body is consumed, and throws the new VeniceBlobTransferIncompatibleSchemaException on mismatch. Throwing from channelRead0 flows through the existing exceptionCaught -> completeExceptionally -> ctx.close() path, so the per-host transfer future fails immediately and the orchestrator can pick the next peer (or fall back to Kafka) without waiting on the receive timeout. Policy is exact equality. Blob transfer is the fast path; if binaries are not in lock-step we'd rather skip P2P and let Kafka handle bootstrap than rely on cross-version metadata promotion. Skew is bounded to rolling-deploy windows. Backward-compat is preserved during rollout: a missing header passes through (peer not yet upgraded), and a malformed/out-of-range header logs a warning and passes through (parse bug must not crash the channel). The existing deserialize-time exception in InternalAvroSpecificSerializer remains as the safety net for the truly incompatible case, so no regression vs. today. Tests cover: header stamped on metadata response only (not on file responses), known-version pass-through, mismatched PartitionState fails fast, mismatched StoreVersionState fails fast, single-header-missing pass-through, and malformed/out-of-range pass-through. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../blobtransfer/BlobTransferUtils.java | 134 ++++++++++++ .../client/NettyFileTransferClient.java | 14 +- .../client/P2PFileTransferClientHandler.java | 48 +++++ .../server/P2PFileTransferServerHandler.java | 46 +++++ .../TestP2PFileTransferClientHandler.java | 191 ++++++++++++++++++ .../TestP2PFileTransferServerHandler.java | 76 +++++++ ...obTransferIncompatibleSchemaException.java | 80 ++++++++ 7 files changed, 588 insertions(+), 1 deletion(-) create mode 100644 internal/venice-client-common/src/main/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaException.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java index 725c6901d0e..7175b9b9108 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java @@ -10,10 +10,13 @@ import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.venice.SSLConfig; +import com.linkedin.venice.exceptions.VeniceBlobTransferIncompatibleSchemaException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.Version; import com.linkedin.venice.security.SSLFactory; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.utils.SslUtils; +import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.ssl.SslHandler; import java.io.File; @@ -32,6 +35,24 @@ public class BlobTransferUtils { public static final String BLOB_TRANSFER_STATUS = "X-Blob-Transfer-Status"; public static final String BLOB_TRANSFER_COMPLETED = "Completed"; public static final String BLOB_TRANSFER_TYPE = "X-Blob-Transfer-Type"; + /** + * Protocol version the peer used to serialize the {@code PartitionState} embedded in + * the metadata response body. The client uses this to fail fast when the local binary + * does not have a reader for that version, rather than discovering the mismatch during + * deserialization after the body has been fully received. + */ + public static final String BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION = + "X-Blob-Transfer-Partition-State-Schema-Version"; + /** Same purpose as {@link #BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION} but for {@code StoreVersionState}. */ + public static final String BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION = + "X-Blob-Transfer-Store-Version-State-Schema-Version"; + /** + * Marker header set on the server's 400 BAD_REQUEST response when it rejects a + * blob-transfer request because the requester's advertised schema versions do not + * match the server's local versions. Lets the client distinguish a schema-version + * rejection from any other 400 without parsing the body. + */ + public static final String BLOB_TRANSFER_SCHEMA_MISMATCH = "X-Blob-Transfer-Schema-Mismatch"; public enum BlobTransferType { FILE, METADATA @@ -89,6 +110,119 @@ public static boolean isMetadataMessage(HttpResponse msg) { return metadataHeader.equals(BlobTransferUtils.BlobTransferType.METADATA.name()); } + /** + * Validate that the schema-version headers on a P2P metadata response match the + * local binary's compiled-in {@code currentProtocolVersion} for each protocol. + * Throws {@link VeniceBlobTransferIncompatibleSchemaException} when at least one + * header is present and does not equal the local version — letting the caller fail + * fast at HTTP header-parse time, before any body is consumed. + * + *

An exact-match policy is used (rather than e.g. "peer <= local"). Blob + * transfer is the fast path; if the binaries on the two ends are not in lock-step + * we want to step aside and let Kafka bootstrap take over rather than rely on + * cross-version Avro promotion of the partition metadata. Skew between peers is + * limited to rolling-deploy windows, so the cost of being strict is bounded. + * + *

Behaviour for the absent / malformed cases is intentionally permissive so a + * server-side rollout of the new headers cannot break peers that haven't been + * upgraded yet, and so a header parsing bug cannot crash the channel: + *

    + *
  • Both headers absent — pass through (peer is on an older binary).
  • + *
  • Header value non-numeric or out of byte range — log a warning and pass + * through; the existing deserialization-time exception remains as the safety + * net for the truly incompatible case (no regression vs. today).
  • + *
+ */ + public static void validateMetadataResponseSchemaVersions(HttpResponse response, String peerHost) { + String psHeader = response.headers().get(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION); + String svsHeader = response.headers().get(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION); + if (psHeader == null && svsHeader == null) { + return; + } + + int peerPs = parseProtocolVersionHeader(psHeader, BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION); + int peerSvs = parseProtocolVersionHeader(svsHeader, BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION); + + int localPs = AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion(); + int localSvs = AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion(); + + boolean psMismatch = peerPs != VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN && peerPs != localPs; + boolean svsMismatch = + peerSvs != VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN && peerSvs != localSvs; + + if (psMismatch || svsMismatch) { + LOGGER.warn( + "Aborting P2P blob transfer from peer {}: metadata schema version mismatch" + + " (peer PartitionState={}, StoreVersionState={}; local PartitionState={}, StoreVersionState={})", + peerHost, + renderVersion(peerPs), + renderVersion(peerSvs), + localPs, + localSvs); + throw new VeniceBlobTransferIncompatibleSchemaException(peerHost, peerPs, peerSvs, localPs, localSvs); + } + } + + /** + * Server-side counterpart of {@link #validateMetadataResponseSchemaVersions}: compare + * the schema-version headers on a P2P blob-transfer GET request against the local + * binary's compiled-in {@code currentProtocolVersion}. Used by the server right next + * to the table-format check so a schema mismatch is rejected with a 400 BAD_REQUEST + * before any file work begins — otherwise the client would pay for the entire file + * transfer and only discover the mismatch at the metadata stage. + * + *

Same equality policy as the response side. Returns a diagnostic string suitable + * for the response body when the request is incompatible, or {@code null} when it + * is compatible (or when both headers are absent — older clients that have not yet + * been upgraded to advertise their versions). + */ + public static String compareRequestedSchemaVersionsAgainstLocal(HttpRequest request) { + String psHeader = request.headers().get(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION); + String svsHeader = request.headers().get(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION); + if (psHeader == null && svsHeader == null) { + return null; + } + + int peerPs = parseProtocolVersionHeader(psHeader, BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION); + int peerSvs = parseProtocolVersionHeader(svsHeader, BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION); + + int localPs = AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion(); + int localSvs = AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion(); + + boolean psMismatch = peerPs != VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN && peerPs != localPs; + boolean svsMismatch = + peerSvs != VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN && peerSvs != localSvs; + + if (psMismatch || svsMismatch) { + return "Blob transfer schema version mismatch: requester PartitionState=" + renderVersion(peerPs) + + ", StoreVersionState=" + renderVersion(peerSvs) + "; local PartitionState=" + localPs + + ", StoreVersionState=" + localSvs; + } + return null; + } + + private static int parseProtocolVersionHeader(String value, String headerName) { + if (value == null) { + return VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN; + } + try { + int parsed = Integer.parseInt(value.trim()); + // Protocol versions are encoded into a single byte on the wire (see InternalAvroSpecificSerializer). + if (parsed < 0 || parsed > Byte.MAX_VALUE) { + LOGGER.warn("Out-of-range value '{}' for blob-transfer header {}; treating as unknown.", value, headerName); + return VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN; + } + return parsed; + } catch (NumberFormatException e) { + LOGGER.warn("Malformed value '{}' for blob-transfer header {}; treating as unknown.", value, headerName); + return VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN; + } + } + + private static String renderVersion(int v) { + return v == VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN ? "" : Integer.toString(v); + } + /** * Generate MD5 checksum for a file * @param filePath the path to the file diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java index d1208cc4e43..689c664cf90 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java @@ -11,6 +11,7 @@ import com.linkedin.venice.listener.VerifySslHandler; import com.linkedin.venice.meta.Version; import com.linkedin.venice.security.SSLFactory; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.utils.DaemonThreadFactory; import com.linkedin.venice.utils.LogContext; import com.linkedin.venice.utils.Utils; @@ -390,10 +391,21 @@ private FullHttpRequest prepareRequest( int version, int partition, BlobTransferTableFormat requestTableFormat) { - return new DefaultFullHttpRequest( + FullHttpRequest request = new DefaultFullHttpRequest( HttpVersion.HTTP_1_1, HttpMethod.GET, String.format("/%s/%d/%d/%s", storeName, version, partition, requestTableFormat.name())); + // Advertise the schema versions this client can deserialize so the server can + // reject the request before any file work begins on a mismatch. + request.headers() + .set( + BlobTransferUtils.BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION, + AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion()); + request.headers() + .set( + BlobTransferUtils.BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION, + AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion()); + return request; } /** diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java index 023f48aade2..8413e4e60dc 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java @@ -1,13 +1,18 @@ package com.linkedin.davinci.blobtransfer.client; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_COMPLETED; +import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION; +import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_SCHEMA_MISMATCH; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STATUS; +import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION; import com.linkedin.davinci.blobtransfer.BlobTransferPayload; import com.linkedin.davinci.blobtransfer.BlobTransferUtils; import com.linkedin.davinci.stats.AggBlobTransferStats; import com.linkedin.venice.exceptions.VeniceBlobTransferFileNotFoundException; +import com.linkedin.venice.exceptions.VeniceBlobTransferIncompatibleSchemaException; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.store.rocksdb.RocksDBUtils; import com.linkedin.venice.utils.LatencyUtils; import com.linkedin.venice.utils.Utils; @@ -103,6 +108,8 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex if (response.status().equals(HttpResponseStatus.NOT_FOUND)) { throw new VeniceBlobTransferFileNotFoundException( "Requested files from remote peer are not found. Response: " + response.status()); + } else if (response.headers().contains(BLOB_TRANSFER_SCHEMA_MISMATCH)) { + throw buildSchemaMismatchExceptionFromErrorResponse(ctx, response); } else { throw new VeniceException("Failed to fetch file from remote peer. Response: " + response.status()); } @@ -111,6 +118,13 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex // redirect the message to the next handler if it's a metadata transfer boolean isMetadataMessage = BlobTransferUtils.isMetadataMessage(response); if (isMetadataMessage) { + // Fail fast at header-parse time if the peer's metadata is serialized with a + // protocol version this binary cannot read; throwing here flows through + // exceptionCaught -> handleExceptionGracefully and fails the per-host transfer + // future immediately, so the next peer (or Kafka bootstrap) can take over + // without waiting for blobReceiveTimeoutInMin. + BlobTransferUtils + .validateMetadataResponseSchemaVersions(response, String.valueOf(ctx.channel().remoteAddress())); ReferenceCountUtil.retain(msg); ctx.fireChannelRead(msg); return; @@ -287,6 +301,40 @@ void handleExceptionGracefully(Throwable cause) { } + /** + * Build a {@link VeniceBlobTransferIncompatibleSchemaException} from a server error + * response carrying the {@link BlobTransferUtils#BLOB_TRANSFER_SCHEMA_MISMATCH} + * marker. The server echoes its own schema versions in the response headers; the + * "peer" fields on the exception are those server-side values, the "local" fields + * are this client's compiled-in versions. + */ + private VeniceBlobTransferIncompatibleSchemaException buildSchemaMismatchExceptionFromErrorResponse( + ChannelHandlerContext ctx, + HttpResponse response) { + int peerPs = readVersionHeader(response, BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION); + int peerSvs = readVersionHeader(response, BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION); + int localPs = AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion(); + int localSvs = AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion(); + return new VeniceBlobTransferIncompatibleSchemaException( + String.valueOf(ctx.channel().remoteAddress()), + peerPs, + peerSvs, + localPs, + localSvs); + } + + private static int readVersionHeader(HttpResponse response, String name) { + String value = response.headers().get(name); + if (value == null) { + return VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN; + } + try { + return Integer.parseInt(value.trim()); + } catch (NumberFormatException e) { + return VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN; + } + } + private String getFileNameFromHeader(HttpResponse response) { String contentDisposition = response.headers().get(HttpHeaderNames.CONTENT_DISPOSITION); if (contentDisposition != null) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java index 13bf73d2f99..31226c9910e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java @@ -1,7 +1,10 @@ package com.linkedin.davinci.blobtransfer.server; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_COMPLETED; +import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION; +import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_SCHEMA_MISMATCH; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STATUS; +import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_TYPE; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferTableFormat; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferType; @@ -18,6 +21,7 @@ import com.linkedin.davinci.stats.AggBlobTransferStats; import com.linkedin.venice.meta.Version; import com.linkedin.venice.request.RequestHelper; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.Utils; import io.netty.buffer.Unpooled; @@ -127,6 +131,16 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpReque return; } + // Check the requester's metadata schema versions before doing any file work. + // If the local binary cannot serialize PartitionState/StoreVersionState in a + // version the requester can read, fail fast with 400 + a marker header so the + // client doesn't pay for file bytes only to reject the metadata at the end. + String schemaMismatch = BlobTransferUtils.compareRequestedSchemaVersionsAgainstLocal(httpRequest); + if (schemaMismatch != null) { + sendSchemaMismatchResponse(ctx, schemaMismatch); + return; + } + // Check the concurrent request limit if (globalConcurrentTransferRequests.get() >= maxAllowedConcurrentSnapshotUsers) { String errMessage = @@ -329,6 +343,30 @@ private void sendFile( }); } + /** + * Send a 400 BAD_REQUEST response for a schema-version mismatch. Echoes the + * server's local protocol versions so the client can build a typed exception + * with full diagnostic context, and sets a marker header so the client can + * distinguish this from any other 400. + */ + private void sendSchemaMismatchResponse(ChannelHandlerContext ctx, String diagnosticBody) { + byte[] body = diagnosticBody.getBytes(); + FullHttpResponse response = + new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST, Unpooled.wrappedBuffer(body)); + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, body.length); + response.headers().set(BLOB_TRANSFER_SCHEMA_MISMATCH, "true"); + response.headers() + .set( + BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION, + AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion()); + response.headers() + .set( + BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION, + AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion()); + LOGGER.warn("Rejecting blob-transfer request from {}: {}", ctx.channel().remoteAddress(), diagnosticBody); + ctx.writeAndFlush(response); + } + /** * Send metadata for the given blob transfer request * @param ctx the channel context @@ -347,6 +385,14 @@ public void sendMetadata(ChannelHandlerContext ctx, BlobTransferPartitionMetadat metadataResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, metadataBytes.length); metadataResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, APPLICATION_JSON); metadataResponse.headers().set(BLOB_TRANSFER_TYPE, BlobTransferType.METADATA); + metadataResponse.headers() + .set( + BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION, + AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion()); + metadataResponse.headers() + .set( + BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION, + AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion()); ctx.writeAndFlush(metadataResponse).addListener(future -> { if (future.isSuccess()) { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java index b2c2afb4f91..74eb1963d52 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java @@ -1,7 +1,10 @@ package com.linkedin.davinci.blobtransfer; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_COMPLETED; +import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION; +import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_SCHEMA_MISMATCH; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STATUS; +import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_TYPE; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferType; import static com.linkedin.venice.utils.TestUtils.DEFAULT_PUBSUB_CONTEXT_FOR_UNIT_TESTING; @@ -14,6 +17,7 @@ import com.linkedin.davinci.notifier.VeniceNotifier; import com.linkedin.davinci.stats.AggBlobTransferStats; import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.venice.exceptions.VeniceBlobTransferIncompatibleSchemaException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.state.IncrementalPushReplicaStatus; import com.linkedin.venice.kafka.protocol.state.PartitionState; @@ -524,6 +528,193 @@ public void testReaderIdleEventBeforeTransferComplete() throws IOException { } } + @Test + public void testIncompatiblePartitionStateSchemaVersionFailsFast() { + int incompatibleVersion = AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion() + 100; + FullHttpResponse metadataResponse = + new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER); + metadataResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, 0); + metadataResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json"); + metadataResponse.headers().set(BLOB_TRANSFER_TYPE, BlobTransferType.METADATA); + metadataResponse.headers().set(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION, incompatibleVersion); + metadataResponse.headers() + .set( + BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION, + AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion()); + + ch.writeInbound(metadataResponse); + + try { + inputStreamFuture.toCompletableFuture().get(1, TimeUnit.SECONDS); + Assert.fail("Expected exception not thrown"); + } catch (Exception e) { + Assert.assertTrue( + e.getCause() instanceof VeniceBlobTransferIncompatibleSchemaException, + "Expected VeniceBlobTransferIncompatibleSchemaException, got: " + e.getCause()); + VeniceBlobTransferIncompatibleSchemaException ex = (VeniceBlobTransferIncompatibleSchemaException) e.getCause(); + Assert.assertEquals(ex.getPeerPartitionStateVersion(), incompatibleVersion); + Assert.assertEquals( + ex.getPeerStoreVersionStateVersion(), + AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion()); + Assert.assertEquals( + ex.getLocalPartitionStateVersion(), + AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion()); + Assert.assertEquals( + ex.getLocalStoreVersionStateVersion(), + AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion()); + } + } + + @Test + public void testIncompatibleStoreVersionStateSchemaVersionFailsFast() { + int incompatibleVersion = AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion() + 100; + FullHttpResponse metadataResponse = + new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER); + metadataResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, 0); + metadataResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json"); + metadataResponse.headers().set(BLOB_TRANSFER_TYPE, BlobTransferType.METADATA); + metadataResponse.headers() + .set( + BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION, + AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion()); + metadataResponse.headers().set(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION, incompatibleVersion); + + ch.writeInbound(metadataResponse); + + try { + inputStreamFuture.toCompletableFuture().get(1, TimeUnit.SECONDS); + Assert.fail("Expected exception not thrown"); + } catch (Exception e) { + Assert.assertTrue(e.getCause() instanceof VeniceBlobTransferIncompatibleSchemaException); + VeniceBlobTransferIncompatibleSchemaException ex = (VeniceBlobTransferIncompatibleSchemaException) e.getCause(); + Assert.assertEquals(ex.getPeerStoreVersionStateVersion(), incompatibleVersion); + } + } + + /** + * Backward compatibility: a peer running an older binary that does not yet emit + * the schema-version headers must not break P2P transfer for upgraded clients. + * The existing happy-path (covered by {@link #testSingleMetadataTransfer}) already + * exercises the no-headers case; this test makes the contract explicit by sending + * only one of the two headers and asserting transfer proceeds normally. + */ + @Test + public void testSingleSchemaVersionHeaderMissingPassesThrough() + throws JsonProcessingException, ExecutionException, InterruptedException, TimeoutException { + BlobTransferPartitionMetadata expectedMetadata = new BlobTransferPartitionMetadata(); + expectedMetadata.setTopicName(TEST_STORE + "_v" + TEST_VERSION); + expectedMetadata.setPartitionId(TEST_PARTITION); + InternalAvroSpecificSerializer partitionStateSerializer = + AvroProtocolDefinition.PARTITION_STATE.getSerializer(); + OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer, DEFAULT_PUBSUB_CONTEXT_FOR_UNIT_TESTING); + expectedMetadata.setOffsetRecord(ByteBuffer.wrap(offsetRecord.toBytes())); + + ObjectMapper objectMapper = new ObjectMapper(); + String metadataJson = objectMapper.writeValueAsString(expectedMetadata); + byte[] metadataBytes = metadataJson.getBytes(CharsetUtil.UTF_8); + + FullHttpResponse metadataResponse = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.OK, + Unpooled.copiedBuffer(metadataJson, CharsetUtil.UTF_8)); + metadataResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, metadataBytes.length); + metadataResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json"); + metadataResponse.headers().set(BLOB_TRANSFER_TYPE, BlobTransferType.METADATA); + // Only the StoreVersionState header is present; PartitionState header is absent. + metadataResponse.headers() + .set( + BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION, + AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion()); + + DefaultHttpResponse endOfTransfer = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + endOfTransfer.headers().add(BLOB_TRANSFER_STATUS, BLOB_TRANSFER_COMPLETED); + + ch.writeInbound(metadataResponse); + ch.writeInbound(endOfTransfer); + + inputStreamFuture.toCompletableFuture().get(1, TimeUnit.MINUTES); + Assert.assertTrue(inputStreamFuture.toCompletableFuture().isDone()); + Assert.assertNotNull(clientMetadataHandler.getMetadata()); + } + + @Test + public void testMalformedSchemaVersionHeaderPassesThrough() + throws JsonProcessingException, ExecutionException, InterruptedException, TimeoutException { + BlobTransferPartitionMetadata expectedMetadata = new BlobTransferPartitionMetadata(); + expectedMetadata.setTopicName(TEST_STORE + "_v" + TEST_VERSION); + expectedMetadata.setPartitionId(TEST_PARTITION); + InternalAvroSpecificSerializer partitionStateSerializer = + AvroProtocolDefinition.PARTITION_STATE.getSerializer(); + OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer, DEFAULT_PUBSUB_CONTEXT_FOR_UNIT_TESTING); + expectedMetadata.setOffsetRecord(ByteBuffer.wrap(offsetRecord.toBytes())); + + ObjectMapper objectMapper = new ObjectMapper(); + String metadataJson = objectMapper.writeValueAsString(expectedMetadata); + byte[] metadataBytes = metadataJson.getBytes(CharsetUtil.UTF_8); + + FullHttpResponse metadataResponse = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.OK, + Unpooled.copiedBuffer(metadataJson, CharsetUtil.UTF_8)); + metadataResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, metadataBytes.length); + metadataResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json"); + metadataResponse.headers().set(BLOB_TRANSFER_TYPE, BlobTransferType.METADATA); + // Header value cannot be parsed as an integer — must not crash the channel. + metadataResponse.headers().set(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION, "not-a-number"); + metadataResponse.headers().set(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION, "9999"); + + DefaultHttpResponse endOfTransfer = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + endOfTransfer.headers().add(BLOB_TRANSFER_STATUS, BLOB_TRANSFER_COMPLETED); + + ch.writeInbound(metadataResponse); + ch.writeInbound(endOfTransfer); + + inputStreamFuture.toCompletableFuture().get(1, TimeUnit.MINUTES); + Assert.assertTrue(inputStreamFuture.toCompletableFuture().isDone()); + Assert.assertNotNull(clientMetadataHandler.getMetadata()); + } + + /** + * Server pre-empted the file transfer with a 400 BAD_REQUEST carrying the + * schema-mismatch marker header (the request-side check fired before any file + * work). Client must fail the transfer future with the typed exception, populated + * from the server-echoed versions in the response headers. + */ + @Test + public void testServerSchemaMismatchRejectionThrowsTypedException() { + int serverPs = AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion() + 50; + int serverSvs = AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion() + 50; + String body = "Blob transfer schema version mismatch (synthetic for test)"; + FullHttpResponse rejection = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.BAD_REQUEST, + Unpooled.copiedBuffer(body, CharsetUtil.UTF_8)); + rejection.headers().set(HttpHeaderNames.CONTENT_LENGTH, body.getBytes(CharsetUtil.UTF_8).length); + rejection.headers().set(BLOB_TRANSFER_SCHEMA_MISMATCH, "true"); + rejection.headers().set(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION, serverPs); + rejection.headers().set(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION, serverSvs); + + ch.writeInbound(rejection); + + try { + inputStreamFuture.toCompletableFuture().get(1, TimeUnit.SECONDS); + Assert.fail("Expected exception not thrown"); + } catch (Exception e) { + Assert.assertTrue( + e.getCause() instanceof VeniceBlobTransferIncompatibleSchemaException, + "Expected VeniceBlobTransferIncompatibleSchemaException, got: " + e.getCause()); + VeniceBlobTransferIncompatibleSchemaException ex = (VeniceBlobTransferIncompatibleSchemaException) e.getCause(); + Assert.assertEquals(ex.getPeerPartitionStateVersion(), serverPs); + Assert.assertEquals(ex.getPeerStoreVersionStateVersion(), serverSvs); + Assert.assertEquals( + ex.getLocalPartitionStateVersion(), + AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion()); + Assert.assertEquals( + ex.getLocalStoreVersionStateVersion(), + AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion()); + } + } + /** * Generate checksum via string content */ diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java index 09f810da024..83d1a3e7eeb 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java @@ -1,7 +1,10 @@ package com.linkedin.davinci.blobtransfer; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_COMPLETED; +import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION; +import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_SCHEMA_MISMATCH; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STATUS; +import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_TYPE; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferType; import static com.linkedin.venice.response.VeniceReadResponseStatus.TOO_MANY_REQUESTS; @@ -347,6 +350,9 @@ public void testTransferMultipleFiles() throws IOException { DefaultHttpResponse httpResponse = (DefaultHttpResponse) response; Assert.assertTrue(fileNames.contains(httpResponse.headers().get(HttpHeaderNames.CONTENT_DISPOSITION))); Assert.assertTrue(fileChecksums.contains(httpResponse.headers().get(HttpHeaderNames.CONTENT_MD5))); + // file responses must not carry the schema-version headers; they only apply to metadata responses. + Assert.assertNull(httpResponse.headers().get(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION)); + Assert.assertNull(httpResponse.headers().get(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION)); fileNames.remove(httpResponse.headers().get(HttpHeaderNames.CONTENT_DISPOSITION)); fileChecksums.remove(httpResponse.headers().get(HttpHeaderNames.CONTENT_MD5)); response = ch.readOutbound(); @@ -368,6 +374,12 @@ public void testTransferMultipleFiles() throws IOException { Assert.assertTrue(response instanceof FullHttpResponse); FullHttpResponse metadataResponse = (FullHttpResponse) response; Assert.assertEquals(metadataResponse.headers().get(BLOB_TRANSFER_TYPE), BlobTransferType.METADATA.toString()); + Assert.assertEquals( + metadataResponse.headers().getInt(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION).intValue(), + AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion()); + Assert.assertEquals( + metadataResponse.headers().getInt(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION).intValue(), + AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion()); ByteBuf content = metadataResponse.content(); byte[] metadataBytes = new byte[content.readableBytes()]; @@ -398,6 +410,70 @@ public void testTransferMultipleFiles() throws IOException { Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0); } + /** + * Server rejects a request whose advertised PartitionState/StoreVersionState + * schema versions don't match the local binary's, with a 400 + marker header, + * BEFORE any file work begins. Without this, the client would receive every + * file byte and only discover the mismatch at the metadata stage. + */ + @Test + public void testRejectRequestWithMismatchedSchemaVersion() throws IOException { + // The snapshot dir is set up so that if the schema check were bypassed, file + // responses would be produced; assert they are NOT produced here. + Path snapshotDir = Paths.get(RocksDBUtils.composeSnapshotDir(baseDir.toString(), "myStore_v1", 10)); + Files.createDirectories(snapshotDir); + Files.write(snapshotDir.resolve("file1").toAbsolutePath(), "hello".getBytes()); + + int incompatibleVersion = AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion() + 100; + FullHttpRequest request = + new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/myStore/1/10/BLOCK_BASED_TABLE"); + request.headers().set(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION, incompatibleVersion); + request.headers() + .set( + BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION, + AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion()); + + ch.writeInbound(request); + + Object response = ch.readOutbound(); + Assert.assertTrue(response instanceof FullHttpResponse); + FullHttpResponse rejection = (FullHttpResponse) response; + Assert.assertEquals(rejection.status(), HttpResponseStatus.BAD_REQUEST); + Assert.assertEquals(rejection.headers().get(BLOB_TRANSFER_SCHEMA_MISMATCH), "true"); + // Server echoes its local versions so the client can build a typed exception. + Assert.assertEquals( + rejection.headers().getInt(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION).intValue(), + AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion()); + Assert.assertEquals( + rejection.headers().getInt(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION).intValue(), + AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion()); + // No further outbound messages — no file response was produced. + Assert.assertNull(ch.readOutbound()); + } + + /** + * Backward compatibility: a request that does NOT carry the schema-version + * headers (peer is on an older binary) must not be rejected — the server should + * fall through to the existing flow. + */ + @Test + public void testRequestWithoutSchemaVersionHeadersIsNotRejected() throws IOException { + Path snapshotDir = Paths.get(RocksDBUtils.composeSnapshotDir(baseDir.toString(), "myStore_v1", 10)); + Files.createDirectories(snapshotDir); + + FullHttpRequest request = + new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/myStore/1/10/BLOCK_BASED_TABLE"); + // Intentionally NOT setting BLOB_TRANSFER_*_SCHEMA_VERSION headers. + + ch.writeInbound(request); + + Object response = ch.readOutbound(); + Assert.assertTrue(response instanceof FullHttpResponse); + // Falls through to the existing not-found path because no real metadata service is wired. + // The key assertion is that the response is NOT a schema-mismatch rejection. + Assert.assertNull(((FullHttpResponse) response).headers().get(BLOB_TRANSFER_SCHEMA_MISMATCH)); + } + /** * Test when fail to get the metadata from storageMetadataService, it should return error to client. * @throws IOException diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaException.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaException.java new file mode 100644 index 00000000000..c4e878ce4de --- /dev/null +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaException.java @@ -0,0 +1,80 @@ +package com.linkedin.venice.exceptions; + +/** + * Thrown when a P2P blob-transfer client receives a metadata response from a peer + * whose serialization protocol version for {@code PartitionState} or + * {@code StoreVersionState} does not match the local binary's compiled-in + * {@code currentProtocolVersion}. Used to fail fast at HTTP header-parse time, + * before any body is consumed, so the transfer can fall over to the next peer or + * to Kafka bootstrap without waiting for the per-host receive timeout. + * + *

Both sides advertise their {@code currentProtocolVersion}; the policy is exact + * equality, since blob transfer is the fast path and any binary skew between peers + * is bounded to rolling-deploy windows. + */ +public class VeniceBlobTransferIncompatibleSchemaException extends VeniceException { + /** Sentinel meaning the peer did not provide a value for this protocol. */ + public static final int VERSION_UNKNOWN = -1; + + private final String peerHost; + private final int peerPartitionStateVersion; + private final int peerStoreVersionStateVersion; + private final int localPartitionStateVersion; + private final int localStoreVersionStateVersion; + + public VeniceBlobTransferIncompatibleSchemaException( + String peerHost, + int peerPartitionStateVersion, + int peerStoreVersionStateVersion, + int localPartitionStateVersion, + int localStoreVersionStateVersion) { + super( + buildMessage( + peerHost, + peerPartitionStateVersion, + peerStoreVersionStateVersion, + localPartitionStateVersion, + localStoreVersionStateVersion)); + this.peerHost = peerHost; + this.peerPartitionStateVersion = peerPartitionStateVersion; + this.peerStoreVersionStateVersion = peerStoreVersionStateVersion; + this.localPartitionStateVersion = localPartitionStateVersion; + this.localStoreVersionStateVersion = localStoreVersionStateVersion; + } + + private static String buildMessage( + String peerHost, + int peerPartitionStateVersion, + int peerStoreVersionStateVersion, + int localPartitionStateVersion, + int localStoreVersionStateVersion) { + return "Peer " + peerHost + " serialized blob-transfer metadata with protocol versions" + " PartitionState=" + + render(peerPartitionStateVersion) + ", StoreVersionState=" + render(peerStoreVersionStateVersion) + + "; local binary has PartitionState=" + localPartitionStateVersion + ", StoreVersionState=" + + localStoreVersionStateVersion; + } + + private static String render(int v) { + return v == VERSION_UNKNOWN ? "" : Integer.toString(v); + } + + public String getPeerHost() { + return peerHost; + } + + public int getPeerPartitionStateVersion() { + return peerPartitionStateVersion; + } + + public int getPeerStoreVersionStateVersion() { + return peerStoreVersionStateVersion; + } + + public int getLocalPartitionStateVersion() { + return localPartitionStateVersion; + } + + public int getLocalStoreVersionStateVersion() { + return localStoreVersionStateVersion; + } +} From cccd9335f6bb872a3d3bb36408f3c5b31cb3e484 Mon Sep 17 00:00:00 2001 From: Jingyan Li Date: Tue, 19 May 2026 16:05:32 -0700 Subject: [PATCH 2/4] add tests --- .../TestP2PFileTransferClientHandler.java | 82 +++++++++++++++++++ .../TestP2PFileTransferServerHandler.java | 71 ++++++++++++++++ 2 files changed, 153 insertions(+) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java index 74eb1963d52..e307d564516 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java @@ -591,6 +591,88 @@ public void testIncompatibleStoreVersionStateSchemaVersionFailsFast() { } } + /** + * Rolling-deploy contract (NEW requester ← OLD sender, multi-file end-to-end): + * a peer on an older binary returns a 200 metadata response with NEITHER of + * the schema-version response headers set. An upgraded client must accept the + * full transfer (files → metadata → STATUS) without throwing + * {@link VeniceBlobTransferIncompatibleSchemaException}. Both files must land + * on disk and the metadata must parse. This pins the realistic multi-file + * variant of the single-metadata case in {@link #testSingleMetadataTransfer}. + */ + @Test + public void testFullTransferWithoutSchemaVersionResponseHeadersFromOldPeer() + throws JsonProcessingException, ExecutionException, InterruptedException, IOException, TimeoutException { + DefaultHttpResponse fileResponse1 = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + fileResponse1.headers().add("Content-Disposition", "filename=\"test_file1.txt\""); + fileResponse1.headers().add("Content-Length", "5"); + fileResponse1.headers().add(BLOB_TRANSFER_TYPE, BlobTransferType.FILE); + fileResponse1.headers().add("Content-MD5", checksumGenerateHelper("12345")); + HttpContent chunk1 = new DefaultLastHttpContent(Unpooled.copiedBuffer("12345", CharsetUtil.UTF_8)); + + DefaultHttpResponse fileResponse2 = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + fileResponse2.headers().add("Content-Disposition", "filename=\"test_file2.txt\""); + fileResponse2.headers().add("Content-Length", "10"); + fileResponse2.headers().add(BLOB_TRANSFER_TYPE, BlobTransferType.FILE); + fileResponse2.headers().add("Content-MD5", checksumGenerateHelper("6789013579")); + HttpContent chunk2a = new DefaultHttpContent(Unpooled.copiedBuffer("67890", CharsetUtil.UTF_8)); + HttpContent chunk2b = new DefaultLastHttpContent(Unpooled.copiedBuffer("13579", CharsetUtil.UTF_8)); + + BlobTransferPartitionMetadata expectedMetadata = new BlobTransferPartitionMetadata(); + expectedMetadata.setTopicName(TEST_STORE + "_v" + TEST_VERSION); + expectedMetadata.setPartitionId(TEST_PARTITION); + InternalAvroSpecificSerializer partitionStateSerializer = + AvroProtocolDefinition.PARTITION_STATE.getSerializer(); + OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer, DEFAULT_PUBSUB_CONTEXT_FOR_UNIT_TESTING); + offsetRecord.setOffsetLag(1000L); + expectedMetadata.setOffsetRecord(ByteBuffer.wrap(offsetRecord.toBytes())); + + ObjectMapper objectMapper = new ObjectMapper(); + String metadataJson = objectMapper.writeValueAsString(expectedMetadata); + byte[] metadataBytes = metadataJson.getBytes(CharsetUtil.UTF_8); + + FullHttpResponse metadataResponse = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.OK, + Unpooled.copiedBuffer(metadataJson, CharsetUtil.UTF_8)); + metadataResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, metadataBytes.length); + metadataResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json"); + metadataResponse.headers().set(BLOB_TRANSFER_TYPE, BlobTransferType.METADATA); + // Intentionally NOT setting BLOB_TRANSFER_*_SCHEMA_VERSION — models an OLD sender. + + DefaultHttpResponse endOfTransfer = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + endOfTransfer.headers().add(BLOB_TRANSFER_STATUS, BLOB_TRANSFER_COMPLETED); + + ch.writeInbound(fileResponse1); + ch.writeInbound(chunk1); + ch.writeInbound(fileResponse2); + ch.writeInbound(chunk2a); + ch.writeInbound(chunk2b); + ch.writeInbound(metadataResponse); + ch.writeInbound(endOfTransfer); + + inputStreamFuture.toCompletableFuture().get(1, TimeUnit.MINUTES); + Assert.assertTrue(inputStreamFuture.toCompletableFuture().isDone()); + + BlobTransferPayload payload = new BlobTransferPayload( + baseDir.toString(), + TEST_STORE, + TEST_VERSION, + TEST_PARTITION, + BlobTransferUtils.BlobTransferTableFormat.BLOCK_BASED_TABLE); + Path dest = Paths.get(payload.getPartitionDir()); + Assert.assertTrue(Files.exists(dest.resolve("test_file1.txt"))); + Assert.assertEquals(Files.size(dest.resolve("test_file1.txt")), 5); + Assert.assertTrue(Files.exists(dest.resolve("test_file2.txt"))); + Assert.assertEquals(Files.size(dest.resolve("test_file2.txt")), 10); + + BlobTransferPartitionMetadata actualMetadata = clientMetadataHandler.getMetadata(); + Assert.assertNotNull(actualMetadata); + Assert.assertEquals(actualMetadata.getTopicName(), expectedMetadata.getTopicName()); + Assert.assertEquals(actualMetadata.getPartitionId(), expectedMetadata.getPartitionId()); + Assert.assertEquals(actualMetadata.getOffsetRecord(), expectedMetadata.getOffsetRecord()); + } + /** * Backward compatibility: a peer running an older binary that does not yet emit * the schema-version headers must not break P2P transfer for upgraded clients. diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java index 83d1a3e7eeb..723f9c26426 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java @@ -451,6 +451,77 @@ public void testRejectRequestWithMismatchedSchemaVersion() throws IOException { Assert.assertNull(ch.readOutbound()); } + /** + * Rolling-deploy contract (NEW requester → NEW sender, matching versions): + * a request that advertises schema-version headers equal to the server's + * local versions must drive a full transfer (file → metadata → STATUS), + * with the schema-mismatch marker absent on every response and the new + * schema-version headers attached to the metadata response. + */ + @Test + public void testRequestWithMatchingSchemaVersionHeadersCompletesTransfer() throws IOException { + StorageEngine localStorageEngine = Mockito.mock(StorageEngine.class); + Mockito.doReturn(localStorageEngine).when(storageEngineRepository).getLocalStorageEngine(Mockito.any()); + Mockito.doReturn(true).when(localStorageEngine).containsPartition(Mockito.anyInt()); + + StoreVersionState storeVersionState = new StoreVersionState(); + Mockito.doReturn(storeVersionState).when(storageMetadataService).getStoreVersionState(Mockito.any()); + InternalAvroSpecificSerializer partitionStateSerializer = + AvroProtocolDefinition.PARTITION_STATE.getSerializer(); + OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer, DEFAULT_PUBSUB_CONTEXT_FOR_UNIT_TESTING); + offsetRecord.setOffsetLag(1000L); + Mockito.doReturn(offsetRecord).when(storageMetadataService).getLastOffset(Mockito.any(), Mockito.anyInt(), any()); + + Path snapshotDir = Paths.get(RocksDBUtils.composeSnapshotDir(baseDir.toString(), "myStore_v1", 10)); + Files.createDirectories(snapshotDir); + Files.write(snapshotDir.resolve("file1").toAbsolutePath(), "hello".getBytes()); + Mockito.doNothing().when(blobSnapshotManager).createSnapshot(Mockito.anyString(), Mockito.anyInt()); + + FullHttpRequest request = + new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/myStore/1/10/BLOCK_BASED_TABLE"); + request.headers() + .set( + BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION, + AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion()); + request.headers() + .set( + BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION, + AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion()); + + ch.writeInbound(request); + + // File response + Object response = ch.readOutbound(); + Assert.assertTrue(response instanceof DefaultHttpResponse); + DefaultHttpResponse fileResponse = (DefaultHttpResponse) response; + Assert.assertEquals(fileResponse.headers().get(BLOB_TRANSFER_TYPE), BlobTransferType.FILE.toString()); + Assert.assertNull(fileResponse.headers().get(BLOB_TRANSFER_SCHEMA_MISMATCH)); + // File chunk + response = ch.readOutbound(); + Assert.assertTrue(response instanceof HttpChunkedInput); + + // Metadata response — must echo current schema-version headers, no mismatch marker. + response = ch.readOutbound(); + Assert.assertTrue(response instanceof FullHttpResponse); + FullHttpResponse metadataResponse = (FullHttpResponse) response; + Assert.assertEquals(metadataResponse.headers().get(BLOB_TRANSFER_TYPE), BlobTransferType.METADATA.toString()); + Assert.assertNull(metadataResponse.headers().get(BLOB_TRANSFER_SCHEMA_MISMATCH)); + Assert.assertEquals( + metadataResponse.headers().getInt(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION).intValue(), + AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion()); + Assert.assertEquals( + metadataResponse.headers().getInt(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION).intValue(), + AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion()); + + // STATUS response + response = ch.readOutbound(); + Assert.assertTrue(response instanceof DefaultHttpResponse); + Assert.assertEquals(((DefaultHttpResponse) response).headers().get(BLOB_TRANSFER_STATUS), BLOB_TRANSFER_COMPLETED); + + ch.pipeline().fireUserEventTriggered(IdleStateEvent.ALL_IDLE_STATE_EVENT); + Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0); + } + /** * Backward compatibility: a request that does NOT carry the schema-version * headers (peer is on an older binary) must not be rejected — the server should From 85495c764db385b14b3830eaf6a81fb29b6e9643 Mon Sep 17 00:00:00 2001 From: Jingyan Li Date: Wed, 20 May 2026 15:03:50 -0700 Subject: [PATCH 3/4] [blob-transfer] add unit tests for VeniceBlobTransferIncompatibleSchemaException Covers the constructor, all getters, message formatting, and both branches of render() (known version and VERSION_UNKNOWN sentinel). Bumps venice-client-common diff branch coverage from 91.6% to 92.74%. --- ...ansferIncompatibleSchemaExceptionTest.java | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 internal/venice-client-common/src/test/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaExceptionTest.java diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaExceptionTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaExceptionTest.java new file mode 100644 index 00000000000..6257d185be4 --- /dev/null +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaExceptionTest.java @@ -0,0 +1,74 @@ +package com.linkedin.venice.exceptions; + +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class VeniceBlobTransferIncompatibleSchemaExceptionTest { + @Test + public void testKnownVersionsOnBothSides() { + String peerHost = "peer-host:1234"; + VeniceBlobTransferIncompatibleSchemaException e = + new VeniceBlobTransferIncompatibleSchemaException(peerHost, 4, 5, 6, 7); + + Assert.assertEquals(e.getPeerHost(), peerHost); + Assert.assertEquals(e.getPeerPartitionStateVersion(), 4); + Assert.assertEquals(e.getPeerStoreVersionStateVersion(), 5); + Assert.assertEquals(e.getLocalPartitionStateVersion(), 6); + Assert.assertEquals(e.getLocalStoreVersionStateVersion(), 7); + + String message = e.getMessage(); + Assert.assertTrue(message.contains(peerHost), "message should mention peer host: " + message); + Assert.assertTrue(message.contains("PartitionState=4"), "message should mention peer PartitionState: " + message); + Assert.assertTrue( + message.contains("StoreVersionState=5"), + "message should mention peer StoreVersionState: " + message); + Assert.assertTrue(message.contains("PartitionState=6"), "message should mention local PartitionState: " + message); + Assert.assertTrue( + message.contains("StoreVersionState=7"), + "message should mention local StoreVersionState: " + message); + Assert.assertFalse(message.contains(""), "all versions are known, message should not contain "); + } + + @Test + public void testPeerVersionsUnknownRendersUnknown() { + String peerHost = "older-peer:5678"; + VeniceBlobTransferIncompatibleSchemaException e = new VeniceBlobTransferIncompatibleSchemaException( + peerHost, + VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN, + VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN, + 3, + 9); + + Assert + .assertEquals(e.getPeerPartitionStateVersion(), VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN); + Assert.assertEquals( + e.getPeerStoreVersionStateVersion(), + VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN); + Assert.assertEquals(e.getLocalPartitionStateVersion(), 3); + Assert.assertEquals(e.getLocalStoreVersionStateVersion(), 9); + + String message = e.getMessage(); + Assert.assertTrue( + message.contains("PartitionState="), + "unknown peer PartitionState should render as : " + message); + Assert.assertTrue( + message.contains("StoreVersionState="), + "unknown peer StoreVersionState should render as : " + message); + Assert.assertTrue(message.contains("PartitionState=3"), "local PartitionState should be rendered: " + message); + Assert + .assertTrue(message.contains("StoreVersionState=9"), "local StoreVersionState should be rendered: " + message); + } + + @Test + public void testIsVeniceException() { + VeniceBlobTransferIncompatibleSchemaException e = + new VeniceBlobTransferIncompatibleSchemaException("host", 1, 2, 1, 2); + Assert.assertTrue(e instanceof VeniceException); + } + + @Test + public void testVersionUnknownSentinel() { + Assert.assertEquals(VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN, -1); + } +} From eb767b6547ab06f07959c03c48f7baf9b65e86b7 Mon Sep 17 00:00:00 2001 From: Jingyan Li Date: Wed, 20 May 2026 15:17:38 -0700 Subject: [PATCH 4/4] Remove redundant instanceof check that fails SpotBugs testIsVeniceException compared a VeniceBlobTransferIncompatibleSchemaException against VeniceException via instanceof. Since the class extends VeniceException, the relationship is enforced at compile time and SpotBugs flags the runtime check as vacuous (BC_VACUOUS_INSTANCEOF). The remaining three tests still cover the constructor, all getters, both branches of render(), and the VERSION_UNKNOWN sentinel. --- .../VeniceBlobTransferIncompatibleSchemaExceptionTest.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaExceptionTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaExceptionTest.java index 6257d185be4..e2ca6b910e8 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaExceptionTest.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaExceptionTest.java @@ -60,13 +60,6 @@ public void testPeerVersionsUnknownRendersUnknown() { .assertTrue(message.contains("StoreVersionState=9"), "local StoreVersionState should be rendered: " + message); } - @Test - public void testIsVeniceException() { - VeniceBlobTransferIncompatibleSchemaException e = - new VeniceBlobTransferIncompatibleSchemaException("host", 1, 2, 1, 2); - Assert.assertTrue(e instanceof VeniceException); - } - @Test public void testVersionUnknownSentinel() { Assert.assertEquals(VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN, -1);