diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/utils/TestAvroSchemaParseUtils.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/utils/TestAvroSchemaParseUtils.java index 8766e3621ff..c6c11363012 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/utils/TestAvroSchemaParseUtils.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/utils/TestAvroSchemaParseUtils.java @@ -56,6 +56,71 @@ public void testParseSchemaSuccessfullyWhenExtendedValidationEnabled() { AvroSchemaParseUtils.parseSchemaFromJSON(SCHEMA_PASS_EXTENDED_VALIDATION, extendedSchemaValidityCheckEnabled); } + // Legacy-style schema with an int default on a float field — fails STRICT (numeric tier check), + // accepted by LOOSE_NUMERICS, and rewritten by coerceNumericDefaultsToFieldType. + private static final String SCHEMA_NUMERIC_DEFAULT_MISMATCH = + "{\n" + " \"type\": \"record\",\n" + " \"name\": \"Scores\",\n" + " \"fields\": [\n" + + " { \"name\": \"score\", \"type\": \"float\", \"default\": 0 }\n" + " ]\n" + "}"; + + // Union with default whose declared type matches the second branch, not the first — a STRICT + // failure that's outside the numeric-default tier. Confirms LOOSE_NUMERICS only opens the one door. + private static final String SCHEMA_UNION_DEFAULT_NOT_FIRST_BRANCH = + "{\n" + " \"type\": \"record\",\n" + " \"name\": \"BadUnionDefault\",\n" + " \"fields\": [\n" + + " { \"name\": \"f\", \"type\": [\"int\", \"null\"], \"default\": null }\n" + " ]\n" + "}"; + + @Test + public void testLooseNumericAcceptsIntDefaultOnFloatField() { + // STRICT rejects this — the historical migration foot-gun. + Assert.assertThrows( + Exception.class, + () -> AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(SCHEMA_NUMERIC_DEFAULT_MISMATCH)); + + // LOOSE_NUMERICS accepts it. + Schema parsed = AvroSchemaParseUtils.parseSchemaFromJSONLooseNumericValidation(SCHEMA_NUMERIC_DEFAULT_MISMATCH); + Assert.assertNotNull(parsed); + } + + @Test + public void testLooseNumericRejectsNonNumericStrictViolation() { + // The union-default-not-first-branch case is a STRICT failure that LOOSE_NUMERICS preserves — + // it's outside the numeric-default tier this preset relaxes. + Assert.assertThrows( + Exception.class, + () -> AvroSchemaParseUtils.parseSchemaFromJSONLooseNumericValidation(SCHEMA_UNION_DEFAULT_NOT_FIRST_BRANCH)); + } + + @Test + public void testCoerceNumericDefaultsRewritesIntDefaultOnFloatField() { + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(SCHEMA_NUMERIC_DEFAULT_MISMATCH); + Assert.assertNotEquals(coerced, SCHEMA_NUMERIC_DEFAULT_MISMATCH, "Legacy schema must be rewritten"); + // Output must be strict-parse-clean — that's the whole point. + Schema strictParsed = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + Assert.assertNotNull(strictParsed); + } + + @Test + public void testCoerceNumericDefaultsIsNoOpForCleanSchema() { + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(SCHEMA_PASS_EXTENDED_VALIDATION); + Assert.assertSame(coerced, SCHEMA_PASS_EXTENDED_VALIDATION, "Clean schema must be returned by identity"); + } + + @Test + public void testCoerceNumericDefaultsDoesNotFixNonNumericIssues() { + // The walker only fixes numeric default mismatches. Other STRICT violations must remain. + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(SCHEMA_UNION_DEFAULT_NOT_FIRST_BRANCH); + Assert.assertThrows(Exception.class, () -> AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced)); + } + + @Test + public void testCoerceNumericDefaultsHandlesAllNumericTiers() { + String schema = "{\"type\":\"record\",\"name\":\"AllTiers\",\"fields\":[" + + "{\"name\":\"f\",\"type\":\"float\",\"default\":0}," + "{\"name\":\"d\",\"type\":\"double\",\"default\":1}," + + "{\"name\":\"l\",\"type\":\"long\",\"default\":2.0}," + "{\"name\":\"i\",\"type\":\"int\",\"default\":3.0}" + + "]}"; + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(schema); + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + } + @Test public void testParseSchemaStrWithDifferentConfigurations() { String schemaStr = "{" + " \"doc\": \"Value in the store\"," + " \"fields\": [" + " {\n" diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/AvroSchemaParseUtils.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/AvroSchemaParseUtils.java index 21220fa173c..8e1285fc853 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/AvroSchemaParseUtils.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/AvroSchemaParseUtils.java @@ -1,8 +1,18 @@ package com.linkedin.venice.schema; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.DoubleNode; +import com.fasterxml.jackson.databind.node.FloatNode; +import com.fasterxml.jackson.databind.node.IntNode; +import com.fasterxml.jackson.databind.node.LongNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.avroutil1.compatibility.SchemaParseConfiguration; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.utils.ObjectMapperFactory; +import java.io.IOException; +import java.util.Iterator; import org.apache.avro.Schema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,7 +48,109 @@ public static Schema parseSchemaFromJSONStrictValidation(String jsonSchema) { return AvroCompatibilityHelper.parse(jsonSchema, SchemaParseConfiguration.STRICT, null).getMainSchema(); } + /** + * Parses with {@link SchemaParseConfiguration#LOOSE_NUMERICS}, which differs from STRICT only by + * disabling {@code validateNumericDefaultValueTypes}. Numeric defaults whose JSON tier doesn't + * match the field's declared numeric type (e.g. {@code "default": 0} for a {@code float} field) + * are accepted at parse time. All other strict checks (name validation, + * default-actually-matches-type, dangling-content) still apply. + * + * Note: Avro retains the original {@code JsonNode} as the default — it does NOT coerce the in-memory + * default to the declared type. Use {@link #coerceNumericDefaultsToFieldType(String)} if you need a + * string form that subsequently passes STRICT. + */ + public static Schema parseSchemaFromJSONLooseNumericValidation(String jsonSchema) { + return AvroCompatibilityHelper.parse(jsonSchema, SchemaParseConfiguration.LOOSE_NUMERICS, null).getMainSchema(); + } + public static Schema parseSchemaFromJSONLooseValidation(String jsonSchema) { return AvroCompatibilityHelper.parse(jsonSchema, SchemaParseConfiguration.LOOSE, null).getMainSchema(); } + + /** + * Walks the schema JSON and coerces numeric default values to the declared primitive numeric field + * type. For example {@code {"name":"score","type":"float","default":0}} becomes + * {@code {"name":"score","type":"float","default":0.0}} — making the schema strict-parse-clean + * without changing its semantic meaning (Avro fingerprint / parsing canonical form is unaffected + * since defaults are not part of the canonical form). + * + * Only acts on field specs whose {@code type} is a textual primitive numeric name and whose + * {@code default} is a numeric JSON value of a different tier. Does not touch unions, complex + * types, non-numeric defaults, or fields whose default already matches the declared type. + * + * Returns the input unchanged if no coercion was needed. + * + * Intended for use during store migration to rewrite legacy schemas registered before + * {@code validateNumericDefaultValueTypes} was enforced. + */ + public static String coerceNumericDefaultsToFieldType(String jsonSchema) { + try { + ObjectMapper mapper = ObjectMapperFactory.getInstance(); + JsonNode root = mapper.readTree(jsonSchema); + if (coerceInPlace(root)) { + return mapper.writeValueAsString(root); + } + return jsonSchema; + } catch (IOException e) { + throw new VeniceException("Failed to parse JSON for numeric default coercion: " + jsonSchema, e); + } + } + + private static boolean coerceInPlace(JsonNode node) { + boolean changed = false; + if (node.isArray()) { + for (JsonNode child: node) { + if (coerceInPlace(child)) { + changed = true; + } + } + return changed; + } + if (!node.isObject()) { + return false; + } + ObjectNode obj = (ObjectNode) node; + JsonNode typeNode = obj.get("type"); + JsonNode defaultNode = obj.get("default"); + if (typeNode != null && typeNode.isTextual() && defaultNode != null && defaultNode.isNumber()) { + JsonNode coerced = coerceNumber(defaultNode, typeNode.asText()); + if (coerced != null) { + obj.set("default", coerced); + changed = true; + } + } + for (Iterator it = obj.fieldNames(); it.hasNext();) { + if (coerceInPlace(obj.get(it.next()))) { + changed = true; + } + } + return changed; + } + + private static JsonNode coerceNumber(JsonNode value, String declaredType) { + switch (declaredType) { + case "float": + if (value.isFloat() || value.isDouble()) { + return null; + } + return new FloatNode(value.floatValue()); + case "double": + if (value.isDouble()) { + return null; + } + return new DoubleNode(value.doubleValue()); + case "int": + if (value.isInt()) { + return null; + } + return new IntNode(value.intValue()); + case "long": + if (value.isLong()) { + return null; + } + return new LongNode(value.longValue()); + default: + return null; + } + } } diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSchemaParseUtils.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSchemaParseUtils.java new file mode 100644 index 00000000000..6f6fbae7ff2 --- /dev/null +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSchemaParseUtils.java @@ -0,0 +1,177 @@ +package com.linkedin.venice.schema; + +import com.linkedin.venice.exceptions.VeniceException; +import org.apache.avro.Schema; +import org.testng.Assert; +import org.testng.annotations.Test; + + +/** + * Branch-coverage tests for {@link AvroSchemaParseUtils}. + * + * Lives in {@code venice-client-common} (alongside the class under test) so coverage tooling on + * this module sees the exercised branches. There is a similar test file in {@code venice-push-job} + * that covers a few overlapping cases but doesn't count toward this module's diff coverage. + */ +public class TestAvroSchemaParseUtils { + // STRICT-clean record schema. + private static final String CLEAN_RECORD_SCHEMA = + "{\"type\":\"record\",\"name\":\"Clean\",\"fields\":[" + "{\"name\":\"v\",\"type\":\"string\"}]}"; + + // Legacy: int default on a float field — fails STRICT (numeric tier), accepted by LOOSE_NUMERICS. + private static final String LEGACY_INT_ON_FLOAT = "{\"type\":\"record\",\"name\":\"Scores\",\"fields\":[" + + "{\"name\":\"score\",\"type\":\"float\",\"default\":0}]}"; + + // Union default whose JSON tier matches the second branch, not the first — STRICT failure outside + // the numeric-default tier. LOOSE_NUMERICS must still reject this. + private static final String UNION_DEFAULT_NOT_FIRST_BRANCH = "{\"type\":\"record\",\"name\":\"BadUnion\",\"fields\":[" + + "{\"name\":\"f\",\"type\":[\"int\",\"null\"],\"default\":null}]}"; + + // Schema mixing all four numeric tiers with mismatched defaults — exercises every case in the + // coerceNumber switch. + private static final String ALL_NUMERIC_TIERS = "{\"type\":\"record\",\"name\":\"AllTiers\",\"fields\":[" + + "{\"name\":\"f\",\"type\":\"float\",\"default\":0}," + "{\"name\":\"d\",\"type\":\"double\",\"default\":1}," + + "{\"name\":\"l\",\"type\":\"long\",\"default\":2.0}," + "{\"name\":\"i\",\"type\":\"int\",\"default\":3.0}]}"; + + // ----- parseSchemaFromJSONStrictValidation ---------------------------------------------------- + + @Test + public void strictAcceptsCleanSchema() { + Schema s = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(CLEAN_RECORD_SCHEMA); + Assert.assertEquals(s.getType(), Schema.Type.RECORD); + } + + @Test + public void strictRejectsLegacyIntOnFloat() { + Assert.assertThrows( + Exception.class, + () -> AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(LEGACY_INT_ON_FLOAT)); + } + + // ----- parseSchemaFromJSONLooseNumericValidation ---------------------------------------------- + + @Test + public void looseNumericAcceptsLegacyIntOnFloat() { + Schema s = AvroSchemaParseUtils.parseSchemaFromJSONLooseNumericValidation(LEGACY_INT_ON_FLOAT); + Assert.assertNotNull(s); + } + + @Test + public void looseNumericStillRejectsNonNumericStrictViolations() { + // The union-default-not-first-branch case is outside the numeric-default tier this preset + // relaxes — must still throw. + Assert.assertThrows( + Exception.class, + () -> AvroSchemaParseUtils.parseSchemaFromJSONLooseNumericValidation(UNION_DEFAULT_NOT_FIRST_BRANCH)); + } + + // ----- parseSchemaFromJSONLooseValidation ----------------------------------------------------- + + @Test + public void looseAcceptsLegacyIntOnFloat() { + Schema s = AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(LEGACY_INT_ON_FLOAT); + Assert.assertNotNull(s); + } + + // ----- parseSchemaFromJSON(str, extendedSchemaValidityCheckEnabled) --------------------------- + + @Test + public void parseSchemaFromJSONStrictPassesThroughForCleanSchema() { + // Clean schema → strict parser succeeds, no fallback triggered. + Schema s = AvroSchemaParseUtils.parseSchemaFromJSON(CLEAN_RECORD_SCHEMA, true); + Assert.assertEquals(s.getType(), Schema.Type.RECORD); + } + + @Test(expectedExceptions = VeniceException.class) + public void parseSchemaFromJSONThrowsWhenExtendedValidationOnAndStrictFails() { + AvroSchemaParseUtils.parseSchemaFromJSON(LEGACY_INT_ON_FLOAT, true); + } + + @Test + public void parseSchemaFromJSONFallsBackToLooseWhenExtendedValidationOff() { + // Strict fails, but extendedSchemaValidityCheckEnabled=false so we fall back to LOOSE and + // return a parsed schema rather than throwing. + Schema s = AvroSchemaParseUtils.parseSchemaFromJSON(LEGACY_INT_ON_FLOAT, false); + Assert.assertNotNull(s); + } + + // ----- coerceNumericDefaultsToFieldType ------------------------------------------------------- + + @Test + public void doubleDefaultOnFloatFieldRoundTripsCleanly() { + // Jackson parses any JSON decimal literal (e.g. {@code 0.0}) as a DoubleNode regardless of the + // declared field type. The {@code "float"} branch of {@code coerceNumber} short-circuits on + // {@code isDouble()} — i.e. it does NOT rewrite DoubleNode-on-float to FloatNode. This test + // pins down that empirically: avro-util1's STRICT parser accepts DoubleNode-on-float, so the + // walker's short-circuit produces a strict-clean output. If avro-util1 ever tightens the float + // numeric-tier check to be symmetric (rejecting double-typed defaults on float fields), this + // test will fail and the {@code "float"} branch will need to coerce DoubleNode -> FloatNode. + String doubleOnFloat = "{\"type\":\"record\",\"name\":\"Scores\",\"fields\":[" + + "{\"name\":\"score\",\"type\":\"float\",\"default\":0.0}]}"; + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(doubleOnFloat); + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + } + + @Test + public void coerceRewritesIntDefaultOnFloatField() { + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(LEGACY_INT_ON_FLOAT); + Assert.assertNotEquals(coerced, LEGACY_INT_ON_FLOAT, "Walker must rewrite the legacy default"); + // Round-trip: the rewritten output must pass STRICT. + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + } + + @Test + public void coerceHandlesAllFourNumericTiers() { + // Walks each branch of coerceNumber's switch (float / double / int / long with mismatched JSON tier). + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(ALL_NUMERIC_TIERS); + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + } + + @Test + public void coerceReturnsInputUnchangedWhenCleanSchema() { + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(CLEAN_RECORD_SCHEMA); + Assert.assertEquals(coerced, CLEAN_RECORD_SCHEMA, "Clean schema must short-circuit the walker"); + } + + @Test + public void coerceLeavesNonNumericStrictViolationsAlone() { + // The walker is a numeric-default fixer only. A non-numeric STRICT violation must still trip + // strict parse on the walker's output. + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(UNION_DEFAULT_NOT_FIRST_BRANCH); + Assert.assertThrows(Exception.class, () -> AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced)); + } + + @Test + public void coerceRecursesIntoNestedRecord() { + // Nested record with a legacy numeric default inside — exercises the recursion path through + // a non-array, non-primitive {@code type} value. + String nested = "{\"type\":\"record\",\"name\":\"Outer\",\"fields\":[" + + "{\"name\":\"inner\",\"type\":{\"type\":\"record\",\"name\":\"Inner\",\"fields\":[" + + "{\"name\":\"score\",\"type\":\"float\",\"default\":0}]}}]}"; + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(nested); + Assert.assertNotEquals(coerced, nested); + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + } + + @Test + public void coerceLeavesNonNumericDefaultsUnchanged() { + // String-typed field with a string default — not a coercion target. + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(CLEAN_RECORD_SCHEMA); + Assert.assertEquals(coerced, CLEAN_RECORD_SCHEMA); + } + + @Test + public void coerceLeavesNonTextualTypeUnchanged() { + // When the {@code type} is itself an object (e.g. a nested record definition) rather than a + // textual primitive name, the field-spec check skips and only recursion fires. + String schema = "{\"type\":\"record\",\"name\":\"Outer\",\"fields\":[" + + "{\"name\":\"items\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}"; + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(schema); + Assert.assertEquals(coerced, schema); + } + + @Test(expectedExceptions = VeniceException.class) + public void coerceWrapsInvalidJsonAsVeniceException() { + AvroSchemaParseUtils.coerceNumericDefaultsToFieldType("{not valid json"); + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java index db1fb3a870b..0993e2c499f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java @@ -44,6 +44,7 @@ import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.ControllerResponse; +import com.linkedin.venice.controllerapi.MultiSchemaResponse; import com.linkedin.venice.controllerapi.StoreResponse; import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; import com.linkedin.venice.exceptions.VeniceException; @@ -70,6 +71,7 @@ import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.manager.TopicManager; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; +import com.linkedin.venice.schema.AvroSchemaParseUtils; import com.linkedin.venice.stats.ClientType; import com.linkedin.venice.stats.VeniceMetricsRepository; import com.linkedin.venice.system.store.MetaStoreDataType; @@ -623,6 +625,47 @@ private void readFromStore(AvroGenericStoreClient client) client.get(Integer.toString(key)).get(); } + /** + * Variant of {@link #createAndPushStore} that uses a record-typed value schema (NameV1) instead of + * the default {@code "string"}. Necessary when a subsequent test step appends a record-typed v2 + * value schema — record vs. primitive-string aren't backward-compatible, so the schema-compat check + * rejects a record v2 on top of a string v1. + */ + private Properties createAndPushStoreWithNameRecordV1(String clusterName, String storeName) throws Exception { + File inputDir = getTempDataDirectory(); + String inputDirPath = "file:" + inputDir.getAbsolutePath(); + Properties props = + IntegrationTestPushUtils.defaultVPJProps(twoLayerMultiRegionMultiClusterWrapper, inputDirPath, storeName); + props.put(SEND_CONTROL_MESSAGES_DIRECTLY, true); + Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordV1Schema(inputDir, RECORD_COUNT); + String keySchemaStr = recordSchema.getField(DEFAULT_KEY_FIELD_PROP).schema().toString(); + String valueSchemaStr = recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema().toString(); + + UpdateStoreQueryParams updateStoreQueryParams = + new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA) + .setHybridRewindSeconds(TEST_TIMEOUT) + .setHybridOffsetLagThreshold(2L) + .setHybridStoreDiskQuotaEnabled(true) + .setLargestUsedRTVersionNumber(2) + .setRealTimeTopicName(Utils.composeRealTimeTopic(props.getProperty(VENICE_STORE_NAME_PROP), 1)) + .setCompressionStrategy(CompressionStrategy.ZSTD_WITH_DICT) + .setStorageNodeReadQuotaEnabled(true); + IntegrationTestPushUtils.createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, updateStoreQueryParams) + .close(); + + try (ControllerClient childControllerClient0 = new ControllerClient(clusterName, childControllerUrl0)) { + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + StoreResponse response = childControllerClient0.getStore(storeName); + Assert.assertNotNull(response.getStore()); + }); + } + + try (VenicePushJob job = new VenicePushJob("Test push job", props)) { + job.run(); + } + return props; + } + private void batchReadFromStore(AvroGenericStoreClient client) throws ExecutionException, InterruptedException { Set keys = new HashSet<>(); @@ -1023,6 +1066,134 @@ public void testAutoStoreMigration() throws Exception { } + /** + * Exercises the migration path against a legacy value schema with a numeric default that mismatches + * the declared field type ({@code {"type":"float","default":0}}). The legacy schema is injected + * directly into source's schema repos via {@code HelixReadWriteSchemaRepository.addValueSchema} + * (which uses {@code SchemaEntry}'s LOOSE parser, bypassing the controller's STRICT pre-check) — + * simulating a store registered before {@code validateNumericDefaultValueTypes} was enforced. + * + * After migration completes, asserts: + * - reads succeed against the dest cluster (D2 client redirects), and + * - dest's value schema v2 is the coerced ({@code 0} → {@code 0.0}) form that passes STRICT. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testStoreMigrationWithLegacyNumericDefaultSchema() throws Exception { + String storeName = Utils.getUniqueString("testMigrationLegacyDefault"); + // v1 must be a record schema so that the legacy v2 (record-with-extra-float-field) is backward- + // compatible with it. The default createAndPushStore uses "string" as the value schema, which is + // not compatible with a record-typed v2. + createAndPushStoreWithNameRecordV1(srcClusterName, storeName); + + // Legacy v2 schema: NameRecord V1's fields + an extra float field with int default. STRICT rejects + // {@code "default":0} on a float; SchemaEntry's LOOSE parser accepts it, mimicking the pre-strict + // legacy state. The schema is backward-compatible with v1 (new field has default). + String legacyValueSchemaStr = "{\"type\":\"record\",\"name\":\"nameRecord\",\"namespace\":\"example.avro\"," + + "\"fields\":[" + "{\"name\":\"firstName\",\"type\":\"string\",\"default\":\"\"}," + + "{\"name\":\"lastName\",\"type\":\"string\",\"default\":\"\"}," + + "{\"name\":\"score\",\"type\":\"float\",\"default\":0}" + "]}"; + + // Bypass controller-level STRICT validation by writing the schema through the underlying + // ReadWriteSchemaRepository on both source's parent and source's child admin instances. + // For parent: getVeniceAdmin() returns VeniceParentHelixAdmin; reach into its inner + // VeniceHelixAdmin to access the schema repo. For child: getVeniceHelixAdmin() works directly. + VeniceHelixAdmin srcParentAdmin = + ((com.linkedin.venice.controller.VeniceParentHelixAdmin) twoLayerMultiRegionMultiClusterWrapper + .getLeaderParentControllerWithRetries(srcClusterName) + .getVeniceAdmin()).getVeniceHelixAdmin(); + VeniceHelixAdmin srcChildAdmin = + multiClusterWrapper.getClusters().get(srcClusterName).getLeaderVeniceController().getVeniceHelixAdmin(); + int legacySchemaId = 2; + srcParentAdmin.getHelixVeniceClusterResources(srcClusterName) + .getSchemaRepository() + .addValueSchema(storeName, legacyValueSchemaStr, legacySchemaId); + srcChildAdmin.getHelixVeniceClusterResources(srcClusterName) + .getSchemaRepository() + .addValueSchema(storeName, legacyValueSchemaStr, legacySchemaId); + + // Sanity-check: source has the legacy schema with the int default (NOT coerced — source is not + // a migration destination, so normalize is a no-op there). + try (ControllerClient srcParentClient = new ControllerClient(srcClusterName, parentControllerUrl)) { + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + MultiSchemaResponse resp = srcParentClient.getAllValueSchema(storeName); + Assert.assertFalse(resp.isError(), "Source getAllValueSchema returned error: " + resp.getError()); + MultiSchemaResponse.Schema v2 = findById(resp, legacySchemaId); + assertNotNull(v2, "Source must have legacy v2 schema after injection"); + // Source must still hold the un-coerced legacy form — STRICT parse must fail. normalize + // is gated on this cluster being the migration *destination*, so it's a no-op on source. + Assert.assertThrows( + Exception.class, + () -> AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(v2.getSchemaStr())); + }); + } + + // Drive the migration end-to-end and verify the D2 client transparently redirects. + String srcD2ServiceName = multiClusterWrapper.getClusterToD2().get(srcClusterName); + String destD2ServiceName = multiClusterWrapper.getClusterToD2().get(destClusterName); + D2Client d2Client = + D2TestUtils.getAndStartD2Client(multiClusterWrapper.getClusters().get(srcClusterName).getZk().getAddress()); + ClientConfig clientConfig = + ClientConfig.defaultGenericClientConfig(storeName).setD2ServiceName(srcD2ServiceName).setD2Client(d2Client); + + try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient(clientConfig)) { + // Pre-migration: store reads succeed against the source cluster. + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> readFromStore(client)); + + StoreMigrationTestUtil.startMigration(parentControllerUrl, storeName, srcClusterName, destClusterName); + StoreMigrationTestUtil + .completeMigration(parentControllerUrl, storeName, srcClusterName, destClusterName, FABRIC0); + + // Read verification: after the discovery flips, the same client transparently routes to dest. + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, true, () -> { + readFromStore(client); + AbstractAvroStoreClient castClient = + (AbstractAvroStoreClient) ((StatTrackingStoreClient) client) + .getInnerStoreClient(); + Assert.assertTrue( + castClient.toString().contains(destD2ServiceName), + "Client did not pick up dest D2 service name after migration; toString=" + castClient); + }); + } + + // Schema verification on dest: v2 must be the coerced form ({@code 0} → {@code 0.0}) and pass STRICT. + try (ControllerClient destParentClient = new ControllerClient(destClusterName, parentControllerUrl); + ControllerClient destChildClient = new ControllerClient(destClusterName, childControllerUrl0)) { + // Parent. + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> { + MultiSchemaResponse resp = destParentClient.getAllValueSchema(storeName); + Assert.assertFalse(resp.isError(), "Dest parent getAllValueSchema returned error: " + resp.getError()); + MultiSchemaResponse.Schema v2 = findById(resp, legacySchemaId); + assertNotNull(v2, "Dest parent must have v2 schema after migration"); + // The whole point of normalize: dest's stored form passes STRICT. + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(v2.getSchemaStr()); + Assert.assertTrue( + v2.getSchemaStr().contains("0.0"), + "Dest parent v2 schema must have coerced default 0.0; got " + v2.getSchemaStr()); + }); + + // Child controller in dc-0 (the actual ingestion fabric). + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> { + MultiSchemaResponse resp = destChildClient.getAllValueSchema(storeName); + Assert.assertFalse(resp.isError(), "Dest child getAllValueSchema returned error: " + resp.getError()); + MultiSchemaResponse.Schema v2 = findById(resp, legacySchemaId); + assertNotNull(v2, "Dest child must have v2 schema after migration"); + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(v2.getSchemaStr()); + Assert.assertTrue( + v2.getSchemaStr().contains("0.0"), + "Dest child v2 schema must have coerced default 0.0; got " + v2.getSchemaStr()); + }); + } + } + + private static MultiSchemaResponse.Schema findById(MultiSchemaResponse resp, int id) { + for (MultiSchemaResponse.Schema s: resp.getSchemas()) { + if (s.getId() == id) { + return s; + } + } + return null; + } + private void verifyKillMessageInParticipantStore( VeniceClusterWrapper clusterWrapper, String topic, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 3a2835b338d..1cdb1cedbea 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -1210,6 +1210,7 @@ public void createStore( HelixVeniceClusterResources clusterResources = getHelixVeniceClusterResources(clusterName); LOGGER.info("Start creating store {} in cluster {} with owner {}", storeName, clusterName, owner); try (AutoCloseableLock ignore = clusterResources.getClusterLockManager().createStoreWriteLock(storeName)) { + valueSchema = normalizeSchemaForMigration(clusterName, storeName, valueSchema); checkPreConditionForCreateStore(clusterName, storeName, keySchema, valueSchema, isSystemStore, true); VeniceControllerClusterConfig config = getHelixVeniceClusterResources(clusterName).getConfig(); Store newStore = new ZKStore( @@ -2410,6 +2411,61 @@ protected void checkPreConditionForCreateStore( new SchemaEntry(SchemaData.INVALID_VALUE_SCHEMA_ID, valueSchema); } + /** + * If {@code storeName} is migrating into {@code clusterName}, accept schemas that fail strict + * parse only because of {@code validateNumericDefaultValueTypes} (e.g. legacy + * {@code {"type":"float","default":0}}) by walking the JSON and coercing numeric defaults to the + * declared field type. The output is strict-parse-clean, which keeps downstream consumers that + * strict-parse (DaVinci's {@code SchemaUtils.annotateValueSchema}, VPJ, Samza producer) working. + * + * Re-strict-parses the coerced output as a defensive check so anything beyond the numeric-default + * tier (bad names, dangling content, union default not first branch) still fails loudly. + * + * For non-migration calls, and for migration calls whose input is already strict-clean, returns + * the input unchanged — so this can be wired into entry points idempotently. + * + * @return possibly-coerced schema string that is guaranteed to pass strict parsing. + */ + String normalizeSchemaForMigration(String clusterName, String storeName, String schemaStr) { + ZkStoreConfigAccessor accessor = getStoreConfigAccessor(clusterName); + if (!accessor.containsConfig(storeName)) { + return schemaStr; + } + StoreConfig cfg = accessor.getStoreConfig(storeName); + if (cfg == null || !clusterName.equals(cfg.getMigrationDestCluster())) { + return schemaStr; + } + // Migration context. If strict already passes, leave the string unchanged so we don't + // introduce gratuitous diffs against the source schema. + try { + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr); + return schemaStr; + } catch (Exception strictFailure) { + LOGGER.info( + "Strict parse failed for store {} migrating into cluster {}; attempting numeric-default coercion.", + storeName, + clusterName, + strictFailure); + String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(schemaStr); + // Defensive: anything LOOSE_NUMERICS would have been lenient about (union default not first + // branch, bad names, etc.) is outside the coercion scope and must still fail strict. When it + // does, surface the *original* strict failure too — it's the one the operator needs to see. + try { + AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced); + } catch (Exception coercedFailure) { + coercedFailure.addSuppressed(strictFailure); + throw coercedFailure; + } + if (!coerced.equals(schemaStr)) { + LOGGER.info( + "Coerced numeric default(s) in value schema for store {} migrating into cluster {}.", + storeName, + clusterName); + } + return coerced; + } + } + void checkStoreGraveyardForRecreation(String clusterName, String storeName) { HelixVeniceClusterResources clusterResources = getHelixVeniceClusterResources(clusterName); @@ -7329,6 +7385,7 @@ public SchemaEntry addValueSchema( String valueSchemaStr, DirectionalSchemaCompatibilityType expectedCompatibilityType) { checkControllerLeadershipFor(clusterName); + valueSchemaStr = normalizeSchemaForMigration(clusterName, storeName, valueSchemaStr); ReadWriteSchemaRepository schemaRepository = getHelixVeniceClusterResources(clusterName).getSchemaRepository(); SchemaEntry schemaEntry = schemaRepository.addValueSchema(storeName, valueSchemaStr, expectedCompatibilityType); // For duplicates, addValueSchema returns DUPLICATE_VALUE_SCHEMA_CODE; look up the real id so callers @@ -7352,6 +7409,7 @@ public SchemaEntry addValueSchema( int schemaId, DirectionalSchemaCompatibilityType compatibilityType) { checkControllerLeadershipFor(clusterName); + valueSchemaStr = normalizeSchemaForMigration(clusterName, storeName, valueSchemaStr); ReadWriteSchemaRepository schemaRepository = getHelixVeniceClusterResources(clusterName).getSchemaRepository(); int newValueSchemaId = schemaRepository.preCheckValueSchemaAndGetNextAvailableId(storeName, valueSchemaStr, compatibilityType); @@ -7431,6 +7489,8 @@ public SchemaEntry addSupersetSchema( String supersetSchemaStr, int supersetSchemaId) { checkControllerLeadershipFor(clusterName); + valueSchema = normalizeSchemaForMigration(clusterName, storeName, valueSchema); + supersetSchemaStr = normalizeSchemaForMigration(clusterName, storeName, supersetSchemaStr); ReadWriteSchemaRepository schemaRepository = getHelixVeniceClusterResources(clusterName).getSchemaRepository(); final SchemaEntry existingSupersetSchemaEntry = schemaRepository.getValueSchema(storeName, supersetSchemaId); @@ -7474,6 +7534,7 @@ int checkPreConditionForAddValueSchemaAndGetNewSchemaId( String storeName, String valueSchemaStr, DirectionalSchemaCompatibilityType expectedCompatibilityType) { + valueSchemaStr = normalizeSchemaForMigration(clusterName, storeName, valueSchemaStr); AvroSchemaUtils.validateAvroSchemaStr(valueSchemaStr); AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr); validateValueSchemaUsingRandomGenerator(valueSchemaStr, clusterName, storeName); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 402c1a05836..a2f757b4302 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -1046,6 +1046,7 @@ public void createStore( Optional accessPermissions) { acquireAdminMessageLock(clusterName, storeName); try { + valueSchema = getVeniceHelixAdmin().normalizeSchemaForMigration(clusterName, storeName, valueSchema); getVeniceHelixAdmin() .checkPreConditionForCreateStore(clusterName, storeName, keySchema, valueSchema, isSystemStore, false); LOGGER.info("Adding store: {} to cluster: {}", storeName, clusterName); @@ -4112,6 +4113,7 @@ public SchemaEntry addValueSchema( DirectionalSchemaCompatibilityType expectedCompatibilityType) { acquireAdminMessageLock(clusterName, storeName); try { + newValueSchemaStr = getVeniceHelixAdmin().normalizeSchemaForMigration(clusterName, storeName, newValueSchemaStr); Schema newValueSchema = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(newValueSchemaStr); final Store store = getVeniceHelixAdmin().getStore(clusterName, storeName); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java index cbc06b01b4f..fb6bd04d2dd 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java @@ -97,6 +97,10 @@ public void setupInternalMocks() { internalAdmin = mock(VeniceHelixAdmin.class); when(internalAdmin.isHybrid((HybridStoreConfig) any())).thenCallRealMethod(); + // Default: normalizeSchemaForMigration is a pass-through. Tests exercising the migration + // coercion path can override to return a different string. + when(internalAdmin.normalizeSchemaForMigration(anyString(), anyString(), anyString())) + .thenAnswer(inv -> inv.getArgument(2, String.class)); doReturn(topicManager).when(internalAdmin).getTopicManager(); SchemaEntry mockEntry = new SchemaEntry(0, TEST_SCHEMA); doReturn(mockEntry).when(internalAdmin).getKeySchema(anyString(), anyString()); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java index 3a4853e7f6b..e8458017d83 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java @@ -476,4 +476,103 @@ public void testWriteEndOfPushThrowsForNonExistentStore() throws Exception { admin.writeEndOfPush("cluster", "missing_store", 1, false, null); } + + // ---- normalizeSchemaForMigration --------------------------------------------------------- + + private static final String CLEAN_VALUE_SCHEMA = + "{\"type\":\"record\",\"name\":\"Clean\",\"fields\":[" + "{\"name\":\"v\",\"type\":\"string\"}]}"; + + // STRICT rejects this (validateNumericDefaultValueTypes), LOOSE_NUMERICS accepts and coerces. + private static final String LEGACY_NUMERIC_DEFAULT_SCHEMA = "{\"type\":\"record\",\"name\":\"Scores\",\"fields\":[" + + "{\"name\":\"score\",\"type\":\"float\",\"default\":0}]}"; + + // Union with default whose declared type matches the second branch — STRICT failure that + // LOOSE_NUMERICS does NOT relax (it's outside the numeric-default tier). + private static final String NON_NUMERIC_STRICT_VIOLATION_SCHEMA = "{\"type\":\"record\",\"name\":\"BadUnion\"," + + "\"fields\":[{\"name\":\"f\",\"type\":[\"int\",\"null\"],\"default\":null}]}"; + + private static VeniceHelixAdmin newNormalizeMock(String clusterName, String storeName, StoreConfig storeConfig) { + VeniceHelixAdmin admin = mock(VeniceHelixAdmin.class); + ZkStoreConfigAccessor accessor = mock(ZkStoreConfigAccessor.class); + doReturn(storeConfig != null).when(accessor).containsConfig(storeName); + doReturn(storeConfig).when(accessor).getStoreConfig(storeName); + doReturn(accessor).when(admin).getStoreConfigAccessor(clusterName); + doCallRealMethod().when(admin).normalizeSchemaForMigration(anyString(), anyString(), anyString()); + return admin; + } + + @Test + public void testNormalizeReturnsInputWhenStoreConfigAbsent() { + String cluster = "venice-dest"; + String store = "legacy_store"; + VeniceHelixAdmin admin = newNormalizeMock(cluster, store, null); + Assert.assertSame( + admin.normalizeSchemaForMigration(cluster, store, LEGACY_NUMERIC_DEFAULT_SCHEMA), + LEGACY_NUMERIC_DEFAULT_SCHEMA, + "Non-migration context (no storeConfig) must return input by identity to avoid parse cost"); + } + + @Test + public void testNormalizeReturnsInputWhenMigrationDestIsAnotherCluster() { + String cluster = "venice-dest"; + String store = "legacy_store"; + StoreConfig cfg = mock(StoreConfig.class); + doReturn("some_other_dest").when(cfg).getMigrationDestCluster(); + + VeniceHelixAdmin admin = newNormalizeMock(cluster, store, cfg); + Assert.assertSame( + admin.normalizeSchemaForMigration(cluster, store, LEGACY_NUMERIC_DEFAULT_SCHEMA), + LEGACY_NUMERIC_DEFAULT_SCHEMA, + "When migrationDestCluster does not match this cluster, input must pass through unchanged"); + } + + @Test + public void testNormalizeReturnsInputWhenStrictAlreadyPasses() { + String cluster = "venice-dest"; + String store = "migrating_store"; + StoreConfig cfg = mock(StoreConfig.class); + doReturn(cluster).when(cfg).getMigrationDestCluster(); + + VeniceHelixAdmin admin = newNormalizeMock(cluster, store, cfg); + Assert.assertSame( + admin.normalizeSchemaForMigration(cluster, store, CLEAN_VALUE_SCHEMA), + CLEAN_VALUE_SCHEMA, + "Strict-clean input under migration context must not be reserialized"); + } + + @Test + public void testNormalizeReserializesLegacyNumericDefault() { + String cluster = "venice-dest"; + String store = "migrating_store"; + StoreConfig cfg = mock(StoreConfig.class); + doReturn(cluster).when(cfg).getMigrationDestCluster(); + + VeniceHelixAdmin admin = newNormalizeMock(cluster, store, cfg); + String normalized = admin.normalizeSchemaForMigration(cluster, store, LEGACY_NUMERIC_DEFAULT_SCHEMA); + + Assert.assertNotEquals(normalized, LEGACY_NUMERIC_DEFAULT_SCHEMA, "Legacy schema must be reserialized"); + // The whole point: output must be strict-parse-clean so downstream consumers don't trip. + com.linkedin.venice.schema.AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(normalized); + } + + @Test + public void testNormalizeRejectsNonNumericStrictViolation() { + String cluster = "venice-dest"; + String store = "migrating_store"; + StoreConfig cfg = mock(StoreConfig.class); + doReturn(cluster).when(cfg).getMigrationDestCluster(); + + VeniceHelixAdmin admin = newNormalizeMock(cluster, store, cfg); + // Migration context, but the violation is outside the numeric-default tier — must propagate. + // The original strict failure must ride along as a suppressed exception so the operator can + // see what was actually wrong with the source schema, not just that post-coercion strict tripped. + try { + admin.normalizeSchemaForMigration(cluster, store, NON_NUMERIC_STRICT_VIOLATION_SCHEMA); + Assert.fail("Expected normalize to throw on non-numeric strict violation"); + } catch (Exception coercedFailure) { + Assert.assertTrue( + coercedFailure.getSuppressed().length >= 1, + "Original strict failure should be attached as a suppressed exception, but none was found"); + } + } }