diff --git a/nifi-extension-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java b/nifi-extension-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java index 70f74eb91cea..c131221c0cfd 100644 --- a/nifi-extension-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java +++ b/nifi-extension-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java @@ -34,7 +34,6 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.SchemaIdentifier; -import java.io.IOException; import java.util.Collection; import java.util.EnumSet; import java.util.HashSet; @@ -52,22 +51,22 @@ description = "Adds a named schema using the JSON string representation of an Avro schema", expressionLanguageScope = ExpressionLanguageScope.NONE) public class AvroSchemaRegistry extends AbstractControllerService implements SchemaRegistry { - private static final Set schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); - private final ConcurrentMap recordSchemas = new ConcurrentHashMap<>(); - - static final PropertyDescriptor VALIDATE_FIELD_NAMES = new PropertyDescriptor.Builder() - .name("Validate Field Names") - .description("Whether or not to validate the field names in the Avro schema based on Avro naming rules. If set to true, all field names must be valid Avro names, " - + "which must begin with [A-Za-z_], and subsequently contain only [A-Za-z0-9_]. If set to false, no validation will be performed on the field names.") - .allowableValues("true", "false") - .defaultValue("true") + static final List OBSOLETE_VALIDATE_FIELD_NAMES = List.of("avro-reg-validated-field-names", "Validate Field Names"); + static final PropertyDescriptor VALIDATION_STRATEGY = new PropertyDescriptor.Builder() + .name("Validation Strategy") + .description("Set the strategy for the level of Avro validation required for field names, namespaces and default values") .required(true) + .allowableValues(ValidationStrategy.class) + .defaultValue(ValidationStrategy.VALIDATE) .build(); private static final List PROPERTY_DESCRIPTORS = List.of( - VALIDATE_FIELD_NAMES + VALIDATION_STRATEGY ); + private static final Set SCHEMA_FIELDS = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); + private final ConcurrentMap recordSchemas = new ConcurrentHashMap<>(); + @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { if (descriptor.isDynamic()) { @@ -92,8 +91,8 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String @Override protected Collection customValidate(ValidationContext validationContext) { - Set results = new HashSet<>(); - boolean strict = validationContext.getProperty(VALIDATE_FIELD_NAMES).asBoolean(); + final Set results = new HashSet<>(); + final ValidationStrategy validationStrategy = validationContext.getProperty(VALIDATION_STRATEGY).asAllowableValue(ValidationStrategy.class); // Iterate over dynamic properties, validating the schemas, and adding results validationContext.getProperties().entrySet().stream().filter(entry -> entry.getKey().isDynamic()).forEach(entry -> { @@ -101,7 +100,7 @@ protected Collection customValidate(ValidationContext validati String input = entry.getValue(); try { - final Schema.Parser parser = strict + final Schema.Parser parser = ValidationStrategy.VALIDATE == validationStrategy ? new Schema.Parser(NameValidator.STRICT_VALIDATOR).setValidateDefaults(true) : new Schema.Parser(NameValidator.NO_VALIDATION).setValidateDefaults(false); final Schema avroSchema = parser.parse(input); @@ -127,7 +126,7 @@ private RecordSchema retrieveSchemaByName(final String schemaName) throws Schema } @Override - public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { + public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws SchemaNotFoundException { final Optional schemaName = schemaIdentifier.getName(); if (schemaName.isPresent()) { return retrieveSchemaByName(schemaName.get()); @@ -137,8 +136,23 @@ public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) thro } @Override - public void migrateProperties(PropertyConfiguration config) { - config.renameProperty("avro-reg-validated-field-names", VALIDATE_FIELD_NAMES.getName()); + public void migrateProperties(final PropertyConfiguration config) { + // NOTE: Although there are multiple names for OBSOLETE_VALIDATE_FIELD_NAMES, the code in the if statement + // will execute at most once as the assumption is a configuration has one of the obsolete property names but not all. + for (final String obsoletePropertyName : OBSOLETE_VALIDATE_FIELD_NAMES) { + if (config.hasProperty(obsoletePropertyName) && config.isPropertySet(obsoletePropertyName)) { + final String validateFieldNamesRawValue = config.getRawPropertyValue(obsoletePropertyName).orElse(Boolean.TRUE.toString()); + final boolean validateFieldNames = Boolean.parseBoolean(validateFieldNamesRawValue); + + if (validateFieldNames) { + config.setProperty(VALIDATION_STRATEGY, ValidationStrategy.VALIDATE.getValue()); + } else { + config.setProperty(VALIDATION_STRATEGY, ValidationStrategy.NONE.getValue()); + } + } + + config.removeProperty(obsoletePropertyName); + } } @Override @@ -159,6 +173,6 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String @Override public Set getSuppliedSchemaFields() { - return schemaFields; + return SCHEMA_FIELDS; } } diff --git a/nifi-extension-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/ValidationStrategy.java b/nifi-extension-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/ValidationStrategy.java new file mode 100644 index 000000000000..e7528173c939 --- /dev/null +++ b/nifi-extension-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/ValidationStrategy.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import org.apache.nifi.components.DescribedValue; + +public enum ValidationStrategy implements DescribedValue { + VALIDATE("Validate Schemas", """ + Validate incoming Avro schemas. Validation includes field name, + namespace and default value validation. All field names must be valid Avro names + that match the following regular expression [A-Za-z_][A-Za-z0-9_]* (i.e. a name that starts with a letter or an underscore, followed by zero or more alphanumeric + characters or underscores), all namespaces must be one or more valid Avro names each matching the aforementioned regular expression and separated by periods + and any default value specified must match the field type (e.g. a field with type int must specify a number such as 9)"""), + NONE("Do Not Validate Schemas", "Do not validate field names, namespaces and default values of incoming schemas."); + + private final String displayName; + private final String description; + + ValidationStrategy(String displayName, String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-extension-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java b/nifi-extension-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java index b2851df6f2b5..00df7566335f 100644 --- a/nifi-extension-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java +++ b/nifi-extension-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java @@ -17,93 +17,186 @@ package org.apache.nifi.schemaregistry.services; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.util.MockPropertyConfiguration; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.PropertyMigrationResult; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; -import java.util.Collection; -import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class TestAvroSchemaRegistry { - - @Test - public void validateSchemaRegistrationFromrDynamicProperties() throws Exception { - String schemaName = "fooSchema"; - - PropertyDescriptor fooSchema = new PropertyDescriptor.Builder() - .name(schemaName) + private static final String SCHEMA_NAME = "fooSchema"; + private static final PropertyDescriptor SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR = new PropertyDescriptor.Builder() + .name(SCHEMA_NAME) + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .dynamic(true) - .build(); - String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " - + "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, " - + "{\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]}, " - + "{\"name\": \"foo\", \"type\": [\"int\", \"null\"]}, " - + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}"; - PropertyDescriptor barSchema = new PropertyDescriptor.Builder() - .name("barSchema") - .dynamic(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); - AvroSchemaRegistry delegate = new AvroSchemaRegistry(); - delegate.onPropertyModified(fooSchema, null, fooSchemaText); - delegate.onPropertyModified(barSchema, null, ""); + private static final String NON_PERIOD_NAMESPACE_SEPARATOR = """ + {"namespace": "example-avro", "type": "record", "name": "User", + "fields": [ {"name": "name", "type": ["string", "null"]}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "foo", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} ]}"""; - SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name(schemaName).build(); - RecordSchema locatedSchema = delegate.retrieveSchema(schemaIdentifier); - assertEquals(fooSchemaText, locatedSchema.getSchemaText().get()); - assertThrows(SchemaNotFoundException.class, () -> delegate.retrieveSchema(SchemaIdentifier.builder().name("barSchema").build())); + private static final String ILLEGAL_CHARACTER_IN_RECORD_NAME = """ + {"namespace": "example.avro", "type": "record", "name": "$User", + "fields": [ {"name": "name", "type": ["string", "null"]}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "foo", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} ]}"""; + + private static final String ILLEGAL_CHARACTER_IN_FIELD_NAME = """ + {"namespace": "example.avro", "type": "record", "name": "User", + "fields": [ {"name": "@name", "type": ["string", "null"]}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "foo", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} ]}"""; + + private static final String NON_MATCHING_DEFAULT_TYPE = """ + {"namespace": "example.avro", "type": "record", "name": "User", + "fields": [ {"name": "name", "type": ["string", "null"]}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "foo", "type": "int", "default": "NAN"}, + {"name": "favorite_color", "type": ["string", "null"]} ]}"""; + + private static final String NON_MATCHING_UNION_DEFAULT_TYPE = """ + {"namespace": "example.avro", "type": "record", "name": "User", + "fields": [ {"name": "name", "type": ["string", "null"]}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "foo", "type": ["int", "null"], "default": "NAN"}, + {"name": "favorite_color", "type": ["string", "null"]} ]}"""; + + private TestRunner runner; + private AvroSchemaRegistry avroSchemaRegistry; + + @BeforeEach + void setup() throws Exception { + runner = TestRunners.newTestRunner(NoOpProcessor.class); + avroSchemaRegistry = new AvroSchemaRegistry(); + runner.addControllerService("avroSchemaRegistry", avroSchemaRegistry); } @Test - public void validateStrictAndNonStrictSchemaRegistrationFromDynamicProperties() { + public void testRetrievalOfNonExistentSchema() { + runner.assertValid(avroSchemaRegistry); + runner.enableControllerService(avroSchemaRegistry); + final SchemaIdentifier nonExistentSchemaIdentifier = SchemaIdentifier.builder().name("barSchema").build(); + + assertThrows(SchemaNotFoundException.class, () -> avroSchemaRegistry.retrieveSchema(nonExistentSchemaIdentifier)); + } + + @Test + public void validateSchemaRegistration() throws Exception { String schemaName = "fooSchema"; - ConfigurationContext configContext = mock(ConfigurationContext.class); - Map properties = new HashMap<>(); - PropertyDescriptor fooSchema = new PropertyDescriptor.Builder() - .name(schemaName) - .dynamic(true) - .build(); - // NOTE: name of record and name of first field are not Avro-compliant, verified below - String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"$User\", " - + "\"fields\": [ " + "{\"name\": \"@name\", \"type\": [\"string\", \"null\"]}, " - + "{\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]}, " - + "{\"name\": \"foo\", \"type\": [\"int\", \"null\"]}, " - + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}"; - PropertyDescriptor barSchema = new PropertyDescriptor.Builder() - .name("barSchema") - .dynamic(false) - .build(); - properties.put(fooSchema, fooSchemaText); - properties.put(barSchema, ""); - AvroSchemaRegistry delegate = new AvroSchemaRegistry(); - delegate.getSupportedPropertyDescriptors().forEach(prop -> properties.put(prop, prop.getDisplayName())); - when(configContext.getProperties()).thenReturn(properties); - - ValidationContext validationContext = mock(ValidationContext.class); - when(validationContext.getProperties()).thenReturn(properties); - PropertyValue propertyValue = mock(PropertyValue.class); - when(validationContext.getProperty(AvroSchemaRegistry.VALIDATE_FIELD_NAMES)).thenReturn(propertyValue); - - // Strict parsing - when(propertyValue.asBoolean()).thenReturn(true); - Collection results = delegate.customValidate(validationContext); - assertTrue(results.stream().anyMatch(result -> !result.isValid())); - - // Non-strict parsing - when(propertyValue.asBoolean()).thenReturn(false); - results = delegate.customValidate(validationContext); - results.forEach(result -> assertTrue(result.isValid())); + String fooSchemaText = """ + {"namespace": "example.avro", "type": "record", "name": "User", + "fields": [ {"name": "name", "type": ["string", "null"]}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "foo", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} ]}"""; + + runner.setProperty(avroSchemaRegistry, SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR, fooSchemaText); + runner.assertValid(avroSchemaRegistry); + runner.enableControllerService(avroSchemaRegistry); + + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name(schemaName).build(); + final RecordSchema locatedSchema = avroSchemaRegistry.retrieveSchema(schemaIdentifier); + assertTrue(locatedSchema.getSchemaText().isPresent()); + assertEquals(fooSchemaText, locatedSchema.getSchemaText().get()); + } + + @ParameterizedTest + @MethodSource("invalidSchemas") + public void testWhereSchemasValidated(String schema) { + runner.setProperty(avroSchemaRegistry, SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR, schema); + runner.setProperty(avroSchemaRegistry, AvroSchemaRegistry.VALIDATION_STRATEGY, ValidationStrategy.VALIDATE.getValue()); + + runner.assertNotValid(avroSchemaRegistry); + } + + @ParameterizedTest + @MethodSource("invalidSchemas") + public void testWhereSchemasNotValidated(String schema) { + runner.setProperty(avroSchemaRegistry, SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR, schema); + runner.setProperty(avroSchemaRegistry, AvroSchemaRegistry.VALIDATION_STRATEGY, ValidationStrategy.NONE.getValue()); + + runner.assertValid(avroSchemaRegistry); + } + + @ParameterizedTest + @EnumSource(ValidationStrategy.class) + public void testDuplicateFieldsInvalidRegardlessOfValidation(ValidationStrategy validationStrategy) { + final String schema = """ + {"namespace": "example.avro", "type": "record", "name": "User", + "fields": [ {"name": "name", "type": ["string", "null"]}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "foo", "type": ["int", "null"]}, + {"name": "foo", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} ]}"""; + + runner.setProperty(avroSchemaRegistry, SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR, schema); + runner.setProperty(avroSchemaRegistry, AvroSchemaRegistry.VALIDATION_STRATEGY, validationStrategy.getValue()); + + runner.assertNotValid(avroSchemaRegistry); + } + + @ParameterizedTest + @MethodSource("migrationConfigurations") + void testMigrateProperties(MockPropertyConfiguration configuration) { + final Set expectedRemoved = new HashSet<>(AvroSchemaRegistry.OBSOLETE_VALIDATE_FIELD_NAMES); + final Set expectedUpdated = Set.of(AvroSchemaRegistry.VALIDATION_STRATEGY.getName()); + + final AvroSchemaRegistry service = new AvroSchemaRegistry(); + service.migrateProperties(configuration); + + final PropertyMigrationResult result = configuration.toPropertyMigrationResult(); + final Set propertiesRemoved = result.getPropertiesRemoved(); + assertEquals(expectedRemoved, propertiesRemoved); + assertEquals(expectedUpdated, result.getPropertiesUpdated()); + } + + private static Stream invalidSchemas() { + return Stream.of( + Arguments.argumentSet("Namespace separator other than a period", NON_PERIOD_NAMESPACE_SEPARATOR), + Arguments.argumentSet("Illegal character(s) in record name", ILLEGAL_CHARACTER_IN_RECORD_NAME), + Arguments.argumentSet("Illegal character(s) in field name", ILLEGAL_CHARACTER_IN_FIELD_NAME), + Arguments.argumentSet("Non matching default type", NON_MATCHING_DEFAULT_TYPE), + Arguments.argumentSet("Non matching union default type", NON_MATCHING_UNION_DEFAULT_TYPE) + ); + } + + private static Stream migrationConfigurations() { + return Stream.of( + Arguments.argumentSet(AvroSchemaRegistry.OBSOLETE_VALIDATE_FIELD_NAMES.getFirst() + " Validate", + new MockPropertyConfiguration(Map.of(AvroSchemaRegistry.OBSOLETE_VALIDATE_FIELD_NAMES.getFirst(), Boolean.TRUE.toString()))), + Arguments.argumentSet(AvroSchemaRegistry.OBSOLETE_VALIDATE_FIELD_NAMES.getFirst() + " Do Not Validate", + new MockPropertyConfiguration(Map.of(AvroSchemaRegistry.OBSOLETE_VALIDATE_FIELD_NAMES.getFirst(), Boolean.FALSE.toString()))), + Arguments.argumentSet(AvroSchemaRegistry.OBSOLETE_VALIDATE_FIELD_NAMES.get(1) + " Validate", + new MockPropertyConfiguration(Map.of(AvroSchemaRegistry.OBSOLETE_VALIDATE_FIELD_NAMES.get(1), Boolean.TRUE.toString()))), + Arguments.argumentSet(AvroSchemaRegistry.OBSOLETE_VALIDATE_FIELD_NAMES.get(1) + " Do Not Validate", + new MockPropertyConfiguration(Map.of(AvroSchemaRegistry.OBSOLETE_VALIDATE_FIELD_NAMES.get(1), Boolean.FALSE.toString()))) + ); } }