From 6ee918b3864a671f3d57b1c82d431d73833eb4df Mon Sep 17 00:00:00 2001 From: Rakesh Kumar Singh Date: Fri, 12 Jun 2026 17:22:31 +0530 Subject: [PATCH] NIFI-16002 Fix LookupRecord writing to wrong level when parent path is missing When the configured Result RecordPath contains intermediate nodes that do not yet exist in the record (e.g. /enrichment/value where /enrichment is absent), the RecordPath evaluator returns a missing-child FieldValue with a null value. When updateValue is later called, getParentRecord() skips the null intermediate node and finds the root record instead, writing the result at the wrong nesting level. The fix adds a helper method createMissingParentRecords that walks the FieldValue parent chain and creates an empty MapRecord for each missing intermediate node, setting it on its parent before the actual value is applied. After creating the intermediate Records, incorporateInactiveFields is called so the new fields appear in the active schema and the re-evaluated path resolves them correctly. This fix applies to both scalar lookup values and Record-typed lookup values. --- .../processors/standard/LookupRecord.java | 63 +++++++++++++- .../processors/standard/TestLookupRecord.java | 83 +++++++++++++++++++ 2 files changed, 144 insertions(+), 2 deletions(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java index 274cadf6e657..3b5667fb4f79 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java @@ -58,12 +58,15 @@ import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.util.Tuple; @@ -675,7 +678,16 @@ private void applyLookupResult(final Record record, final ProcessContext context // the Lookup Record to the destination Record. However, if the destination Record Path returns // something other than a Record, then we can't add the fields to it. We can only replace it, // because it doesn't make sense to add fields to anything but a Record. - resultPathResult.getSelectedFields().forEach(fieldVal -> { + + // First pass: create any missing intermediate parent Records in the path + resultPathResult.getSelectedFields().forEach(fieldVal -> createMissingParentRecords(fieldVal, record)); + + // Incorporate newly created Records into the schema before re-evaluation, + // otherwise ChildFieldPath treats them as missing (field not in active schema). + record.incorporateInactiveFields(); + + // Second pass: apply updates with fresh FieldValues that reflect created parents + resultPath.evaluate(record).getSelectedFields().forEach(fieldVal -> { final Object destinationValue = fieldVal.getValue(); if (destinationValue instanceof final Record destinationRecord) { @@ -703,13 +715,60 @@ private void applyLookupResult(final Record record, final ProcessContext context }); } else { final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType()); - resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue, inferredDataType)); + + // First pass: create any missing intermediate parent Records in the path + resultPathResult.getSelectedFields().forEach(fieldVal -> createMissingParentRecords(fieldVal, record)); + + // Incorporate newly created Records into the schema before re-evaluation, + // otherwise ChildFieldPath treats them as missing (field not in active schema). + record.incorporateInactiveFields(); + + // Second pass: update with fresh FieldValues that reflect created parents + resultPath.evaluate(record).getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue, inferredDataType)); } record.incorporateInactiveFields(); } } + /** + * Ensures that every intermediate Record in the FieldValue parent chain exists. + * When a result path such as /enrichment/value is configured and /enrichment does not + * yet exist in the record, this method creates an empty MapRecord for each missing + * intermediate node and sets it on its parent Record, so that the final value can be + * written to the correct location rather than falling back to the root record. + */ + private void createMissingParentRecords(final FieldValue fieldValue, final Record rootRecord) { + // Build a top-down chain: [firstPathSegment, ..., immediateParent, fieldValue] + final List chain = new ArrayList<>(); + FieldValue current = fieldValue; + while (current.getParent().isPresent()) { + chain.add(0, current); + current = current.getParent().get(); + } + + // Process all nodes except the last one (the leaf field itself). + // Navigate from root down, creating empty Records wherever an intermediate is missing. + Record currentRecord = rootRecord; + for (int i = 0; i < chain.size() - 1; i++) { + final FieldValue fv = chain.get(i); + final String fieldName = fv.getField().getFieldName(); + final Object existingValue = currentRecord.getValue(fieldName); + + if (existingValue instanceof Record childRecord) { + currentRecord = childRecord; + } else { + final RecordField field = fv.getField(); + final RecordSchema schema = field.getDataType() instanceof RecordDataType recordDataType + ? recordDataType.getChildSchema() + : new SimpleRecordSchema(Collections.emptyList()); + final MapRecord newRecord = new MapRecord(schema, new HashMap<>()); + currentRecord.setValue(field, newRecord); + currentRecord = newRecord; + } + } + } + @Override public RecordSchema determineResultSchema(final RecordReaderFactory readerFactory, final RecordPath rootRecordPath, final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final LookupContext lookupContext) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java index c68f97dae19b..7226804301f8 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java @@ -507,6 +507,89 @@ public void testAllMatchButFirstRouteToSuccess() { matched.assertContentEquals("John Doe,48,\nJane Doe,47,Soccer\nJimmy Doe,14,Football\n"); } + @Test + public void testResultPathNestedMissingParentScalar() throws InitializationException { + // NIFI-16002: when the result path has a parent that does not exist in the record, + // the intermediate Record should be auto-created and the value written inside it, + // not at the root level. + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("jsonReader", jsonReader); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("jsonWriter", jsonWriter); + runner.enableControllerService(jsonWriter); + + runner.setProperty(LookupRecord.RECORD_READER, "jsonReader"); + runner.setProperty(LookupRecord.RECORD_WRITER, "jsonWriter"); + runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/enrichment/sport"); + runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED); + + lookupService.addValue("John Doe", "Soccer"); + + runner.enqueue(""" + [ { "name" : "John Doe", "age" : 48 } ] + """); + runner.run(); + + runner.assertTransferCount(LookupRecord.REL_MATCHED, 1); + runner.assertTransferCount(LookupRecord.REL_UNMATCHED, 0); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).getFirst(); + final String content = new String(out.toByteArray()); + + // The value must be inside the auto-created /enrichment Record, not at the root + assertTrue(content.contains("\"enrichment\""), "Expected enrichment object in output: " + content); + assertTrue(content.contains("\"sport\"") && content.contains("\"Soccer\""), "Expected sport inside enrichment: " + content); + // Verify sport is NOT at root level (would appear before enrichment) + assertTrue(content.indexOf("\"enrichment\"") < content.indexOf("\"sport\""), "sport should be nested inside enrichment: " + content); + } + + @Test + public void testResultPathNestedMissingParentRecord() throws InitializationException { + // NIFI-16002: same bug for Record-typed lookup values — intermediate parent must be + // auto-created so the looked-up Record is placed at /enrichment/details, not at /details. + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("jsonReader", jsonReader); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("jsonWriter", jsonWriter); + runner.enableControllerService(jsonWriter); + + final RecordLookup recordLookupService = new RecordLookup(); + final RecordSchema lookupSchema = new SimpleRecordSchema(List.of( + new RecordField("sport", RecordFieldType.STRING.getDataType()), + new RecordField("level", RecordFieldType.STRING.getDataType()))); + recordLookupService.addValue("John Doe", new MapRecord(lookupSchema, Map.of("sport", "Soccer", "level", "Pro"))); + runner.addControllerService("recordLookup", recordLookupService); + runner.enableControllerService(recordLookupService); + + runner.setProperty(LookupRecord.RECORD_READER, "jsonReader"); + runner.setProperty(LookupRecord.RECORD_WRITER, "jsonWriter"); + runner.setProperty(LookupRecord.LOOKUP_SERVICE, "recordLookup"); + runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/enrichment/details"); + runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS); + runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED); + runner.setProperty("lookup", "/name"); + + runner.enqueue(""" + [ { "name" : "John Doe" } ] + """); + runner.run(); + + runner.assertTransferCount(LookupRecord.REL_MATCHED, 1); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).getFirst(); + final String content = new String(out.toByteArray()); + + assertTrue(content.contains("\"enrichment\""), "Expected enrichment object in output: " + content); + assertTrue(content.contains("\"details\""), "Expected details inside enrichment: " + content); + assertTrue(content.contains("\"sport\"") && content.contains("\"Soccer\""), "Expected sport field in output: " + content); + // Verify details is NOT at root level (would appear before enrichment) + assertTrue(content.indexOf("\"enrichment\"") < content.indexOf("\"details\""), "details should be nested inside enrichment: " + content); + } + @Test public void testResultPathNotFound() { runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/other");