diff --git a/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-client-service-api/src/main/java/org/apache/nifi/graph/GraphClientTransientException.java b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-client-service-api/src/main/java/org/apache/nifi/graph/GraphClientTransientException.java new file mode 100644 index 000000000000..92da39e177a9 --- /dev/null +++ b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-client-service-api/src/main/java/org/apache/nifi/graph/GraphClientTransientException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.graph; + +/** + * Indicates a transient graph client failure that should be retried. + */ +public class GraphClientTransientException extends RuntimeException { + public GraphClientTransientException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-client-service-api/src/main/java/org/apache/nifi/graph/GraphElementType.java b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-client-service-api/src/main/java/org/apache/nifi/graph/GraphElementType.java new file mode 100644 index 000000000000..64a2ed9d68c2 --- /dev/null +++ b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-client-service-api/src/main/java/org/apache/nifi/graph/GraphElementType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.graph; + +/** + * Graph element types supported by graph processors and services. + */ +public enum GraphElementType { + NODE, + EDGE +} diff --git a/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-client-service-api/src/main/java/org/apache/nifi/graph/GraphMutation.java b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-client-service-api/src/main/java/org/apache/nifi/graph/GraphMutation.java new file mode 100644 index 000000000000..c86525caf1d9 --- /dev/null +++ b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-client-service-api/src/main/java/org/apache/nifi/graph/GraphMutation.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.graph; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class GraphMutation { + + private final String query; + private final Map parameters; + + public GraphMutation(final String query, final Map parameters) { + this.query = query; + this.parameters = parameters != null ? new HashMap<>(parameters) : Collections.emptyMap(); + } + + public String getQuery() { + return query; + } + + public Map getParameters() { + return Collections.unmodifiableMap(parameters); + } +} diff --git a/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-client-service-api/src/main/java/org/apache/nifi/graph/GraphQueryGeneratorService.java b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-client-service-api/src/main/java/org/apache/nifi/graph/GraphQueryGeneratorService.java new file mode 100644 index 000000000000..582bdabf3813 --- /dev/null +++ b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-client-service-api/src/main/java/org/apache/nifi/graph/GraphQueryGeneratorService.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.graph; + +import org.apache.nifi.controller.ControllerService; + +import java.util.Map; + +public interface GraphQueryGeneratorService extends ControllerService { + + /** + * Generates a query statement for setting properties on matched graph elements in the language associated with the graph database. + * + * @param elementType The graph element type to enrich, such as NODE or EDGE + * @param identifiers The identifier property names and values used to match existing elements + * @param elementLabel The graph element label to match on, for example "Person", "Organization", or "KNOWS" + * @param propertyMap The property names and values to set on matched elements + * @return A {@link GraphMutation} containing the generated query and parameters + */ + GraphMutation generateSetPropertiesMutation(GraphElementType elementType, Map identifiers, String elementLabel, Map propertyMap); +} diff --git a/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/EnrichGraphRecord.java b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/EnrichGraphRecord.java new file mode 100644 index 000000000000..b6e7f6cfdebb --- /dev/null +++ b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/EnrichGraphRecord.java @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.graph; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.graph.GraphClientService; +import org.apache.nifi.graph.GraphElementType; +import org.apache.nifi.graph.GraphClientTransientException; +import org.apache.nifi.graph.GraphMutation; +import org.apache.nifi.graph.GraphQueryGeneratorService; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.RecordPathResult; +import org.apache.nifi.record.path.util.RecordPathCache; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.type.ArrayDataType; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +@Tags({"graph", "gremlin", "cypher", "enrich", "record"}) +@CapabilityDescription("This processor uses fields from FlowFile records to add property values to graph elements. Each record is associated " + + "with an individual graph element using the specified identifier field values. A single FlowFile containing successful graph responses is " + + "written to the response relationship. Failed records are written to a single FlowFile routed to the failure relationship.") +@WritesAttributes({ + @WritesAttribute(attribute = EnrichGraphRecord.GRAPH_OPERATIONS_TIME_SECONDS, description = "The amount of time in seconds that it took to execute all graph operations."), + @WritesAttribute(attribute = EnrichGraphRecord.RECORD_COUNT, description = "The number of records unsuccessfully processed.") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@DynamicProperty(name = "Field(s) containing values to be added to matched elements as properties. If no user-defined properties are added, all fields " + + "except identifier fields are added as element properties.", + value = "The property name to be set in the graph query", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "A dynamic property specifying a RecordPath Expression identifying field(s) whose values are added as properties") +public class EnrichGraphRecord extends AbstractGraphExecutor { + private static final AllowableValue NODE = new AllowableValue( + GraphElementType.NODE.name(), + "Node", + "Enrich nodes in the graph with properties from incoming records." + ); + + private static final AllowableValue EDGE = new AllowableValue( + GraphElementType.EDGE.name(), + "Edge", + "Enrich edges in the graph with properties from incoming records." + ); + + public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("Graph Client Service") + .description("The graph client service for connecting to a graph database.") + .identifiesControllerService(GraphClientService.class) + .addValidator(Validator.VALID) + .required(true) + .build(); + + public static final PropertyDescriptor QUERY_GENERATOR_SERVICE = new PropertyDescriptor.Builder() + .name("Graph Query Generator Service") + .description("The graph query generator service used to build mutation statements for the selected graph implementation.") + .identifiesControllerService(GraphQueryGeneratorService.class) + .addValidator(Validator.VALID) + .required(true) + .build(); + + public static final PropertyDescriptor READER_SERVICE = new PropertyDescriptor.Builder() + .name("Record Reader") + .description("The record reader to use with this processor to read incoming records.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor WRITER_SERVICE = new PropertyDescriptor.Builder() + .name("Failed Record Writer") + .description("The record writer to use for writing failed records.") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor ELEMENT_TYPE = new PropertyDescriptor.Builder() + .name("Element Type") + .description("The graph element type to enrich with properties from incoming records.") + .addValidator(Validator.VALID) + .allowableValues(NODE, EDGE) + .defaultValue(NODE.getValue()) + .required(true) + .build(); + + public static final PropertyDescriptor IDENTIFIER_FIELD = new PropertyDescriptor.Builder() + .name("Identifier Field(s)") + .description("A RecordPath Expression for field(s) in the record used to match identifiers when setting properties.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final PropertyDescriptor ELEMENT_LABEL = new PropertyDescriptor.Builder() + .name("Element Label") + .description("The graph element label used for matching in the graph query. Setting this can result in faster execution.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final Relationship ORIGINAL = new Relationship.Builder() + .name("original") + .description("Original FlowFiles that successfully interacted with graph server.") + .build(); + + public static final Relationship FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles that fail to interact with graph server.") + .build(); + + public static final Relationship RESPONSE = new Relationship.Builder() + .name("response") + .description("The response object from the graph server.") + .autoTerminateDefault(true) + .build(); + + private static final List PROPERTY_DESCRIPTORS = List.of( + CLIENT_SERVICE, + QUERY_GENERATOR_SERVICE, + READER_SERVICE, + WRITER_SERVICE, + ELEMENT_TYPE, + IDENTIFIER_FIELD, + ELEMENT_LABEL + ); + + private static final Set RELATIONSHIPS = Set.of( + ORIGINAL, + FAILURE, + RESPONSE + ); + + public static final String RECORD_COUNT = "record.count"; + public static final String GRAPH_OPERATIONS_TIME_SECONDS = "graph.operations.took"; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + private volatile GraphClientService clientService; + private volatile GraphQueryGeneratorService graphQueryGeneratorService; + private volatile RecordReaderFactory recordReaderFactory; + private volatile RecordSetWriterFactory recordSetWriterFactory; + + private volatile RecordPathCache recordPathCache; + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dynamic(true) + .build(); + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + public List getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + @Override + @OnScheduled + public void onScheduled(final ProcessContext context) { + clientService = context.getProperty(CLIENT_SERVICE).asControllerService(GraphClientService.class); + graphQueryGeneratorService = context.getProperty(QUERY_GENERATOR_SERVICE).asControllerService(GraphQueryGeneratorService.class); + recordReaderFactory = context.getProperty(READER_SERVICE).asControllerService(RecordReaderFactory.class); + recordSetWriterFactory = context.getProperty(WRITER_SERVICE).asControllerService(RecordSetWriterFactory.class); + + recordPathCache = new RecordPathCache(100); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile input = session.get(); + if (input == null) { + return; + } + + final String identifierRecordPathValue = context.getProperty(IDENTIFIER_FIELD).evaluateAttributeExpressions(input).getValue(); + final RecordPath identifierRecordPath = recordPathCache.getCompiled(identifierRecordPathValue); + final String elementLabel = context.getProperty(ELEMENT_LABEL).evaluateAttributeExpressions(input).getValue(); + final GraphElementType elementType = GraphElementType.valueOf(context.getProperty(ELEMENT_TYPE).getValue()); + + final Map dynamicPropertyRecordPaths = new HashMap<>(); + for (final Map.Entry configuredProperty : context.getProperties().entrySet()) { + final PropertyDescriptor propertyDescriptor = configuredProperty.getKey(); + if (propertyDescriptor.isDynamic()) { + final String recordPathValue = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(input).getValue(); + dynamicPropertyRecordPaths.put(propertyDescriptor.getName(), recordPathCache.getCompiled(recordPathValue)); + } + } + + Duration graphOperationsDuration = Duration.ZERO; + int successfulRecordCount = 0; + final AtomicBoolean wroteGraphResponse = new AtomicBoolean(false); + WriteResult failedWriteResult; + + FlowFile failedRecords = session.create(input); + FlowFile graphResponse = session.create(input); + try ( + InputStream inputStream = session.read(input); + RecordReader recordReader = recordReaderFactory.createRecordReader(input, inputStream, getLogger()); + + OutputStream failedOutputStream = session.write(failedRecords); + RecordSetWriter failedWriter = recordSetWriterFactory.createWriter(getLogger(), recordReader.getSchema(), failedOutputStream, input.getAttributes()); + + OutputStream graphOutputStream = session.write(graphResponse) + ) { + final long processingStartNanos = System.nanoTime(); + + failedWriter.beginRecordSet(); + + graphOutputStream.write("[".getBytes(StandardCharsets.UTF_8)); + + int recordIndex = 0; + Record record; + while ((record = recordReader.nextRecord()) != null) { + try { + final List identifierFieldValues = getFieldValues(record, identifierRecordPath); + if (identifierFieldValues.isEmpty()) { + throw new IOException("Identifier field(s) not found in record, check the RecordPath expression"); + } + + final LinkedHashMap identifierFieldNameToValue = new LinkedHashMap<>(identifierFieldValues.size()); + final Set identifierFieldNames = new HashSet<>(); + for (final FieldValue identifierFieldValue : identifierFieldValues) { + final Object identifierValue = identifierFieldValue.getValue(); + if (identifierValue == null) { + throw new IOException(String.format("Identifier field '%s' is null for record at index %d", identifierRecordPathValue, recordIndex)); + } + + final String identifierFieldName = identifierFieldValue.getField().getFieldName(); + if (identifierFieldNameToValue.containsKey(identifierFieldName)) { + throw new IOException(String.format("Duplicate identifier field '%s' found for record at index %d", identifierFieldName, recordIndex)); + } + identifierFieldNames.add(identifierFieldName); + identifierFieldNameToValue.put(identifierFieldName, identifierValue); + } + + final Map propertiesToUpdate = getPropertiesToUpdate(record, identifierFieldNames, dynamicPropertyRecordPaths); + final GraphMutation graphMutation = graphQueryGeneratorService.generateSetPropertiesMutation(elementType, identifierFieldNameToValue, elementLabel, propertiesToUpdate); + + final long queryStartNanos = System.nanoTime(); + try { + clientService.executeQuery(graphMutation.getQuery(), graphMutation.getParameters(), (resultMap, hasMore) -> { + try { + if (wroteGraphResponse.get()) { + graphOutputStream.write(",".getBytes(StandardCharsets.UTF_8)); + } + + graphOutputStream.write(objectMapper.writeValueAsBytes(resultMap)); + wroteGraphResponse.set(true); + } catch (final IOException ioException) { + throw new ProcessException("Failed to write graph response", ioException); + } + }); + } finally { + graphOperationsDuration = graphOperationsDuration.plusNanos(System.nanoTime() - queryStartNanos); + } + + successfulRecordCount++; + } catch (final GraphClientTransientException transientException) { + throw transientException; + } catch (final Exception e) { + getLogger().error("Failed to process record at index {}", recordIndex, e); + failedWriter.write(record); + } finally { + recordIndex++; + } + } + + graphOutputStream.write("]".getBytes(StandardCharsets.UTF_8)); + final Duration totalProcessingDuration = Duration.ofNanos(System.nanoTime() - processingStartNanos); + if (getLogger().isDebugEnabled()) { + getLogger().debug("Handled {} records in {} ms, including {} ms spent executing graph queries", + successfulRecordCount, + totalProcessingDuration.toMillis(), + graphOperationsDuration.toMillis()); + } + + failedWriteResult = failedWriter.finishRecordSet(); + failedWriter.flush(); + } catch (final GraphClientTransientException transientException) { + getLogger().error("Transient graph client failure, rolling back session for retry", transientException); + context.yield(); + throw transientException; + } catch (final Exception ex) { + getLogger().error("Error reading records, routing input FlowFile to failure", ex); + session.remove(failedRecords); + session.remove(graphResponse); + session.transfer(input, FAILURE); + context.yield(); + return; + } + + if (successfulRecordCount > 0) { + graphResponse = session.putAttribute(graphResponse, CoreAttributes.MIME_TYPE.key(), "application/json"); + session.transfer(graphResponse, RESPONSE); + } else { + session.remove(graphResponse); + } + + session.getProvenanceReporter().send(input, clientService.getTransitUrl(), graphOperationsDuration.toMillis()); + + if (failedWriteResult != null && failedWriteResult.getRecordCount() < 1) { + session.remove(failedRecords); + input = session.putAttribute(input, GRAPH_OPERATIONS_TIME_SECONDS, String.valueOf(graphOperationsDuration.toSeconds())); + session.transfer(input, ORIGINAL); + } else if (failedWriteResult != null) { + failedRecords = session.putAttribute(failedRecords, RECORD_COUNT, String.valueOf(failedWriteResult.getRecordCount())); + session.transfer(failedRecords, FAILURE); + session.remove(input); + } + } + + private List getFieldValues(final Record record, final RecordPath recordPath) { + final RecordPathResult result = recordPath.evaluate(record); + final List values = result.getSelectedFields().toList(); + return values; + } + + private Map getPropertiesToUpdate(final Record record, final Set identifierFieldNames, + final Map dynamicPropertyRecordPaths) throws IOException { + final Map propertiesToUpdate = new HashMap<>(); + if (dynamicPropertyRecordPaths.isEmpty()) { + final List fieldNames = record.getSchema().getFieldNames(); + for (final String fieldName : fieldNames) { + if (identifierFieldNames.contains(fieldName)) { + continue; + } + + final List fieldValues = getFieldValues(record, recordPathCache.getCompiled("/" + fieldName)); + if (fieldValues.isEmpty()) { + continue; + } + + final FieldValue selectedValue = fieldValues.getFirst(); + final Object rawValue = selectedValue.getValue(); + if (rawValue == null) { + continue; + } + + propertiesToUpdate.put(fieldName, normalizeValue(rawValue, selectedValue.getField().getDataType())); + } + } else { + for (final Map.Entry dynamicPropertyRecordPath : dynamicPropertyRecordPaths.entrySet()) { + final List fieldValues = getFieldValues(record, dynamicPropertyRecordPath.getValue()); + if (fieldValues.isEmpty() || fieldValues.getFirst().getValue() == null) { + throw new IOException("Dynamic property field(s) not found in record, check the RecordPath expression"); + } + + final FieldValue selectedValue = fieldValues.getFirst(); + propertiesToUpdate.put(dynamicPropertyRecordPath.getKey(), normalizeValue(selectedValue.getValue(), selectedValue.getField().getDataType())); + } + } + + return propertiesToUpdate; + } + + private Object normalizeValue(final Object rawValue, final DataType rawDataType) { + final RecordFieldType rawFieldType = rawDataType.getFieldType(); + if (RecordFieldType.ARRAY.equals(rawFieldType)) { + final DataType arrayElementDataType = ((ArrayDataType) rawDataType).getElementType(); + if (RecordFieldType.RECORD.getDataType().equals(arrayElementDataType)) { + final Object[] rawValueArray = (Object[]) rawValue; + final Object[] mappedValueArray = new Object[rawValueArray.length]; + for (int index = 0; index < rawValueArray.length; index++) { + final MapRecord mapRecord = (MapRecord) rawValueArray[index]; + mappedValueArray[index] = mapRecord.toMap(true); + } + return mappedValueArray; + } + + return rawValue; + } + + if (RecordFieldType.RECORD.equals(rawFieldType)) { + final MapRecord mapRecord = (MapRecord) rawValue; + return mapRecord.toMap(true); + } + + return rawValue; + } +} diff --git a/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 2ab7e95b40d1..bbc89eff6882 100644 --- a/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.processors.graph.ExecuteGraphQuery -org.apache.nifi.processors.graph.ExecuteGraphQueryRecord \ No newline at end of file +org.apache.nifi.processors.graph.ExecuteGraphQueryRecord +org.apache.nifi.processors.graph.EnrichGraphRecord \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/MockEnrichGraphClientService.java b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/MockEnrichGraphClientService.java new file mode 100644 index 000000000000..c2d8a3c028f3 --- /dev/null +++ b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/MockEnrichGraphClientService.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.graph; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.graph.GraphClientService; +import org.apache.nifi.graph.GraphClientTransientException; +import org.apache.nifi.graph.GraphQueryResultCallback; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.HashMap; +import java.util.Map; + +public class MockEnrichGraphClientService extends AbstractControllerService implements GraphClientService { + + @Override + public Map executeQuery(final String query, final Map parameters, final GraphQueryResultCallback handler) { + if ("FAIL".equals(query)) { + throw new ProcessException("Generated query failure"); + } + if ("TRANSIENT_FAIL".equals(query)) { + throw new GraphClientTransientException("Generated transient connectivity failure", new RuntimeException("database unavailable")); + } + + final Map response = new HashMap<>(); + response.put("query", query); + response.put("properties", parameters.get("properties")); + response.put("elementType", parameters.get("elementType")); + response.put("elementLabel", parameters.get("elementLabel")); + + handler.process(response, false); + + final Map resultAttributes = new HashMap<>(); + resultAttributes.put(NODES_CREATED, String.valueOf(1)); + resultAttributes.put(RELATIONS_CREATED, String.valueOf(1)); + resultAttributes.put(LABELS_ADDED, String.valueOf(1)); + resultAttributes.put(NODES_DELETED, String.valueOf(0)); + resultAttributes.put(RELATIONS_DELETED, String.valueOf(0)); + resultAttributes.put(PROPERTIES_SET, String.valueOf(1)); + resultAttributes.put(ROWS_RETURNED, String.valueOf(1)); + return resultAttributes; + } + + @Override + public String getTransitUrl() { + return "mock://localhost:12345/fake-database"; + } +} diff --git a/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/MockGraphQueryGeneratorService.java b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/MockGraphQueryGeneratorService.java new file mode 100644 index 000000000000..b6410417ca71 --- /dev/null +++ b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/MockGraphQueryGeneratorService.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.graph; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.graph.GraphElementType; +import org.apache.nifi.graph.GraphMutation; +import org.apache.nifi.graph.GraphQueryGeneratorService; + +import java.util.HashMap; +import java.util.Map; + +public class MockGraphQueryGeneratorService extends AbstractControllerService implements GraphQueryGeneratorService { + + @Override + public GraphMutation generateSetPropertiesMutation(final GraphElementType elementType, final Map identifiers, final String elementLabel, + final Map propertyMap) { + final String query; + if (propertyMap != null && Boolean.TRUE.equals(propertyMap.get("forceTransientFailure"))) { + query = "TRANSIENT_FAIL"; + } else if (propertyMap != null && Boolean.TRUE.equals(propertyMap.get("forceFailure"))) { + query = "FAIL"; + } else { + query = elementType.name() + ":" + (elementLabel == null ? "" : elementLabel); + } + + final Map parameters = new HashMap<>(); + parameters.put("properties", propertyMap == null ? Map.of() : propertyMap); + parameters.put("elementType", elementType.name()); + parameters.put("elementLabel", elementLabel); + parameters.put("identifierCount", identifiers == null ? 0 : identifiers.size()); + + return new GraphMutation(query, parameters); + } +} diff --git a/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/TestEnrichGraphRecord.java b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/TestEnrichGraphRecord.java new file mode 100644 index 000000000000..8d2b5b2eacc0 --- /dev/null +++ b/nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/TestEnrichGraphRecord.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.graph; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.graph.GraphElementType; +import org.apache.nifi.json.JsonTreeReader; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestEnrichGraphRecord { + private final ObjectMapper objectMapper = new ObjectMapper(); + private TestRunner runner; + + @BeforeEach + void setup() throws Exception { + runner = TestRunners.newTestRunner(EnrichGraphRecord.class); + + final MockEnrichGraphClientService graphClientService = new MockEnrichGraphClientService(); + final MockGraphQueryGeneratorService queryGeneratorService = new MockGraphQueryGeneratorService(); + final JsonTreeReader reader = new JsonTreeReader(); + final MockRecordWriter writer = new MockRecordWriter(); + + runner.addControllerService("graphClient", graphClientService); + runner.addControllerService("queryGenerator", queryGeneratorService); + runner.addControllerService("reader", reader); + runner.addControllerService("writer", writer); + + runner.enableControllerService(graphClientService); + runner.enableControllerService(queryGeneratorService); + runner.enableControllerService(reader); + runner.enableControllerService(writer); + + runner.setProperty(EnrichGraphRecord.CLIENT_SERVICE, "graphClient"); + runner.setProperty(EnrichGraphRecord.QUERY_GENERATOR_SERVICE, "queryGenerator"); + runner.setProperty(EnrichGraphRecord.READER_SERVICE, "reader"); + runner.setProperty(EnrichGraphRecord.WRITER_SERVICE, "writer"); + runner.setProperty(EnrichGraphRecord.IDENTIFIER_FIELD, "/id"); + runner.setProperty(EnrichGraphRecord.ELEMENT_LABEL, "Person"); + } + + @Test + void testSuccessfulNodeProcessingProducesSingleResponseFlowFile() throws Exception { + final String inputContent = "[{\"id\":\"1\",\"price\":100},{\"id\":\"2\",\"age\":\"10\"}]"; + runner.enqueue(inputContent.getBytes()); + + runner.run(); + + runner.assertTransferCount(EnrichGraphRecord.ORIGINAL, 1); + runner.assertTransferCount(EnrichGraphRecord.FAILURE, 0); + runner.assertTransferCount(EnrichGraphRecord.RESPONSE, 1); + + final MockFlowFile responseFlowFile = runner.getFlowFilesForRelationship(EnrichGraphRecord.RESPONSE).getFirst(); + final List> responses = objectMapper.readValue(runner.getContentAsByteArray(responseFlowFile), new TypeReference<>() { + }); + + assertEquals(2, responses.size()); + + final Map firstProperties = (Map) responses.getFirst().get("properties"); + final Map secondProperties = (Map) responses.get(1).get("properties"); + assertEquals(100, firstProperties.get("price")); + assertEquals("10", secondProperties.get("age")); + assertFalse(firstProperties.containsKey("id")); + assertFalse(secondProperties.containsKey("id")); + } + + @Test + void testPartialFailureWritesFailedRecords() throws Exception { + final String inputContent = "[{\"id\":\"1\",\"price\":100},{\"id\":\"2\",\"forceFailure\":true}]"; + runner.enqueue(inputContent.getBytes()); + + runner.run(); + + runner.assertTransferCount(EnrichGraphRecord.RESPONSE, 1); + runner.assertTransferCount(EnrichGraphRecord.FAILURE, 1); + runner.assertTransferCount(EnrichGraphRecord.ORIGINAL, 0); + + final MockFlowFile failedFlowFile = runner.getFlowFilesForRelationship(EnrichGraphRecord.FAILURE).getFirst(); + assertEquals("1", failedFlowFile.getAttribute(EnrichGraphRecord.RECORD_COUNT)); + } + + @Test + void testDynamicPropertiesAreUsedInsteadOfDefaultFields() throws Exception { + runner.setProperty("cost", "/price"); + final String inputContent = "[{\"id\":\"1\",\"price\":100,\"name\":\"Widget\"}]"; + runner.enqueue(inputContent.getBytes()); + + runner.run(); + + runner.assertTransferCount(EnrichGraphRecord.ORIGINAL, 1); + runner.assertTransferCount(EnrichGraphRecord.FAILURE, 0); + runner.assertTransferCount(EnrichGraphRecord.RESPONSE, 1); + + final MockFlowFile responseFlowFile = runner.getFlowFilesForRelationship(EnrichGraphRecord.RESPONSE).getFirst(); + final List> responses = objectMapper.readValue(runner.getContentAsByteArray(responseFlowFile), new TypeReference<>() { + }); + final Map properties = (Map) responses.getFirst().get("properties"); + assertEquals(100, properties.get("cost")); + assertFalse(properties.containsKey("name")); + } + + @Test + void testEdgeSelectionIsPassedToQueryGenerator() throws Exception { + runner.setProperty(EnrichGraphRecord.ELEMENT_TYPE, GraphElementType.EDGE.name()); + runner.setProperty(EnrichGraphRecord.ELEMENT_LABEL, "ASSOCIATED_WITH"); + final String inputContent = "[{\"id\":\"1\",\"weight\":7}]"; + runner.enqueue(inputContent.getBytes()); + + runner.run(); + + runner.assertTransferCount(EnrichGraphRecord.RESPONSE, 1); + final MockFlowFile responseFlowFile = runner.getFlowFilesForRelationship(EnrichGraphRecord.RESPONSE).getFirst(); + final List> responses = objectMapper.readValue(runner.getContentAsByteArray(responseFlowFile), new TypeReference<>() { + }); + assertEquals("EDGE", responses.getFirst().get("elementType")); + assertEquals("EDGE:ASSOCIATED_WITH", responses.getFirst().get("query")); + assertEquals("ASSOCIATED_WITH", responses.getFirst().get("elementLabel")); + assertTrue(((Map) responses.getFirst().get("properties")).containsKey("weight")); + } + + @Test + void testTransientGraphFailureRollsBackForRetry() { + final String inputContent = "[{\"id\":\"1\",\"forceTransientFailure\":true}]"; + runner.enqueue(inputContent.getBytes()); + + assertThrows(AssertionError.class, () -> runner.run(1, true, true)); + runner.assertTransferCount(EnrichGraphRecord.RESPONSE, 0); + runner.assertTransferCount(EnrichGraphRecord.FAILURE, 0); + runner.assertTransferCount(EnrichGraphRecord.ORIGINAL, 0); + runner.assertQueueNotEmpty(); + assertTrue(runner.isYieldCalled()); + } +} diff --git a/nifi-extension-bundles/nifi-graph-bundle/nifi-neo4j-cypher-service/src/main/java/org/apache/nifi/graph/Neo4JCypherClientService.java b/nifi-extension-bundles/nifi-graph-bundle/nifi-neo4j-cypher-service/src/main/java/org/apache/nifi/graph/Neo4JCypherClientService.java index a3f0e6a30f15..f52762bc1afc 100644 --- a/nifi-extension-bundles/nifi-graph-bundle/nifi-neo4j-cypher-service/src/main/java/org/apache/nifi/graph/Neo4JCypherClientService.java +++ b/nifi-extension-bundles/nifi-graph-bundle/nifi-neo4j-cypher-service/src/main/java/org/apache/nifi/graph/Neo4JCypherClientService.java @@ -37,6 +37,9 @@ import org.neo4j.driver.Record; import org.neo4j.driver.Result; import org.neo4j.driver.Session; +import org.neo4j.driver.exceptions.ServiceUnavailableException; +import org.neo4j.driver.exceptions.SessionExpiredException; +import org.neo4j.driver.exceptions.TransientException; import org.neo4j.driver.internal.InternalNode; import org.neo4j.driver.summary.ResultSummary; import org.neo4j.driver.summary.SummaryCounters; @@ -288,10 +291,25 @@ public Map executeQuery(String query, Map parame return resultAttributes; } catch (Exception ex) { + if (isTransientConnectivityFailure(ex)) { + throw new GraphClientTransientException("Transient query execution failure", ex); + } throw new ProcessException("Query execution failed", ex); } } + private static boolean isTransientConnectivityFailure(final Throwable throwable) { + Throwable current = throwable; + while (current != null) { + if (current instanceof ServiceUnavailableException || current instanceof SessionExpiredException || current instanceof TransientException) { + return true; + } + current = current.getCause(); + } + + return false; + } + @Override public String getTransitUrl() { return connectionUrl; diff --git a/nifi-extension-bundles/nifi-graph-bundle/nifi-neo4j-cypher-service/src/main/java/org/apache/nifi/graph/Neo4JCypherQueryGeneratorService.java b/nifi-extension-bundles/nifi-graph-bundle/nifi-neo4j-cypher-service/src/main/java/org/apache/nifi/graph/Neo4JCypherQueryGeneratorService.java new file mode 100644 index 000000000000..358c7e75e874 --- /dev/null +++ b/nifi-extension-bundles/nifi-graph-bundle/nifi-neo4j-cypher-service/src/main/java/org/apache/nifi/graph/Neo4JCypherQueryGeneratorService.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.graph; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.StringUtils; + +import java.util.HashMap; +import java.util.Map; + +@Tags({"graph", "neo4j", "cypher", "query", "generator"}) +@CapabilityDescription("Generates parameterized Cypher upsert statements for graph elements from identifier and property values.") +public class Neo4JCypherQueryGeneratorService extends AbstractControllerService implements GraphQueryGeneratorService { + private static final String PROPERTIES_PARAMETER = "properties"; + private static final String NODE_ALIAS = "n"; + private static final String EDGE_ALIAS = "e"; + + @Override + public GraphMutation generateSetPropertiesMutation(final GraphElementType elementType, final Map identifiers, final String elementLabel, + final Map propertyMap) { + if (elementType == null) { + throw new ProcessException("Graph element type must be specified"); + } + + final Map mutationParameters = new HashMap<>(); + final StringBuilder queryBuilder = new StringBuilder(); + + if (GraphElementType.NODE == elementType) { + queryBuilder.append("MERGE (").append(NODE_ALIAS); + if (!StringUtils.isBlank(elementLabel)) { + queryBuilder.append(":").append(validateCypherToken(elementLabel, "Element Label")); + } + + appendIdentifierClause(queryBuilder, identifiers, mutationParameters); + queryBuilder.append(")\n"); + queryBuilder.append("ON MATCH SET ").append(NODE_ALIAS).append(" += $").append(PROPERTIES_PARAMETER).append("\n"); + queryBuilder.append("ON CREATE SET ").append(NODE_ALIAS).append(" += $").append(PROPERTIES_PARAMETER); + } else if (GraphElementType.EDGE.equals(elementType)) { + if (StringUtils.isBlank(elementLabel)) { + throw new ProcessException("Element Label must be set when enriching edges"); + } + + queryBuilder.append("MATCH ()-[").append(EDGE_ALIAS).append(":").append(validateCypherToken(elementLabel, "Element Label")); + appendIdentifierClause(queryBuilder, identifiers, mutationParameters); + queryBuilder.append("]-()\n"); + queryBuilder.append("SET ").append(EDGE_ALIAS).append(" += $").append(PROPERTIES_PARAMETER); + } else { + throw new ProcessException("Unsupported graph element type: " + elementType); + } + + final Map properties = propertyMap == null ? Map.of() : propertyMap; + mutationParameters.put(PROPERTIES_PARAMETER, properties); + + return new GraphMutation(queryBuilder.toString(), mutationParameters); + } + + private void appendIdentifierClause(final StringBuilder queryBuilder, final Map identifiers, final Map mutationParameters) { + if (identifiers == null || identifiers.isEmpty()) { + return; + } + + queryBuilder.append(" {"); + int index = 0; + for (final Map.Entry identifierEntry : identifiers.entrySet()) { + if (identifierEntry.getKey() == null || identifierEntry.getValue() == null) { + throw new ProcessException("Identifiers and values must not be null"); + } + + if (index > 0) { + queryBuilder.append(", "); + } + + final String identifierName = validateCypherToken(identifierEntry.getKey(), "Identifier name"); + final String parameterName = "identifier_" + index; + queryBuilder.append(identifierName).append(": $").append(parameterName); + mutationParameters.put(parameterName, identifierEntry.getValue()); + index++; + } + queryBuilder.append("}"); + } + + private String validateCypherToken(final String token, final String tokenType) { + if (StringUtils.isBlank(token)) { + throw new ProcessException(tokenType + " must not be blank"); + } + + for (int index = 0; index < token.length(); index++) { + final char character = token.charAt(index); + if (!Character.isLetterOrDigit(character) && character != '_') { + throw new ProcessException(tokenType + " '" + token + "' contains invalid character '" + character + "'. Only [A-Za-z0-9_] are supported."); + } + } + + if (Character.isDigit(token.charAt(0))) { + throw new ProcessException(tokenType + " '" + token + "' must not start with a digit."); + } + + return token; + } +} diff --git a/nifi-extension-bundles/nifi-graph-bundle/nifi-neo4j-cypher-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-extension-bundles/nifi-graph-bundle/nifi-neo4j-cypher-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index d5132cf4e751..8d4827716eaf 100644 --- a/nifi-extension-bundles/nifi-graph-bundle/nifi-neo4j-cypher-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-extension-bundles/nifi-graph-bundle/nifi-neo4j-cypher-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.graph.Neo4JCypherClientService \ No newline at end of file +org.apache.nifi.graph.Neo4JCypherClientService +org.apache.nifi.graph.Neo4JCypherQueryGeneratorService \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-graph-bundle/nifi-neo4j-cypher-service/src/test/java/org/apache/nifi/graph/TestNeo4JCypherQueryGeneratorService.java b/nifi-extension-bundles/nifi-graph-bundle/nifi-neo4j-cypher-service/src/test/java/org/apache/nifi/graph/TestNeo4JCypherQueryGeneratorService.java new file mode 100644 index 000000000000..a9fabb727977 --- /dev/null +++ b/nifi-extension-bundles/nifi-graph-bundle/nifi-neo4j-cypher-service/src/test/java/org/apache/nifi/graph/TestNeo4JCypherQueryGeneratorService.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.graph; + +import org.apache.nifi.processor.exception.ProcessException; +import org.junit.jupiter.api.Test; + +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class TestNeo4JCypherQueryGeneratorService { + + @Test + void testGenerateSetPropertiesMutationForNode() { + final Neo4JCypherQueryGeneratorService service = new Neo4JCypherQueryGeneratorService(); + final LinkedHashMap identifiers = new LinkedHashMap<>(); + identifiers.put("id", 123L); + final Map properties = Map.of("price", 100, "age", "10"); + + final GraphMutation mutation = service.generateSetPropertiesMutation(GraphElementType.NODE, identifiers, "Person", properties); + + final String expectedQuery = "MERGE (n:Person {id: $identifier_0})\n" + + "ON MATCH SET n += $properties\n" + + "ON CREATE SET n += $properties"; + + assertEquals(expectedQuery, mutation.getQuery()); + assertEquals(123L, mutation.getParameters().get("identifier_0")); + assertEquals(properties, mutation.getParameters().get("properties")); + } + + @Test + void testGenerateSetPropertiesMutationForEdgeUsesMatchQuery() { + final Neo4JCypherQueryGeneratorService service = new Neo4JCypherQueryGeneratorService(); + final LinkedHashMap identifiers = new LinkedHashMap<>(); + identifiers.put("edgeId", "123"); + final Map properties = Map.of("weight", 7); + + final GraphMutation mutation = service.generateSetPropertiesMutation(GraphElementType.EDGE, identifiers, "ASSOCIATED_WITH", properties); + + final String expectedQuery = "MATCH ()-[e:ASSOCIATED_WITH {edgeId: $identifier_0}]-()\n" + + "SET e += $properties"; + + assertEquals(expectedQuery, mutation.getQuery()); + assertEquals("123", mutation.getParameters().get("identifier_0")); + assertEquals(properties, mutation.getParameters().get("properties")); + } + + @Test + void testGenerateSetPropertiesMutationForEdgeRequiresLabel() { + final Neo4JCypherQueryGeneratorService service = new Neo4JCypherQueryGeneratorService(); + + assertThrows(ProcessException.class, () -> service.generateSetPropertiesMutation(GraphElementType.EDGE, new LinkedHashMap<>(), null, Map.of())); + } + + @Test + void testGenerateSetPropertiesMutationRejectsNullIdentifierValue() { + final Neo4JCypherQueryGeneratorService service = new Neo4JCypherQueryGeneratorService(); + final LinkedHashMap identifiers = new LinkedHashMap<>(); + identifiers.put("id", null); + + assertThrows(ProcessException.class, () -> service.generateSetPropertiesMutation(GraphElementType.NODE, identifiers, "Person", Map.of())); + } + + @Test + void testGenerateSetPropertiesMutationRejectsInvalidToken() { + final Neo4JCypherQueryGeneratorService service = new Neo4JCypherQueryGeneratorService(); + final LinkedHashMap identifiers = new LinkedHashMap<>(); + identifiers.put("edge-id", "123"); + + assertThrows(ProcessException.class, () -> service.generateSetPropertiesMutation(GraphElementType.EDGE, identifiers, "ASSOCIATED_WITH", Map.of())); + } +}