From 8d7223cb1176e628a95aa40398f4ae04e0e2adc0 Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Sat, 16 May 2026 00:59:15 -0700 Subject: [PATCH 1/6] [controller][schema] Coerce legacy numeric defaults during store migration Adds a destination-side rewrite ({0 -> 0.0} on float-typed fields, etc.) so legacy schemas registered before validateNumericDefaultValueTypes was enforced can be migrated into clusters where the controller's STRICT parser now rejects them. Gated on storeConfig.migrationDestCluster, so non-migration writes are unaffected; defensively re-strict-parses the output so non-numeric violations (bad names, dangling content, union default not first branch) still fail loudly. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../utils/TestAvroSchemaParseUtils.java | 65 +++++++ .../venice/schema/AvroSchemaParseUtils.java | 112 ++++++++++++ .../venice/endToEnd/TestStoreMigration.java | 172 ++++++++++++++++++ .../venice/controller/VeniceHelixAdmin.java | 50 +++++ .../controller/VeniceParentHelixAdmin.java | 2 + .../AbstractTestVeniceParentHelixAdmin.java | 4 + .../TestVeniceHelixAdminWithoutCluster.java | 90 +++++++++ 7 files changed, 495 insertions(+) diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/utils/TestAvroSchemaParseUtils.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/utils/TestAvroSchemaParseUtils.java index 8766e3621ff..c6c11363012 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/utils/TestAvroSchemaParseUtils.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/utils/TestAvroSchemaParseUtils.java @@ -56,6 +56,71 @@ public void testParseSchemaSuccessfullyWhenExtendedValidationEnabled() { AvroSchemaParseUtils.parseSchemaFromJSON(SCHEMA_PASS_EXTENDED_VALIDATION, extendedSchemaValidityCheckEnabled); } + // Legacy-style schema with an int default on a float field — fails STRICT (numeric tier check), + // accepted by LOOSE_NUMERICS, and rewritten by coerceNumericDefaultsToFieldType. + private static final String SCHEMA_NUMERIC_DEFAULT_MISMATCH = + "{\n" + " \"type\": \"record\",\n" + " \"name\": \"Scores\",\n" + " \"fields\": [\n" + + " { \"name\": \"score\", \"type\": \"float\", \"default\": 0 }\n" + " ]\n" + "}"; + + // Union with default whose declared type matches the second branch, not the first — a STRICT + // failure that's outside the numeric-default tier. Confirms LOOSE_NUMERICS only opens the one door. + private static final String SCHEMA_UNION_DEFAULT_NOT_FIRST_BRANCH = + "{\n" + " \"type\": \"record\",\n" + " \"name\": \"BadUnionDefault\",\n" + " \"fields\": [\n" + + " { \"name\": \"f\", \"type\": [\"int\", \"null\"], \"default\": null }\n" + " ]\n" + "}"; + + @Test + public void testLooseNumericAcceptsIntDefaultOnFloatField() { + // STRICT rejects this — the historical migration foot-gun. + Assert.assertThrows( + Exception.class, + () -> AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(SCHEMA_NUMERIC_DEFAULT_MISMATCH)); + + // LOOSE_NUMERICS accepts it. + Schema parsed = AvroSchemaParseUtils.parseSchemaFromJSONLooseNumericValidation(SCHEMA_NUMERIC_DEFAULT_MISMATCH); + Assert.assertNotNull(parsed); + } + + @Test + public void testLooseNumericRejectsNonNumericStrictViolation() { + // The union-default-not-first-branch case is a STRICT failure that LOOSE_NUMERICS preserves — + // it's outside the numeric-default tier this preset relaxes. + Assert.assertThrows( + Exception.class, + () -> AvroSchemaParseUtils.parseSchemaFromJSONLooseNumericValidation(SCHEMA_UNION_DEFAULT_NOT_FIRST_BRANCH)); + } + + @Test + public void testCoerceNumericDefaultsRewritesIntDefaultOnFloatField() { + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(SCHEMA_NUMERIC_DEFAULT_MISMATCH); + Assert.assertNotEquals(coerced, SCHEMA_NUMERIC_DEFAULT_MISMATCH, "Legacy schema must be rewritten"); + // Output must be strict-parse-clean — that's the whole point. + Schema strictParsed = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + Assert.assertNotNull(strictParsed); + } + + @Test + public void testCoerceNumericDefaultsIsNoOpForCleanSchema() { + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(SCHEMA_PASS_EXTENDED_VALIDATION); + Assert.assertSame(coerced, SCHEMA_PASS_EXTENDED_VALIDATION, "Clean schema must be returned by identity"); + } + + @Test + public void testCoerceNumericDefaultsDoesNotFixNonNumericIssues() { + // The walker only fixes numeric default mismatches. Other STRICT violations must remain. + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(SCHEMA_UNION_DEFAULT_NOT_FIRST_BRANCH); + Assert.assertThrows(Exception.class, () -> AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced)); + } + + @Test + public void testCoerceNumericDefaultsHandlesAllNumericTiers() { + String schema = "{\"type\":\"record\",\"name\":\"AllTiers\",\"fields\":[" + + "{\"name\":\"f\",\"type\":\"float\",\"default\":0}," + "{\"name\":\"d\",\"type\":\"double\",\"default\":1}," + + "{\"name\":\"l\",\"type\":\"long\",\"default\":2.0}," + "{\"name\":\"i\",\"type\":\"int\",\"default\":3.0}" + + "]}"; + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(schema); + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + } + @Test public void testParseSchemaStrWithDifferentConfigurations() { String schemaStr = "{" + " \"doc\": \"Value in the store\"," + " \"fields\": [" + " {\n" diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/AvroSchemaParseUtils.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/AvroSchemaParseUtils.java index 21220fa173c..8e1285fc853 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/AvroSchemaParseUtils.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/AvroSchemaParseUtils.java @@ -1,8 +1,18 @@ package com.linkedin.venice.schema; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.DoubleNode; +import com.fasterxml.jackson.databind.node.FloatNode; +import com.fasterxml.jackson.databind.node.IntNode; +import com.fasterxml.jackson.databind.node.LongNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.avroutil1.compatibility.SchemaParseConfiguration; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.utils.ObjectMapperFactory; +import java.io.IOException; +import java.util.Iterator; import org.apache.avro.Schema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,7 +48,109 @@ public static Schema parseSchemaFromJSONStrictValidation(String jsonSchema) { return AvroCompatibilityHelper.parse(jsonSchema, SchemaParseConfiguration.STRICT, null).getMainSchema(); } + /** + * Parses with {@link SchemaParseConfiguration#LOOSE_NUMERICS}, which differs from STRICT only by + * disabling {@code validateNumericDefaultValueTypes}. Numeric defaults whose JSON tier doesn't + * match the field's declared numeric type (e.g. {@code "default": 0} for a {@code float} field) + * are accepted at parse time. All other strict checks (name validation, + * default-actually-matches-type, dangling-content) still apply. + * + * Note: Avro retains the original {@code JsonNode} as the default — it does NOT coerce the in-memory + * default to the declared type. Use {@link #coerceNumericDefaultsToFieldType(String)} if you need a + * string form that subsequently passes STRICT. + */ + public static Schema parseSchemaFromJSONLooseNumericValidation(String jsonSchema) { + return AvroCompatibilityHelper.parse(jsonSchema, SchemaParseConfiguration.LOOSE_NUMERICS, null).getMainSchema(); + } + public static Schema parseSchemaFromJSONLooseValidation(String jsonSchema) { return AvroCompatibilityHelper.parse(jsonSchema, SchemaParseConfiguration.LOOSE, null).getMainSchema(); } + + /** + * Walks the schema JSON and coerces numeric default values to the declared primitive numeric field + * type. For example {@code {"name":"score","type":"float","default":0}} becomes + * {@code {"name":"score","type":"float","default":0.0}} — making the schema strict-parse-clean + * without changing its semantic meaning (Avro fingerprint / parsing canonical form is unaffected + * since defaults are not part of the canonical form). + * + * Only acts on field specs whose {@code type} is a textual primitive numeric name and whose + * {@code default} is a numeric JSON value of a different tier. Does not touch unions, complex + * types, non-numeric defaults, or fields whose default already matches the declared type. + * + * Returns the input unchanged if no coercion was needed. + * + * Intended for use during store migration to rewrite legacy schemas registered before + * {@code validateNumericDefaultValueTypes} was enforced. + */ + public static String coerceNumericDefaultsToFieldType(String jsonSchema) { + try { + ObjectMapper mapper = ObjectMapperFactory.getInstance(); + JsonNode root = mapper.readTree(jsonSchema); + if (coerceInPlace(root)) { + return mapper.writeValueAsString(root); + } + return jsonSchema; + } catch (IOException e) { + throw new VeniceException("Failed to parse JSON for numeric default coercion: " + jsonSchema, e); + } + } + + private static boolean coerceInPlace(JsonNode node) { + boolean changed = false; + if (node.isArray()) { + for (JsonNode child: node) { + if (coerceInPlace(child)) { + changed = true; + } + } + return changed; + } + if (!node.isObject()) { + return false; + } + ObjectNode obj = (ObjectNode) node; + JsonNode typeNode = obj.get("type"); + JsonNode defaultNode = obj.get("default"); + if (typeNode != null && typeNode.isTextual() && defaultNode != null && defaultNode.isNumber()) { + JsonNode coerced = coerceNumber(defaultNode, typeNode.asText()); + if (coerced != null) { + obj.set("default", coerced); + changed = true; + } + } + for (Iterator it = obj.fieldNames(); it.hasNext();) { + if (coerceInPlace(obj.get(it.next()))) { + changed = true; + } + } + return changed; + } + + private static JsonNode coerceNumber(JsonNode value, String declaredType) { + switch (declaredType) { + case "float": + if (value.isFloat() || value.isDouble()) { + return null; + } + return new FloatNode(value.floatValue()); + case "double": + if (value.isDouble()) { + return null; + } + return new DoubleNode(value.doubleValue()); + case "int": + if (value.isInt()) { + return null; + } + return new IntNode(value.intValue()); + case "long": + if (value.isLong()) { + return null; + } + return new LongNode(value.longValue()); + default: + return null; + } + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java index db1fb3a870b..44dd72c1c56 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java @@ -44,6 +44,7 @@ import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.ControllerResponse; +import com.linkedin.venice.controllerapi.MultiSchemaResponse; import com.linkedin.venice.controllerapi.StoreResponse; import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; import com.linkedin.venice.exceptions.VeniceException; @@ -70,6 +71,7 @@ import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.manager.TopicManager; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; +import com.linkedin.venice.schema.AvroSchemaParseUtils; import com.linkedin.venice.stats.ClientType; import com.linkedin.venice.stats.VeniceMetricsRepository; import com.linkedin.venice.system.store.MetaStoreDataType; @@ -623,6 +625,47 @@ private void readFromStore(AvroGenericStoreClient client) client.get(Integer.toString(key)).get(); } + /** + * Variant of {@link #createAndPushStore} that uses a record-typed value schema (NameV1) instead of + * the default {@code "string"}. Necessary when a subsequent test step appends a record-typed v2 + * value schema — record vs. primitive-string aren't backward-compatible, so the schema-compat check + * rejects a record v2 on top of a string v1. + */ + private Properties createAndPushStoreWithNameRecordV1(String clusterName, String storeName) throws Exception { + File inputDir = getTempDataDirectory(); + String inputDirPath = "file:" + inputDir.getAbsolutePath(); + Properties props = + IntegrationTestPushUtils.defaultVPJProps(twoLayerMultiRegionMultiClusterWrapper, inputDirPath, storeName); + props.put(SEND_CONTROL_MESSAGES_DIRECTLY, true); + Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordV1Schema(inputDir, RECORD_COUNT); + String keySchemaStr = recordSchema.getField(DEFAULT_KEY_FIELD_PROP).schema().toString(); + String valueSchemaStr = recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema().toString(); + + UpdateStoreQueryParams updateStoreQueryParams = + new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA) + .setHybridRewindSeconds(TEST_TIMEOUT) + .setHybridOffsetLagThreshold(2L) + .setHybridStoreDiskQuotaEnabled(true) + .setLargestUsedRTVersionNumber(2) + .setRealTimeTopicName(Utils.composeRealTimeTopic(props.getProperty(VENICE_STORE_NAME_PROP), 1)) + .setCompressionStrategy(CompressionStrategy.ZSTD_WITH_DICT) + .setStorageNodeReadQuotaEnabled(true); + IntegrationTestPushUtils.createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, updateStoreQueryParams) + .close(); + + try (ControllerClient childControllerClient0 = new ControllerClient(clusterName, childControllerUrl0)) { + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + StoreResponse response = childControllerClient0.getStore(storeName); + Assert.assertNotNull(response.getStore()); + }); + } + + try (VenicePushJob job = new VenicePushJob("Test push job", props)) { + job.run(); + } + return props; + } + private void batchReadFromStore(AvroGenericStoreClient client) throws ExecutionException, InterruptedException { Set keys = new HashSet<>(); @@ -1023,6 +1066,135 @@ public void testAutoStoreMigration() throws Exception { } + /** + * Exercises the migration path against a legacy value schema with a numeric default that mismatches + * the declared field type ({@code {"type":"float","default":0}}). The legacy schema is injected + * directly into source's schema repos via {@code HelixReadWriteSchemaRepository.addValueSchema} + * (which uses {@code SchemaEntry}'s LOOSE parser, bypassing the controller's STRICT pre-check) — + * simulating a store registered before {@code validateNumericDefaultValueTypes} was enforced. + * + * After migration completes, asserts: + * - reads succeed against the dest cluster (D2 client redirects), and + * - dest's value schema v2 is the coerced ({@code 0} → {@code 0.0}) form that passes STRICT. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testStoreMigrationWithLegacyNumericDefaultSchema() throws Exception { + String storeName = Utils.getUniqueString("testMigrationLegacyDefault"); + // v1 must be a record schema so that the legacy v2 (record-with-extra-float-field) is backward- + // compatible with it. The default createAndPushStore uses "string" as the value schema, which is + // not compatible with a record-typed v2. + createAndPushStoreWithNameRecordV1(srcClusterName, storeName); + + // Legacy v2 schema: NameRecord V1's fields + an extra float field with int default. STRICT rejects + // {@code "default":0} on a float; SchemaEntry's LOOSE parser accepts it, mimicking the pre-strict + // legacy state. The schema is backward-compatible with v1 (new field has default). + String legacyValueSchemaStr = "{\"type\":\"record\",\"name\":\"nameRecord\",\"namespace\":\"example.avro\"," + + "\"fields\":[" + "{\"name\":\"firstName\",\"type\":\"string\",\"default\":\"\"}," + + "{\"name\":\"lastName\",\"type\":\"string\",\"default\":\"\"}," + + "{\"name\":\"score\",\"type\":\"float\",\"default\":0}" + "]}"; + + // Bypass controller-level STRICT validation by writing the schema through the underlying + // ReadWriteSchemaRepository on both source's parent and source's child admin instances. + // For parent: getVeniceAdmin() returns VeniceParentHelixAdmin; reach into its inner + // VeniceHelixAdmin to access the schema repo. For child: getVeniceHelixAdmin() works directly. + VeniceHelixAdmin srcParentAdmin = + ((com.linkedin.venice.controller.VeniceParentHelixAdmin) twoLayerMultiRegionMultiClusterWrapper + .getLeaderParentControllerWithRetries(srcClusterName) + .getVeniceAdmin()).getVeniceHelixAdmin(); + VeniceHelixAdmin srcChildAdmin = + multiClusterWrapper.getClusters().get(srcClusterName).getLeaderVeniceController().getVeniceHelixAdmin(); + int legacySchemaId = 2; + srcParentAdmin.getHelixVeniceClusterResources(srcClusterName) + .getSchemaRepository() + .addValueSchema(storeName, legacyValueSchemaStr, legacySchemaId); + srcChildAdmin.getHelixVeniceClusterResources(srcClusterName) + .getSchemaRepository() + .addValueSchema(storeName, legacyValueSchemaStr, legacySchemaId); + + // Sanity-check: source has the legacy schema with the int default (NOT coerced — source is not + // a migration destination, so normalize is a no-op there). + try (ControllerClient srcParentClient = new ControllerClient(srcClusterName, parentControllerUrl)) { + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + MultiSchemaResponse resp = srcParentClient.getAllValueSchema(storeName); + Assert.assertFalse(resp.isError(), "Source getAllValueSchema returned error: " + resp.getError()); + MultiSchemaResponse.Schema v2 = findById(resp, legacySchemaId); + assertNotNull(v2, "Source must have legacy v2 schema after injection"); + try { + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(v2.getSchemaStr()); + fail("Source's legacy schema is expected to fail STRICT parse, but it passed: " + v2.getSchemaStr()); + } catch (Exception expected) { + // expected + } + }); + } + + // Drive the migration end-to-end and verify the D2 client transparently redirects. + String srcD2ServiceName = multiClusterWrapper.getClusterToD2().get(srcClusterName); + String destD2ServiceName = multiClusterWrapper.getClusterToD2().get(destClusterName); + D2Client d2Client = + D2TestUtils.getAndStartD2Client(multiClusterWrapper.getClusters().get(srcClusterName).getZk().getAddress()); + ClientConfig clientConfig = + ClientConfig.defaultGenericClientConfig(storeName).setD2ServiceName(srcD2ServiceName).setD2Client(d2Client); + + try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient(clientConfig)) { + // Pre-migration: store reads succeed against the source cluster. + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> readFromStore(client)); + + StoreMigrationTestUtil.startMigration(parentControllerUrl, storeName, srcClusterName, destClusterName); + StoreMigrationTestUtil + .completeMigration(parentControllerUrl, storeName, srcClusterName, destClusterName, FABRIC0); + + // Read verification: after the discovery flips, the same client transparently routes to dest. + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, true, () -> { + readFromStore(client); + AbstractAvroStoreClient castClient = + (AbstractAvroStoreClient) ((StatTrackingStoreClient) client) + .getInnerStoreClient(); + Assert.assertTrue( + castClient.toString().contains(destD2ServiceName), + "Client did not pick up dest D2 service name after migration; toString=" + castClient); + }); + } + + // Schema verification on dest: v2 must be the coerced form ({@code 0} → {@code 0.0}) and pass STRICT. + try (ControllerClient destParentClient = new ControllerClient(destClusterName, parentControllerUrl); + ControllerClient destChildClient = new ControllerClient(destClusterName, childControllerUrl0)) { + // Parent. + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> { + MultiSchemaResponse resp = destParentClient.getAllValueSchema(storeName); + Assert.assertFalse(resp.isError(), "Dest parent getAllValueSchema returned error: " + resp.getError()); + MultiSchemaResponse.Schema v2 = findById(resp, legacySchemaId); + assertNotNull(v2, "Dest parent must have v2 schema after migration"); + // The whole point of normalize: dest's stored form passes STRICT. + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(v2.getSchemaStr()); + Assert.assertTrue( + v2.getSchemaStr().contains("0.0"), + "Dest parent v2 schema must have coerced default 0.0; got " + v2.getSchemaStr()); + }); + + // Child controller in dc-0 (the actual ingestion fabric). + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> { + MultiSchemaResponse resp = destChildClient.getAllValueSchema(storeName); + Assert.assertFalse(resp.isError(), "Dest child getAllValueSchema returned error: " + resp.getError()); + MultiSchemaResponse.Schema v2 = findById(resp, legacySchemaId); + assertNotNull(v2, "Dest child must have v2 schema after migration"); + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(v2.getSchemaStr()); + Assert.assertTrue( + v2.getSchemaStr().contains("0.0"), + "Dest child v2 schema must have coerced default 0.0; got " + v2.getSchemaStr()); + }); + } + } + + private static MultiSchemaResponse.Schema findById(MultiSchemaResponse resp, int id) { + for (MultiSchemaResponse.Schema s: resp.getSchemas()) { + if (s.getId() == id) { + return s; + } + } + return null; + } + private void verifyKillMessageInParticipantStore( VeniceClusterWrapper clusterWrapper, String topic, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 3a2835b338d..6ee88be79ba 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -1210,6 +1210,7 @@ public void createStore( HelixVeniceClusterResources clusterResources = getHelixVeniceClusterResources(clusterName); LOGGER.info("Start creating store {} in cluster {} with owner {}", storeName, clusterName, owner); try (AutoCloseableLock ignore = clusterResources.getClusterLockManager().createStoreWriteLock(storeName)) { + valueSchema = normalizeSchemaForMigration(clusterName, storeName, valueSchema); checkPreConditionForCreateStore(clusterName, storeName, keySchema, valueSchema, isSystemStore, true); VeniceControllerClusterConfig config = getHelixVeniceClusterResources(clusterName).getConfig(); Store newStore = new ZKStore( @@ -2410,6 +2411,50 @@ protected void checkPreConditionForCreateStore( new SchemaEntry(SchemaData.INVALID_VALUE_SCHEMA_ID, valueSchema); } + /** + * If {@code storeName} is migrating into {@code clusterName}, accept schemas that fail strict + * parse only because of {@code validateNumericDefaultValueTypes} (e.g. legacy + * {@code {"type":"float","default":0}}) by walking the JSON and coercing numeric defaults to the + * declared field type. The output is strict-parse-clean, which keeps downstream consumers that + * strict-parse (DaVinci's {@code SchemaUtils.annotateValueSchema}, VPJ, Samza producer) working. + * + * Re-strict-parses the coerced output as a defensive check so anything beyond the numeric-default + * tier (bad names, dangling content, union default not first branch) still fails loudly. + * + * For non-migration calls, and for migration calls whose input is already strict-clean, returns + * the input unchanged — so this can be wired into entry points idempotently. + * + * @return possibly-coerced schema string that is guaranteed to pass strict parsing. + */ + String normalizeSchemaForMigration(String clusterName, String storeName, String schemaStr) { + ZkStoreConfigAccessor accessor = getStoreConfigAccessor(clusterName); + if (!accessor.containsConfig(storeName)) { + return schemaStr; + } + StoreConfig cfg = accessor.getStoreConfig(storeName); + if (cfg == null || !clusterName.equals(cfg.getMigrationDestCluster())) { + return schemaStr; + } + // Migration context. If strict already passes, leave the string unchanged so we don't + // introduce gratuitous diffs against the source schema. + try { + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr); + return schemaStr; + } catch (Exception strictFailure) { + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(schemaStr); + // Defensive: anything LOOSE_NUMERICS would have been lenient about (union default not first + // branch, bad names, etc.) is outside the coercion scope and must still fail strict. + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + if (!coerced.equals(schemaStr)) { + LOGGER.info( + "Coerced numeric default(s) in value schema for store {} migrating into cluster {}.", + storeName, + clusterName); + } + return coerced; + } + } + void checkStoreGraveyardForRecreation(String clusterName, String storeName) { HelixVeniceClusterResources clusterResources = getHelixVeniceClusterResources(clusterName); @@ -7329,6 +7374,7 @@ public SchemaEntry addValueSchema( String valueSchemaStr, DirectionalSchemaCompatibilityType expectedCompatibilityType) { checkControllerLeadershipFor(clusterName); + valueSchemaStr = normalizeSchemaForMigration(clusterName, storeName, valueSchemaStr); ReadWriteSchemaRepository schemaRepository = getHelixVeniceClusterResources(clusterName).getSchemaRepository(); SchemaEntry schemaEntry = schemaRepository.addValueSchema(storeName, valueSchemaStr, expectedCompatibilityType); // For duplicates, addValueSchema returns DUPLICATE_VALUE_SCHEMA_CODE; look up the real id so callers @@ -7352,6 +7398,7 @@ public SchemaEntry addValueSchema( int schemaId, DirectionalSchemaCompatibilityType compatibilityType) { checkControllerLeadershipFor(clusterName); + valueSchemaStr = normalizeSchemaForMigration(clusterName, storeName, valueSchemaStr); ReadWriteSchemaRepository schemaRepository = getHelixVeniceClusterResources(clusterName).getSchemaRepository(); int newValueSchemaId = schemaRepository.preCheckValueSchemaAndGetNextAvailableId(storeName, valueSchemaStr, compatibilityType); @@ -7431,6 +7478,8 @@ public SchemaEntry addSupersetSchema( String supersetSchemaStr, int supersetSchemaId) { checkControllerLeadershipFor(clusterName); + valueSchema = normalizeSchemaForMigration(clusterName, storeName, valueSchema); + supersetSchemaStr = normalizeSchemaForMigration(clusterName, storeName, supersetSchemaStr); ReadWriteSchemaRepository schemaRepository = getHelixVeniceClusterResources(clusterName).getSchemaRepository(); final SchemaEntry existingSupersetSchemaEntry = schemaRepository.getValueSchema(storeName, supersetSchemaId); @@ -7474,6 +7523,7 @@ int checkPreConditionForAddValueSchemaAndGetNewSchemaId( String storeName, String valueSchemaStr, DirectionalSchemaCompatibilityType expectedCompatibilityType) { + valueSchemaStr = normalizeSchemaForMigration(clusterName, storeName, valueSchemaStr); AvroSchemaUtils.validateAvroSchemaStr(valueSchemaStr); AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr); validateValueSchemaUsingRandomGenerator(valueSchemaStr, clusterName, storeName); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 402c1a05836..a2f757b4302 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -1046,6 +1046,7 @@ public void createStore( Optional accessPermissions) { acquireAdminMessageLock(clusterName, storeName); try { + valueSchema = getVeniceHelixAdmin().normalizeSchemaForMigration(clusterName, storeName, valueSchema); getVeniceHelixAdmin() .checkPreConditionForCreateStore(clusterName, storeName, keySchema, valueSchema, isSystemStore, false); LOGGER.info("Adding store: {} to cluster: {}", storeName, clusterName); @@ -4112,6 +4113,7 @@ public SchemaEntry addValueSchema( DirectionalSchemaCompatibilityType expectedCompatibilityType) { acquireAdminMessageLock(clusterName, storeName); try { + newValueSchemaStr = getVeniceHelixAdmin().normalizeSchemaForMigration(clusterName, storeName, newValueSchemaStr); Schema newValueSchema = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(newValueSchemaStr); final Store store = getVeniceHelixAdmin().getStore(clusterName, storeName); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java index cbc06b01b4f..fb6bd04d2dd 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java @@ -97,6 +97,10 @@ public void setupInternalMocks() { internalAdmin = mock(VeniceHelixAdmin.class); when(internalAdmin.isHybrid((HybridStoreConfig) any())).thenCallRealMethod(); + // Default: normalizeSchemaForMigration is a pass-through. Tests exercising the migration + // coercion path can override to return a different string. + when(internalAdmin.normalizeSchemaForMigration(anyString(), anyString(), anyString())) + .thenAnswer(inv -> inv.getArgument(2, String.class)); doReturn(topicManager).when(internalAdmin).getTopicManager(); SchemaEntry mockEntry = new SchemaEntry(0, TEST_SCHEMA); doReturn(mockEntry).when(internalAdmin).getKeySchema(anyString(), anyString()); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java index 3a4853e7f6b..751b8ca06fc 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java @@ -476,4 +476,94 @@ public void testWriteEndOfPushThrowsForNonExistentStore() throws Exception { admin.writeEndOfPush("cluster", "missing_store", 1, false, null); } + + // ---- normalizeSchemaForMigration --------------------------------------------------------- + + private static final String CLEAN_VALUE_SCHEMA = + "{\"type\":\"record\",\"name\":\"Clean\",\"fields\":[" + "{\"name\":\"v\",\"type\":\"string\"}]}"; + + // STRICT rejects this (validateNumericDefaultValueTypes), LOOSE_NUMERICS accepts and coerces. + private static final String LEGACY_NUMERIC_DEFAULT_SCHEMA = "{\"type\":\"record\",\"name\":\"Scores\",\"fields\":[" + + "{\"name\":\"score\",\"type\":\"float\",\"default\":0}]}"; + + // Union with default whose declared type matches the second branch — STRICT failure that + // LOOSE_NUMERICS does NOT relax (it's outside the numeric-default tier). + private static final String NON_NUMERIC_STRICT_VIOLATION_SCHEMA = "{\"type\":\"record\",\"name\":\"BadUnion\"," + + "\"fields\":[{\"name\":\"f\",\"type\":[\"int\",\"null\"],\"default\":null}]}"; + + private static VeniceHelixAdmin newNormalizeMock(String clusterName, String storeName, StoreConfig storeConfig) { + VeniceHelixAdmin admin = mock(VeniceHelixAdmin.class); + ZkStoreConfigAccessor accessor = mock(ZkStoreConfigAccessor.class); + doReturn(storeConfig != null).when(accessor).containsConfig(storeName); + doReturn(storeConfig).when(accessor).getStoreConfig(storeName); + doReturn(accessor).when(admin).getStoreConfigAccessor(clusterName); + doCallRealMethod().when(admin).normalizeSchemaForMigration(anyString(), anyString(), anyString()); + return admin; + } + + @Test + public void testNormalizeReturnsInputWhenStoreConfigAbsent() { + String cluster = "venice-dest"; + String store = "legacy_store"; + VeniceHelixAdmin admin = newNormalizeMock(cluster, store, null); + Assert.assertSame( + admin.normalizeSchemaForMigration(cluster, store, LEGACY_NUMERIC_DEFAULT_SCHEMA), + LEGACY_NUMERIC_DEFAULT_SCHEMA, + "Non-migration context (no storeConfig) must return input by identity to avoid parse cost"); + } + + @Test + public void testNormalizeReturnsInputWhenMigrationDestIsAnotherCluster() { + String cluster = "venice-dest"; + String store = "legacy_store"; + StoreConfig cfg = mock(StoreConfig.class); + doReturn("some_other_dest").when(cfg).getMigrationDestCluster(); + + VeniceHelixAdmin admin = newNormalizeMock(cluster, store, cfg); + Assert.assertSame( + admin.normalizeSchemaForMigration(cluster, store, LEGACY_NUMERIC_DEFAULT_SCHEMA), + LEGACY_NUMERIC_DEFAULT_SCHEMA, + "When migrationDestCluster does not match this cluster, input must pass through unchanged"); + } + + @Test + public void testNormalizeReturnsInputWhenStrictAlreadyPasses() { + String cluster = "venice-dest"; + String store = "migrating_store"; + StoreConfig cfg = mock(StoreConfig.class); + doReturn(cluster).when(cfg).getMigrationDestCluster(); + + VeniceHelixAdmin admin = newNormalizeMock(cluster, store, cfg); + Assert.assertSame( + admin.normalizeSchemaForMigration(cluster, store, CLEAN_VALUE_SCHEMA), + CLEAN_VALUE_SCHEMA, + "Strict-clean input under migration context must not be reserialized"); + } + + @Test + public void testNormalizeReserializesLegacyNumericDefault() { + String cluster = "venice-dest"; + String store = "migrating_store"; + StoreConfig cfg = mock(StoreConfig.class); + doReturn(cluster).when(cfg).getMigrationDestCluster(); + + VeniceHelixAdmin admin = newNormalizeMock(cluster, store, cfg); + String normalized = admin.normalizeSchemaForMigration(cluster, store, LEGACY_NUMERIC_DEFAULT_SCHEMA); + + Assert.assertNotEquals(normalized, LEGACY_NUMERIC_DEFAULT_SCHEMA, "Legacy schema must be reserialized"); + // The whole point: output must be strict-parse-clean so downstream consumers don't trip. + com.linkedin.venice.schema.AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(normalized); + } + + @Test(expectedExceptions = Exception.class) + public void testNormalizeRejectsNonNumericStrictViolation() { + String cluster = "venice-dest"; + String store = "migrating_store"; + StoreConfig cfg = mock(StoreConfig.class); + doReturn(cluster).when(cfg).getMigrationDestCluster(); + + VeniceHelixAdmin admin = newNormalizeMock(cluster, store, cfg); + // Migration context, but the violation is outside the numeric-default tier — must propagate. + admin.normalizeSchemaForMigration(cluster, store, NON_NUMERIC_STRICT_VIOLATION_SCHEMA); + } } From 05bca130033935981d9aaed866e5e95c36230e9c Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Mon, 18 May 2026 11:23:55 -0700 Subject: [PATCH 2/6] Fix spotbugs issue --- .../linkedin/venice/endToEnd/TestStoreMigration.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java index 44dd72c1c56..0993e2c499f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java @@ -1119,12 +1119,11 @@ public void testStoreMigrationWithLegacyNumericDefaultSchema() throws Exception Assert.assertFalse(resp.isError(), "Source getAllValueSchema returned error: " + resp.getError()); MultiSchemaResponse.Schema v2 = findById(resp, legacySchemaId); assertNotNull(v2, "Source must have legacy v2 schema after injection"); - try { - AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(v2.getSchemaStr()); - fail("Source's legacy schema is expected to fail STRICT parse, but it passed: " + v2.getSchemaStr()); - } catch (Exception expected) { - // expected - } + // Source must still hold the un-coerced legacy form — STRICT parse must fail. normalize + // is gated on this cluster being the migration *destination*, so it's a no-op on source. + Assert.assertThrows( + Exception.class, + () -> AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(v2.getSchemaStr())); }); } From fe09cad96d3d3b8e12ba791f763d921a8b38f78b Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Mon, 18 May 2026 11:38:02 -0700 Subject: [PATCH 3/6] [schema] Add branch-coverage tests for AvroSchemaParseUtils MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The existing tests for parseSchemaFromJSONLooseNumericValidation and coerceNumericDefaultsToFieldType live in venice-push-job; diff coverage on venice-client-common (where the class lives) only sees branches exercised by tests in the same module. Adds 16 focused tests covering the strict/loose/loose-numeric parsers, the parseSchemaFromJSON wrapper with both extendedSchemaValidityCheckEnabled values, and every branch of the JSON walker (each numeric tier, nested record recursion, non-textual type passthrough, identity short-circuit for clean input, IOException → VeniceException wrap). Diff coverage on the changed lines: 76.92% branches (45% required). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../schema/TestAvroSchemaParseUtils.java | 162 ++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSchemaParseUtils.java diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSchemaParseUtils.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSchemaParseUtils.java new file mode 100644 index 00000000000..440f5eea0fb --- /dev/null +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSchemaParseUtils.java @@ -0,0 +1,162 @@ +package com.linkedin.venice.schema; + +import com.linkedin.venice.exceptions.VeniceException; +import org.apache.avro.Schema; +import org.testng.Assert; +import org.testng.annotations.Test; + + +/** + * Branch-coverage tests for {@link AvroSchemaParseUtils}. + * + * Lives in {@code venice-client-common} (alongside the class under test) so coverage tooling on + * this module sees the exercised branches. There is a similar test file in {@code venice-push-job} + * that covers a few overlapping cases but doesn't count toward this module's diff coverage. + */ +public class TestAvroSchemaParseUtils { + // STRICT-clean record schema. + private static final String CLEAN_RECORD_SCHEMA = + "{\"type\":\"record\",\"name\":\"Clean\",\"fields\":[" + "{\"name\":\"v\",\"type\":\"string\"}]}"; + + // Legacy: int default on a float field — fails STRICT (numeric tier), accepted by LOOSE_NUMERICS. + private static final String LEGACY_INT_ON_FLOAT = "{\"type\":\"record\",\"name\":\"Scores\",\"fields\":[" + + "{\"name\":\"score\",\"type\":\"float\",\"default\":0}]}"; + + // Union default whose JSON tier matches the second branch, not the first — STRICT failure outside + // the numeric-default tier. LOOSE_NUMERICS must still reject this. + private static final String UNION_DEFAULT_NOT_FIRST_BRANCH = "{\"type\":\"record\",\"name\":\"BadUnion\",\"fields\":[" + + "{\"name\":\"f\",\"type\":[\"int\",\"null\"],\"default\":null}]}"; + + // Schema mixing all four numeric tiers with mismatched defaults — exercises every case in the + // coerceNumber switch. + private static final String ALL_NUMERIC_TIERS = "{\"type\":\"record\",\"name\":\"AllTiers\",\"fields\":[" + + "{\"name\":\"f\",\"type\":\"float\",\"default\":0}," + "{\"name\":\"d\",\"type\":\"double\",\"default\":1}," + + "{\"name\":\"l\",\"type\":\"long\",\"default\":2.0}," + "{\"name\":\"i\",\"type\":\"int\",\"default\":3.0}]}"; + + // ----- parseSchemaFromJSONStrictValidation ---------------------------------------------------- + + @Test + public void strictAcceptsCleanSchema() { + Schema s = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(CLEAN_RECORD_SCHEMA); + Assert.assertEquals(s.getType(), Schema.Type.RECORD); + } + + @Test + public void strictRejectsLegacyIntOnFloat() { + Assert.assertThrows( + Exception.class, + () -> AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(LEGACY_INT_ON_FLOAT)); + } + + // ----- parseSchemaFromJSONLooseNumericValidation ---------------------------------------------- + + @Test + public void looseNumericAcceptsLegacyIntOnFloat() { + Schema s = AvroSchemaParseUtils.parseSchemaFromJSONLooseNumericValidation(LEGACY_INT_ON_FLOAT); + Assert.assertNotNull(s); + } + + @Test + public void looseNumericStillRejectsNonNumericStrictViolations() { + // The union-default-not-first-branch case is outside the numeric-default tier this preset + // relaxes — must still throw. + Assert.assertThrows( + Exception.class, + () -> AvroSchemaParseUtils.parseSchemaFromJSONLooseNumericValidation(UNION_DEFAULT_NOT_FIRST_BRANCH)); + } + + // ----- parseSchemaFromJSONLooseValidation ----------------------------------------------------- + + @Test + public void looseAcceptsLegacyIntOnFloat() { + Schema s = AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(LEGACY_INT_ON_FLOAT); + Assert.assertNotNull(s); + } + + // ----- parseSchemaFromJSON(str, extendedSchemaValidityCheckEnabled) --------------------------- + + @Test + public void parseSchemaFromJSONStrictPassesThroughForCleanSchema() { + // Clean schema → strict parser succeeds, no fallback triggered. + Schema s = AvroSchemaParseUtils.parseSchemaFromJSON(CLEAN_RECORD_SCHEMA, true); + Assert.assertEquals(s.getType(), Schema.Type.RECORD); + } + + @Test(expectedExceptions = VeniceException.class) + public void parseSchemaFromJSONThrowsWhenExtendedValidationOnAndStrictFails() { + AvroSchemaParseUtils.parseSchemaFromJSON(LEGACY_INT_ON_FLOAT, true); + } + + @Test + public void parseSchemaFromJSONFallsBackToLooseWhenExtendedValidationOff() { + // Strict fails, but extendedSchemaValidityCheckEnabled=false so we fall back to LOOSE and + // return a parsed schema rather than throwing. + Schema s = AvroSchemaParseUtils.parseSchemaFromJSON(LEGACY_INT_ON_FLOAT, false); + Assert.assertNotNull(s); + } + + // ----- coerceNumericDefaultsToFieldType ------------------------------------------------------- + + @Test + public void coerceRewritesIntDefaultOnFloatField() { + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(LEGACY_INT_ON_FLOAT); + Assert.assertNotEquals(coerced, LEGACY_INT_ON_FLOAT, "Walker must rewrite the legacy default"); + // Round-trip: the rewritten output must pass STRICT. + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + } + + @Test + public void coerceHandlesAllFourNumericTiers() { + // Walks each branch of coerceNumber's switch (float / double / int / long with mismatched JSON tier). + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(ALL_NUMERIC_TIERS); + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + } + + @Test + public void coerceReturnsInputByIdentityWhenCleanSchema() { + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(CLEAN_RECORD_SCHEMA); + Assert.assertSame(coerced, CLEAN_RECORD_SCHEMA, "Clean schema must short-circuit the walker"); + } + + @Test + public void coerceLeavesNonNumericStrictViolationsAlone() { + // The walker is a numeric-default fixer only. A non-numeric STRICT violation must still trip + // strict parse on the walker's output. + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(UNION_DEFAULT_NOT_FIRST_BRANCH); + Assert.assertThrows(Exception.class, () -> AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced)); + } + + @Test + public void coerceRecursesIntoNestedRecord() { + // Nested record with a legacy numeric default inside — exercises the recursion path through + // a non-array, non-primitive {@code type} value. + String nested = "{\"type\":\"record\",\"name\":\"Outer\",\"fields\":[" + + "{\"name\":\"inner\",\"type\":{\"type\":\"record\",\"name\":\"Inner\",\"fields\":[" + + "{\"name\":\"score\",\"type\":\"float\",\"default\":0}]}}]}"; + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(nested); + Assert.assertNotEquals(coerced, nested); + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + } + + @Test + public void coerceLeavesNonNumericDefaultsUnchanged() { + // String-typed field with a string default — not a coercion target. + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(CLEAN_RECORD_SCHEMA); + Assert.assertSame(coerced, CLEAN_RECORD_SCHEMA); + } + + @Test + public void coerceLeavesNonTextualTypeUnchanged() { + // When the {@code type} is itself an object (e.g. a nested record definition) rather than a + // textual primitive name, the field-spec check skips and only recursion fires. + String schema = "{\"type\":\"record\",\"name\":\"Outer\",\"fields\":[" + + "{\"name\":\"items\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}"; + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(schema); + Assert.assertSame(coerced, schema); + } + + @Test(expectedExceptions = VeniceException.class) + public void coerceWrapsInvalidJsonAsVeniceException() { + AvroSchemaParseUtils.coerceNumericDefaultsToFieldType("{not valid json"); + } +} From 124be1244d301b79ae5cc286ee0eb3ab5a168b03 Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Mon, 18 May 2026 12:05:17 -0700 Subject: [PATCH 4/6] Fix spotbugs ES_COMPARING_STRINGS_WITH_EQ in AvroSchemaParseUtils tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Assert.assertSame on String operands compiles to ==, which trips ES_COMPARING_STRINGS_WITH_EQ. Switch to Assert.assertEquals — the observable behavior (walker returns the input unchanged for clean or non-textual-type schemas) is still verified. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../linkedin/venice/schema/TestAvroSchemaParseUtils.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSchemaParseUtils.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSchemaParseUtils.java index 440f5eea0fb..f46eb4e8ef2 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSchemaParseUtils.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSchemaParseUtils.java @@ -113,9 +113,9 @@ public void coerceHandlesAllFourNumericTiers() { } @Test - public void coerceReturnsInputByIdentityWhenCleanSchema() { + public void coerceReturnsInputUnchangedWhenCleanSchema() { String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(CLEAN_RECORD_SCHEMA); - Assert.assertSame(coerced, CLEAN_RECORD_SCHEMA, "Clean schema must short-circuit the walker"); + Assert.assertEquals(coerced, CLEAN_RECORD_SCHEMA, "Clean schema must short-circuit the walker"); } @Test @@ -142,7 +142,7 @@ public void coerceRecursesIntoNestedRecord() { public void coerceLeavesNonNumericDefaultsUnchanged() { // String-typed field with a string default — not a coercion target. String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(CLEAN_RECORD_SCHEMA); - Assert.assertSame(coerced, CLEAN_RECORD_SCHEMA); + Assert.assertEquals(coerced, CLEAN_RECORD_SCHEMA); } @Test @@ -152,7 +152,7 @@ public void coerceLeavesNonTextualTypeUnchanged() { String schema = "{\"type\":\"record\",\"name\":\"Outer\",\"fields\":[" + "{\"name\":\"items\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}"; String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(schema); - Assert.assertSame(coerced, schema); + Assert.assertEquals(coerced, schema); } @Test(expectedExceptions = VeniceException.class) From e3ff7303ffb709915e021af0b16b8b8b16f8374e Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Wed, 20 May 2026 11:12:53 -0700 Subject: [PATCH 5/6] [schema] Pin DoubleNode-on-float strict-parse behavior MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Jackson parses any JSON decimal literal (e.g. 0.0) into a DoubleNode regardless of the declared field type. The "float" branch of coerceNumber short-circuits on value.isDouble(), so legacy schemas written as {"type":"float","default":0.0} are NOT rewritten by the walker — they pass through unchanged. Empirically avro-util1's STRICT parser accepts DoubleNode-on-float (the numeric-tier check is asymmetric: rejects IntNode-on-float, accepts DoubleNode-on-float), so the output is still strict-clean. Adds a regression-pinning test for this combination. If avro-util1 ever tightens the float numeric-tier check to be symmetric, this test will fail and the "float" branch will need to coerce DoubleNode -> FloatNode. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../venice/schema/TestAvroSchemaParseUtils.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSchemaParseUtils.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSchemaParseUtils.java index f46eb4e8ef2..6f6fbae7ff2 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSchemaParseUtils.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSchemaParseUtils.java @@ -97,6 +97,21 @@ public void parseSchemaFromJSONFallsBackToLooseWhenExtendedValidationOff() { // ----- coerceNumericDefaultsToFieldType ------------------------------------------------------- + @Test + public void doubleDefaultOnFloatFieldRoundTripsCleanly() { + // Jackson parses any JSON decimal literal (e.g. {@code 0.0}) as a DoubleNode regardless of the + // declared field type. The {@code "float"} branch of {@code coerceNumber} short-circuits on + // {@code isDouble()} — i.e. it does NOT rewrite DoubleNode-on-float to FloatNode. This test + // pins down that empirically: avro-util1's STRICT parser accepts DoubleNode-on-float, so the + // walker's short-circuit produces a strict-clean output. If avro-util1 ever tightens the float + // numeric-tier check to be symmetric (rejecting double-typed defaults on float fields), this + // test will fail and the {@code "float"} branch will need to coerce DoubleNode -> FloatNode. + String doubleOnFloat = "{\"type\":\"record\",\"name\":\"Scores\",\"fields\":[" + + "{\"name\":\"score\",\"type\":\"float\",\"default\":0.0}]}"; + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(doubleOnFloat); + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + } + @Test public void coerceRewritesIntDefaultOnFloatField() { String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(LEGACY_INT_ON_FLOAT); From 9c88232f43b97c7a93bf16028221c725dcf7372b Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Wed, 20 May 2026 11:49:12 -0700 Subject: [PATCH 6/6] [controller] Preserve original strict failure during migration coercion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the initial strict parse fails inside normalizeSchemaForMigration, log strictFailure at INFO before attempting the LOOSE_NUMERICS-based coercion, and on the post-coercion strict re-check attach strictFailure as a suppressed exception of whatever the second parse throws. For non-numeric violations (union default not first branch, bad names, dangling content) the post-coercion strict parse still fails — and without chaining, the operator only sees that second exception and has no idea what was wrong with the source schema. The suppressed entry puts the original message into the same stack trace. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../venice/controller/VeniceHelixAdmin.java | 15 +++++++++++++-- .../TestVeniceHelixAdminWithoutCluster.java | 13 +++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 6ee88be79ba..1cdb1cedbea 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -2441,10 +2441,21 @@ String normalizeSchemaForMigration(String clusterName, String storeName, String AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr); return schemaStr; } catch (Exception strictFailure) { + LOGGER.info( + "Strict parse failed for store {} migrating into cluster {}; attempting numeric-default coercion.", + storeName, + clusterName, + strictFailure); String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(schemaStr); // Defensive: anything LOOSE_NUMERICS would have been lenient about (union default not first - // branch, bad names, etc.) is outside the coercion scope and must still fail strict. - AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + // branch, bad names, etc.) is outside the coercion scope and must still fail strict. When it + // does, surface the *original* strict failure too — it's the one the operator needs to see. + try { + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + } catch (Exception coercedFailure) { + coercedFailure.addSuppressed(strictFailure); + throw coercedFailure; + } if (!coerced.equals(schemaStr)) { LOGGER.info( "Coerced numeric default(s) in value schema for store {} migrating into cluster {}.", diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java index 751b8ca06fc..e8458017d83 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java @@ -555,7 +555,7 @@ public void testNormalizeReserializesLegacyNumericDefault() { com.linkedin.venice.schema.AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(normalized); } - @Test(expectedExceptions = Exception.class) + @Test public void testNormalizeRejectsNonNumericStrictViolation() { String cluster = "venice-dest"; String store = "migrating_store"; @@ -564,6 +564,15 @@ public void testNormalizeRejectsNonNumericStrictViolation() { VeniceHelixAdmin admin = newNormalizeMock(cluster, store, cfg); // Migration context, but the violation is outside the numeric-default tier — must propagate. - admin.normalizeSchemaForMigration(cluster, store, NON_NUMERIC_STRICT_VIOLATION_SCHEMA); + // The original strict failure must ride along as a suppressed exception so the operator can + // see what was actually wrong with the source schema, not just that post-coercion strict tripped. + try { + admin.normalizeSchemaForMigration(cluster, store, NON_NUMERIC_STRICT_VIOLATION_SCHEMA); + Assert.fail("Expected normalize to throw on non-numeric strict violation"); + } catch (Exception coercedFailure) { + Assert.assertTrue( + coercedFailure.getSuppressed().length >= 1, + "Original strict failure should be attached as a suppressed exception, but none was found"); + } } }