Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,15 @@
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.Tuple;

Expand Down Expand Up @@ -675,7 +678,16 @@ private void applyLookupResult(final Record record, final ProcessContext context
// the Lookup Record to the destination Record. However, if the destination Record Path returns
// something other than a Record, then we can't add the fields to it. We can only replace it,
// because it doesn't make sense to add fields to anything but a Record.
resultPathResult.getSelectedFields().forEach(fieldVal -> {

// First pass: create any missing intermediate parent Records in the path
resultPathResult.getSelectedFields().forEach(fieldVal -> createMissingParentRecords(fieldVal, record));

// Incorporate newly created Records into the schema before re-evaluation,
// otherwise ChildFieldPath treats them as missing (field not in active schema).
record.incorporateInactiveFields();

// Second pass: apply updates with fresh FieldValues that reflect created parents
resultPath.evaluate(record).getSelectedFields().forEach(fieldVal -> {
final Object destinationValue = fieldVal.getValue();

if (destinationValue instanceof final Record destinationRecord) {
Expand Down Expand Up @@ -703,13 +715,60 @@ private void applyLookupResult(final Record record, final ProcessContext context
});
} else {
final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType());
resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue, inferredDataType));

// First pass: create any missing intermediate parent Records in the path
resultPathResult.getSelectedFields().forEach(fieldVal -> createMissingParentRecords(fieldVal, record));

// Incorporate newly created Records into the schema before re-evaluation,
// otherwise ChildFieldPath treats them as missing (field not in active schema).
record.incorporateInactiveFields();

// Second pass: update with fresh FieldValues that reflect created parents
resultPath.evaluate(record).getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue, inferredDataType));
}

record.incorporateInactiveFields();
}
}

/**
* Ensures that every intermediate Record in the FieldValue parent chain exists.
* When a result path such as /enrichment/value is configured and /enrichment does not
* yet exist in the record, this method creates an empty MapRecord for each missing
* intermediate node and sets it on its parent Record, so that the final value can be
* written to the correct location rather than falling back to the root record.
*/
private void createMissingParentRecords(final FieldValue fieldValue, final Record rootRecord) {
// Build a top-down chain: [firstPathSegment, ..., immediateParent, fieldValue]
final List<FieldValue> chain = new ArrayList<>();
FieldValue current = fieldValue;
while (current.getParent().isPresent()) {
chain.add(0, current);
current = current.getParent().get();
}

// Process all nodes except the last one (the leaf field itself).
// Navigate from root down, creating empty Records wherever an intermediate is missing.
Record currentRecord = rootRecord;
for (int i = 0; i < chain.size() - 1; i++) {
final FieldValue fv = chain.get(i);
final String fieldName = fv.getField().getFieldName();
final Object existingValue = currentRecord.getValue(fieldName);

if (existingValue instanceof Record childRecord) {
currentRecord = childRecord;
} else {
final RecordField field = fv.getField();
final RecordSchema schema = field.getDataType() instanceof RecordDataType recordDataType
? recordDataType.getChildSchema()
: new SimpleRecordSchema(Collections.emptyList());
final MapRecord newRecord = new MapRecord(schema, new HashMap<>());
currentRecord.setValue(field, newRecord);
currentRecord = newRecord;
}
}
}

@Override
public RecordSchema determineResultSchema(final RecordReaderFactory readerFactory, final RecordPath rootRecordPath, final ProcessContext context, final ProcessSession session,
final FlowFile flowFile, final LookupContext lookupContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,89 @@ public void testAllMatchButFirstRouteToSuccess() {
matched.assertContentEquals("John Doe,48,\nJane Doe,47,Soccer\nJimmy Doe,14,Football\n");
}

@Test
public void testResultPathNestedMissingParentScalar() throws InitializationException {
// NIFI-16002: when the result path has a parent that does not exist in the record,
// the intermediate Record should be auto-created and the value written inside it,
// not at the root level.
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("jsonReader", jsonReader);
runner.enableControllerService(jsonReader);

final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("jsonWriter", jsonWriter);
runner.enableControllerService(jsonWriter);

runner.setProperty(LookupRecord.RECORD_READER, "jsonReader");
runner.setProperty(LookupRecord.RECORD_WRITER, "jsonWriter");
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/enrichment/sport");
runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED);

lookupService.addValue("John Doe", "Soccer");

runner.enqueue("""
[ { "name" : "John Doe", "age" : 48 } ]
""");
runner.run();

runner.assertTransferCount(LookupRecord.REL_MATCHED, 1);
runner.assertTransferCount(LookupRecord.REL_UNMATCHED, 0);

final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).getFirst();
final String content = new String(out.toByteArray());

// The value must be inside the auto-created /enrichment Record, not at the root
assertTrue(content.contains("\"enrichment\""), "Expected enrichment object in output: " + content);
assertTrue(content.contains("\"sport\"") && content.contains("\"Soccer\""), "Expected sport inside enrichment: " + content);
// Verify sport is NOT at root level (would appear before enrichment)
assertTrue(content.indexOf("\"enrichment\"") < content.indexOf("\"sport\""), "sport should be nested inside enrichment: " + content);
}

@Test
public void testResultPathNestedMissingParentRecord() throws InitializationException {
// NIFI-16002: same bug for Record-typed lookup values — intermediate parent must be
// auto-created so the looked-up Record is placed at /enrichment/details, not at /details.
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("jsonReader", jsonReader);
runner.enableControllerService(jsonReader);

final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("jsonWriter", jsonWriter);
runner.enableControllerService(jsonWriter);

final RecordLookup recordLookupService = new RecordLookup();
final RecordSchema lookupSchema = new SimpleRecordSchema(List.of(
new RecordField("sport", RecordFieldType.STRING.getDataType()),
new RecordField("level", RecordFieldType.STRING.getDataType())));
recordLookupService.addValue("John Doe", new MapRecord(lookupSchema, Map.of("sport", "Soccer", "level", "Pro")));
runner.addControllerService("recordLookup", recordLookupService);
runner.enableControllerService(recordLookupService);

runner.setProperty(LookupRecord.RECORD_READER, "jsonReader");
runner.setProperty(LookupRecord.RECORD_WRITER, "jsonWriter");
runner.setProperty(LookupRecord.LOOKUP_SERVICE, "recordLookup");
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/enrichment/details");
runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED);
runner.setProperty("lookup", "/name");

runner.enqueue("""
[ { "name" : "John Doe" } ]
""");
runner.run();

runner.assertTransferCount(LookupRecord.REL_MATCHED, 1);

final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).getFirst();
final String content = new String(out.toByteArray());

assertTrue(content.contains("\"enrichment\""), "Expected enrichment object in output: " + content);
assertTrue(content.contains("\"details\""), "Expected details inside enrichment: " + content);
assertTrue(content.contains("\"sport\"") && content.contains("\"Soccer\""), "Expected sport field in output: " + content);
// Verify details is NOT at root level (would appear before enrichment)
assertTrue(content.indexOf("\"enrichment\"") < content.indexOf("\"details\""), "details should be nested inside enrichment: " + content);
}

@Test
public void testResultPathNotFound() {
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/other");
Expand Down
Loading