diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java
index 725c6901d0e..7175b9b9108 100644
--- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java
+++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java
@@ -10,10 +10,13 @@
import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.venice.SSLConfig;
+import com.linkedin.venice.exceptions.VeniceBlobTransferIncompatibleSchemaException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.security.SSLFactory;
+import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.SslUtils;
+import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.ssl.SslHandler;
import java.io.File;
@@ -32,6 +35,24 @@ public class BlobTransferUtils {
public static final String BLOB_TRANSFER_STATUS = "X-Blob-Transfer-Status";
public static final String BLOB_TRANSFER_COMPLETED = "Completed";
public static final String BLOB_TRANSFER_TYPE = "X-Blob-Transfer-Type";
+ /**
+ * Protocol version the peer used to serialize the {@code PartitionState} embedded in
+ * the metadata response body. The client uses this to fail fast when the local binary
+ * does not have a reader for that version, rather than discovering the mismatch during
+ * deserialization after the body has been fully received.
+ */
+ public static final String BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION =
+ "X-Blob-Transfer-Partition-State-Schema-Version";
+ /** Same purpose as {@link #BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION} but for {@code StoreVersionState}. */
+ public static final String BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION =
+ "X-Blob-Transfer-Store-Version-State-Schema-Version";
+ /**
+ * Marker header set on the server's 400 BAD_REQUEST response when it rejects a
+ * blob-transfer request because the requester's advertised schema versions do not
+ * match the server's local versions. Lets the client distinguish a schema-version
+ * rejection from any other 400 without parsing the body.
+ */
+ public static final String BLOB_TRANSFER_SCHEMA_MISMATCH = "X-Blob-Transfer-Schema-Mismatch";
public enum BlobTransferType {
FILE, METADATA
@@ -89,6 +110,119 @@ public static boolean isMetadataMessage(HttpResponse msg) {
return metadataHeader.equals(BlobTransferUtils.BlobTransferType.METADATA.name());
}
+ /**
+ * Validate that the schema-version headers on a P2P metadata response match the
+ * local binary's compiled-in {@code currentProtocolVersion} for each protocol.
+ * Throws {@link VeniceBlobTransferIncompatibleSchemaException} when at least one
+ * header is present and does not equal the local version — letting the caller fail
+ * fast at HTTP header-parse time, before any body is consumed.
+ *
+ *
An exact-match policy is used (rather than e.g. "peer <= local"). Blob
+ * transfer is the fast path; if the binaries on the two ends are not in lock-step
+ * we want to step aside and let Kafka bootstrap take over rather than rely on
+ * cross-version Avro promotion of the partition metadata. Skew between peers is
+ * limited to rolling-deploy windows, so the cost of being strict is bounded.
+ *
+ *
Behaviour for the absent / malformed cases is intentionally permissive so a
+ * server-side rollout of the new headers cannot break peers that haven't been
+ * upgraded yet, and so a header parsing bug cannot crash the channel:
+ *
+ * - Both headers absent — pass through (peer is on an older binary).
+ * - Header value non-numeric or out of byte range — log a warning and pass
+ * through; the existing deserialization-time exception remains as the safety
+ * net for the truly incompatible case (no regression vs. today).
+ *
+ */
+ public static void validateMetadataResponseSchemaVersions(HttpResponse response, String peerHost) {
+ String psHeader = response.headers().get(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION);
+ String svsHeader = response.headers().get(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION);
+ if (psHeader == null && svsHeader == null) {
+ return;
+ }
+
+ int peerPs = parseProtocolVersionHeader(psHeader, BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION);
+ int peerSvs = parseProtocolVersionHeader(svsHeader, BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION);
+
+ int localPs = AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion();
+ int localSvs = AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion();
+
+ boolean psMismatch = peerPs != VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN && peerPs != localPs;
+ boolean svsMismatch =
+ peerSvs != VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN && peerSvs != localSvs;
+
+ if (psMismatch || svsMismatch) {
+ LOGGER.warn(
+ "Aborting P2P blob transfer from peer {}: metadata schema version mismatch"
+ + " (peer PartitionState={}, StoreVersionState={}; local PartitionState={}, StoreVersionState={})",
+ peerHost,
+ renderVersion(peerPs),
+ renderVersion(peerSvs),
+ localPs,
+ localSvs);
+ throw new VeniceBlobTransferIncompatibleSchemaException(peerHost, peerPs, peerSvs, localPs, localSvs);
+ }
+ }
+
+ /**
+ * Server-side counterpart of {@link #validateMetadataResponseSchemaVersions}: compare
+ * the schema-version headers on a P2P blob-transfer GET request against the local
+ * binary's compiled-in {@code currentProtocolVersion}. Used by the server right next
+ * to the table-format check so a schema mismatch is rejected with a 400 BAD_REQUEST
+ * before any file work begins — otherwise the client would pay for the entire file
+ * transfer and only discover the mismatch at the metadata stage.
+ *
+ * Same equality policy as the response side. Returns a diagnostic string suitable
+ * for the response body when the request is incompatible, or {@code null} when it
+ * is compatible (or when both headers are absent — older clients that have not yet
+ * been upgraded to advertise their versions).
+ */
+ public static String compareRequestedSchemaVersionsAgainstLocal(HttpRequest request) {
+ String psHeader = request.headers().get(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION);
+ String svsHeader = request.headers().get(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION);
+ if (psHeader == null && svsHeader == null) {
+ return null;
+ }
+
+ int peerPs = parseProtocolVersionHeader(psHeader, BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION);
+ int peerSvs = parseProtocolVersionHeader(svsHeader, BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION);
+
+ int localPs = AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion();
+ int localSvs = AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion();
+
+ boolean psMismatch = peerPs != VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN && peerPs != localPs;
+ boolean svsMismatch =
+ peerSvs != VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN && peerSvs != localSvs;
+
+ if (psMismatch || svsMismatch) {
+ return "Blob transfer schema version mismatch: requester PartitionState=" + renderVersion(peerPs)
+ + ", StoreVersionState=" + renderVersion(peerSvs) + "; local PartitionState=" + localPs
+ + ", StoreVersionState=" + localSvs;
+ }
+ return null;
+ }
+
+ private static int parseProtocolVersionHeader(String value, String headerName) {
+ if (value == null) {
+ return VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN;
+ }
+ try {
+ int parsed = Integer.parseInt(value.trim());
+ // Protocol versions are encoded into a single byte on the wire (see InternalAvroSpecificSerializer).
+ if (parsed < 0 || parsed > Byte.MAX_VALUE) {
+ LOGGER.warn("Out-of-range value '{}' for blob-transfer header {}; treating as unknown.", value, headerName);
+ return VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN;
+ }
+ return parsed;
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Malformed value '{}' for blob-transfer header {}; treating as unknown.", value, headerName);
+ return VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN;
+ }
+ }
+
+ private static String renderVersion(int v) {
+ return v == VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN ? "" : Integer.toString(v);
+ }
+
/**
* Generate MD5 checksum for a file
* @param filePath the path to the file
diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java
index d1208cc4e43..689c664cf90 100644
--- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java
+++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java
@@ -11,6 +11,7 @@
import com.linkedin.venice.listener.VerifySslHandler;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.security.SSLFactory;
+import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.LogContext;
import com.linkedin.venice.utils.Utils;
@@ -390,10 +391,21 @@ private FullHttpRequest prepareRequest(
int version,
int partition,
BlobTransferTableFormat requestTableFormat) {
- return new DefaultFullHttpRequest(
+ FullHttpRequest request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
HttpMethod.GET,
String.format("/%s/%d/%d/%s", storeName, version, partition, requestTableFormat.name()));
+ // Advertise the schema versions this client can deserialize so the server can
+ // reject the request before any file work begins on a mismatch.
+ request.headers()
+ .set(
+ BlobTransferUtils.BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION,
+ AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion());
+ request.headers()
+ .set(
+ BlobTransferUtils.BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION,
+ AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());
+ return request;
}
/**
diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java
index 023f48aade2..8413e4e60dc 100644
--- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java
+++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java
@@ -1,13 +1,18 @@
package com.linkedin.davinci.blobtransfer.client;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_COMPLETED;
+import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION;
+import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_SCHEMA_MISMATCH;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STATUS;
+import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION;
import com.linkedin.davinci.blobtransfer.BlobTransferPayload;
import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.stats.AggBlobTransferStats;
import com.linkedin.venice.exceptions.VeniceBlobTransferFileNotFoundException;
+import com.linkedin.venice.exceptions.VeniceBlobTransferIncompatibleSchemaException;
import com.linkedin.venice.exceptions.VeniceException;
+import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.Utils;
@@ -103,6 +108,8 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex
if (response.status().equals(HttpResponseStatus.NOT_FOUND)) {
throw new VeniceBlobTransferFileNotFoundException(
"Requested files from remote peer are not found. Response: " + response.status());
+ } else if (response.headers().contains(BLOB_TRANSFER_SCHEMA_MISMATCH)) {
+ throw buildSchemaMismatchExceptionFromErrorResponse(ctx, response);
} else {
throw new VeniceException("Failed to fetch file from remote peer. Response: " + response.status());
}
@@ -111,6 +118,13 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex
// redirect the message to the next handler if it's a metadata transfer
boolean isMetadataMessage = BlobTransferUtils.isMetadataMessage(response);
if (isMetadataMessage) {
+ // Fail fast at header-parse time if the peer's metadata is serialized with a
+ // protocol version this binary cannot read; throwing here flows through
+ // exceptionCaught -> handleExceptionGracefully and fails the per-host transfer
+ // future immediately, so the next peer (or Kafka bootstrap) can take over
+ // without waiting for blobReceiveTimeoutInMin.
+ BlobTransferUtils
+ .validateMetadataResponseSchemaVersions(response, String.valueOf(ctx.channel().remoteAddress()));
ReferenceCountUtil.retain(msg);
ctx.fireChannelRead(msg);
return;
@@ -287,6 +301,40 @@ void handleExceptionGracefully(Throwable cause) {
}
+ /**
+ * Build a {@link VeniceBlobTransferIncompatibleSchemaException} from a server error
+ * response carrying the {@link BlobTransferUtils#BLOB_TRANSFER_SCHEMA_MISMATCH}
+ * marker. The server echoes its own schema versions in the response headers; the
+ * "peer" fields on the exception are those server-side values, the "local" fields
+ * are this client's compiled-in versions.
+ */
+ private VeniceBlobTransferIncompatibleSchemaException buildSchemaMismatchExceptionFromErrorResponse(
+ ChannelHandlerContext ctx,
+ HttpResponse response) {
+ int peerPs = readVersionHeader(response, BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION);
+ int peerSvs = readVersionHeader(response, BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION);
+ int localPs = AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion();
+ int localSvs = AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion();
+ return new VeniceBlobTransferIncompatibleSchemaException(
+ String.valueOf(ctx.channel().remoteAddress()),
+ peerPs,
+ peerSvs,
+ localPs,
+ localSvs);
+ }
+
+ private static int readVersionHeader(HttpResponse response, String name) {
+ String value = response.headers().get(name);
+ if (value == null) {
+ return VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN;
+ }
+ try {
+ return Integer.parseInt(value.trim());
+ } catch (NumberFormatException e) {
+ return VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN;
+ }
+ }
+
private String getFileNameFromHeader(HttpResponse response) {
String contentDisposition = response.headers().get(HttpHeaderNames.CONTENT_DISPOSITION);
if (contentDisposition != null) {
diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java
index 13bf73d2f99..31226c9910e 100644
--- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java
+++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java
@@ -1,7 +1,10 @@
package com.linkedin.davinci.blobtransfer.server;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_COMPLETED;
+import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION;
+import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_SCHEMA_MISMATCH;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STATUS;
+import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_TYPE;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferTableFormat;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferType;
@@ -18,6 +21,7 @@
import com.linkedin.davinci.stats.AggBlobTransferStats;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.request.RequestHelper;
+import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.Utils;
import io.netty.buffer.Unpooled;
@@ -127,6 +131,16 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpReque
return;
}
+ // Check the requester's metadata schema versions before doing any file work.
+ // If the local binary cannot serialize PartitionState/StoreVersionState in a
+ // version the requester can read, fail fast with 400 + a marker header so the
+ // client doesn't pay for file bytes only to reject the metadata at the end.
+ String schemaMismatch = BlobTransferUtils.compareRequestedSchemaVersionsAgainstLocal(httpRequest);
+ if (schemaMismatch != null) {
+ sendSchemaMismatchResponse(ctx, schemaMismatch);
+ return;
+ }
+
// Check the concurrent request limit
if (globalConcurrentTransferRequests.get() >= maxAllowedConcurrentSnapshotUsers) {
String errMessage =
@@ -329,6 +343,30 @@ private void sendFile(
});
}
+ /**
+ * Send a 400 BAD_REQUEST response for a schema-version mismatch. Echoes the
+ * server's local protocol versions so the client can build a typed exception
+ * with full diagnostic context, and sets a marker header so the client can
+ * distinguish this from any other 400.
+ */
+ private void sendSchemaMismatchResponse(ChannelHandlerContext ctx, String diagnosticBody) {
+ byte[] body = diagnosticBody.getBytes();
+ FullHttpResponse response =
+ new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST, Unpooled.wrappedBuffer(body));
+ response.headers().set(HttpHeaderNames.CONTENT_LENGTH, body.length);
+ response.headers().set(BLOB_TRANSFER_SCHEMA_MISMATCH, "true");
+ response.headers()
+ .set(
+ BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION,
+ AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion());
+ response.headers()
+ .set(
+ BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION,
+ AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());
+ LOGGER.warn("Rejecting blob-transfer request from {}: {}", ctx.channel().remoteAddress(), diagnosticBody);
+ ctx.writeAndFlush(response);
+ }
+
/**
* Send metadata for the given blob transfer request
* @param ctx the channel context
@@ -347,6 +385,14 @@ public void sendMetadata(ChannelHandlerContext ctx, BlobTransferPartitionMetadat
metadataResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, metadataBytes.length);
metadataResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, APPLICATION_JSON);
metadataResponse.headers().set(BLOB_TRANSFER_TYPE, BlobTransferType.METADATA);
+ metadataResponse.headers()
+ .set(
+ BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION,
+ AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion());
+ metadataResponse.headers()
+ .set(
+ BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION,
+ AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());
ctx.writeAndFlush(metadataResponse).addListener(future -> {
if (future.isSuccess()) {
diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java
index b2c2afb4f91..e307d564516 100644
--- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java
+++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java
@@ -1,7 +1,10 @@
package com.linkedin.davinci.blobtransfer;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_COMPLETED;
+import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION;
+import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_SCHEMA_MISMATCH;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STATUS;
+import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_TYPE;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferType;
import static com.linkedin.venice.utils.TestUtils.DEFAULT_PUBSUB_CONTEXT_FOR_UNIT_TESTING;
@@ -14,6 +17,7 @@
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.AggBlobTransferStats;
import com.linkedin.davinci.storage.StorageMetadataService;
+import com.linkedin.venice.exceptions.VeniceBlobTransferIncompatibleSchemaException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.state.IncrementalPushReplicaStatus;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
@@ -524,6 +528,275 @@ public void testReaderIdleEventBeforeTransferComplete() throws IOException {
}
}
+ @Test
+ public void testIncompatiblePartitionStateSchemaVersionFailsFast() {
+ int incompatibleVersion = AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion() + 100;
+ FullHttpResponse metadataResponse =
+ new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER);
+ metadataResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, 0);
+ metadataResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json");
+ metadataResponse.headers().set(BLOB_TRANSFER_TYPE, BlobTransferType.METADATA);
+ metadataResponse.headers().set(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION, incompatibleVersion);
+ metadataResponse.headers()
+ .set(
+ BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION,
+ AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());
+
+ ch.writeInbound(metadataResponse);
+
+ try {
+ inputStreamFuture.toCompletableFuture().get(1, TimeUnit.SECONDS);
+ Assert.fail("Expected exception not thrown");
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getCause() instanceof VeniceBlobTransferIncompatibleSchemaException,
+ "Expected VeniceBlobTransferIncompatibleSchemaException, got: " + e.getCause());
+ VeniceBlobTransferIncompatibleSchemaException ex = (VeniceBlobTransferIncompatibleSchemaException) e.getCause();
+ Assert.assertEquals(ex.getPeerPartitionStateVersion(), incompatibleVersion);
+ Assert.assertEquals(
+ ex.getPeerStoreVersionStateVersion(),
+ AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());
+ Assert.assertEquals(
+ ex.getLocalPartitionStateVersion(),
+ AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion());
+ Assert.assertEquals(
+ ex.getLocalStoreVersionStateVersion(),
+ AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());
+ }
+ }
+
+ @Test
+ public void testIncompatibleStoreVersionStateSchemaVersionFailsFast() {
+ int incompatibleVersion = AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion() + 100;
+ FullHttpResponse metadataResponse =
+ new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER);
+ metadataResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, 0);
+ metadataResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json");
+ metadataResponse.headers().set(BLOB_TRANSFER_TYPE, BlobTransferType.METADATA);
+ metadataResponse.headers()
+ .set(
+ BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION,
+ AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion());
+ metadataResponse.headers().set(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION, incompatibleVersion);
+
+ ch.writeInbound(metadataResponse);
+
+ try {
+ inputStreamFuture.toCompletableFuture().get(1, TimeUnit.SECONDS);
+ Assert.fail("Expected exception not thrown");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getCause() instanceof VeniceBlobTransferIncompatibleSchemaException);
+ VeniceBlobTransferIncompatibleSchemaException ex = (VeniceBlobTransferIncompatibleSchemaException) e.getCause();
+ Assert.assertEquals(ex.getPeerStoreVersionStateVersion(), incompatibleVersion);
+ }
+ }
+
+ /**
+ * Rolling-deploy contract (NEW requester ← OLD sender, multi-file end-to-end):
+ * a peer on an older binary returns a 200 metadata response with NEITHER of
+ * the schema-version response headers set. An upgraded client must accept the
+ * full transfer (files → metadata → STATUS) without throwing
+ * {@link VeniceBlobTransferIncompatibleSchemaException}. Both files must land
+ * on disk and the metadata must parse. This pins the realistic multi-file
+ * variant of the single-metadata case in {@link #testSingleMetadataTransfer}.
+ */
+ @Test
+ public void testFullTransferWithoutSchemaVersionResponseHeadersFromOldPeer()
+ throws JsonProcessingException, ExecutionException, InterruptedException, IOException, TimeoutException {
+ DefaultHttpResponse fileResponse1 = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ fileResponse1.headers().add("Content-Disposition", "filename=\"test_file1.txt\"");
+ fileResponse1.headers().add("Content-Length", "5");
+ fileResponse1.headers().add(BLOB_TRANSFER_TYPE, BlobTransferType.FILE);
+ fileResponse1.headers().add("Content-MD5", checksumGenerateHelper("12345"));
+ HttpContent chunk1 = new DefaultLastHttpContent(Unpooled.copiedBuffer("12345", CharsetUtil.UTF_8));
+
+ DefaultHttpResponse fileResponse2 = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ fileResponse2.headers().add("Content-Disposition", "filename=\"test_file2.txt\"");
+ fileResponse2.headers().add("Content-Length", "10");
+ fileResponse2.headers().add(BLOB_TRANSFER_TYPE, BlobTransferType.FILE);
+ fileResponse2.headers().add("Content-MD5", checksumGenerateHelper("6789013579"));
+ HttpContent chunk2a = new DefaultHttpContent(Unpooled.copiedBuffer("67890", CharsetUtil.UTF_8));
+ HttpContent chunk2b = new DefaultLastHttpContent(Unpooled.copiedBuffer("13579", CharsetUtil.UTF_8));
+
+ BlobTransferPartitionMetadata expectedMetadata = new BlobTransferPartitionMetadata();
+ expectedMetadata.setTopicName(TEST_STORE + "_v" + TEST_VERSION);
+ expectedMetadata.setPartitionId(TEST_PARTITION);
+ InternalAvroSpecificSerializer partitionStateSerializer =
+ AvroProtocolDefinition.PARTITION_STATE.getSerializer();
+ OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer, DEFAULT_PUBSUB_CONTEXT_FOR_UNIT_TESTING);
+ offsetRecord.setOffsetLag(1000L);
+ expectedMetadata.setOffsetRecord(ByteBuffer.wrap(offsetRecord.toBytes()));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ String metadataJson = objectMapper.writeValueAsString(expectedMetadata);
+ byte[] metadataBytes = metadataJson.getBytes(CharsetUtil.UTF_8);
+
+ FullHttpResponse metadataResponse = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1,
+ HttpResponseStatus.OK,
+ Unpooled.copiedBuffer(metadataJson, CharsetUtil.UTF_8));
+ metadataResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, metadataBytes.length);
+ metadataResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json");
+ metadataResponse.headers().set(BLOB_TRANSFER_TYPE, BlobTransferType.METADATA);
+ // Intentionally NOT setting BLOB_TRANSFER_*_SCHEMA_VERSION — models an OLD sender.
+
+ DefaultHttpResponse endOfTransfer = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ endOfTransfer.headers().add(BLOB_TRANSFER_STATUS, BLOB_TRANSFER_COMPLETED);
+
+ ch.writeInbound(fileResponse1);
+ ch.writeInbound(chunk1);
+ ch.writeInbound(fileResponse2);
+ ch.writeInbound(chunk2a);
+ ch.writeInbound(chunk2b);
+ ch.writeInbound(metadataResponse);
+ ch.writeInbound(endOfTransfer);
+
+ inputStreamFuture.toCompletableFuture().get(1, TimeUnit.MINUTES);
+ Assert.assertTrue(inputStreamFuture.toCompletableFuture().isDone());
+
+ BlobTransferPayload payload = new BlobTransferPayload(
+ baseDir.toString(),
+ TEST_STORE,
+ TEST_VERSION,
+ TEST_PARTITION,
+ BlobTransferUtils.BlobTransferTableFormat.BLOCK_BASED_TABLE);
+ Path dest = Paths.get(payload.getPartitionDir());
+ Assert.assertTrue(Files.exists(dest.resolve("test_file1.txt")));
+ Assert.assertEquals(Files.size(dest.resolve("test_file1.txt")), 5);
+ Assert.assertTrue(Files.exists(dest.resolve("test_file2.txt")));
+ Assert.assertEquals(Files.size(dest.resolve("test_file2.txt")), 10);
+
+ BlobTransferPartitionMetadata actualMetadata = clientMetadataHandler.getMetadata();
+ Assert.assertNotNull(actualMetadata);
+ Assert.assertEquals(actualMetadata.getTopicName(), expectedMetadata.getTopicName());
+ Assert.assertEquals(actualMetadata.getPartitionId(), expectedMetadata.getPartitionId());
+ Assert.assertEquals(actualMetadata.getOffsetRecord(), expectedMetadata.getOffsetRecord());
+ }
+
+ /**
+ * Backward compatibility: a peer running an older binary that does not yet emit
+ * the schema-version headers must not break P2P transfer for upgraded clients.
+ * The existing happy-path (covered by {@link #testSingleMetadataTransfer}) already
+ * exercises the no-headers case; this test makes the contract explicit by sending
+ * only one of the two headers and asserting transfer proceeds normally.
+ */
+ @Test
+ public void testSingleSchemaVersionHeaderMissingPassesThrough()
+ throws JsonProcessingException, ExecutionException, InterruptedException, TimeoutException {
+ BlobTransferPartitionMetadata expectedMetadata = new BlobTransferPartitionMetadata();
+ expectedMetadata.setTopicName(TEST_STORE + "_v" + TEST_VERSION);
+ expectedMetadata.setPartitionId(TEST_PARTITION);
+ InternalAvroSpecificSerializer partitionStateSerializer =
+ AvroProtocolDefinition.PARTITION_STATE.getSerializer();
+ OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer, DEFAULT_PUBSUB_CONTEXT_FOR_UNIT_TESTING);
+ expectedMetadata.setOffsetRecord(ByteBuffer.wrap(offsetRecord.toBytes()));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ String metadataJson = objectMapper.writeValueAsString(expectedMetadata);
+ byte[] metadataBytes = metadataJson.getBytes(CharsetUtil.UTF_8);
+
+ FullHttpResponse metadataResponse = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1,
+ HttpResponseStatus.OK,
+ Unpooled.copiedBuffer(metadataJson, CharsetUtil.UTF_8));
+ metadataResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, metadataBytes.length);
+ metadataResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json");
+ metadataResponse.headers().set(BLOB_TRANSFER_TYPE, BlobTransferType.METADATA);
+ // Only the StoreVersionState header is present; PartitionState header is absent.
+ metadataResponse.headers()
+ .set(
+ BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION,
+ AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());
+
+ DefaultHttpResponse endOfTransfer = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ endOfTransfer.headers().add(BLOB_TRANSFER_STATUS, BLOB_TRANSFER_COMPLETED);
+
+ ch.writeInbound(metadataResponse);
+ ch.writeInbound(endOfTransfer);
+
+ inputStreamFuture.toCompletableFuture().get(1, TimeUnit.MINUTES);
+ Assert.assertTrue(inputStreamFuture.toCompletableFuture().isDone());
+ Assert.assertNotNull(clientMetadataHandler.getMetadata());
+ }
+
+ @Test
+ public void testMalformedSchemaVersionHeaderPassesThrough()
+ throws JsonProcessingException, ExecutionException, InterruptedException, TimeoutException {
+ BlobTransferPartitionMetadata expectedMetadata = new BlobTransferPartitionMetadata();
+ expectedMetadata.setTopicName(TEST_STORE + "_v" + TEST_VERSION);
+ expectedMetadata.setPartitionId(TEST_PARTITION);
+ InternalAvroSpecificSerializer partitionStateSerializer =
+ AvroProtocolDefinition.PARTITION_STATE.getSerializer();
+ OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer, DEFAULT_PUBSUB_CONTEXT_FOR_UNIT_TESTING);
+ expectedMetadata.setOffsetRecord(ByteBuffer.wrap(offsetRecord.toBytes()));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ String metadataJson = objectMapper.writeValueAsString(expectedMetadata);
+ byte[] metadataBytes = metadataJson.getBytes(CharsetUtil.UTF_8);
+
+ FullHttpResponse metadataResponse = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1,
+ HttpResponseStatus.OK,
+ Unpooled.copiedBuffer(metadataJson, CharsetUtil.UTF_8));
+ metadataResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, metadataBytes.length);
+ metadataResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json");
+ metadataResponse.headers().set(BLOB_TRANSFER_TYPE, BlobTransferType.METADATA);
+ // Header value cannot be parsed as an integer — must not crash the channel.
+ metadataResponse.headers().set(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION, "not-a-number");
+ metadataResponse.headers().set(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION, "9999");
+
+ DefaultHttpResponse endOfTransfer = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ endOfTransfer.headers().add(BLOB_TRANSFER_STATUS, BLOB_TRANSFER_COMPLETED);
+
+ ch.writeInbound(metadataResponse);
+ ch.writeInbound(endOfTransfer);
+
+ inputStreamFuture.toCompletableFuture().get(1, TimeUnit.MINUTES);
+ Assert.assertTrue(inputStreamFuture.toCompletableFuture().isDone());
+ Assert.assertNotNull(clientMetadataHandler.getMetadata());
+ }
+
+ /**
+ * Server pre-empted the file transfer with a 400 BAD_REQUEST carrying the
+ * schema-mismatch marker header (the request-side check fired before any file
+ * work). Client must fail the transfer future with the typed exception, populated
+ * from the server-echoed versions in the response headers.
+ */
+ @Test
+ public void testServerSchemaMismatchRejectionThrowsTypedException() {
+ int serverPs = AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion() + 50;
+ int serverSvs = AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion() + 50;
+ String body = "Blob transfer schema version mismatch (synthetic for test)";
+ FullHttpResponse rejection = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1,
+ HttpResponseStatus.BAD_REQUEST,
+ Unpooled.copiedBuffer(body, CharsetUtil.UTF_8));
+ rejection.headers().set(HttpHeaderNames.CONTENT_LENGTH, body.getBytes(CharsetUtil.UTF_8).length);
+ rejection.headers().set(BLOB_TRANSFER_SCHEMA_MISMATCH, "true");
+ rejection.headers().set(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION, serverPs);
+ rejection.headers().set(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION, serverSvs);
+
+ ch.writeInbound(rejection);
+
+ try {
+ inputStreamFuture.toCompletableFuture().get(1, TimeUnit.SECONDS);
+ Assert.fail("Expected exception not thrown");
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getCause() instanceof VeniceBlobTransferIncompatibleSchemaException,
+ "Expected VeniceBlobTransferIncompatibleSchemaException, got: " + e.getCause());
+ VeniceBlobTransferIncompatibleSchemaException ex = (VeniceBlobTransferIncompatibleSchemaException) e.getCause();
+ Assert.assertEquals(ex.getPeerPartitionStateVersion(), serverPs);
+ Assert.assertEquals(ex.getPeerStoreVersionStateVersion(), serverSvs);
+ Assert.assertEquals(
+ ex.getLocalPartitionStateVersion(),
+ AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion());
+ Assert.assertEquals(
+ ex.getLocalStoreVersionStateVersion(),
+ AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());
+ }
+ }
+
/**
* Generate checksum via string content
*/
diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java
index 09f810da024..723f9c26426 100644
--- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java
+++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java
@@ -1,7 +1,10 @@
package com.linkedin.davinci.blobtransfer;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_COMPLETED;
+import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION;
+import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_SCHEMA_MISMATCH;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STATUS;
+import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_TYPE;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferType;
import static com.linkedin.venice.response.VeniceReadResponseStatus.TOO_MANY_REQUESTS;
@@ -347,6 +350,9 @@ public void testTransferMultipleFiles() throws IOException {
DefaultHttpResponse httpResponse = (DefaultHttpResponse) response;
Assert.assertTrue(fileNames.contains(httpResponse.headers().get(HttpHeaderNames.CONTENT_DISPOSITION)));
Assert.assertTrue(fileChecksums.contains(httpResponse.headers().get(HttpHeaderNames.CONTENT_MD5)));
+ // file responses must not carry the schema-version headers; they only apply to metadata responses.
+ Assert.assertNull(httpResponse.headers().get(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION));
+ Assert.assertNull(httpResponse.headers().get(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION));
fileNames.remove(httpResponse.headers().get(HttpHeaderNames.CONTENT_DISPOSITION));
fileChecksums.remove(httpResponse.headers().get(HttpHeaderNames.CONTENT_MD5));
response = ch.readOutbound();
@@ -368,6 +374,12 @@ public void testTransferMultipleFiles() throws IOException {
Assert.assertTrue(response instanceof FullHttpResponse);
FullHttpResponse metadataResponse = (FullHttpResponse) response;
Assert.assertEquals(metadataResponse.headers().get(BLOB_TRANSFER_TYPE), BlobTransferType.METADATA.toString());
+ Assert.assertEquals(
+ metadataResponse.headers().getInt(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION).intValue(),
+ AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion());
+ Assert.assertEquals(
+ metadataResponse.headers().getInt(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION).intValue(),
+ AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());
ByteBuf content = metadataResponse.content();
byte[] metadataBytes = new byte[content.readableBytes()];
@@ -398,6 +410,141 @@ public void testTransferMultipleFiles() throws IOException {
Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0);
}
+ /**
+ * Server rejects a request whose advertised PartitionState/StoreVersionState
+ * schema versions don't match the local binary's, with a 400 + marker header,
+ * BEFORE any file work begins. Without this, the client would receive every
+ * file byte and only discover the mismatch at the metadata stage.
+ */
+ @Test
+ public void testRejectRequestWithMismatchedSchemaVersion() throws IOException {
+ // The snapshot dir is set up so that if the schema check were bypassed, file
+ // responses would be produced; assert they are NOT produced here.
+ Path snapshotDir = Paths.get(RocksDBUtils.composeSnapshotDir(baseDir.toString(), "myStore_v1", 10));
+ Files.createDirectories(snapshotDir);
+ Files.write(snapshotDir.resolve("file1").toAbsolutePath(), "hello".getBytes());
+
+ int incompatibleVersion = AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion() + 100;
+ FullHttpRequest request =
+ new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/myStore/1/10/BLOCK_BASED_TABLE");
+ request.headers().set(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION, incompatibleVersion);
+ request.headers()
+ .set(
+ BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION,
+ AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());
+
+ ch.writeInbound(request);
+
+ Object response = ch.readOutbound();
+ Assert.assertTrue(response instanceof FullHttpResponse);
+ FullHttpResponse rejection = (FullHttpResponse) response;
+ Assert.assertEquals(rejection.status(), HttpResponseStatus.BAD_REQUEST);
+ Assert.assertEquals(rejection.headers().get(BLOB_TRANSFER_SCHEMA_MISMATCH), "true");
+ // Server echoes its local versions so the client can build a typed exception.
+ Assert.assertEquals(
+ rejection.headers().getInt(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION).intValue(),
+ AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion());
+ Assert.assertEquals(
+ rejection.headers().getInt(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION).intValue(),
+ AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());
+ // No further outbound messages — no file response was produced.
+ Assert.assertNull(ch.readOutbound());
+ }
+
+ /**
+ * Rolling-deploy contract (NEW requester → NEW sender, matching versions):
+ * a request that advertises schema-version headers equal to the server's
+ * local versions must drive a full transfer (file → metadata → STATUS),
+ * with the schema-mismatch marker absent on every response and the new
+ * schema-version headers attached to the metadata response.
+ */
+ @Test
+ public void testRequestWithMatchingSchemaVersionHeadersCompletesTransfer() throws IOException {
+ StorageEngine localStorageEngine = Mockito.mock(StorageEngine.class);
+ Mockito.doReturn(localStorageEngine).when(storageEngineRepository).getLocalStorageEngine(Mockito.any());
+ Mockito.doReturn(true).when(localStorageEngine).containsPartition(Mockito.anyInt());
+
+ StoreVersionState storeVersionState = new StoreVersionState();
+ Mockito.doReturn(storeVersionState).when(storageMetadataService).getStoreVersionState(Mockito.any());
+ InternalAvroSpecificSerializer partitionStateSerializer =
+ AvroProtocolDefinition.PARTITION_STATE.getSerializer();
+ OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer, DEFAULT_PUBSUB_CONTEXT_FOR_UNIT_TESTING);
+ offsetRecord.setOffsetLag(1000L);
+ Mockito.doReturn(offsetRecord).when(storageMetadataService).getLastOffset(Mockito.any(), Mockito.anyInt(), any());
+
+ Path snapshotDir = Paths.get(RocksDBUtils.composeSnapshotDir(baseDir.toString(), "myStore_v1", 10));
+ Files.createDirectories(snapshotDir);
+ Files.write(snapshotDir.resolve("file1").toAbsolutePath(), "hello".getBytes());
+ Mockito.doNothing().when(blobSnapshotManager).createSnapshot(Mockito.anyString(), Mockito.anyInt());
+
+ FullHttpRequest request =
+ new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/myStore/1/10/BLOCK_BASED_TABLE");
+ request.headers()
+ .set(
+ BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION,
+ AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion());
+ request.headers()
+ .set(
+ BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION,
+ AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());
+
+ ch.writeInbound(request);
+
+ // File response
+ Object response = ch.readOutbound();
+ Assert.assertTrue(response instanceof DefaultHttpResponse);
+ DefaultHttpResponse fileResponse = (DefaultHttpResponse) response;
+ Assert.assertEquals(fileResponse.headers().get(BLOB_TRANSFER_TYPE), BlobTransferType.FILE.toString());
+ Assert.assertNull(fileResponse.headers().get(BLOB_TRANSFER_SCHEMA_MISMATCH));
+ // File chunk
+ response = ch.readOutbound();
+ Assert.assertTrue(response instanceof HttpChunkedInput);
+
+ // Metadata response — must echo current schema-version headers, no mismatch marker.
+ response = ch.readOutbound();
+ Assert.assertTrue(response instanceof FullHttpResponse);
+ FullHttpResponse metadataResponse = (FullHttpResponse) response;
+ Assert.assertEquals(metadataResponse.headers().get(BLOB_TRANSFER_TYPE), BlobTransferType.METADATA.toString());
+ Assert.assertNull(metadataResponse.headers().get(BLOB_TRANSFER_SCHEMA_MISMATCH));
+ Assert.assertEquals(
+ metadataResponse.headers().getInt(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION).intValue(),
+ AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion());
+ Assert.assertEquals(
+ metadataResponse.headers().getInt(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION).intValue(),
+ AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());
+
+ // STATUS response
+ response = ch.readOutbound();
+ Assert.assertTrue(response instanceof DefaultHttpResponse);
+ Assert.assertEquals(((DefaultHttpResponse) response).headers().get(BLOB_TRANSFER_STATUS), BLOB_TRANSFER_COMPLETED);
+
+ ch.pipeline().fireUserEventTriggered(IdleStateEvent.ALL_IDLE_STATE_EVENT);
+ Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0);
+ }
+
+ /**
+ * Backward compatibility: a request that does NOT carry the schema-version
+ * headers (peer is on an older binary) must not be rejected — the server should
+ * fall through to the existing flow.
+ */
+ @Test
+ public void testRequestWithoutSchemaVersionHeadersIsNotRejected() throws IOException {
+ Path snapshotDir = Paths.get(RocksDBUtils.composeSnapshotDir(baseDir.toString(), "myStore_v1", 10));
+ Files.createDirectories(snapshotDir);
+
+ FullHttpRequest request =
+ new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/myStore/1/10/BLOCK_BASED_TABLE");
+ // Intentionally NOT setting BLOB_TRANSFER_*_SCHEMA_VERSION headers.
+
+ ch.writeInbound(request);
+
+ Object response = ch.readOutbound();
+ Assert.assertTrue(response instanceof FullHttpResponse);
+ // Falls through to the existing not-found path because no real metadata service is wired.
+ // The key assertion is that the response is NOT a schema-mismatch rejection.
+ Assert.assertNull(((FullHttpResponse) response).headers().get(BLOB_TRANSFER_SCHEMA_MISMATCH));
+ }
+
/**
* Test when fail to get the metadata from storageMetadataService, it should return error to client.
* @throws IOException
diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaException.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaException.java
new file mode 100644
index 00000000000..c4e878ce4de
--- /dev/null
+++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaException.java
@@ -0,0 +1,80 @@
+package com.linkedin.venice.exceptions;
+
+/**
+ * Thrown when a P2P blob-transfer client receives a metadata response from a peer
+ * whose serialization protocol version for {@code PartitionState} or
+ * {@code StoreVersionState} does not match the local binary's compiled-in
+ * {@code currentProtocolVersion}. Used to fail fast at HTTP header-parse time,
+ * before any body is consumed, so the transfer can fall over to the next peer or
+ * to Kafka bootstrap without waiting for the per-host receive timeout.
+ *
+ * Both sides advertise their {@code currentProtocolVersion}; the policy is exact
+ * equality, since blob transfer is the fast path and any binary skew between peers
+ * is bounded to rolling-deploy windows.
+ */
+public class VeniceBlobTransferIncompatibleSchemaException extends VeniceException {
+ /** Sentinel meaning the peer did not provide a value for this protocol. */
+ public static final int VERSION_UNKNOWN = -1;
+
+ private final String peerHost;
+ private final int peerPartitionStateVersion;
+ private final int peerStoreVersionStateVersion;
+ private final int localPartitionStateVersion;
+ private final int localStoreVersionStateVersion;
+
+ public VeniceBlobTransferIncompatibleSchemaException(
+ String peerHost,
+ int peerPartitionStateVersion,
+ int peerStoreVersionStateVersion,
+ int localPartitionStateVersion,
+ int localStoreVersionStateVersion) {
+ super(
+ buildMessage(
+ peerHost,
+ peerPartitionStateVersion,
+ peerStoreVersionStateVersion,
+ localPartitionStateVersion,
+ localStoreVersionStateVersion));
+ this.peerHost = peerHost;
+ this.peerPartitionStateVersion = peerPartitionStateVersion;
+ this.peerStoreVersionStateVersion = peerStoreVersionStateVersion;
+ this.localPartitionStateVersion = localPartitionStateVersion;
+ this.localStoreVersionStateVersion = localStoreVersionStateVersion;
+ }
+
+ private static String buildMessage(
+ String peerHost,
+ int peerPartitionStateVersion,
+ int peerStoreVersionStateVersion,
+ int localPartitionStateVersion,
+ int localStoreVersionStateVersion) {
+ return "Peer " + peerHost + " serialized blob-transfer metadata with protocol versions" + " PartitionState="
+ + render(peerPartitionStateVersion) + ", StoreVersionState=" + render(peerStoreVersionStateVersion)
+ + "; local binary has PartitionState=" + localPartitionStateVersion + ", StoreVersionState="
+ + localStoreVersionStateVersion;
+ }
+
+ private static String render(int v) {
+ return v == VERSION_UNKNOWN ? "" : Integer.toString(v);
+ }
+
+ public String getPeerHost() {
+ return peerHost;
+ }
+
+ public int getPeerPartitionStateVersion() {
+ return peerPartitionStateVersion;
+ }
+
+ public int getPeerStoreVersionStateVersion() {
+ return peerStoreVersionStateVersion;
+ }
+
+ public int getLocalPartitionStateVersion() {
+ return localPartitionStateVersion;
+ }
+
+ public int getLocalStoreVersionStateVersion() {
+ return localStoreVersionStateVersion;
+ }
+}
diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaExceptionTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaExceptionTest.java
new file mode 100644
index 00000000000..e2ca6b910e8
--- /dev/null
+++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/exceptions/VeniceBlobTransferIncompatibleSchemaExceptionTest.java
@@ -0,0 +1,67 @@
+package com.linkedin.venice.exceptions;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class VeniceBlobTransferIncompatibleSchemaExceptionTest {
+ @Test
+ public void testKnownVersionsOnBothSides() {
+ String peerHost = "peer-host:1234";
+ VeniceBlobTransferIncompatibleSchemaException e =
+ new VeniceBlobTransferIncompatibleSchemaException(peerHost, 4, 5, 6, 7);
+
+ Assert.assertEquals(e.getPeerHost(), peerHost);
+ Assert.assertEquals(e.getPeerPartitionStateVersion(), 4);
+ Assert.assertEquals(e.getPeerStoreVersionStateVersion(), 5);
+ Assert.assertEquals(e.getLocalPartitionStateVersion(), 6);
+ Assert.assertEquals(e.getLocalStoreVersionStateVersion(), 7);
+
+ String message = e.getMessage();
+ Assert.assertTrue(message.contains(peerHost), "message should mention peer host: " + message);
+ Assert.assertTrue(message.contains("PartitionState=4"), "message should mention peer PartitionState: " + message);
+ Assert.assertTrue(
+ message.contains("StoreVersionState=5"),
+ "message should mention peer StoreVersionState: " + message);
+ Assert.assertTrue(message.contains("PartitionState=6"), "message should mention local PartitionState: " + message);
+ Assert.assertTrue(
+ message.contains("StoreVersionState=7"),
+ "message should mention local StoreVersionState: " + message);
+ Assert.assertFalse(message.contains(""), "all versions are known, message should not contain ");
+ }
+
+ @Test
+ public void testPeerVersionsUnknownRendersUnknown() {
+ String peerHost = "older-peer:5678";
+ VeniceBlobTransferIncompatibleSchemaException e = new VeniceBlobTransferIncompatibleSchemaException(
+ peerHost,
+ VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN,
+ VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN,
+ 3,
+ 9);
+
+ Assert
+ .assertEquals(e.getPeerPartitionStateVersion(), VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN);
+ Assert.assertEquals(
+ e.getPeerStoreVersionStateVersion(),
+ VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN);
+ Assert.assertEquals(e.getLocalPartitionStateVersion(), 3);
+ Assert.assertEquals(e.getLocalStoreVersionStateVersion(), 9);
+
+ String message = e.getMessage();
+ Assert.assertTrue(
+ message.contains("PartitionState="),
+ "unknown peer PartitionState should render as : " + message);
+ Assert.assertTrue(
+ message.contains("StoreVersionState="),
+ "unknown peer StoreVersionState should render as : " + message);
+ Assert.assertTrue(message.contains("PartitionState=3"), "local PartitionState should be rendered: " + message);
+ Assert
+ .assertTrue(message.contains("StoreVersionState=9"), "local StoreVersionState should be rendered: " + message);
+ }
+
+ @Test
+ public void testVersionUnknownSentinel() {
+ Assert.assertEquals(VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN, -1);
+ }
+}