Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;

import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
Expand All @@ -52,22 +51,22 @@
description = "Adds a named schema using the JSON string representation of an Avro schema",
expressionLanguageScope = ExpressionLanguageScope.NONE)
public class AvroSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
private final ConcurrentMap<String, RecordSchema> recordSchemas = new ConcurrentHashMap<>();

static final PropertyDescriptor VALIDATE_FIELD_NAMES = new PropertyDescriptor.Builder()
.name("Validate Field Names")
.description("Whether or not to validate the field names in the Avro schema based on Avro naming rules. If set to true, all field names must be valid Avro names, "
+ "which must begin with [A-Za-z_], and subsequently contain only [A-Za-z0-9_]. If set to false, no validation will be performed on the field names.")
.allowableValues("true", "false")
.defaultValue("true")
static final List<String> OBSOLETE_VALIDATE_FIELD_NAMES = List.of("avro-reg-validated-field-names", "Validate Field Names");
static final PropertyDescriptor VALIDATION_STRATEGY = new PropertyDescriptor.Builder()
.name("Validation Strategy")
.description("Set the strategy for the level of Avro validation required for field names, namespaces and default values")
.required(true)
.allowableValues(ValidationStrategy.class)
.defaultValue(ValidationStrategy.VALIDATE)
.build();

private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
VALIDATE_FIELD_NAMES
VALIDATION_STRATEGY
);

private static final Set<SchemaField> SCHEMA_FIELDS = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
private final ConcurrentMap<String, RecordSchema> recordSchemas = new ConcurrentHashMap<>();

@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (descriptor.isDynamic()) {
Expand All @@ -92,16 +91,16 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String

@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
Set<ValidationResult> results = new HashSet<>();
boolean strict = validationContext.getProperty(VALIDATE_FIELD_NAMES).asBoolean();
final Set<ValidationResult> results = new HashSet<>();
final ValidationStrategy validationStrategy = validationContext.getProperty(VALIDATION_STRATEGY).asAllowableValue(ValidationStrategy.class);

// Iterate over dynamic properties, validating the schemas, and adding results
validationContext.getProperties().entrySet().stream().filter(entry -> entry.getKey().isDynamic()).forEach(entry -> {
String subject = entry.getKey().getDisplayName();
String input = entry.getValue();

try {
final Schema.Parser parser = strict
final Schema.Parser parser = ValidationStrategy.VALIDATE == validationStrategy
? new Schema.Parser(NameValidator.STRICT_VALIDATOR).setValidateDefaults(true)
: new Schema.Parser(NameValidator.NO_VALIDATION).setValidateDefaults(false);
final Schema avroSchema = parser.parse(input);
Expand All @@ -127,7 +126,7 @@ private RecordSchema retrieveSchemaByName(final String schemaName) throws Schema
}

@Override
public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws SchemaNotFoundException {
final Optional<String> schemaName = schemaIdentifier.getName();
if (schemaName.isPresent()) {
return retrieveSchemaByName(schemaName.get());
Expand All @@ -137,8 +136,23 @@ public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) thro
}

@Override
public void migrateProperties(PropertyConfiguration config) {
config.renameProperty("avro-reg-validated-field-names", VALIDATE_FIELD_NAMES.getName());
public void migrateProperties(final PropertyConfiguration config) {
// NOTE: Although there are multiple names for OBSOLETE_VALIDATE_FIELD_NAMES, the code in the if statement
// will execute at most once as the assumption is a configuration has one of the obsolete property names but not all.
for (final String obsoletePropertyName : OBSOLETE_VALIDATE_FIELD_NAMES) {
if (config.hasProperty(obsoletePropertyName) && config.isPropertySet(obsoletePropertyName)) {
final String validateFieldNamesRawValue = config.getRawPropertyValue(obsoletePropertyName).orElse(Boolean.TRUE.toString());
final boolean validateFieldNames = Boolean.parseBoolean(validateFieldNamesRawValue);

if (validateFieldNames) {
config.setProperty(VALIDATION_STRATEGY, ValidationStrategy.VALIDATE.getValue());
} else {
config.setProperty(VALIDATION_STRATEGY, ValidationStrategy.NONE.getValue());
}
}

config.removeProperty(obsoletePropertyName);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should removeProperty(obsoletePropertyName) be guarded by hasProperty(obsoletePropertyName) (or the loop broken after the first match), so that the migration result only reports the property that was actually present?

@dan-s1 dan-s1 Jun 1, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc on remove is
Removes the property with the given name, if it exists. If the property does not exist, this is a no-op.

Why would it be necessary then to be guarded by hasProperty(obsoletePropertyName) (or the loop broken after the first match)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvillard31 Are you okay if this is left as is?

}
}

@Override
Expand All @@ -159,6 +173,6 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String

@Override
public Set<SchemaField> getSuppliedSchemaFields() {
return schemaFields;
return SCHEMA_FIELDS;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schemaregistry.services;

import org.apache.nifi.components.DescribedValue;

public enum ValidationStrategy implements DescribedValue {
VALIDATE("Validate Schemas", """
Validate incoming Avro schemas. Validation includes field name,
namespace and default value validation. All field names must be valid Avro names
that match the following regular expression [A-Za-z_][A-Za-z0-9_]* (i.e. a name that starts with a letter or an underscore, followed by zero or more alphanumeric
characters or underscores), all namespaces must be one or more valid Avro names each matching the aforementioned regular expression and separated by periods
and any default value specified must match the field type (e.g. a field with type int must specify a number such as 9)"""),
NONE("Do Not Validate Schemas", "Do not validate field names, namespaces and default values of incoming schemas.");

private final String displayName;
private final String description;

ValidationStrategy(String displayName, String description) {
this.displayName = displayName;
this.description = description;
}

@Override
public String getValue() {
return name();
}

@Override
public String getDisplayName() {
return displayName;
}

@Override
public String getDescription() {
return description;
}
}
Loading
Loading