diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java index 274cadf6e657..3b5667fb4f79 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java @@ -58,12 +58,15 @@ import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.util.Tuple; @@ -675,7 +678,16 @@ private void applyLookupResult(final Record record, final ProcessContext context // the Lookup Record to the destination Record. However, if the destination Record Path returns // something other than a Record, then we can't add the fields to it. We can only replace it, // because it doesn't make sense to add fields to anything but a Record. - resultPathResult.getSelectedFields().forEach(fieldVal -> { + + // First pass: create any missing intermediate parent Records in the path + resultPathResult.getSelectedFields().forEach(fieldVal -> createMissingParentRecords(fieldVal, record)); + + // Incorporate newly created Records into the schema before re-evaluation, + // otherwise ChildFieldPath treats them as missing (field not in active schema). + record.incorporateInactiveFields(); + + // Second pass: apply updates with fresh FieldValues that reflect created parents + resultPath.evaluate(record).getSelectedFields().forEach(fieldVal -> { final Object destinationValue = fieldVal.getValue(); if (destinationValue instanceof final Record destinationRecord) { @@ -703,13 +715,60 @@ private void applyLookupResult(final Record record, final ProcessContext context }); } else { final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType()); - resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue, inferredDataType)); + + // First pass: create any missing intermediate parent Records in the path + resultPathResult.getSelectedFields().forEach(fieldVal -> createMissingParentRecords(fieldVal, record)); + + // Incorporate newly created Records into the schema before re-evaluation, + // otherwise ChildFieldPath treats them as missing (field not in active schema). + record.incorporateInactiveFields(); + + // Second pass: update with fresh FieldValues that reflect created parents + resultPath.evaluate(record).getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue, inferredDataType)); } record.incorporateInactiveFields(); } } + /** + * Ensures that every intermediate Record in the FieldValue parent chain exists. + * When a result path such as /enrichment/value is configured and /enrichment does not + * yet exist in the record, this method creates an empty MapRecord for each missing + * intermediate node and sets it on its parent Record, so that the final value can be + * written to the correct location rather than falling back to the root record. + */ + private void createMissingParentRecords(final FieldValue fieldValue, final Record rootRecord) { + // Build a top-down chain: [firstPathSegment, ..., immediateParent, fieldValue] + final List chain = new ArrayList<>(); + FieldValue current = fieldValue; + while (current.getParent().isPresent()) { + chain.add(0, current); + current = current.getParent().get(); + } + + // Process all nodes except the last one (the leaf field itself). + // Navigate from root down, creating empty Records wherever an intermediate is missing. + Record currentRecord = rootRecord; + for (int i = 0; i < chain.size() - 1; i++) { + final FieldValue fv = chain.get(i); + final String fieldName = fv.getField().getFieldName(); + final Object existingValue = currentRecord.getValue(fieldName); + + if (existingValue instanceof Record childRecord) { + currentRecord = childRecord; + } else { + final RecordField field = fv.getField(); + final RecordSchema schema = field.getDataType() instanceof RecordDataType recordDataType + ? recordDataType.getChildSchema() + : new SimpleRecordSchema(Collections.emptyList()); + final MapRecord newRecord = new MapRecord(schema, new HashMap<>()); + currentRecord.setValue(field, newRecord); + currentRecord = newRecord; + } + } + } + @Override public RecordSchema determineResultSchema(final RecordReaderFactory readerFactory, final RecordPath rootRecordPath, final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final LookupContext lookupContext) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java index c68f97dae19b..7226804301f8 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java @@ -507,6 +507,89 @@ public void testAllMatchButFirstRouteToSuccess() { matched.assertContentEquals("John Doe,48,\nJane Doe,47,Soccer\nJimmy Doe,14,Football\n"); } + @Test + public void testResultPathNestedMissingParentScalar() throws InitializationException { + // NIFI-16002: when the result path has a parent that does not exist in the record, + // the intermediate Record should be auto-created and the value written inside it, + // not at the root level. + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("jsonReader", jsonReader); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("jsonWriter", jsonWriter); + runner.enableControllerService(jsonWriter); + + runner.setProperty(LookupRecord.RECORD_READER, "jsonReader"); + runner.setProperty(LookupRecord.RECORD_WRITER, "jsonWriter"); + runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/enrichment/sport"); + runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED); + + lookupService.addValue("John Doe", "Soccer"); + + runner.enqueue(""" + [ { "name" : "John Doe", "age" : 48 } ] + """); + runner.run(); + + runner.assertTransferCount(LookupRecord.REL_MATCHED, 1); + runner.assertTransferCount(LookupRecord.REL_UNMATCHED, 0); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).getFirst(); + final String content = new String(out.toByteArray()); + + // The value must be inside the auto-created /enrichment Record, not at the root + assertTrue(content.contains("\"enrichment\""), "Expected enrichment object in output: " + content); + assertTrue(content.contains("\"sport\"") && content.contains("\"Soccer\""), "Expected sport inside enrichment: " + content); + // Verify sport is NOT at root level (would appear before enrichment) + assertTrue(content.indexOf("\"enrichment\"") < content.indexOf("\"sport\""), "sport should be nested inside enrichment: " + content); + } + + @Test + public void testResultPathNestedMissingParentRecord() throws InitializationException { + // NIFI-16002: same bug for Record-typed lookup values — intermediate parent must be + // auto-created so the looked-up Record is placed at /enrichment/details, not at /details. + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("jsonReader", jsonReader); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("jsonWriter", jsonWriter); + runner.enableControllerService(jsonWriter); + + final RecordLookup recordLookupService = new RecordLookup(); + final RecordSchema lookupSchema = new SimpleRecordSchema(List.of( + new RecordField("sport", RecordFieldType.STRING.getDataType()), + new RecordField("level", RecordFieldType.STRING.getDataType()))); + recordLookupService.addValue("John Doe", new MapRecord(lookupSchema, Map.of("sport", "Soccer", "level", "Pro"))); + runner.addControllerService("recordLookup", recordLookupService); + runner.enableControllerService(recordLookupService); + + runner.setProperty(LookupRecord.RECORD_READER, "jsonReader"); + runner.setProperty(LookupRecord.RECORD_WRITER, "jsonWriter"); + runner.setProperty(LookupRecord.LOOKUP_SERVICE, "recordLookup"); + runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/enrichment/details"); + runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS); + runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED); + runner.setProperty("lookup", "/name"); + + runner.enqueue(""" + [ { "name" : "John Doe" } ] + """); + runner.run(); + + runner.assertTransferCount(LookupRecord.REL_MATCHED, 1); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).getFirst(); + final String content = new String(out.toByteArray()); + + assertTrue(content.contains("\"enrichment\""), "Expected enrichment object in output: " + content); + assertTrue(content.contains("\"details\""), "Expected details inside enrichment: " + content); + assertTrue(content.contains("\"sport\"") && content.contains("\"Soccer\""), "Expected sport field in output: " + content); + // Verify details is NOT at root level (would appear before enrichment) + assertTrue(content.indexOf("\"enrichment\"") < content.indexOf("\"details\""), "details should be nested inside enrichment: " + content); + } + @Test public void testResultPathNotFound() { runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/other");