Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.venice.SSLConfig;
import com.linkedin.venice.exceptions.VeniceBlobTransferIncompatibleSchemaException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.SslUtils;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.ssl.SslHandler;
import java.io.File;
Expand All @@ -32,6 +35,24 @@ public class BlobTransferUtils {
public static final String BLOB_TRANSFER_STATUS = "X-Blob-Transfer-Status";
public static final String BLOB_TRANSFER_COMPLETED = "Completed";
public static final String BLOB_TRANSFER_TYPE = "X-Blob-Transfer-Type";
/**
* Protocol version the peer used to serialize the {@code PartitionState} embedded in
* the metadata response body. The client uses this to fail fast when the local binary
* does not have a reader for that version, rather than discovering the mismatch during
* deserialization after the body has been fully received.
*/
public static final String BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION =
"X-Blob-Transfer-Partition-State-Schema-Version";
/** Same purpose as {@link #BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION} but for {@code StoreVersionState}. */
public static final String BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION =
"X-Blob-Transfer-Store-Version-State-Schema-Version";
/**
* Marker header set on the server's 400 BAD_REQUEST response when it rejects a
* blob-transfer request because the requester's advertised schema versions do not
* match the server's local versions. Lets the client distinguish a schema-version
* rejection from any other 400 without parsing the body.
*/
public static final String BLOB_TRANSFER_SCHEMA_MISMATCH = "X-Blob-Transfer-Schema-Mismatch";

public enum BlobTransferType {
FILE, METADATA
Expand Down Expand Up @@ -89,6 +110,119 @@ public static boolean isMetadataMessage(HttpResponse msg) {
return metadataHeader.equals(BlobTransferUtils.BlobTransferType.METADATA.name());
}

/**
* Validate that the schema-version headers on a P2P metadata response match the
* local binary's compiled-in {@code currentProtocolVersion} for each protocol.
* Throws {@link VeniceBlobTransferIncompatibleSchemaException} when at least one
* header is present and does not equal the local version — letting the caller fail
* fast at HTTP header-parse time, before any body is consumed.
*
* <p>An exact-match policy is used (rather than e.g. "peer &lt;= local"). Blob
* transfer is the fast path; if the binaries on the two ends are not in lock-step
* we want to step aside and let Kafka bootstrap take over rather than rely on
* cross-version Avro promotion of the partition metadata. Skew between peers is
* limited to rolling-deploy windows, so the cost of being strict is bounded.
*
* <p>Behaviour for the absent / malformed cases is intentionally permissive so a
* server-side rollout of the new headers cannot break peers that haven't been
* upgraded yet, and so a header parsing bug cannot crash the channel:
* <ul>
* <li>Both headers absent — pass through (peer is on an older binary).</li>
* <li>Header value non-numeric or out of byte range — log a warning and pass
* through; the existing deserialization-time exception remains as the safety
* net for the truly incompatible case (no regression vs. today).</li>
* </ul>
*/
public static void validateMetadataResponseSchemaVersions(HttpResponse response, String peerHost) {
String psHeader = response.headers().get(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION);
String svsHeader = response.headers().get(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION);
if (psHeader == null && svsHeader == null) {
return;
}

int peerPs = parseProtocolVersionHeader(psHeader, BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION);
int peerSvs = parseProtocolVersionHeader(svsHeader, BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION);

int localPs = AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion();
int localSvs = AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion();

boolean psMismatch = peerPs != VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN && peerPs != localPs;
boolean svsMismatch =
peerSvs != VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN && peerSvs != localSvs;

if (psMismatch || svsMismatch) {
LOGGER.warn(
"Aborting P2P blob transfer from peer {}: metadata schema version mismatch"
+ " (peer PartitionState={}, StoreVersionState={}; local PartitionState={}, StoreVersionState={})",
peerHost,
renderVersion(peerPs),
renderVersion(peerSvs),
localPs,
localSvs);
throw new VeniceBlobTransferIncompatibleSchemaException(peerHost, peerPs, peerSvs, localPs, localSvs);
}
}

/**
* Server-side counterpart of {@link #validateMetadataResponseSchemaVersions}: compare
* the schema-version headers on a P2P blob-transfer GET request against the local
* binary's compiled-in {@code currentProtocolVersion}. Used by the server right next
* to the table-format check so a schema mismatch is rejected with a 400 BAD_REQUEST
* before any file work begins — otherwise the client would pay for the entire file
* transfer and only discover the mismatch at the metadata stage.
*
* <p>Same equality policy as the response side. Returns a diagnostic string suitable
* for the response body when the request is incompatible, or {@code null} when it
* is compatible (or when both headers are absent — older clients that have not yet
* been upgraded to advertise their versions).
*/
public static String compareRequestedSchemaVersionsAgainstLocal(HttpRequest request) {
String psHeader = request.headers().get(BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION);
String svsHeader = request.headers().get(BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION);
if (psHeader == null && svsHeader == null) {
return null;
}

int peerPs = parseProtocolVersionHeader(psHeader, BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION);
int peerSvs = parseProtocolVersionHeader(svsHeader, BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION);

int localPs = AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion();
int localSvs = AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion();

boolean psMismatch = peerPs != VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN && peerPs != localPs;
boolean svsMismatch =
peerSvs != VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN && peerSvs != localSvs;

if (psMismatch || svsMismatch) {
return "Blob transfer schema version mismatch: requester PartitionState=" + renderVersion(peerPs)
+ ", StoreVersionState=" + renderVersion(peerSvs) + "; local PartitionState=" + localPs
+ ", StoreVersionState=" + localSvs;
}
return null;
}

private static int parseProtocolVersionHeader(String value, String headerName) {
if (value == null) {
return VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN;
}
try {
int parsed = Integer.parseInt(value.trim());
// Protocol versions are encoded into a single byte on the wire (see InternalAvroSpecificSerializer).
if (parsed < 0 || parsed > Byte.MAX_VALUE) {
LOGGER.warn("Out-of-range value '{}' for blob-transfer header {}; treating as unknown.", value, headerName);
return VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN;
}
return parsed;
} catch (NumberFormatException e) {
LOGGER.warn("Malformed value '{}' for blob-transfer header {}; treating as unknown.", value, headerName);
return VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN;
}
}

private static String renderVersion(int v) {
return v == VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN ? "<unknown>" : Integer.toString(v);
}

/**
* Generate MD5 checksum for a file
* @param filePath the path to the file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.venice.listener.VerifySslHandler;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.LogContext;
import com.linkedin.venice.utils.Utils;
Expand Down Expand Up @@ -390,10 +391,21 @@ private FullHttpRequest prepareRequest(
int version,
int partition,
BlobTransferTableFormat requestTableFormat) {
return new DefaultFullHttpRequest(
FullHttpRequest request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
HttpMethod.GET,
String.format("/%s/%d/%d/%s", storeName, version, partition, requestTableFormat.name()));
// Advertise the schema versions this client can deserialize so the server can
// reject the request before any file work begins on a mismatch.
request.headers()
.set(
BlobTransferUtils.BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION,
AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion());
request.headers()
.set(
BlobTransferUtils.BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION,
AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());
return request;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.linkedin.davinci.blobtransfer.client;

import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_COMPLETED;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_SCHEMA_MISMATCH;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STATUS;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION;

import com.linkedin.davinci.blobtransfer.BlobTransferPayload;
import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.stats.AggBlobTransferStats;
import com.linkedin.venice.exceptions.VeniceBlobTransferFileNotFoundException;
import com.linkedin.venice.exceptions.VeniceBlobTransferIncompatibleSchemaException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.Utils;
Expand Down Expand Up @@ -103,6 +108,8 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex
if (response.status().equals(HttpResponseStatus.NOT_FOUND)) {
throw new VeniceBlobTransferFileNotFoundException(
"Requested files from remote peer are not found. Response: " + response.status());
} else if (response.headers().contains(BLOB_TRANSFER_SCHEMA_MISMATCH)) {
throw buildSchemaMismatchExceptionFromErrorResponse(ctx, response);
} else {
throw new VeniceException("Failed to fetch file from remote peer. Response: " + response.status());
}
Expand All @@ -111,6 +118,13 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex
// redirect the message to the next handler if it's a metadata transfer
boolean isMetadataMessage = BlobTransferUtils.isMetadataMessage(response);
if (isMetadataMessage) {
// Fail fast at header-parse time if the peer's metadata is serialized with a
// protocol version this binary cannot read; throwing here flows through
// exceptionCaught -> handleExceptionGracefully and fails the per-host transfer
// future immediately, so the next peer (or Kafka bootstrap) can take over
// without waiting for blobReceiveTimeoutInMin.
BlobTransferUtils
.validateMetadataResponseSchemaVersions(response, String.valueOf(ctx.channel().remoteAddress()));
ReferenceCountUtil.retain(msg);
ctx.fireChannelRead(msg);
return;
Expand Down Expand Up @@ -287,6 +301,40 @@ void handleExceptionGracefully(Throwable cause) {

}

/**
* Build a {@link VeniceBlobTransferIncompatibleSchemaException} from a server error
* response carrying the {@link BlobTransferUtils#BLOB_TRANSFER_SCHEMA_MISMATCH}
* marker. The server echoes its own schema versions in the response headers; the
* "peer" fields on the exception are those server-side values, the "local" fields
* are this client's compiled-in versions.
*/
private VeniceBlobTransferIncompatibleSchemaException buildSchemaMismatchExceptionFromErrorResponse(
ChannelHandlerContext ctx,
HttpResponse response) {
int peerPs = readVersionHeader(response, BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION);
int peerSvs = readVersionHeader(response, BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION);
int localPs = AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion();
int localSvs = AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion();
return new VeniceBlobTransferIncompatibleSchemaException(
String.valueOf(ctx.channel().remoteAddress()),
peerPs,
peerSvs,
localPs,
localSvs);
}

private static int readVersionHeader(HttpResponse response, String name) {
String value = response.headers().get(name);
if (value == null) {
return VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN;
}
try {
return Integer.parseInt(value.trim());
} catch (NumberFormatException e) {
return VeniceBlobTransferIncompatibleSchemaException.VERSION_UNKNOWN;
}
}

private String getFileNameFromHeader(HttpResponse response) {
String contentDisposition = response.headers().get(HttpHeaderNames.CONTENT_DISPOSITION);
if (contentDisposition != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.linkedin.davinci.blobtransfer.server;

import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_COMPLETED;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_SCHEMA_MISMATCH;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STATUS;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_TYPE;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferTableFormat;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferType;
Expand All @@ -18,6 +21,7 @@
import com.linkedin.davinci.stats.AggBlobTransferStats;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.request.RequestHelper;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.Utils;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -127,6 +131,16 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpReque
return;
}

// Check the requester's metadata schema versions before doing any file work.
// If the local binary cannot serialize PartitionState/StoreVersionState in a
// version the requester can read, fail fast with 400 + a marker header so the
// client doesn't pay for file bytes only to reject the metadata at the end.
String schemaMismatch = BlobTransferUtils.compareRequestedSchemaVersionsAgainstLocal(httpRequest);
if (schemaMismatch != null) {
sendSchemaMismatchResponse(ctx, schemaMismatch);
return;
}

// Check the concurrent request limit
if (globalConcurrentTransferRequests.get() >= maxAllowedConcurrentSnapshotUsers) {
String errMessage =
Expand Down Expand Up @@ -329,6 +343,30 @@ private void sendFile(
});
}

/**
* Send a 400 BAD_REQUEST response for a schema-version mismatch. Echoes the
* server's local protocol versions so the client can build a typed exception
* with full diagnostic context, and sets a marker header so the client can
* distinguish this from any other 400.
*/
private void sendSchemaMismatchResponse(ChannelHandlerContext ctx, String diagnosticBody) {
byte[] body = diagnosticBody.getBytes();
FullHttpResponse response =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST, Unpooled.wrappedBuffer(body));
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, body.length);
response.headers().set(BLOB_TRANSFER_SCHEMA_MISMATCH, "true");
response.headers()
.set(
BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION,
AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion());
response.headers()
.set(
BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION,
AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());
LOGGER.warn("Rejecting blob-transfer request from {}: {}", ctx.channel().remoteAddress(), diagnosticBody);
ctx.writeAndFlush(response);
}

/**
* Send metadata for the given blob transfer request
* @param ctx the channel context
Expand All @@ -347,6 +385,14 @@ public void sendMetadata(ChannelHandlerContext ctx, BlobTransferPartitionMetadat
metadataResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, metadataBytes.length);
metadataResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, APPLICATION_JSON);
metadataResponse.headers().set(BLOB_TRANSFER_TYPE, BlobTransferType.METADATA);
metadataResponse.headers()
.set(
BLOB_TRANSFER_PARTITION_STATE_SCHEMA_VERSION,
AvroProtocolDefinition.PARTITION_STATE.getCurrentProtocolVersion());
metadataResponse.headers()
.set(
BLOB_TRANSFER_STORE_VERSION_STATE_SCHEMA_VERSION,
AvroProtocolDefinition.STORE_VERSION_STATE.getCurrentProtocolVersion());

ctx.writeAndFlush(metadataResponse).addListener(future -> {
if (future.isSuccess()) {
Expand Down
Loading