From 67b4ea235e6f977ddafdfcdadee0a254b120b4ff Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 6 Feb 2026 17:04:23 -0500 Subject: [PATCH 1/2] NIFI-15565: Code cleanup; improvements to .cursorrules to provide better instructions regarding automated tests --- .cursorrules | 55 ++++ .../connector/util/VersionedFlowUtils.java | 174 ++++++++++- .../util/TestVersionedFlowUtils.java | 174 +++++++++-- .../StandaloneConnectionFacade.java | 23 ++ .../StandaloneControllerServiceFacade.java | 25 ++ .../StandaloneParameterContextFacade.java | 21 ++ .../StandaloneProcessGroupFacade.java | 23 ++ .../standalone/StandaloneProcessorFacade.java | 25 ++ .../AuthorizingConnectionFacade.java | 23 ++ .../AuthorizingControllerServiceFacade.java | 23 ++ .../AuthorizingParameterContextFacade.java | 23 ++ .../AuthorizingProcessGroupFacade.java | 23 ++ .../AuthorizingProcessorFacade.java | 23 ++ .../nifi-system-test-extensions/pom.xml | 5 + .../system/BundleResolutionConnector.java | 75 +---- .../tests/system/CalculateConnector.java | 38 +-- .../system/ComponentLifecycleConnector.java | 281 +++--------------- .../tests/system/DataQueuingConnector.java | 83 +----- .../system/GatedDataQueuingConnector.java | 101 +------ .../system/ParameterContextConnector.java | 239 +++------------ .../tests/system/SelectiveDropConnector.java | 102 ++----- 21 files changed, 750 insertions(+), 809 deletions(-) diff --git a/.cursorrules b/.cursorrules index 1990f9de801b..e083cb45c42d 100644 --- a/.cursorrules +++ b/.cursorrules @@ -72,3 +72,58 @@ In addition to the general rules defined in the Code Style section, follow these 7. Never use pointless assertions such as assertDoesNotThrow - this adds nothing but complexity. Just call the method, and if it throws an Exception, the test will fail. I.e., it is assumed by default that each line does not throw an Exception, so do not use + +Unit Test vs. Integration Test vs. System Test +---------------------------------------------- +Unit tests should be used to test individual classes and methods in isolation. This often will result in mocking dependency classes. +However, if there already exists a Mock implementation of an interface or dependency class, it is preferred to use the existing Mock implementation. +Similarly, for simple classes, it is preferable to make use of the real implementation of a class rather than creating a Mock implementation. +While some may argue that this is not a "true" unit test, we are infinitely more interested in having tests that are fast, reliable, +correct, and easy to maintain than we are in having tests that adhere to strict and arbitrary rules. + +In general, unit tests should be used to verify any sufficiently complex method in a class. We should *NOT* have unit tests for +trivial methods such as getters and setters, or methods that are only a few lines long and are not complex. A good general +rule of thumb is that if a person can understand a method and verify that it is correct in a few seconds, then it is not +necessary to have a unit test for that method. + +Avoid adding multiple tests that are redundant. For example, if you have a method that takes a single `String` argument +it may make sense to test with `null`, a 0-length strength, and a long String. But do not test with 4 different Strings +each with a different value but which test the same lines of code. Avoid over-testing by adding multiple tests whose differences +are conditions that do not truly affect the behavior of the code. + +Unit tests should always focus on the "contract" of the method. That is, given a certain input, the test should assert that it +receives the correct output. The test should NOT be focused on the inner details of how the method works. Focusing on the inner details +of how the method works is an anti-pattern because it results in tests that are more brittle, more difficult to maintain, +and tests that fail when the method is improved or refactored but still adheres to the same contract. + +When producing a Processor or Controller Service, unit tests should always be written, and they should always make use of the `nifi-mock` mocking framework. + +When working in the framework, unit tests are still important, but integration tests and system tests are often more important. +Integration tests are still allowed to use mocks but typically we prefer to use real implementations of classes in order to ensure a +more realistic and holistic test. + +System tests live in the `nifi-system-tests` module and should be used for any changes that make significant changes to +the framework and the interaction between a significant number of classes. They should also be used for any changes that +may be fairly isolated but which are in a critical path of the framework, especially those that affect how data is persisted, +processed, or accessed; or those that affected how components are created, configured, scheduled, or executed. +For example, any change to `ProcessScheduler`, `ProcessorNode`, `ControllerServiceNode`, `FlowController`, +`FlowManager`, changes to how Parameters are handled, flow synchronization, the repositories, etc. is a good candidate for a system test. + +Additionally, any unit test that ends up requiring a large number of mocks is a good candidate for an integration test, +and any integration test that ends up requiring a large number of mocks is a good candidate for a system test. + + +Ending Conditions +----------------- +When you have completed the task, ensure that you have done the following: +1. All code compiles and builds successfully using `mvn`. +2. All relevant unit tests pass successfully using `mvn`. +3. All code adheres to the Code Style rules defined above. +4. Checkstyle and PMD pass successfully using `mvn checkstyle:check pmd:check -T 1C` from the appropriate directory. +5. Unit tests have been added to verify the functionality of any sufficiently complex method. +6. A system test or an integration test has been added if the change makes significant changes to the framework + and the interaction between a significant number of classes. + +Do not consider the task complete until all of the above conditions have been met. +When you do consider the task complete, provide a summary of which tests were added or modified and what the +behavior that they verify is correct. diff --git a/nifi-commons/nifi-connector-utils/src/main/java/org/apache/nifi/components/connector/util/VersionedFlowUtils.java b/nifi-commons/nifi-connector-utils/src/main/java/org/apache/nifi/components/connector/util/VersionedFlowUtils.java index d4a06b8658ea..739280d16fa1 100644 --- a/nifi-commons/nifi-connector-utils/src/main/java/org/apache/nifi/components/connector/util/VersionedFlowUtils.java +++ b/nifi-commons/nifi-connector-utils/src/main/java/org/apache/nifi/components/connector/util/VersionedFlowUtils.java @@ -24,6 +24,7 @@ import org.apache.nifi.flow.ComponentType; import org.apache.nifi.flow.ConnectableComponent; import org.apache.nifi.flow.ConnectableComponentType; +import org.apache.nifi.flow.PortType; import org.apache.nifi.flow.Position; import org.apache.nifi.flow.ScheduledState; import org.apache.nifi.flow.VersionedConnection; @@ -109,7 +110,8 @@ public static ConnectableComponent createConnectableComponent(final VersionedPor return component; } - public static void addConnection(final VersionedProcessGroup group, final ConnectableComponent source, final ConnectableComponent destination, final Set relationships) { + public static VersionedConnection addConnection(final VersionedProcessGroup group, final ConnectableComponent source, final ConnectableComponent destination, + final Set relationships) { final VersionedConnection connection = new VersionedConnection(); connection.setSource(source); connection.setDestination(destination); @@ -134,6 +136,7 @@ public static void addConnection(final VersionedProcessGroup group, final Connec final String uuid = generateDeterministicUuid(group, ComponentType.CONNECTION); connection.setIdentifier(uuid); + return connection; } public static List findOutboundConnections(final VersionedProcessGroup group, final VersionedProcessor processor) { @@ -263,6 +266,64 @@ public static VersionedControllerService addControllerService(final VersionedPro return controllerService; } + public static VersionedProcessGroup createProcessGroup(final String identifier, final String name) { + final VersionedProcessGroup group = new VersionedProcessGroup(); + group.setIdentifier(identifier); + group.setName(name); + group.setProcessors(new HashSet<>()); + group.setProcessGroups(new HashSet<>()); + group.setConnections(new HashSet<>()); + group.setControllerServices(new HashSet<>()); + group.setInputPorts(new HashSet<>()); + group.setOutputPorts(new HashSet<>()); + group.setFunnels(new HashSet<>()); + group.setLabels(new HashSet<>()); + group.setComponentType(ComponentType.PROCESS_GROUP); + return group; + } + + public static VersionedPort addInputPort(final VersionedProcessGroup group, final String name, final Position position) { + return addPort(group, name, position, PortType.INPUT_PORT); + } + + public static VersionedPort addOutputPort(final VersionedProcessGroup group, final String name, final Position position) { + return addPort(group, name, position, PortType.OUTPUT_PORT); + } + + private static VersionedPort addPort(final VersionedProcessGroup group, final String name, final Position position, final PortType portType) { + final boolean isInput = portType == PortType.INPUT_PORT; + final ComponentType componentType = isInput ? ComponentType.INPUT_PORT : ComponentType.OUTPUT_PORT; + + final VersionedPort port = new VersionedPort(); + port.setIdentifier(generateDeterministicUuid(group, componentType)); + port.setName(name); + port.setPosition(position); + port.setType(portType); + port.setComponentType(componentType); + port.setScheduledState(ScheduledState.ENABLED); + port.setConcurrentlySchedulableTaskCount(1); + port.setAllowRemoteAccess(false); + port.setGroupIdentifier(group.getIdentifier()); + + if (isInput) { + Set inputPorts = group.getInputPorts(); + if (inputPorts == null) { + inputPorts = new HashSet<>(); + group.setInputPorts(inputPorts); + } + inputPorts.add(port); + } else { + Set outputPorts = group.getOutputPorts(); + if (outputPorts == null) { + outputPorts = new HashSet<>(); + group.setOutputPorts(outputPorts); + } + outputPorts.add(port); + } + + return port; + } + public static Set getReferencedControllerServices(final VersionedProcessGroup group) { final Set referencedServices = new HashSet<>(); collectReferencedControllerServices(group, referencedServices); @@ -307,6 +368,105 @@ private static void collectReferencedControllerServices(final VersionedProcessGr } } + /** + * Returns the set of controller services that are transitively referenced by the given processor. + * This includes any services directly referenced by the processor's properties, as well as any services + * that those services reference, and so on. Only services that are accessible to the processor are considered, + * meaning services in the processor's own group and its ancestor groups. + * + * @param rootGroup the root process group to search for controller services + * @param processor the processor whose referenced services should be found + * @return the set of transitively referenced controller services + */ + public static Set getReferencedControllerServices(final VersionedProcessGroup rootGroup, final VersionedProcessor processor) { + return findTransitivelyReferencedServices(rootGroup, processor.getGroupIdentifier(), processor.getProperties()); + } + + /** + * Returns the set of controller services that are transitively referenced by the given controller service. + * This includes any services directly referenced by the service's properties, as well as any services + * that those services reference, and so on. Only services that are accessible to the given service are considered, + * meaning services in the service's own group and its ancestor groups. + * + * @param rootGroup the root process group to search for controller services + * @param controllerService the controller service whose referenced services should be found + * @return the set of transitively referenced controller services + */ + public static Set getReferencedControllerServices(final VersionedProcessGroup rootGroup, final VersionedControllerService controllerService) { + return findTransitivelyReferencedServices(rootGroup, controllerService.getGroupIdentifier(), controllerService.getProperties()); + } + + private static Set findTransitivelyReferencedServices(final VersionedProcessGroup rootGroup, final String componentGroupId, + final Map properties) { + final Map serviceMap = new HashMap<>(); + collectAccessibleControllerServices(rootGroup, componentGroupId, serviceMap); + + final Set referencedServices = new HashSet<>(); + for (final String propertyValue : properties.values()) { + final VersionedControllerService referencedService = serviceMap.get(propertyValue); + if (referencedService != null) { + referencedServices.add(referencedService); + } + } + + resolveTransitiveServiceReferences(referencedServices, serviceMap); + return referencedServices; + } + + /** + * Collects controller services that are accessible from the given target group. In NiFi, a component can reference + * controller services in its own group or any ancestor group. This method traverses from the root group down to the + * target group, collecting services from each group along the path. + * + * @param group the current group being examined + * @param targetGroupId the identifier of the group whose accessible services should be collected + * @param serviceMap the map to populate with accessible service identifiers and their corresponding services + * @return true if the target group was found at or beneath this group, false otherwise + */ + private static boolean collectAccessibleControllerServices(final VersionedProcessGroup group, final String targetGroupId, + final Map serviceMap) { + final boolean isTarget = group.getIdentifier().equals(targetGroupId); + + boolean foundInChild = false; + if (!isTarget) { + for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { + if (collectAccessibleControllerServices(childGroup, targetGroupId, serviceMap)) { + foundInChild = true; + break; + } + } + } + + if (isTarget || foundInChild) { + for (final VersionedControllerService service : group.getControllerServices()) { + serviceMap.put(service.getIdentifier(), service); + } + return true; + } + + return false; + } + + private static void resolveTransitiveServiceReferences(final Set referencedServices, final Map serviceMap) { + while (true) { + final Set newlyAddedServices = new HashSet<>(); + + for (final VersionedControllerService service : referencedServices) { + for (final String propertyValue : service.getProperties().values()) { + final VersionedControllerService referencedService = serviceMap.get(propertyValue); + if (referencedService != null && !referencedServices.contains(referencedService)) { + newlyAddedServices.add(referencedService); + } + } + } + + referencedServices.addAll(newlyAddedServices); + if (newlyAddedServices.isEmpty()) { + break; + } + } + } + public static void removeControllerServiceReferences(final VersionedProcessGroup processGroup, final String serviceIdentifier) { for (final VersionedProcessor processor : processGroup.getProcessors()) { removeValuesFromMap(processor.getProperties(), serviceIdentifier); @@ -347,6 +507,18 @@ public static void setParameterValue(final VersionedParameterContext parameterCo } } + public static void setParameterValues(final VersionedExternalFlow externalFlow, final Map parameterValues) { + for (final Map.Entry entry : parameterValues.entrySet()) { + setParameterValue(externalFlow, entry.getKey(), entry.getValue()); + } + } + + public static void setParameterValues(final VersionedParameterContext parameterContext, final Map parameterValues) { + for (final Map.Entry entry : parameterValues.entrySet()) { + setParameterValue(parameterContext, entry.getKey(), entry.getValue()); + } + } + public static void removeUnreferencedControllerServices(final VersionedProcessGroup processGroup) { final Set referencedServices = getReferencedControllerServices(processGroup); final Set referencedServiceIds = new HashSet<>(); diff --git a/nifi-commons/nifi-connector-utils/src/test/java/org/apache/nifi/components/connector/util/TestVersionedFlowUtils.java b/nifi-commons/nifi-connector-utils/src/test/java/org/apache/nifi/components/connector/util/TestVersionedFlowUtils.java index f3c9741d8e8a..a7534d9088f5 100644 --- a/nifi-commons/nifi-connector-utils/src/test/java/org/apache/nifi/components/connector/util/TestVersionedFlowUtils.java +++ b/nifi-commons/nifi-connector-utils/src/test/java/org/apache/nifi/components/connector/util/TestVersionedFlowUtils.java @@ -19,7 +19,6 @@ import org.apache.nifi.components.connector.ComponentBundleLookup; import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.ComponentType; import org.apache.nifi.flow.Position; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedProcessGroup; @@ -27,8 +26,8 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import java.util.HashSet; import java.util.Optional; +import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -137,39 +136,150 @@ void testUpdateToLatestBundleReturnsFalseWhenNotUpdated() { assertEquals("2.0.0", processor.getBundle().getVersion()); } - private VersionedProcessGroup createProcessGroup() { - final VersionedProcessGroup group = new VersionedProcessGroup(); - group.setIdentifier("test-group-id"); - group.setName("Test Process Group"); - group.setProcessors(new HashSet<>()); - group.setProcessGroups(new HashSet<>()); - group.setControllerServices(new HashSet<>()); - group.setConnections(new HashSet<>()); - group.setInputPorts(new HashSet<>()); - group.setOutputPorts(new HashSet<>()); - group.setFunnels(new HashSet<>()); - group.setLabels(new HashSet<>()); - group.setComponentType(ComponentType.PROCESS_GROUP); - return group; + } + + @Nested + class GetReferencedControllerServices { + private static final String PROCESSOR_TYPE = "org.apache.nifi.processors.TestProcessor"; + private static final String SERVICE_TYPE = "org.apache.nifi.services.TestService"; + private static final Bundle TEST_BUNDLE = new Bundle("group", "artifact", "1.0.0"); + + @Test + void testProcessorWithNoReferences() { + final VersionedProcessGroup group = createProcessGroup(); + VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Unreferenced Service"); + final VersionedProcessor processor = VersionedFlowUtils.addProcessor(group, PROCESSOR_TYPE, TEST_BUNDLE, "Processor", new Position(0, 0)); + processor.getProperties().put("some-property", "not-a-service-id"); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(group, processor); + + assertTrue(result.isEmpty()); + } + + @Test + void testProcessorWithDirectReferences() { + final VersionedProcessGroup group = createProcessGroup(); + final VersionedControllerService service1 = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service 1"); + final VersionedControllerService service2 = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service 2"); + final VersionedControllerService unreferencedService = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Unreferenced Service"); + + final VersionedProcessor processor = VersionedFlowUtils.addProcessor(group, PROCESSOR_TYPE, TEST_BUNDLE, "Processor", new Position(0, 0)); + processor.getProperties().put("service-prop-1", service1.getIdentifier()); + processor.getProperties().put("service-prop-2", service2.getIdentifier()); + processor.getProperties().put("non-service-prop", "some-value"); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(group, processor); + + assertEquals(2, result.size()); + assertTrue(result.contains(service1)); + assertTrue(result.contains(service2)); + assertFalse(result.contains(unreferencedService)); + } + + @Test + void testProcessorWithTransitiveReferences() { + final VersionedProcessGroup group = createProcessGroup(); + final VersionedControllerService serviceC = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service C"); + final VersionedControllerService serviceB = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service B"); + serviceB.getProperties().put("nested-service", serviceC.getIdentifier()); + final VersionedControllerService serviceA = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service A"); + serviceA.getProperties().put("nested-service", serviceB.getIdentifier()); + + final VersionedProcessor processor = VersionedFlowUtils.addProcessor(group, PROCESSOR_TYPE, TEST_BUNDLE, "Processor", new Position(0, 0)); + processor.getProperties().put("service-prop", serviceA.getIdentifier()); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(group, processor); + + assertEquals(3, result.size()); + assertTrue(result.contains(serviceA)); + assertTrue(result.contains(serviceB)); + assertTrue(result.contains(serviceC)); + } + + @Test + void testProcessorReferencingServiceInAncestorGroup() { + final VersionedProcessGroup rootGroup = createProcessGroup(); + final VersionedControllerService parentService = VersionedFlowUtils.addControllerService(rootGroup, SERVICE_TYPE, TEST_BUNDLE, "Parent Service"); + final VersionedProcessGroup childGroup = createChildProcessGroup(rootGroup, "child-group-id"); + final VersionedProcessor processor = VersionedFlowUtils.addProcessor(childGroup, PROCESSOR_TYPE, TEST_BUNDLE, "Processor", new Position(0, 0)); + processor.getProperties().put("service-prop", parentService.getIdentifier()); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(rootGroup, processor); + + assertEquals(1, result.size()); + assertTrue(result.contains(parentService)); } - private VersionedProcessGroup createChildProcessGroup(final VersionedProcessGroup parent, final String identifier) { - final VersionedProcessGroup childGroup = new VersionedProcessGroup(); - childGroup.setIdentifier(identifier); - childGroup.setName("Child Process Group"); - childGroup.setGroupIdentifier(parent.getIdentifier()); - childGroup.setProcessors(new HashSet<>()); - childGroup.setProcessGroups(new HashSet<>()); - childGroup.setControllerServices(new HashSet<>()); - childGroup.setConnections(new HashSet<>()); - childGroup.setInputPorts(new HashSet<>()); - childGroup.setOutputPorts(new HashSet<>()); - childGroup.setFunnels(new HashSet<>()); - childGroup.setLabels(new HashSet<>()); - childGroup.setComponentType(ComponentType.PROCESS_GROUP); - parent.getProcessGroups().add(childGroup); - return childGroup; + @Test + void testProcessorDoesNotFindServiceInDescendantGroup() { + final VersionedProcessGroup rootGroup = createProcessGroup(); + final VersionedProcessGroup childGroup = createChildProcessGroup(rootGroup, "child-group-id"); + final VersionedControllerService childService = VersionedFlowUtils.addControllerService(childGroup, SERVICE_TYPE, TEST_BUNDLE, "Child Service"); + + final VersionedProcessor processor = VersionedFlowUtils.addProcessor(rootGroup, PROCESSOR_TYPE, TEST_BUNDLE, "Processor", new Position(0, 0)); + processor.getProperties().put("service-prop", childService.getIdentifier()); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(rootGroup, processor); + + assertTrue(result.isEmpty()); + } + + @Test + void testProcessorWithCircularServiceReferences() { + final VersionedProcessGroup group = createProcessGroup(); + final VersionedControllerService serviceA = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service A"); + final VersionedControllerService serviceB = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service B"); + serviceA.getProperties().put("nested-service", serviceB.getIdentifier()); + serviceB.getProperties().put("nested-service", serviceA.getIdentifier()); + + final VersionedProcessor processor = VersionedFlowUtils.addProcessor(group, PROCESSOR_TYPE, TEST_BUNDLE, "Processor", new Position(0, 0)); + processor.getProperties().put("service-prop", serviceA.getIdentifier()); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(group, processor); + + assertEquals(2, result.size()); + assertTrue(result.contains(serviceA)); + assertTrue(result.contains(serviceB)); } + + @Test + void testControllerServiceWithNoReferences() { + final VersionedProcessGroup group = createProcessGroup(); + final VersionedControllerService service = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service"); + service.getProperties().put("some-property", "not-a-service-id"); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(group, service); + + assertTrue(result.isEmpty()); + } + + @Test + void testControllerServiceWithDirectAndTransitiveReferences() { + final VersionedProcessGroup group = createProcessGroup(); + final VersionedControllerService serviceC = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service C"); + final VersionedControllerService serviceB = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service B"); + serviceB.getProperties().put("nested-service", serviceC.getIdentifier()); + final VersionedControllerService serviceA = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service A"); + serviceA.getProperties().put("nested-service", serviceB.getIdentifier()); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(group, serviceA); + + assertEquals(2, result.size()); + assertTrue(result.contains(serviceB)); + assertTrue(result.contains(serviceC)); + assertFalse(result.contains(serviceA)); + } + } + + private static VersionedProcessGroup createProcessGroup() { + return VersionedFlowUtils.createProcessGroup("test-group-id", "Test Process Group"); + } + + private static VersionedProcessGroup createChildProcessGroup(final VersionedProcessGroup parent, final String identifier) { + final VersionedProcessGroup childGroup = VersionedFlowUtils.createProcessGroup(identifier, "Child Process Group"); + childGroup.setGroupIdentifier(parent.getIdentifier()); + parent.getProcessGroups().add(childGroup); + return childGroup; } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneConnectionFacade.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneConnectionFacade.java index a1ecb960867b..cc21353731d5 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneConnectionFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneConnectionFacade.java @@ -25,6 +25,7 @@ import org.apache.nifi.flowfile.FlowFile; import java.io.IOException; +import java.util.Objects; import java.util.function.Predicate; public class StandaloneConnectionFacade implements ConnectionFacade { @@ -56,4 +57,26 @@ public void purge() { public DropFlowFileSummary dropFlowFiles(final Predicate predicate) throws IOException { return connection.getFlowFileQueue().dropFlowFiles(predicate); } + + @Override + public String toString() { + return "StandaloneConnectionFacade[id=" + versionedConnection.getIdentifier() + ", name=" + versionedConnection.getName() + "]"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StandaloneConnectionFacade that = (StandaloneConnectionFacade) o; + return Objects.equals(versionedConnection.getIdentifier(), that.versionedConnection.getIdentifier()); + } + + @Override + public int hashCode() { + return Objects.hashCode(versionedConnection.getIdentifier()); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java index ff507e4fab52..808b5a4747dc 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java @@ -46,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; public class StandaloneControllerServiceFacade implements ControllerServiceFacade { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() @@ -200,6 +201,30 @@ public T invokeConnectorMethod(final String methodName, final Map serializeArgumentsToJson(final Map arguments) throws InvocationFailedException { final Map jsonArguments = new HashMap<>(); for (final Map.Entry entry : arguments.entrySet()) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneParameterContextFacade.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneParameterContextFacade.java index 00ab88857df0..2599269d3554 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneParameterContextFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneParameterContextFacade.java @@ -205,4 +205,25 @@ public Asset createAsset(final InputStream inputStream) throws IOException { return null; } + @Override + public String toString() { + return "StandaloneParameterContextFacade[id=" + parameterContext.getIdentifier() + ", name=" + parameterContext.getName() + "]"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StandaloneParameterContextFacade that = (StandaloneParameterContextFacade) o; + return Objects.equals(parameterContext.getIdentifier(), that.parameterContext.getIdentifier()); + } + + @Override + public int hashCode() { + return Objects.hashCode(parameterContext.getIdentifier()); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupFacade.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupFacade.java index 3d02ff161b5c..307a786aea86 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupFacade.java @@ -49,6 +49,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Predicate; @@ -381,6 +382,28 @@ public ProcessGroupLifecycle getLifecycle() { return lifecycle; } + @Override + public String toString() { + return "StandaloneProcessGroupFacade[id=" + flowDefinition.getIdentifier() + ", name=" + flowDefinition.getName() + "]"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StandaloneProcessGroupFacade that = (StandaloneProcessGroupFacade) o; + return Objects.equals(flowDefinition.getIdentifier(), that.flowDefinition.getIdentifier()); + } + + @Override + public int hashCode() { + return Objects.hashCode(flowDefinition.getIdentifier()); + } + @Override public DropFlowFileSummary dropFlowFiles(final Predicate predicate) throws IOException { DropFlowFileSummary summary = new DropFlowFileSummary(0, 0); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java index 707aa8b67c9f..3de26ad4431a 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java @@ -48,6 +48,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; public class StandaloneProcessorFacade implements ProcessorFacade { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() @@ -208,6 +209,30 @@ public T invokeConnectorMethod(final String methodName, final Map serializeArgumentsToJson(final Map arguments) throws InvocationFailedException { final Map jsonArguments = new HashMap<>(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingConnectionFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingConnectionFacade.java index a061bd3b8345..ea72b197f6b6 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingConnectionFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingConnectionFacade.java @@ -23,6 +23,7 @@ import org.apache.nifi.flowfile.FlowFile; import java.io.IOException; +import java.util.Objects; import java.util.function.Predicate; /** @@ -62,5 +63,27 @@ public DropFlowFileSummary dropFlowFiles(final Predicate predicate) th authContext.authorizeWrite(); return delegate.dropFlowFiles(predicate); } + + @Override + public String toString() { + return "AuthorizingConnectionFacade[delegate=" + delegate + "]"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AuthorizingConnectionFacade that = (AuthorizingConnectionFacade) o; + return Objects.equals(delegate, that.delegate); + } + + @Override + public int hashCode() { + return Objects.hashCode(delegate); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingControllerServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingControllerServiceFacade.java index 2086c875d495..a85d924079f8 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingControllerServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingControllerServiceFacade.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; +import java.util.Objects; /** * A wrapper around {@link ControllerServiceFacade} that enforces authorization before delegating @@ -95,5 +96,27 @@ public T invokeConnectorMethod(final String methodName, final Map predicate) th authContext.authorizeWrite(); return delegate.dropFlowFiles(predicate); } + + @Override + public String toString() { + return "AuthorizingProcessGroupFacade[delegate=" + delegate + "]"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AuthorizingProcessGroupFacade that = (AuthorizingProcessGroupFacade) o; + return Objects.equals(delegate, that.delegate); + } + + @Override + public int hashCode() { + return Objects.hashCode(delegate); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessorFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessorFacade.java index 22fe5622306b..d392d2c2b9f2 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessorFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessorFacade.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; +import java.util.Objects; /** * A wrapper around {@link ProcessorFacade} that enforces authorization before delegating @@ -95,5 +96,27 @@ public T invokeConnectorMethod(final String methodName, final Mapcom.fasterxml.jackson.core jackson-databind + + org.apache.nifi + nifi-connector-utils + 2.8.0-SNAPSHOT + org.apache.nifi nifi-system-test-extensions-services-api diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/BundleResolutionConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/BundleResolutionConnector.java index e1f6be0cf076..f15d668ef51f 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/BundleResolutionConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/BundleResolutionConnector.java @@ -27,18 +27,15 @@ import org.apache.nifi.components.connector.FlowUpdateException; import org.apache.nifi.components.connector.PropertyType; import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.util.VersionedFlowUtils; import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.ComponentType; import org.apache.nifi.flow.Position; -import org.apache.nifi.flow.ScheduledState; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedParameter; import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -94,13 +91,7 @@ protected void onStepConfigured(final String stepName, final FlowContext working @Override public VersionedExternalFlow getInitialFlow() { - final VersionedProcessGroup group = new VersionedProcessGroup(); - group.setIdentifier(UUID.randomUUID().toString()); - group.setName("Bundle Resolution Flow"); - group.setProcessors(new HashSet<>()); - group.setProcessGroups(new HashSet<>()); - group.setConnections(new HashSet<>()); - group.setControllerServices(new HashSet<>()); + final VersionedProcessGroup group = VersionedFlowUtils.createProcessGroup(UUID.randomUUID().toString(), "Bundle Resolution Flow"); final VersionedParameter compatabilityParam = new VersionedParameter(); compatabilityParam.setName("BUNDLE_COMPATABILITY"); @@ -120,26 +111,17 @@ public VersionedExternalFlow getInitialFlow() { } private VersionedExternalFlow createFlowWithBundleScenarios() { - final VersionedProcessGroup group = new VersionedProcessGroup(); - group.setIdentifier(UUID.randomUUID().toString()); - group.setName("Bundle Resolution Flow"); - group.setProcessors(new HashSet<>()); - group.setProcessGroups(new HashSet<>()); - group.setConnections(new HashSet<>()); - group.setControllerServices(new HashSet<>()); + final VersionedProcessGroup group = VersionedFlowUtils.createProcessGroup(UUID.randomUUID().toString(), "Bundle Resolution Flow"); // Add a processor with an unavailable bundle (fake version) that should be resolved based on BundleCompatability // Uses the system test GenerateFlowFile processor which is available in the system test extensions bundle - final VersionedProcessor testProcessor = createProcessor( - "test-processor", - "GenerateFlowFile for Bundle Resolution Test", - "org.apache.nifi.processors.tests.system.GenerateFlowFile", - "org.apache.nifi", - "nifi-system-test-extensions-nar", - "0.0.0-NONEXISTENT", - new Position(100, 100) - ); - group.getProcessors().add(testProcessor); + final Bundle nonexistentBundle = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "0.0.0-NONEXISTENT"); + + final VersionedProcessor testProcessor = VersionedFlowUtils.addProcessor(group, + "org.apache.nifi.processors.tests.system.GenerateFlowFile", nonexistentBundle, + "GenerateFlowFile for Bundle Resolution Test", new Position(100, 100)); + testProcessor.setSchedulingPeriod("1 sec"); + testProcessor.setAutoTerminatedRelationships(Set.of("success")); final VersionedParameterContext parameterContext = new VersionedParameterContext(); parameterContext.setName("Bundle Resolution Parameter Context"); @@ -151,43 +133,6 @@ private VersionedExternalFlow createFlowWithBundleScenarios() { return flow; } - private VersionedProcessor createProcessor(final String id, final String name, final String type, - final String bundleGroup, final String bundleArtifact, - final String bundleVersion, final Position position) { - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier(id); - processor.setName(name); - processor.setType(type); - processor.setPosition(position); - - final Bundle bundle = new Bundle(); - bundle.setGroup(bundleGroup); - bundle.setArtifact(bundleArtifact); - bundle.setVersion(bundleVersion); - processor.setBundle(bundle); - - processor.setProperties(new HashMap<>()); - processor.setPropertyDescriptors(new HashMap<>()); - processor.setStyle(new HashMap<>()); - processor.setSchedulingPeriod("1 sec"); - processor.setSchedulingStrategy("TIMER_DRIVEN"); - processor.setExecutionNode("ALL"); - processor.setPenaltyDuration("30 sec"); - processor.setYieldDuration("1 sec"); - processor.setBulletinLevel("WARN"); - processor.setRunDurationMillis(0L); - processor.setConcurrentlySchedulableTaskCount(1); - processor.setAutoTerminatedRelationships(Set.of("success")); - processor.setScheduledState(ScheduledState.ENABLED); - processor.setRetryCount(10); - processor.setRetriedRelationships(new HashSet<>()); - processor.setBackoffMechanism("PENALIZE_FLOWFILE"); - processor.setMaxBackoffPeriod("10 mins"); - processor.setComponentType(ComponentType.PROCESSOR); - - return processor; - } - @Override public List verifyConfigurationStep(final String stepName, final Map propertyValueOverrides, diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java index f3fb33d80b46..bb767219154d 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java @@ -28,12 +28,11 @@ import org.apache.nifi.components.connector.StepConfigurationContext; import org.apache.nifi.components.connector.components.FlowContext; import org.apache.nifi.components.connector.components.ProcessorFacade; +import org.apache.nifi.components.connector.util.VersionedFlowUtils; import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.Position; -import org.apache.nifi.flow.ScheduledState; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedProcessGroup; -import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.processor.util.StandardValidators; import java.io.File; @@ -41,7 +40,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Set; /** * A test connector that invokes a ConnectorMethod on a CalculateProcessor using its own POJO types. @@ -163,36 +161,10 @@ public void setResult(final int result) { @Override public VersionedExternalFlow getInitialFlow() { - final VersionedProcessGroup group = new VersionedProcessGroup(); - group.setName("Calculate Flow"); - group.setIdentifier("calculate-flow-id"); - - final Bundle bundle = new Bundle(); - bundle.setGroup("org.apache.nifi"); - bundle.setArtifact("nifi-system-test-extensions-nar"); - bundle.setVersion("2.8.0-SNAPSHOT"); - - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier("calculate-processor-id"); - processor.setName("Calculate Processor"); - processor.setType("org.apache.nifi.processors.tests.system.Calculate"); - processor.setBundle(bundle); - processor.setProperties(Map.of()); - processor.setPropertyDescriptors(Map.of()); - processor.setScheduledState(ScheduledState.ENABLED); - processor.setBulletinLevel("WARN"); - processor.setPosition(new Position(0D, 0D)); - processor.setPenaltyDuration("30 sec"); - processor.setAutoTerminatedRelationships(Set.of()); - processor.setExecutionNode("ALL"); - processor.setGroupIdentifier(group.getIdentifier()); - processor.setConcurrentlySchedulableTaskCount(1); - processor.setRunDurationMillis(0L); - processor.setSchedulingStrategy("TIMER_DRIVEN"); - processor.setYieldDuration("1 sec"); - processor.setSchedulingPeriod("0 sec"); - processor.setStyle(Map.of()); - group.setProcessors(Set.of(processor)); + final Bundle bundle = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "2.8.0-SNAPSHOT"); + final VersionedProcessGroup group = VersionedFlowUtils.createProcessGroup("calculate-flow-id", "Calculate Flow"); + + VersionedFlowUtils.addProcessor(group, "org.apache.nifi.processors.tests.system.Calculate", bundle, "Calculate Processor", new Position(0, 0)); final VersionedExternalFlow flow = new VersionedExternalFlow(); flow.setFlowContents(group); diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java index 40b4b41475e5..81c6af171b13 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java @@ -22,16 +22,12 @@ import org.apache.nifi.components.connector.ConfigurationStep; import org.apache.nifi.components.connector.FlowUpdateException; import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.util.VersionedFlowUtils; import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.ComponentType; -import org.apache.nifi.flow.ConnectableComponent; -import org.apache.nifi.flow.ConnectableComponentType; import org.apache.nifi.flow.ControllerServiceAPI; import org.apache.nifi.flow.ExecutionEngine; -import org.apache.nifi.flow.PortType; import org.apache.nifi.flow.Position; import org.apache.nifi.flow.ScheduledState; -import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedPort; @@ -57,18 +53,7 @@ */ public class ComponentLifecycleConnector extends AbstractConnector { - public static final String ROOT_PROCESSOR_ID = "root-processor-id"; - public static final String CHILD_GROUP_ID = "child-group-id"; - public static final String CHILD_INPUT_PORT_ID = "child-input-port-id"; - public static final String CHILD_OUTPUT_PORT_ID = "child-output-port-id"; - public static final String CHILD_PROCESSOR_ID = "child-processor-id"; - public static final String STATELESS_GROUP_ID = "stateless-group-id"; - public static final String STATELESS_PROCESSOR_ID = "stateless-processor-id"; - public static final String STATELESS_INPUT_PORT_ID = "stateless-input-port-id"; - public static final String ROOT_CONTROLLER_SERVICE_ID = "root-controller-service-id"; - public static final String CHILD_CONTROLLER_SERVICE_ID = "child-controller-service-id"; - - private static final Bundle SYSTEM_TEST_EXTENSIONS_BUNDLE = createBundle(); + private static final Bundle SYSTEM_TEST_EXTENSIONS_BUNDLE = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "2.8.0-SNAPSHOT"); @Override protected void onStepConfigured(final String stepName, final FlowContext workingContext) { @@ -83,272 +68,98 @@ public VersionedExternalFlow getInitialFlow() { } private VersionedProcessGroup createRootGroup() { - final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); - rootGroup.setIdentifier(UUID.randomUUID().toString()); - rootGroup.setName("Component Lifecycle Root"); + final VersionedProcessGroup rootGroup = VersionedFlowUtils.createProcessGroup(UUID.randomUUID().toString(), "Component Lifecycle Root"); rootGroup.setPosition(new Position(0, 0)); - rootGroup.setProcessors(new HashSet<>()); - rootGroup.setProcessGroups(new HashSet<>()); - rootGroup.setConnections(new HashSet<>()); - rootGroup.setInputPorts(new HashSet<>()); - rootGroup.setOutputPorts(new HashSet<>()); - rootGroup.setControllerServices(new HashSet<>()); - rootGroup.setLabels(new HashSet<>()); - rootGroup.setFunnels(new HashSet<>()); rootGroup.setRemoteProcessGroups(new HashSet<>()); rootGroup.setScheduledState(ScheduledState.ENABLED); rootGroup.setExecutionEngine(ExecutionEngine.STANDARD); - rootGroup.setComponentType(ComponentType.PROCESS_GROUP); - // Create root-level Controller Service - final VersionedControllerService rootControllerService = createControllerService(ROOT_CONTROLLER_SERVICE_ID, "Root Count Service", rootGroup.getIdentifier()); - rootGroup.getControllerServices().add(rootControllerService); + final VersionedControllerService rootControllerService = VersionedFlowUtils.addControllerService(rootGroup, + "org.apache.nifi.cs.tests.system.StandardCountService", SYSTEM_TEST_EXTENSIONS_BUNDLE, "Root Count Service"); + rootControllerService.setScheduledState(ScheduledState.ENABLED); + final ControllerServiceAPI rootServiceApi = new ControllerServiceAPI(); + rootServiceApi.setType("org.apache.nifi.cs.tests.system.CountService"); + rootServiceApi.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE); + rootControllerService.setControllerServiceApis(Collections.singletonList(rootServiceApi)); - // Create root-level processor (GenerateFlowFile) - final VersionedProcessor rootProcessor = createProcessor(ROOT_PROCESSOR_ID, "Root GenerateFlowFile", - "org.apache.nifi.processors.tests.system.GenerateFlowFile", new Position(100, 100)); - rootProcessor.setGroupIdentifier(rootGroup.getIdentifier()); + final VersionedProcessor rootProcessor = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.GenerateFlowFile", SYSTEM_TEST_EXTENSIONS_BUNDLE, "Root GenerateFlowFile", new Position(100, 100)); rootProcessor.setSchedulingPeriod("10 sec"); - rootGroup.getProcessors().add(rootProcessor); - // Create root-level processor (TerminateFlowFile) - final VersionedProcessor rootTerminateProcessor = createProcessor("root-terminate-processor-id", "Root TerminateFlowFile", - "org.apache.nifi.processors.tests.system.TerminateFlowFile", new Position(300, 100)); - rootTerminateProcessor.setGroupIdentifier(rootGroup.getIdentifier()); - rootGroup.getProcessors().add(rootTerminateProcessor); + final VersionedProcessor rootTerminateProcessor = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.TerminateFlowFile", SYSTEM_TEST_EXTENSIONS_BUNDLE, "Root TerminateFlowFile", new Position(300, 100)); - // Create child process group with ports and processor final VersionedProcessGroup childGroup = createChildGroup(rootGroup.getIdentifier()); rootGroup.getProcessGroups().add(childGroup); - // Create connection from root processor to child group's input port - final VersionedConnection rootToChildConnection = createConnection( - createConnectableComponent(ROOT_PROCESSOR_ID, "Root GenerateFlowFile", ConnectableComponentType.PROCESSOR, rootGroup.getIdentifier()), - Set.of("success"), - createConnectableComponent(CHILD_INPUT_PORT_ID, "Child Input", ConnectableComponentType.INPUT_PORT, CHILD_GROUP_ID), - rootGroup.getIdentifier() - ); - rootGroup.getConnections().add(rootToChildConnection); + final VersionedPort childInputPort = childGroup.getInputPorts().iterator().next(); + final VersionedPort childOutputPort = childGroup.getOutputPorts().iterator().next(); - // Create connection from child group's output port to root terminate processor - final VersionedConnection childToRootConnection = createConnection( - createConnectableComponent(CHILD_OUTPUT_PORT_ID, "Child Output", ConnectableComponentType.OUTPUT_PORT, CHILD_GROUP_ID), - Set.of(""), - createConnectableComponent("root-terminate-processor-id", "Root TerminateFlowFile", ConnectableComponentType.PROCESSOR, rootGroup.getIdentifier()), - rootGroup.getIdentifier() - ); - rootGroup.getConnections().add(childToRootConnection); + VersionedFlowUtils.addConnection(rootGroup, VersionedFlowUtils.createConnectableComponent(rootProcessor), + VersionedFlowUtils.createConnectableComponent(childInputPort), Set.of("success")); + VersionedFlowUtils.addConnection(rootGroup, VersionedFlowUtils.createConnectableComponent(childOutputPort), + VersionedFlowUtils.createConnectableComponent(rootTerminateProcessor), Set.of("")); return rootGroup; } private VersionedProcessGroup createChildGroup(final String parentGroupId) { - final VersionedProcessGroup childGroup = new VersionedProcessGroup(); - childGroup.setIdentifier(CHILD_GROUP_ID); - childGroup.setName("Child Group"); + final VersionedProcessGroup childGroup = VersionedFlowUtils.createProcessGroup("child-group-id", "Child Group"); childGroup.setPosition(new Position(100, 300)); - childGroup.setProcessors(new HashSet<>()); - childGroup.setProcessGroups(new HashSet<>()); - childGroup.setConnections(new HashSet<>()); - childGroup.setInputPorts(new HashSet<>()); - childGroup.setOutputPorts(new HashSet<>()); - childGroup.setControllerServices(new HashSet<>()); - childGroup.setLabels(new HashSet<>()); - childGroup.setFunnels(new HashSet<>()); childGroup.setRemoteProcessGroups(new HashSet<>()); childGroup.setScheduledState(ScheduledState.ENABLED); childGroup.setExecutionEngine(ExecutionEngine.STANDARD); - childGroup.setComponentType(ComponentType.PROCESS_GROUP); childGroup.setGroupIdentifier(parentGroupId); - // Create Controller Service in child group - final VersionedControllerService childControllerService = createControllerService(CHILD_CONTROLLER_SERVICE_ID, "Child Count Service", CHILD_GROUP_ID); - childGroup.getControllerServices().add(childControllerService); - - // Create input port - final VersionedPort inputPort = createPort(CHILD_INPUT_PORT_ID, "Child Input", true, CHILD_GROUP_ID); - childGroup.getInputPorts().add(inputPort); + final VersionedControllerService childControllerService = VersionedFlowUtils.addControllerService(childGroup, + "org.apache.nifi.cs.tests.system.StandardCountService", SYSTEM_TEST_EXTENSIONS_BUNDLE, "Child Count Service"); + childControllerService.setScheduledState(ScheduledState.ENABLED); + final ControllerServiceAPI childServiceApi = new ControllerServiceAPI(); + childServiceApi.setType("org.apache.nifi.cs.tests.system.CountService"); + childServiceApi.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE); + childControllerService.setControllerServiceApis(Collections.singletonList(childServiceApi)); - // Create output port - final VersionedPort outputPort = createPort(CHILD_OUTPUT_PORT_ID, "Child Output", false, CHILD_GROUP_ID); - childGroup.getOutputPorts().add(outputPort); + final VersionedPort inputPort = VersionedFlowUtils.addInputPort(childGroup, "Child Input", new Position(0, 0)); + final VersionedPort outputPort = VersionedFlowUtils.addOutputPort(childGroup, "Child Output", new Position(200, 0)); - // Create processor in child group - final VersionedProcessor childProcessor = createProcessor(CHILD_PROCESSOR_ID, "Child Terminate", - "org.apache.nifi.processors.tests.system.PassThrough", new Position(100, 100)); - childProcessor.setGroupIdentifier(CHILD_GROUP_ID); - childGroup.getProcessors().add(childProcessor); + final VersionedProcessor childProcessor = VersionedFlowUtils.addProcessor(childGroup, + "org.apache.nifi.processors.tests.system.PassThrough", SYSTEM_TEST_EXTENSIONS_BUNDLE, "Child Terminate", new Position(100, 100)); - // Create stateless group - final VersionedProcessGroup statelessGroup = createStatelessGroup(CHILD_GROUP_ID); + final VersionedProcessGroup statelessGroup = createStatelessGroup(childGroup.getIdentifier()); childGroup.getProcessGroups().add(statelessGroup); - // Connection: input port -> child processor - final VersionedConnection inputToProcessor = createConnection( - createConnectableComponent(CHILD_INPUT_PORT_ID, "Child Input", ConnectableComponentType.INPUT_PORT, CHILD_GROUP_ID), - Set.of(""), - createConnectableComponent(CHILD_PROCESSOR_ID, "Child Terminate", ConnectableComponentType.PROCESSOR, CHILD_GROUP_ID), - CHILD_GROUP_ID - ); - childGroup.getConnections().add(inputToProcessor); - - // Connection: input port -> stateless group - final VersionedConnection inputToStateless = createConnection( - createConnectableComponent(CHILD_INPUT_PORT_ID, "Child Input", ConnectableComponentType.INPUT_PORT, CHILD_GROUP_ID), - Set.of(""), - createConnectableComponent(STATELESS_INPUT_PORT_ID, "Stateless Input", ConnectableComponentType.INPUT_PORT, STATELESS_GROUP_ID), - CHILD_GROUP_ID - ); - childGroup.getConnections().add(inputToStateless); + final VersionedPort statelessInputPort = statelessGroup.getInputPorts().iterator().next(); - // Connection: child processor -> output port - final VersionedConnection processorToOutput = createConnection( - createConnectableComponent(CHILD_PROCESSOR_ID, "Child Terminate", ConnectableComponentType.PROCESSOR, CHILD_GROUP_ID), - Set.of("success"), - createConnectableComponent(CHILD_OUTPUT_PORT_ID, "Child Output", ConnectableComponentType.OUTPUT_PORT, CHILD_GROUP_ID), - CHILD_GROUP_ID - ); - childGroup.getConnections().add(processorToOutput); + VersionedFlowUtils.addConnection(childGroup, VersionedFlowUtils.createConnectableComponent(inputPort), + VersionedFlowUtils.createConnectableComponent(childProcessor), Set.of("")); + VersionedFlowUtils.addConnection(childGroup, VersionedFlowUtils.createConnectableComponent(inputPort), + VersionedFlowUtils.createConnectableComponent(statelessInputPort), Set.of("")); + VersionedFlowUtils.addConnection(childGroup, VersionedFlowUtils.createConnectableComponent(childProcessor), + VersionedFlowUtils.createConnectableComponent(outputPort), Set.of("success")); return childGroup; } private VersionedProcessGroup createStatelessGroup(final String parentGroupId) { - final VersionedProcessGroup statelessGroup = new VersionedProcessGroup(); - statelessGroup.setIdentifier(STATELESS_GROUP_ID); - statelessGroup.setName("Stateless Group"); + final VersionedProcessGroup statelessGroup = VersionedFlowUtils.createProcessGroup("stateless-group-id", "Stateless Group"); statelessGroup.setPosition(new Position(400, 100)); - statelessGroup.setProcessors(new HashSet<>()); - statelessGroup.setProcessGroups(new HashSet<>()); - statelessGroup.setConnections(new HashSet<>()); - statelessGroup.setInputPorts(new HashSet<>()); - statelessGroup.setOutputPorts(new HashSet<>()); - statelessGroup.setControllerServices(new HashSet<>()); - statelessGroup.setLabels(new HashSet<>()); - statelessGroup.setFunnels(new HashSet<>()); statelessGroup.setRemoteProcessGroups(new HashSet<>()); statelessGroup.setScheduledState(ScheduledState.ENABLED); statelessGroup.setExecutionEngine(ExecutionEngine.STATELESS); statelessGroup.setStatelessFlowTimeout("1 min"); - statelessGroup.setComponentType(ComponentType.PROCESS_GROUP); statelessGroup.setGroupIdentifier(parentGroupId); - // Create input port for stateless group - final VersionedPort statelessInput = createPort(STATELESS_INPUT_PORT_ID, "Stateless Input", true, STATELESS_GROUP_ID); - statelessGroup.getInputPorts().add(statelessInput); + final VersionedPort statelessInput = VersionedFlowUtils.addInputPort(statelessGroup, "Stateless Input", new Position(0, 0)); - // Create processor in stateless group - final VersionedProcessor statelessProcessor = createProcessor(STATELESS_PROCESSOR_ID, "Stateless Terminate", - "org.apache.nifi.processors.tests.system.TerminateFlowFile", new Position(100, 100)); - statelessProcessor.setGroupIdentifier(STATELESS_GROUP_ID); - statelessGroup.getProcessors().add(statelessProcessor); + final VersionedProcessor statelessProcessor = VersionedFlowUtils.addProcessor(statelessGroup, + "org.apache.nifi.processors.tests.system.TerminateFlowFile", SYSTEM_TEST_EXTENSIONS_BUNDLE, "Stateless Terminate", new Position(100, 100)); - // Connection: input port -> processor - final VersionedConnection inputToProcessor = createConnection( - createConnectableComponent(STATELESS_INPUT_PORT_ID, "Stateless Input", ConnectableComponentType.INPUT_PORT, STATELESS_GROUP_ID), - Set.of(""), - createConnectableComponent(STATELESS_PROCESSOR_ID, "Stateless Terminate", ConnectableComponentType.PROCESSOR, STATELESS_GROUP_ID), - STATELESS_GROUP_ID - ); - statelessGroup.getConnections().add(inputToProcessor); + VersionedFlowUtils.addConnection(statelessGroup, VersionedFlowUtils.createConnectableComponent(statelessInput), + VersionedFlowUtils.createConnectableComponent(statelessProcessor), Set.of("")); return statelessGroup; } - private VersionedProcessor createProcessor(final String id, final String name, final String type, final Position position) { - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier(id); - processor.setName(name); - processor.setType(type); - processor.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE); - processor.setPosition(position); - processor.setProperties(Map.of()); - processor.setPropertyDescriptors(Map.of()); - processor.setSchedulingPeriod("0 sec"); - processor.setSchedulingStrategy("TIMER_DRIVEN"); - processor.setExecutionNode("ALL"); - processor.setPenaltyDuration("30 sec"); - processor.setYieldDuration("1 sec"); - processor.setBulletinLevel("WARN"); - processor.setRunDurationMillis(0L); - processor.setConcurrentlySchedulableTaskCount(1); - processor.setAutoTerminatedRelationships(Set.of()); - processor.setScheduledState(ScheduledState.ENABLED); - processor.setRetryCount(0); - processor.setRetriedRelationships(Set.of()); - processor.setComponentType(ComponentType.PROCESSOR); - return processor; - } - - private VersionedPort createPort(final String id, final String name, final boolean isInput, final String groupId) { - final VersionedPort port = new VersionedPort(); - port.setIdentifier(id); - port.setName(name); - port.setPosition(new Position(isInput ? 0 : 200, 0)); - port.setType(isInput ? PortType.INPUT_PORT : PortType.OUTPUT_PORT); - port.setComponentType(isInput ? ComponentType.INPUT_PORT : ComponentType.OUTPUT_PORT); - port.setConcurrentlySchedulableTaskCount(1); - port.setScheduledState(ScheduledState.ENABLED); - port.setAllowRemoteAccess(false); - port.setGroupIdentifier(groupId); - return port; - } - - private VersionedControllerService createControllerService(final String id, final String name, final String groupId) { - final VersionedControllerService service = new VersionedControllerService(); - service.setIdentifier(id); - service.setName(name); - service.setType("org.apache.nifi.cs.tests.system.StandardCountService"); - service.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE); - service.setGroupIdentifier(groupId); - service.setProperties(Map.of()); - service.setPropertyDescriptors(Map.of()); - service.setScheduledState(ScheduledState.ENABLED); - service.setBulletinLevel("WARN"); - - final ControllerServiceAPI serviceApi = new ControllerServiceAPI(); - serviceApi.setType("org.apache.nifi.cs.tests.system.CountService"); - serviceApi.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE); - service.setControllerServiceApis(Collections.singletonList(serviceApi)); - - return service; - } - - private VersionedConnection createConnection(final ConnectableComponent source, final Set relationships, - final ConnectableComponent destination, final String groupId) { - final VersionedConnection connection = new VersionedConnection(); - connection.setIdentifier(UUID.randomUUID().toString()); - connection.setName(""); - connection.setSource(source); - connection.setDestination(destination); - connection.setSelectedRelationships(relationships); - connection.setBackPressureObjectThreshold(10000L); - connection.setBackPressureDataSizeThreshold("1 GB"); - connection.setFlowFileExpiration("0 sec"); - connection.setLabelIndex(0); - connection.setzIndex(0L); - connection.setComponentType(ComponentType.CONNECTION); - connection.setGroupIdentifier(groupId); - return connection; - } - - private ConnectableComponent createConnectableComponent(final String id, final String name, final ConnectableComponentType type, final String groupId) { - final ConnectableComponent component = new ConnectableComponent(); - component.setId(id); - component.setName(name); - component.setType(type); - component.setGroupId(groupId); - return component; - } - - private static Bundle createBundle() { - final Bundle bundle = new Bundle(); - bundle.setGroup("org.apache.nifi"); - bundle.setArtifact("nifi-system-test-extensions-nar"); - bundle.setVersion("2.8.0-SNAPSHOT"); - return bundle; - } - @Override public List verifyConfigurationStep(final String stepName, final Map propertyValueOverrides, final FlowContext flowContext) { return List.of(); diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java index 987b7e812db8..1c50941c8371 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java @@ -21,12 +21,10 @@ import org.apache.nifi.components.connector.AbstractConnector; import org.apache.nifi.components.connector.ConfigurationStep; import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.util.VersionedFlowUtils; import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.ConnectableComponent; -import org.apache.nifi.flow.ConnectableComponentType; import org.apache.nifi.flow.Position; import org.apache.nifi.flow.ScheduledState; -import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; @@ -44,47 +42,20 @@ protected void onStepConfigured(final String stepName, final FlowContext working @Override public VersionedExternalFlow getInitialFlow() { - final Bundle bundle = new Bundle(); - bundle.setGroup("org.apache.nifi"); - bundle.setArtifact("nifi-system-test-extensions-nar"); - bundle.setVersion("2.8.0-SNAPSHOT"); + final Bundle bundle = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "2.8.0-SNAPSHOT"); + final VersionedProcessGroup rootGroup = VersionedFlowUtils.createProcessGroup("1234", "Data Queuing Connector"); - final VersionedProcessor generate = createVersionedProcessor("gen-1", "1234", "GenerateFlowFile", - "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, Map.of("File Size", "1 KB"), ScheduledState.ENABLED); + final VersionedProcessor generate = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, "GenerateFlowFile", new Position(0, 0)); + generate.getProperties().put("File Size", "1 KB"); generate.setSchedulingPeriod("100 millis"); - final VersionedProcessor terminate = createVersionedProcessor("term-1", "1234", "TerminateFlowFile", - "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, Collections.emptyMap(), ScheduledState.DISABLED); + final VersionedProcessor terminate = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, "TerminateFlowFile", new Position(0, 0)); + terminate.setScheduledState(ScheduledState.DISABLED); - final ConnectableComponent source = new ConnectableComponent(); - source.setId(generate.getIdentifier()); - source.setType(ConnectableComponentType.PROCESSOR); - source.setGroupId("1234"); - - final ConnectableComponent destination = new ConnectableComponent(); - destination.setId(terminate.getIdentifier()); - destination.setType(ConnectableComponentType.PROCESSOR); - destination.setGroupId("1234"); - - final VersionedConnection connection = new VersionedConnection(); - connection.setIdentifier("generate-to-terminate-1"); - connection.setSource(source); - connection.setDestination(destination); - connection.setGroupIdentifier("1234"); - connection.setSelectedRelationships(Set.of("success")); - connection.setBackPressureDataSizeThreshold("1 GB"); - connection.setBackPressureObjectThreshold(10_000L); - connection.setBends(Collections.emptyList()); - connection.setLabelIndex(1); - connection.setFlowFileExpiration("0 sec"); - connection.setPrioritizers(Collections.emptyList()); - connection.setzIndex(1L); - - final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); - rootGroup.setName("Data Queuing Connector"); - rootGroup.setIdentifier("1234"); - rootGroup.setProcessors(Set.of(generate, terminate)); - rootGroup.setConnections(Set.of(connection)); + VersionedFlowUtils.addConnection(rootGroup, VersionedFlowUtils.createConnectableComponent(generate), + VersionedFlowUtils.createConnectableComponent(terminate), Set.of("success")); final VersionedExternalFlow flow = new VersionedExternalFlow(); flow.setFlowContents(rootGroup); @@ -105,36 +76,4 @@ public List getConfigurationSteps() { @Override public void applyUpdate(final FlowContext workingFlowContext, final FlowContext activeFlowContext) { } - - private VersionedProcessor createVersionedProcessor(final String identifier, final String groupIdentifier, final String name, - final String type, final Bundle bundle, final Map properties, - final ScheduledState scheduledState) { - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier(identifier); - processor.setGroupIdentifier(groupIdentifier); - processor.setName(name); - processor.setType(type); - processor.setBundle(bundle); - processor.setProperties(properties); - processor.setPropertyDescriptors(Collections.emptyMap()); - processor.setScheduledState(scheduledState); - - processor.setBulletinLevel("WARN"); - processor.setSchedulingStrategy("TIMER_DRIVEN"); - processor.setSchedulingPeriod("0 sec"); - processor.setExecutionNode("ALL"); - processor.setConcurrentlySchedulableTaskCount(1); - processor.setPenaltyDuration("30 sec"); - processor.setYieldDuration("1 sec"); - processor.setRunDurationMillis(0L); - processor.setPosition(new Position(0, 0)); - - processor.setAutoTerminatedRelationships(Collections.emptySet()); - processor.setRetryCount(10); - processor.setRetriedRelationships(Collections.emptySet()); - processor.setBackoffMechanism("PENALIZE_FLOWFILE"); - processor.setMaxBackoffPeriod("10 mins"); - - return processor; - } } diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java index 9d8b9a9508ff..8c7f680d86ba 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java @@ -26,19 +26,15 @@ import org.apache.nifi.components.connector.FlowUpdateException; import org.apache.nifi.components.connector.PropertyType; import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.util.VersionedFlowUtils; import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.ConnectableComponent; -import org.apache.nifi.flow.ConnectableComponentType; import org.apache.nifi.flow.Position; -import org.apache.nifi.flow.ScheduledState; -import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.processor.util.StandardValidators; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -53,8 +49,6 @@ */ public class GatedDataQueuingConnector extends AbstractConnector { - private static final String TERMINATE_PROCESSOR_ID = "term-1"; - static final ConnectorPropertyDescriptor GATE_FILE_PATH = new ConnectorPropertyDescriptor.Builder() .name("Gate File Path") .description("The path to the gate file. When this file exists, the TerminateFlowFile processor " + @@ -80,49 +74,19 @@ protected void onStepConfigured(final String stepName, final FlowContext working @Override public VersionedExternalFlow getInitialFlow() { - final Bundle bundle = new Bundle(); - bundle.setGroup("org.apache.nifi"); - bundle.setArtifact("nifi-system-test-extensions-nar"); - bundle.setVersion("2.8.0"); - - final VersionedProcessor generate = createVersionedProcessor("gen-1", "1234", "GenerateFlowFile", - "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, - Map.of("File Size", "1 KB"), ScheduledState.ENABLED); + final Bundle bundle = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "2.8.0"); + final VersionedProcessGroup rootGroup = VersionedFlowUtils.createProcessGroup("1234", "Gated Data Queuing Connector"); + + final VersionedProcessor generate = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, "GenerateFlowFile", new Position(0, 0)); + generate.getProperties().put("File Size", "1 KB"); generate.setSchedulingPeriod("100 millis"); - final VersionedProcessor terminate = createVersionedProcessor(TERMINATE_PROCESSOR_ID, "1234", "TerminateFlowFile", - "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, - Collections.emptyMap(), ScheduledState.ENABLED); - - final ConnectableComponent source = new ConnectableComponent(); - source.setId(generate.getIdentifier()); - source.setType(ConnectableComponentType.PROCESSOR); - source.setGroupId("1234"); - - final ConnectableComponent destination = new ConnectableComponent(); - destination.setId(terminate.getIdentifier()); - destination.setType(ConnectableComponentType.PROCESSOR); - destination.setGroupId("1234"); - - final VersionedConnection connection = new VersionedConnection(); - connection.setIdentifier("generate-to-terminate-1"); - connection.setSource(source); - connection.setDestination(destination); - connection.setGroupIdentifier("1234"); - connection.setSelectedRelationships(Set.of("success")); - connection.setBackPressureDataSizeThreshold("1 GB"); - connection.setBackPressureObjectThreshold(10_000L); - connection.setBends(Collections.emptyList()); - connection.setLabelIndex(1); - connection.setFlowFileExpiration("0 sec"); - connection.setPrioritizers(Collections.emptyList()); - connection.setzIndex(1L); - - final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); - rootGroup.setName("Gated Data Queuing Connector"); - rootGroup.setIdentifier("1234"); - rootGroup.setProcessors(Set.of(generate, terminate)); - rootGroup.setConnections(Set.of(connection)); + final VersionedProcessor terminate = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, "TerminateFlowFile", new Position(0, 0)); + + VersionedFlowUtils.addConnection(rootGroup, VersionedFlowUtils.createConnectableComponent(generate), + VersionedFlowUtils.createConnectableComponent(terminate), Set.of("success")); final VersionedExternalFlow flow = new VersionedExternalFlow(); flow.setFlowContents(rootGroup); @@ -150,46 +114,9 @@ public void applyUpdate(final FlowContext workingFlowContext, final FlowContext final VersionedExternalFlow flow = getInitialFlow(); final VersionedProcessGroup rootGroup = flow.getFlowContents(); - for (final VersionedProcessor processor : rootGroup.getProcessors()) { - if (TERMINATE_PROCESSOR_ID.equals(processor.getIdentifier())) { - final Map properties = new HashMap<>(processor.getProperties()); - properties.put("Gate File", gateFilePath); - processor.setProperties(properties); - } - } + VersionedFlowUtils.findProcessor(rootGroup, p -> p.getType().endsWith("TerminateFlowFile")) + .ifPresent(processor -> processor.getProperties().put("Gate File", gateFilePath)); getInitializationContext().updateFlow(activeFlowContext, flow, BundleCompatibility.RESOLVE_BUNDLE); } - - private VersionedProcessor createVersionedProcessor(final String identifier, final String groupIdentifier, final String name, - final String type, final Bundle bundle, final Map properties, - final ScheduledState scheduledState) { - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier(identifier); - processor.setGroupIdentifier(groupIdentifier); - processor.setName(name); - processor.setType(type); - processor.setBundle(bundle); - processor.setProperties(properties); - processor.setPropertyDescriptors(Collections.emptyMap()); - processor.setScheduledState(scheduledState); - - processor.setBulletinLevel("WARN"); - processor.setSchedulingStrategy("TIMER_DRIVEN"); - processor.setSchedulingPeriod("0 sec"); - processor.setExecutionNode("ALL"); - processor.setConcurrentlySchedulableTaskCount(1); - processor.setPenaltyDuration("30 sec"); - processor.setYieldDuration("1 sec"); - processor.setRunDurationMillis(0L); - processor.setPosition(new Position(0, 0)); - - processor.setAutoTerminatedRelationships(Collections.emptySet()); - processor.setRetryCount(10); - processor.setRetriedRelationships(Collections.emptySet()); - processor.setBackoffMechanism("PENALIZE_FLOWFILE"); - processor.setMaxBackoffPeriod("10 mins"); - - return processor; - } } diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ParameterContextConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ParameterContextConnector.java index 8ba055dda93b..9d160a11f1c2 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ParameterContextConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ParameterContextConnector.java @@ -26,13 +26,9 @@ import org.apache.nifi.components.connector.FlowUpdateException; import org.apache.nifi.components.connector.PropertyType; import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.util.VersionedFlowUtils; import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.ConnectableComponent; -import org.apache.nifi.flow.ConnectableComponentType; -import org.apache.nifi.flow.PortType; import org.apache.nifi.flow.Position; -import org.apache.nifi.flow.ScheduledState; -import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedParameter; import org.apache.nifi.flow.VersionedParameterContext; @@ -43,7 +39,6 @@ import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -75,15 +70,8 @@ public class ParameterContextConnector extends AbstractConnector { private static final String CONFIGURATION_STEP_NAME = "Parameter Context Configuration"; private static final String ROOT_GROUP_ID = "root-group"; - private static final String GENERATE_PROCESSOR_ID = "generate-flowfile"; private static final String GROUP_A_ID = "process-group-a"; private static final String GROUP_B_ID = "process-group-b"; - private static final String INPUT_PORT_A_ID = "input-port-a"; - private static final String INPUT_PORT_B_ID = "input-port-b"; - private static final String UPDATE_CONTENT_ID = "update-content"; - private static final String REPLACE_WITH_FILE_ID = "replace-with-file"; - private static final String WRITE_SENSITIVE_ID = "write-sensitive"; - private static final String WRITE_ASSET_ID = "write-asset"; private static final String PARENT_CONTEXT_NAME = "Parent Parameter Context"; private static final String CHILD_CONTEXT_A_NAME = "Child Context A"; @@ -147,15 +135,7 @@ public VersionedExternalFlow getInitialFlow() { } private VersionedExternalFlow createEmptyFlow() { - final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); - rootGroup.setIdentifier(ROOT_GROUP_ID); - rootGroup.setName("Parameter Context Test Flow"); - rootGroup.setProcessors(new HashSet<>()); - rootGroup.setProcessGroups(new HashSet<>()); - rootGroup.setConnections(new HashSet<>()); - rootGroup.setInputPorts(new HashSet<>()); - rootGroup.setOutputPorts(new HashSet<>()); - rootGroup.setControllerServices(new HashSet<>()); + final VersionedProcessGroup rootGroup = VersionedFlowUtils.createProcessGroup(ROOT_GROUP_ID, "Parameter Context Test Flow"); final VersionedExternalFlow flow = new VersionedExternalFlow(); flow.setFlowContents(rootGroup); @@ -190,46 +170,30 @@ public void applyUpdate(final FlowContext workingContext, final FlowContext acti private VersionedExternalFlow createFlow(final String sensitiveValue, final String assetFilePath, final String sensitiveOutputFile, final String assetOutputFile) { - final Bundle bundle = createBundle(); - - // Create Parameter Contexts with inheritance + final Bundle bundle = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "2.8.0-SNAPSHOT"); final Map parameterContexts = createParameterContexts(sensitiveValue, assetFilePath); - // Create root group - final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); - rootGroup.setIdentifier(ROOT_GROUP_ID); - rootGroup.setName("Parameter Context Test Flow"); + final VersionedProcessGroup rootGroup = VersionedFlowUtils.createProcessGroup(ROOT_GROUP_ID, "Parameter Context Test Flow"); rootGroup.setParameterContextName(PARENT_CONTEXT_NAME); - // Create GenerateFlowFile at root level - final VersionedProcessor generateProcessor = createProcessor(GENERATE_PROCESSOR_ID, ROOT_GROUP_ID, "GenerateFlowFile", - "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, - Map.of("Max FlowFiles", "1", "File Size", "0 B"), ScheduledState.ENABLED); + final VersionedProcessor generateProcessor = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, "GenerateFlowFile", new Position(0, 0)); + generateProcessor.getProperties().putAll(Map.of("Max FlowFiles", "1", "File Size", "0 B")); generateProcessor.setSchedulingPeriod("60 sec"); - // Create Process Group A (sensitive value path) final VersionedProcessGroup groupA = createProcessGroupA(bundle, sensitiveOutputFile); + rootGroup.getProcessGroups().add(groupA); - // Create Process Group B (asset path) final VersionedProcessGroup groupB = createProcessGroupB(bundle, assetOutputFile); + rootGroup.getProcessGroups().add(groupB); - // Create connections from GenerateFlowFile to both process group input ports - final VersionedConnection connectionToA = createConnection("conn-to-group-a", ROOT_GROUP_ID, - GENERATE_PROCESSOR_ID, ConnectableComponentType.PROCESSOR, - INPUT_PORT_A_ID, ConnectableComponentType.INPUT_PORT, GROUP_A_ID, - Set.of("success")); - - final VersionedConnection connectionToB = createConnection("conn-to-group-b", ROOT_GROUP_ID, - GENERATE_PROCESSOR_ID, ConnectableComponentType.PROCESSOR, - INPUT_PORT_B_ID, ConnectableComponentType.INPUT_PORT, GROUP_B_ID, - Set.of("success")); + final VersionedPort inputPortA = groupA.getInputPorts().iterator().next(); + final VersionedPort inputPortB = groupB.getInputPorts().iterator().next(); - rootGroup.setProcessors(Set.of(generateProcessor)); - rootGroup.setProcessGroups(Set.of(groupA, groupB)); - rootGroup.setConnections(Set.of(connectionToA, connectionToB)); - rootGroup.setInputPorts(new HashSet<>()); - rootGroup.setOutputPorts(new HashSet<>()); - rootGroup.setControllerServices(new HashSet<>()); + VersionedFlowUtils.addConnection(rootGroup, VersionedFlowUtils.createConnectableComponent(generateProcessor), + VersionedFlowUtils.createConnectableComponent(inputPortA), Set.of("success")); + VersionedFlowUtils.addConnection(rootGroup, VersionedFlowUtils.createConnectableComponent(generateProcessor), + VersionedFlowUtils.createConnectableComponent(inputPortB), Set.of("success")); final VersionedExternalFlow flow = new VersionedExternalFlow(); flow.setFlowContents(rootGroup); @@ -238,7 +202,6 @@ private VersionedExternalFlow createFlow(final String sensitiveValue, final Stri } private Map createParameterContexts(final String sensitiveValue, final String assetFilePath) { - // Child Context A - sensitive parameter final VersionedParameter sensitiveParam = new VersionedParameter(); sensitiveParam.setName(SENSITIVE_PARAM_NAME); sensitiveParam.setSensitive(true); @@ -250,7 +213,6 @@ private Map createParameterContexts(final Str childContextA.setName(CHILD_CONTEXT_A_NAME); childContextA.setParameters(Set.of(sensitiveParam)); - // Child Context B - asset parameter final VersionedParameter assetParam = new VersionedParameter(); assetParam.setName(ASSET_PARAM_NAME); assetParam.setSensitive(false); @@ -262,7 +224,6 @@ private Map createParameterContexts(final Str childContextB.setName(CHILD_CONTEXT_B_NAME); childContextB.setParameters(Set.of(assetParam)); - // Parent Context - inherits from both child contexts final VersionedParameterContext parentContext = new VersionedParameterContext(); parentContext.setName(PARENT_CONTEXT_NAME); parentContext.setParameters(Set.of()); @@ -276,172 +237,50 @@ private Map createParameterContexts(final Str } private VersionedProcessGroup createProcessGroupA(final Bundle bundle, final String outputFile) { - final VersionedProcessGroup groupA = new VersionedProcessGroup(); - groupA.setIdentifier(GROUP_A_ID); + final VersionedProcessGroup groupA = VersionedFlowUtils.createProcessGroup(GROUP_A_ID, "Process Group A - Sensitive Value"); groupA.setGroupIdentifier(ROOT_GROUP_ID); - groupA.setName("Process Group A - Sensitive Value"); groupA.setParameterContextName(PARENT_CONTEXT_NAME); - // Input Port - final VersionedPort inputPortA = createInputPort(INPUT_PORT_A_ID, GROUP_A_ID, "Input Port A"); + final VersionedPort inputPortA = VersionedFlowUtils.addInputPort(groupA, "Input Port A", new Position(0, 0)); - // UpdateContent processor using sensitive parameter - final VersionedProcessor updateContent = createProcessor(UPDATE_CONTENT_ID, GROUP_A_ID, "UpdateContent", - "org.apache.nifi.processors.tests.system.UpdateContent", bundle, - Map.of("Sensitive Content", "#{" + SENSITIVE_PARAM_NAME + "}", "Update Strategy", "Replace"), - ScheduledState.ENABLED); + final VersionedProcessor updateContent = VersionedFlowUtils.addProcessor(groupA, + "org.apache.nifi.processors.tests.system.UpdateContent", bundle, "UpdateContent", new Position(0, 0)); + updateContent.getProperties().putAll(Map.of("Sensitive Content", "#{" + SENSITIVE_PARAM_NAME + "}", "Update Strategy", "Replace")); - // WriteToFile processor - final VersionedProcessor writeToFile = createProcessor(WRITE_SENSITIVE_ID, GROUP_A_ID, "WriteToFile", - "org.apache.nifi.processors.tests.system.WriteToFile", bundle, - Map.of("Filename", outputFile), ScheduledState.ENABLED); + final VersionedProcessor writeToFile = VersionedFlowUtils.addProcessor(groupA, + "org.apache.nifi.processors.tests.system.WriteToFile", bundle, "WriteToFile", new Position(0, 0)); + writeToFile.getProperties().put("Filename", outputFile); writeToFile.setAutoTerminatedRelationships(Set.of("success", "failure")); - // Connections within Group A - final VersionedConnection inputToUpdate = createConnection("input-to-update", GROUP_A_ID, - INPUT_PORT_A_ID, ConnectableComponentType.INPUT_PORT, - UPDATE_CONTENT_ID, ConnectableComponentType.PROCESSOR, null, - Set.of()); - - final VersionedConnection updateToWrite = createConnection("update-to-write", GROUP_A_ID, - UPDATE_CONTENT_ID, ConnectableComponentType.PROCESSOR, - WRITE_SENSITIVE_ID, ConnectableComponentType.PROCESSOR, null, - Set.of("success")); - - groupA.setInputPorts(Set.of(inputPortA)); - groupA.setOutputPorts(new HashSet<>()); - groupA.setProcessors(Set.of(updateContent, writeToFile)); - groupA.setConnections(Set.of(inputToUpdate, updateToWrite)); - groupA.setProcessGroups(new HashSet<>()); - groupA.setControllerServices(new HashSet<>()); + VersionedFlowUtils.addConnection(groupA, VersionedFlowUtils.createConnectableComponent(inputPortA), + VersionedFlowUtils.createConnectableComponent(updateContent), Set.of()); + VersionedFlowUtils.addConnection(groupA, VersionedFlowUtils.createConnectableComponent(updateContent), + VersionedFlowUtils.createConnectableComponent(writeToFile), Set.of("success")); return groupA; } private VersionedProcessGroup createProcessGroupB(final Bundle bundle, final String outputFile) { - final VersionedProcessGroup groupB = new VersionedProcessGroup(); - groupB.setIdentifier(GROUP_B_ID); + final VersionedProcessGroup groupB = VersionedFlowUtils.createProcessGroup(GROUP_B_ID, "Process Group B - Asset Value"); groupB.setGroupIdentifier(ROOT_GROUP_ID); - groupB.setName("Process Group B - Asset Value"); groupB.setParameterContextName(PARENT_CONTEXT_NAME); - // Input Port - final VersionedPort inputPortB = createInputPort(INPUT_PORT_B_ID, GROUP_B_ID, "Input Port B"); + final VersionedPort inputPortB = VersionedFlowUtils.addInputPort(groupB, "Input Port B", new Position(0, 0)); - // ReplaceWithFile processor using asset parameter - final VersionedProcessor replaceWithFile = createProcessor(REPLACE_WITH_FILE_ID, GROUP_B_ID, "ReplaceWithFile", - "org.apache.nifi.processors.tests.system.ReplaceWithFile", bundle, - Map.of("Filename", "#{" + ASSET_PARAM_NAME + "}"), ScheduledState.ENABLED); + final VersionedProcessor replaceWithFile = VersionedFlowUtils.addProcessor(groupB, + "org.apache.nifi.processors.tests.system.ReplaceWithFile", bundle, "ReplaceWithFile", new Position(0, 0)); + replaceWithFile.getProperties().put("Filename", "#{" + ASSET_PARAM_NAME + "}"); - // WriteToFile processor - final VersionedProcessor writeToFile = createProcessor(WRITE_ASSET_ID, GROUP_B_ID, "WriteToFile", - "org.apache.nifi.processors.tests.system.WriteToFile", bundle, - Map.of("Filename", outputFile), ScheduledState.ENABLED); + final VersionedProcessor writeToFile = VersionedFlowUtils.addProcessor(groupB, + "org.apache.nifi.processors.tests.system.WriteToFile", bundle, "WriteToFile", new Position(0, 0)); + writeToFile.getProperties().put("Filename", outputFile); writeToFile.setAutoTerminatedRelationships(Set.of("success", "failure")); - // Connections within Group B - final VersionedConnection inputToReplace = createConnection("input-to-replace", GROUP_B_ID, - INPUT_PORT_B_ID, ConnectableComponentType.INPUT_PORT, - REPLACE_WITH_FILE_ID, ConnectableComponentType.PROCESSOR, null, - Set.of()); - - final VersionedConnection replaceToWrite = createConnection("replace-to-write", GROUP_B_ID, - REPLACE_WITH_FILE_ID, ConnectableComponentType.PROCESSOR, - WRITE_ASSET_ID, ConnectableComponentType.PROCESSOR, null, - Set.of("success")); - - groupB.setInputPorts(Set.of(inputPortB)); - groupB.setOutputPorts(new HashSet<>()); - groupB.setProcessors(Set.of(replaceWithFile, writeToFile)); - groupB.setConnections(Set.of(inputToReplace, replaceToWrite)); - groupB.setProcessGroups(new HashSet<>()); - groupB.setControllerServices(new HashSet<>()); + VersionedFlowUtils.addConnection(groupB, VersionedFlowUtils.createConnectableComponent(inputPortB), + VersionedFlowUtils.createConnectableComponent(replaceWithFile), Set.of()); + VersionedFlowUtils.addConnection(groupB, VersionedFlowUtils.createConnectableComponent(replaceWithFile), + VersionedFlowUtils.createConnectableComponent(writeToFile), Set.of("success")); return groupB; } - - private Bundle createBundle() { - final Bundle bundle = new Bundle(); - bundle.setGroup("org.apache.nifi"); - bundle.setArtifact("nifi-system-test-extensions-nar"); - bundle.setVersion("2.8.0-SNAPSHOT"); - return bundle; - } - - private VersionedPort createInputPort(final String identifier, final String groupIdentifier, final String name) { - final VersionedPort port = new VersionedPort(); - port.setIdentifier(identifier); - port.setGroupIdentifier(groupIdentifier); - port.setName(name); - port.setType(PortType.INPUT_PORT); - port.setScheduledState(ScheduledState.ENABLED); - port.setConcurrentlySchedulableTaskCount(1); - port.setPosition(new Position(0, 0)); - port.setAllowRemoteAccess(false); - return port; - } - - private VersionedProcessor createProcessor(final String identifier, final String groupIdentifier, final String name, - final String type, final Bundle bundle, final Map properties, - final ScheduledState scheduledState) { - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier(identifier); - processor.setGroupIdentifier(groupIdentifier); - processor.setName(name); - processor.setType(type); - processor.setBundle(bundle); - processor.setProperties(properties); - processor.setPropertyDescriptors(Collections.emptyMap()); - processor.setScheduledState(scheduledState); - - processor.setBulletinLevel("WARN"); - processor.setSchedulingStrategy("TIMER_DRIVEN"); - processor.setSchedulingPeriod("0 sec"); - processor.setExecutionNode("ALL"); - processor.setConcurrentlySchedulableTaskCount(1); - processor.setPenaltyDuration("30 sec"); - processor.setYieldDuration("1 sec"); - processor.setRunDurationMillis(0L); - processor.setPosition(new Position(0, 0)); - - processor.setAutoTerminatedRelationships(Collections.emptySet()); - processor.setRetryCount(10); - processor.setRetriedRelationships(Collections.emptySet()); - processor.setBackoffMechanism("PENALIZE_FLOWFILE"); - processor.setMaxBackoffPeriod("10 mins"); - - return processor; - } - - private VersionedConnection createConnection(final String identifier, final String groupIdentifier, - final String sourceId, final ConnectableComponentType sourceType, - final String destinationId, final ConnectableComponentType destinationType, - final String destinationGroupId, - final Set selectedRelationships) { - final ConnectableComponent source = new ConnectableComponent(); - source.setId(sourceId); - source.setType(sourceType); - source.setGroupId(groupIdentifier); - - final ConnectableComponent destination = new ConnectableComponent(); - destination.setId(destinationId); - destination.setType(destinationType); - destination.setGroupId(destinationGroupId != null ? destinationGroupId : groupIdentifier); - - final VersionedConnection connection = new VersionedConnection(); - connection.setIdentifier(identifier); - connection.setGroupIdentifier(groupIdentifier); - connection.setSource(source); - connection.setDestination(destination); - connection.setSelectedRelationships(selectedRelationships); - connection.setBackPressureDataSizeThreshold("1 GB"); - connection.setBackPressureObjectThreshold(10_000L); - connection.setBends(Collections.emptyList()); - connection.setLabelIndex(1); - connection.setFlowFileExpiration("0 sec"); - connection.setPrioritizers(Collections.emptyList()); - connection.setzIndex(0L); - - return connection; - } } diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/SelectiveDropConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/SelectiveDropConnector.java index 06ccdd561c92..b1b852b18afa 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/SelectiveDropConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/SelectiveDropConnector.java @@ -25,9 +25,8 @@ import org.apache.nifi.components.connector.components.ConnectionFacade; import org.apache.nifi.components.connector.components.FlowContext; import org.apache.nifi.components.connector.components.ProcessGroupFacade; +import org.apache.nifi.components.connector.util.VersionedFlowUtils; import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.ConnectableComponent; -import org.apache.nifi.flow.ConnectableComponentType; import org.apache.nifi.flow.Position; import org.apache.nifi.flow.ScheduledState; import org.apache.nifi.flow.VersionedConnection; @@ -50,62 +49,33 @@ */ public class SelectiveDropConnector extends AbstractConnector { - private static final String CONNECTION_ID = "generate-to-terminate-connection"; - @Override protected void onStepConfigured(final String stepName, final FlowContext workingContext) { } @Override public VersionedExternalFlow getInitialFlow() { - final Bundle bundle = new Bundle(); - bundle.setGroup("org.apache.nifi"); - bundle.setArtifact("nifi-system-test-extensions-nar"); - bundle.setVersion("2.7.0-SNAPSHOT"); + final Bundle bundle = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "2.7.0-SNAPSHOT"); + final VersionedProcessGroup rootGroup = VersionedFlowUtils.createProcessGroup("1234", "Selective Drop Connector"); - // GenerateFlowFile processor configured to generate 1-byte FlowFiles with flowFileIndex attribute - final Map generateProperties = Map.of( + final VersionedProcessor generate = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, "GenerateFlowFile", new Position(0, 0)); + generate.getProperties().putAll(Map.of( "File Size", "1 B", "Batch Size", "20000", "Max FlowFiles", "20000", "flowFileIndex", "${nextInt()}" - ); - final VersionedProcessor generate = createVersionedProcessor("gen-1", "1234", "GenerateFlowFile", - "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, generateProperties, ScheduledState.ENABLED); + )); generate.setSchedulingPeriod("10 sec"); - final VersionedProcessor terminate = createVersionedProcessor("term-1", "1234", "TerminateFlowFile", - "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, Collections.emptyMap(), ScheduledState.DISABLED); - - final ConnectableComponent source = new ConnectableComponent(); - source.setId(generate.getIdentifier()); - source.setType(ConnectableComponentType.PROCESSOR); - source.setGroupId("1234"); - - final ConnectableComponent destination = new ConnectableComponent(); - destination.setId(terminate.getIdentifier()); - destination.setType(ConnectableComponentType.PROCESSOR); - destination.setGroupId("1234"); - - final VersionedConnection connection = new VersionedConnection(); - connection.setIdentifier(CONNECTION_ID); - connection.setSource(source); - connection.setDestination(destination); - connection.setGroupIdentifier("1234"); - connection.setSelectedRelationships(Set.of("success")); + final VersionedProcessor terminate = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, "TerminateFlowFile", new Position(0, 0)); + terminate.setScheduledState(ScheduledState.DISABLED); + + final VersionedConnection connection = VersionedFlowUtils.addConnection(rootGroup, + VersionedFlowUtils.createConnectableComponent(generate), VersionedFlowUtils.createConnectableComponent(terminate), Set.of("success")); connection.setBackPressureDataSizeThreshold("100 GB"); connection.setBackPressureObjectThreshold(100_000L); - connection.setBends(Collections.emptyList()); - connection.setLabelIndex(1); - connection.setFlowFileExpiration("0 sec"); - connection.setPrioritizers(Collections.emptyList()); - connection.setzIndex(1L); - - final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); - rootGroup.setName("Selective Drop Connector"); - rootGroup.setIdentifier("1234"); - rootGroup.setProcessors(Set.of(generate, terminate)); - rootGroup.setConnections(Set.of(connection)); final VersionedExternalFlow flow = new VersionedExternalFlow(); flow.setFlowContents(rootGroup); @@ -131,15 +101,13 @@ public void applyUpdate(final FlowContext workingFlowContext, final FlowContext @Override public void stop(final FlowContext context) throws FlowUpdateException { - // First, stop the processors via the parent class super.stop(context); - // Then, drop all FlowFiles where flowFileIndex has an even value final ProcessGroupFacade rootGroup = context.getRootGroup(); - final ConnectionFacade connection = findConnectionById(rootGroup, CONNECTION_ID); + final ConnectionFacade connection = findFirstConnection(rootGroup); if (connection == null) { - getLogger().warn("Could not find connection with ID {} to perform selective drop", CONNECTION_ID); + getLogger().warn("Could not find connection to perform selective drop"); return; } @@ -165,15 +133,13 @@ private boolean hasEvenFlowFileIndex(final FlowFile flowFile) { return Integer.parseInt(flowFile.getAttribute("flowFileIndex")) % 2 == 0; } - private ConnectionFacade findConnectionById(final ProcessGroupFacade group, final String connectionId) { + private ConnectionFacade findFirstConnection(final ProcessGroupFacade group) { for (final ConnectionFacade connection : group.getConnections()) { - if (connectionId.equals(connection.getDefinition().getIdentifier())) { - return connection; - } + return connection; } for (final ProcessGroupFacade childGroup : group.getProcessGroups()) { - final ConnectionFacade found = findConnectionById(childGroup, connectionId); + final ConnectionFacade found = findFirstConnection(childGroup); if (found != null) { return found; } @@ -181,36 +147,4 @@ private ConnectionFacade findConnectionById(final ProcessGroupFacade group, fina return null; } - - private VersionedProcessor createVersionedProcessor(final String identifier, final String groupIdentifier, final String name, - final String type, final Bundle bundle, final Map properties, - final ScheduledState scheduledState) { - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier(identifier); - processor.setGroupIdentifier(groupIdentifier); - processor.setName(name); - processor.setType(type); - processor.setBundle(bundle); - processor.setProperties(properties); - processor.setPropertyDescriptors(Collections.emptyMap()); - processor.setScheduledState(scheduledState); - - processor.setBulletinLevel("WARN"); - processor.setSchedulingStrategy("TIMER_DRIVEN"); - processor.setSchedulingPeriod("0 sec"); - processor.setExecutionNode("ALL"); - processor.setConcurrentlySchedulableTaskCount(1); - processor.setPenaltyDuration("30 sec"); - processor.setYieldDuration("1 sec"); - processor.setRunDurationMillis(0L); - processor.setPosition(new Position(0, 0)); - - processor.setAutoTerminatedRelationships(Collections.emptySet()); - processor.setRetryCount(10); - processor.setRetriedRelationships(Collections.emptySet()); - processor.setBackoffMechanism("PENALIZE_FLOWFILE"); - processor.setMaxBackoffPeriod("10 mins"); - - return processor; - } } From 13143958973256b11268ab40893ae30770f0b522 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Sat, 7 Feb 2026 10:22:43 -0500 Subject: [PATCH 2/2] NIFI-15565: Broke apart cursorrules file into the new format of .cursor/rules/* --- .cursor/rules/building.mdc | 18 +++ .cursor/rules/code-style.mdc | 74 ++++++++++ .cursor/rules/ending-conditions.mdc | 30 ++++ .cursor/rules/extension-development.mdc | 107 +++++++++++++++ .cursor/rules/extension-testing.mdc | 80 +++++++++++ .cursor/rules/framework-testing.mdc | 46 +++++++ .cursor/rules/persona.mdc | 16 +++ .cursor/rules/testing-standards.mdc | 69 ++++++++++ .cursorrules | 129 ------------------ .gitignore | 2 +- ...apache.nifi.components.connector.Connector | 2 +- 11 files changed, 442 insertions(+), 131 deletions(-) create mode 100644 .cursor/rules/building.mdc create mode 100644 .cursor/rules/code-style.mdc create mode 100644 .cursor/rules/ending-conditions.mdc create mode 100644 .cursor/rules/extension-development.mdc create mode 100644 .cursor/rules/extension-testing.mdc create mode 100644 .cursor/rules/framework-testing.mdc create mode 100644 .cursor/rules/persona.mdc create mode 100644 .cursor/rules/testing-standards.mdc delete mode 100644 .cursorrules diff --git a/.cursor/rules/building.mdc b/.cursor/rules/building.mdc new file mode 100644 index 000000000000..af4829a42628 --- /dev/null +++ b/.cursor/rules/building.mdc @@ -0,0 +1,18 @@ +--- +description: Maven build instructions for the NiFi codebase +alwaysApply: true +--- + +# Building + +NiFi is a complex Maven codebase. Never build code (testing or otherwise) using javac. +Always use `mvn` instead, or preferably the `.mvnw` wrapper script. + +Additionally, building a maven module using the also-make flag (`-am`) is often very +expensive and slow. Instead, only build the specific module you are modifying. Assume that +the user has already built the entire codebase and that only the specific module you are +modifying needs to be built again. If this fails, you can prompt the user to build the entire +codebase, but only after you have attempted to build the relevant modules yourself first. +It is important not to run `mvn clean` at the root level or at the `nifi-assembly` level without +the user's express permission, as this may delete a running instance of NiFi, causing permanent +loss of flows and configuration. diff --git a/.cursor/rules/code-style.mdc b/.cursor/rules/code-style.mdc new file mode 100644 index 000000000000..ca7e9cfed5a6 --- /dev/null +++ b/.cursor/rules/code-style.mdc @@ -0,0 +1,74 @@ +--- +description: Java code style conventions for the NiFi codebase +globs: "**/*.java" +alwaysApply: false +--- + +# Code Style + +NiFi adheres to a few code styles that are not necessarily common. Please ensure that you +observe these code styles. + +1. Any variable that can be marked `final` must be marked `final`. This includes + declarations of Exceptions, method arguments, local variables, member variables, etc. +2. Short-hand is highly discouraged in names of variables, classes, methods, etc., as well + as in documentation. Exceptions to this include in the framework, you may see references to + `procNode` for `ProcessorNode` or other such short-hand that is very difficult to confuse with + other terms, and it is used only when clearly defined such as `final ProcessorNode procNode = ...`. + Even though, however, we would not abbreviate `ControllerService` as `cs` because `cs` is too vague + and easily misunderstood. Instead, a value of `serviceNode` might be used. +3. Private / helper methods should not be placed before the first public/protected method + that calls it. +4. Unless the method is to be heavily reused, avoid creating trivial 1-2 line methods and + instead just place the code inline. +5. Code is allowed to be up to 200 characters wide. Avoid breaking lines into many short lines. +6. Avoid creating private methods that are called only once unless they are at least 10 + lines long or are complex. +7. It is never acceptable to use star imports. Import each individual class that is to be used. +8. Never use underscores in class names, variables, or filenames. +9. Never use System.out.println but instead use SLF4J Loggers. +10. Avoid excessive whitespace in method invocations. For example, instead of writing: + +```java +myObject.doSomething( + arg1, + arg2, + arg3, + arg4, + arg5 +); +``` + +Write this instead: + +```java +myObject.doSomething(arg1, arg2, arg3, arg4, arg5); +``` + +It is okay to use many newlines in a builder pattern, such as: +```java +final MyObject myObject = MyObject.builder() + .arg1(arg1) + .arg2(arg2) + .arg3(arg3) + .build(); +``` + +It is also acceptable when chaining methods in a functional style such as: +```java +final List result = myList.stream() + .filter(s -> s.startsWith("A")) + .map(String::toUpperCase) + .toList(); +``` + +11. When possible, prefer importing a class, rather than using fully qualified classname + inline in the code. +12. Avoid statically importing methods, except in methods that are frequently used in testing + frameworks, such as the `Assertions` and `Mockito` classes. +13. Avoid trailing whitespace at the end of lines, especially in blank lines. +14. The `var` keyword is never allowed in the codebase. Always explicitly declare the type of variables. +15. Prefer procedural code over functional code. For example, prefer using a for loop instead of a stream + when the logic is not simple and straightforward. The stream API is powerful but can be difficult to + read when overused or used in complex scenarios. Functional style is best used when the logic is simple + and chains together no more than 3-4 operations. diff --git a/.cursor/rules/ending-conditions.mdc b/.cursor/rules/ending-conditions.mdc new file mode 100644 index 000000000000..aa2b607801a1 --- /dev/null +++ b/.cursor/rules/ending-conditions.mdc @@ -0,0 +1,30 @@ +--- +description: Task completion checklist that must be verified before considering any task done +alwaysApply: true +--- + +# Ending Conditions + +When you have completed a task, ensure that you have verified the following: + +1. All code compiles and builds successfully using `mvn`. +2. All relevant unit tests pass successfully using `mvn`. +3. All code adheres to the Code Style rules. +4. Checkstyle and PMD pass successfully using + `mvn checkstyle:check pmd:check -T 1C` from the appropriate directory. +5. Unit tests have been added to verify the functionality of any sufficiently complex method. +6. A system test or an integration test has been added if the change makes significant + changes to the framework and the interaction between a significant number of classes. +7. You have performed a full review of the code to ensure that there are no logical errors + and that the code is not duplicative or difficult to understand. If you find any code that + is in need of refactoring due to clarity or duplication, you should report this to the user + and offer to make those changes as well. +8. If creating a new Processor or Controller Service, ensure that all relevant annotations + have been added, including `@Tags`, `@CapabilityDescription`, `@UseCase`, and + `@MultiProcessorUseCase` as appropriate. + + +Do not consider the task complete until all of the above conditions have been met. When you +do consider the task complete, provide a summary of what you changed and which tests were +added or modified and what the behavior is that they verify. Additionally, provide any feedback +about your work that may need further review or that is not entirely complete. diff --git a/.cursor/rules/extension-development.mdc b/.cursor/rules/extension-development.mdc new file mode 100644 index 000000000000..74e53f34d5f1 --- /dev/null +++ b/.cursor/rules/extension-development.mdc @@ -0,0 +1,107 @@ +--- +description: Development patterns for NiFi extensions (Processors, Controller Services, Connectors). Covers Property Descriptors, Relationships, and common patterns. +alwaysApply: false +--- + +# Extension Development + +This rule applies when developing NiFi extensions: Processors, Controller Services, and Connectors. + +## Property Descriptors + +Property Descriptors are defined as `static final` fields on the component class using +`PropertyDescriptor.Builder`. + +- **Naming:** Use clear, descriptive names. The `displayName` field should never be used. Make the + name itself clear and concise. Use Title Case for property names. +- **Required vs. optional:** Mark properties as `.required(true)` when the component cannot + function without them. Prefer sensible defaults via `.defaultValue(...)` when possible. + When a default value is provided, the property will always have a value. The `required` flag in this + case is more of a documentation aid to indicate the importance of the property. +- **Validators:** Always attach an appropriate `Validator` (e.g., `StandardValidators.NON_EMPTY_VALIDATOR`, + `StandardValidators.POSITIVE_INTEGER_VALIDATOR`). The Validator can be left off only when Allowable Values + are provided. In this case, do not include a Validator because it is redundant and confusing. +- **Expression Language:** If a property should support Expression Language, add + `.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)` or the + appropriate scope. Always document when Expression Language is supported in the property + description. Some developers tend to go overboard here and feel like Expression Language should be supported + everywhere, but this is a mistake! The default assumption should be that Expression Language is not supported + unless the value is expected to be different for every FlowFile that is processed. +- **Dependencies:** Use `.dependsOn(...)` to conditionally show properties based on the + values of other properties. This keeps the configuration UI clean and avoids exposing + irrelevant properties. If there is a dependency, it is important to understand that `.required(true)` means that + this property is required IF AND ONLY IF the dependency condition is met. + +## Processors + +- The `onTrigger` method should be focused on processing FlowFiles. Keep setup and teardown + logic in lifecycle methods when possible. +- Prefer `session.read()` and `session.write()` with callbacks over directly working with + streams to ensure proper resource management. +- Prefer `session.commitAsync()` over `session.commit()`. The `commit` method was the original implementation, + but it has now been deprecated in favor of `commitAsync`. The `commitAsync` call provide a clearer, cleaner + interface for handling post-commit actions including success and failure callbacks. In addition, the async + method allows Processors to be used much more efficiently in a Stateless NiFi flow. + +### Processor Lifecycle Annotations + +- Use `@OnScheduled` for setup that should happen once before the processor starts + running (e.g., creating clients, compiling patterns). +- Use `@OnStopped` for cleanup (e.g., closing clients, releasing resources). +- `@OnUnscheduled` is rarely used but can be used to interrupt long-running processes when the Processor is stopped. + Generally, though, it is preferable to write the Processor in such a way that long-running processes check `isScheduled()` + and stop gracefully if the return value is `false`. + +### Relationships +- **Declaration**: Relationships are defined as `static final` fields using `new Relationship.Builder()`. + Relationship names should generally be lowercase. +- **Success and Failure:** Most processors define at least a `success` and `failure` + relationship. Use `REL_SUCCESS` and `REL_FAILURE` as constant names. +- **Original relationship:** Processors that enrich or fork FlowFiles often include an + `original` relationship for the unmodified input FlowFile. + +### Use Case Documentation +The `@UseCase` and `@MultiProcessorUseCase` annotations help document common usage patterns for Processors. +This is helpful for users to understand when and how to use the component effectively. It is equally important +for Agents that can determine which components should be used for a given task. + +- Use `@UseCase` to document common use cases for the Processor. This helps users understand + when and how to use the component effectively. This is unnecessary for Processors that serve a single use case + that is clearly described by the component name and description. For example, a Processor that consumes messages + from a specific service likely does not need a `@UseCase` annotation because its purpose is clear. +- Use `@MultiProcessorUseCase` to document well-known patterns that involve multiple Processors working + together to achieve a common goal. Examples include List/Fetch patterns, Fork/Join patterns, etc. + The `@MultiProcessorUseCase` annotation should not be added to each individual Processor involved in the pattern. + Rather, the convention is to add the annotation to the last Processor in the flow that completes the pattern. + Some Processors will have one or more `@UseCase` annotations and no `@MultiProcessorUseCase` annotations, + while some will have one or more `@MultiProcessorUseCase` annotations and no `@UseCase` annotations. + + +## Controller Services + +Controller Services are objects that can be shared across multiple components. This is typically done for +clients that connect to external systems in order to avoid creating many connections, or in order to share +configuration across multiple components without the user having to duplicate configuration. Controller Services +can also be helpful for abstracting away some piece of functionality into a separate extension point so that the +implementation can be swapped out by the user. For example, Record Readers and Writers are implemented as Controller +Services so that the user can simply choose which format they want to read and write in a flexible and reusable way. + +That said, Controller Services can be more onerous to configure and maintain for the user, so they should +be used sparingly and only when there is a clear benefit to doing so. + +### Controller Service Lifecycle Annotations + +- Use `@OnScheduled` for setup that should happen once before the service is enabled (e.g., creating clients, compiling patterns). +- Use `@OnDisabled` for cleanup (e.g., closing clients, releasing resources). + + +## General Patterns + +- Use `ComponentLog` (obtained via `getLogger()`) for all logging, not SLF4J directly. + This ensures log messages are associated with the component instance and that they generate Bulletins. +- Use `@CapabilityDescription` to provide a clear and concise description of what the component does. This should not + be used for configuration details. +- Use `@Tags` to provide relevant keywords that help users find the component. +- Use `@SeeAlso` to reference related components. +- Use `@WritesAttributes` and `@ReadsAttributes` to document which FlowFile attributes are read and written by the component. +- Use `@DynamicProperty` to document any dynamic properties supported by the component. diff --git a/.cursor/rules/extension-testing.mdc b/.cursor/rules/extension-testing.mdc new file mode 100644 index 000000000000..97692657289b --- /dev/null +++ b/.cursor/rules/extension-testing.mdc @@ -0,0 +1,80 @@ +--- +description: Testing guidance for NiFi extensions (Processors, Controller Services, Connectors). Covers nifi-mock and TestRunner usage. +alwaysApply: false +--- + +# Extension Testing + +This rule applies when writing tests for NiFi extensions: Processors, Controller Services, and Connectors. + +## Unit Tests + +Unit tests should be used to test individual classes and methods in isolation. This often +will result in mocking dependency classes. However, if there already exists a Mock +implementation of an interface or dependency class, it is preferred to use the existing +Mock implementation. Similarly, for simple classes, it is preferable to make use of the +real implementation of a class rather than creating a Mock implementation. We are infinitely +more interested in having tests that are fast, reliable, correct, and easy to maintain than +we are in having tests that adhere to strict and arbitrary definitions of what constitutes +a "unit test." + +## Use nifi-mock + +Tests for extensions should always make use of the `nifi-mock` mocking framework. This is +done through the `TestRunner` interface and its standard implementation, obtained via +`TestRunners.newTestRunner(processor)`. + +The `TestRunner` provides methods for: +- Setting property values (`setProperty`) +- Enqueueing FlowFiles (`enqueue`) +- Running the processor (`run`) +- Asserting transfer to relationships (`assertTransferCount`, `assertAllFlowFilesTransferred`) +- Validating processor configuration (`assertValid`, `assertNotValid`) +- Asserting content and attributes of FlowFiles (`assertContentEquals`, `assertAttributeEquals`, etc.) + +## No System Tests for Extensions + +System tests are not expected for extensions. Extensions are tested at the unit level using +`nifi-mock`. The `nifi-mock` framework provides sufficient isolation and simulation of the +NiFi runtime environment. + +## What to Test + +- **Property validation:** If the extension has a custom Validator, it +- **customValidate:** If the extension overrides the `customValidate` method, test that it correctly + validates the configuration and produces appropriate validation results. +- **Relationship routing:** Verify that FlowFiles are routed to the correct relationship + based on input and configuration. +- **Content transformation:** For processors that modify FlowFile content, verify that + output content matches expectations. +- **Attribute handling:** Verify that expected attributes are set on output FlowFiles. +- **Error handling:** Verify that error conditions (bad input, misconfiguration, simulated + failures) are handled correctly, typically by routing to a failure relationship. + +## What NOT to Test + +- **NiFi framework behavior:** Do not attempt to test the behavior of the NiFi framework itself. + For example, do not test that `session.commitAsync()` actually commits a transaction. Instead, + focus on testing that your extension behaves correctly when `commitAsync` is called, and trust + that the NiFi framework will handle the commit correctly. +- **Validator behavior:** If a custom validator is used by an extension, that custom validator should + be tested separate as a unit test for the validator itself. However, if the extension point provides + a `customValidate` method, that should absolutely be tested as part of the extension's unit tests. +- **The PropertyDescriptors that are returned:** Do not test that the `getSupportedPropertyDescriptors` + method returns the expected PropertyDescriptors. This is an anti-pattern because it does not properly + test that the extension abides by the contract of the API. For example, if a new PropertyDescriptor is + added whose default is to behave the same way as the old behavior, the test should absolutely pass. + However, if the test is written to expect a specific set of PropertyDescriptors, then the test will fail, + leading to confusion and unnecessary maintenance. + +## Controller Service Testing + +When a processor depends on a Controller Service, use `TestRunner.addControllerService` +and `TestRunner.enableControllerService` to wire up either a real or mock implementation +of the service for testing. + +## TestContainers + +For Processors that interact with external systems, it can be helpful to use TestContainers to spin up +a temporary instance of the external system for testing. This allows for more realistic integration tests +without requiring the user to have the external system installed and running on their machine. diff --git a/.cursor/rules/framework-testing.mdc b/.cursor/rules/framework-testing.mdc new file mode 100644 index 000000000000..d25680b5021d --- /dev/null +++ b/.cursor/rules/framework-testing.mdc @@ -0,0 +1,46 @@ +--- +description: Testing guidance for NiFi framework code (not extensions). Covers when to use unit, integration, and system tests for framework classes. +alwaysApply: false +--- + +# Framework Testing + +This rule applies when working on NiFi framework code (not Processors, Controller +Services, or Connectors). + +## Unit Tests + +Unit tests should be used to test individual classes and methods in isolation. This often +will result in mocking dependency classes. However, if there already exists a Mock +implementation of an interface or dependency class, it is preferred to use the existing +Mock implementation. Similarly, for simple classes, it is preferable to make use of the +real implementation of a class rather than creating a Mock implementation. We are infinitely +more interested in having tests that are fast, reliable, correct, and easy to maintain than +we are in having tests that adhere to strict and arbitrary definitions of what constitutes +a "unit test." + +## Integration Tests + +When working in the framework, unit tests are still important, but integration tests and +system tests are often more important. Integration tests are still allowed to use mocks but +typically we prefer to use real implementations of classes in order to ensure a more +realistic and holistic test. + +## System Tests + +System tests live in the `nifi-system-tests` module and should be used for any changes +that make significant changes to the framework and the interaction between a significant +number of classes. They should also be used for any changes that may be fairly isolated but +which are in a critical path of the framework, especially those that affect how data is +persisted, processed, or accessed; or those that affect how components are created, +configured, scheduled, or executed. + +Good candidates for system tests include changes to `ProcessScheduler`, `ProcessorNode`, +`ControllerServiceNode`, `FlowController`, `FlowManager`, how Parameters are handled, flow +synchronization, the repositories, etc. + +## Escalation + +Any unit test that ends up requiring a large number of mocks is a good candidate for an +integration test, and any integration test that ends up requiring a large number of mocks +is a good candidate for a system test. diff --git a/.cursor/rules/persona.mdc b/.cursor/rules/persona.mdc new file mode 100644 index 000000000000..3e4a66ed6455 --- /dev/null +++ b/.cursor/rules/persona.mdc @@ -0,0 +1,16 @@ +--- +description: AI persona and general approach for working on the Apache NiFi codebase +alwaysApply: true +--- + +# AI Persona + +Act as an experienced Java software engineer. When considering how to implement a task, +first consider the big picture of what is being asked. Then determine which classes will +need to be updated. + +Quite often, a single request will require manipulating many different classes. Generally +speaking, it is best to avoid changing established interfaces, especially those in nifi-api. +It is acceptable when necessary, but any change in nifi-api needs to be backward compatible. +For example, you might introduce a new method with a default implementation, or add a new method +and deprecate an old one without removing it. diff --git a/.cursor/rules/testing-standards.mdc b/.cursor/rules/testing-standards.mdc new file mode 100644 index 000000000000..09a543a967de --- /dev/null +++ b/.cursor/rules/testing-standards.mdc @@ -0,0 +1,69 @@ +--- +description: Shared test coding conventions for all NiFi automated tests (framework and extensions) +globs: "**/src/test/**/*.java" +alwaysApply: false +--- + +# Testing Standards + +In addition to the general rules defined in the Code Style rule, follow these rules when +creating or manipulating automated tests. + +## Test Code Conventions + +1. NEVER add comments such as `// Given`, `// When`, `// Then`. These comments are + considered an anti-pattern and should be removed or replaced whenever they are + encountered. Instead, leave them out all together (preferred) or use comments that + clearly articulate what is happening, such as `// Setup`, `// Invoke method`, + `// Assert expected results`. One of the reasons that this is considered an anti-pattern + (in addition to the fact that the given/when/then nomenclature itself provides no + meaning) is that it assumes a very specific pattern in unit tests, that we will create a + bunch of objects, invoke the method we care about, make assertions, and then end. This + often results in many tests that are extremely repetitive. Instead, whenever it makes + sense to do so, create the prerequisite objects, invoke the method we care about with + appropriate arguments, make assertions, and then invoke again with a different set of + arguments, make assertions, etc. There is no need to have many repetitive methods that + each create many repetitive objects. + +2. Unit tests are Java. They are not English. As such, they should be written like Java. + Frameworks such as assertj that strive to make the unit tests look more "English-like" + should be avoided. Use of these frameworks sometimes works well but often quickly + devolves into automated tests that read like neither English nor Java. + +3. Like any other code, unit tests should be created using reusable methods where + appropriate. Do not create 15 methods that are all very similar and repetitive. Instead, + create reusable methods that can be called from each of the methods. + +4. Never use the `assert` keyword. Use JUnit assertions instead. + +5. Never create a test file that is centered around testing a method or capability. Unit + tests must always be named after the class they are testing. It is okay if a given unit + test class is very, very long. + +6. This is a Java project using the Maven structure. Java test files must always fall under + src/test/java of the appropriate sub-module. + +7. Never use pointless assertions such as assertDoesNotThrow. This adds nothing but + complexity. Just call the method, and if it throws an Exception, the test will fail. It + is assumed by default that each line does not throw an Exception. + +## General Testing Philosophy + +- Unit tests should be used to verify any sufficiently complex method in a class. We should + *NOT* have unit tests for trivial methods such as getters and setters, or methods that are + only a few lines long and are not complex. A good general rule of thumb is that if a + person can understand a method and verify that it is correct in a few seconds, then it is + not necessary to have a unit test for that method. + +- Avoid adding multiple tests that are redundant. For example, if you have a method that + takes a single `String` argument it may make sense to test with `null`, a 0-length + string, and a long String. But do not test with 4 different Strings each with a different + value but which test the same lines of code. Avoid over-testing by adding multiple tests + whose differences are conditions that do not truly affect the behavior of the code. + +- Unit tests should always focus on the "contract" of the method. That is, given a certain + input, the test should assert that it receives the correct output. The test should NOT be + focused on the inner details of how the method works. Focusing on the inner details of + how the method works is an anti-pattern because it results in tests that are more brittle, + more difficult to maintain, and tests that fail when the method is improved or refactored + but still adheres to the same contract. diff --git a/.cursorrules b/.cursorrules deleted file mode 100644 index e083cb45c42d..000000000000 --- a/.cursorrules +++ /dev/null @@ -1,129 +0,0 @@ -AI persona ----------- -Act as an experienced Java software engineer. When considering how to implement a task, first consider the big picture of what is being asked. Then determine which classes will need to be udpated. -Quite often, a single request will require manipulating many different classes. Generally speaking, it is best to avoid changing established interfaces, but it is often acceptable because the public API -that NiFi adheres to is not in this module. - - -API ---- -When it is necessary to lookup the API for reference, the API can generally be found at ../nifi-api, where .. is relative to the project's root directory. - -Building --------- -NiFi is a complex Maven codebase. Never build code (testing or otherwise) using javac. Always use `mvn` instead. -Additionally, building a maven module using the also-make flag (`-am`) is often very expensive and slow. -Instead, only build the specific module you are modifying. Assume that the user has already built the entire -codebase and that only the specific module you are modifying needs to be built again. - -Code Style ----------- -NiFi adheres to a few code styles that are not necessarily common. Please ensure that you observe these code styles. -1. Any variable that can be marked `final` must be marked `final`. This includes declarations of Exceptions, method arguments, local variables, member variables, etc. -2. Short-hand is highly discouraged in names of variables, classes, methods, etc., as well as in documentation. -3. Private / helper methods should not be placed before the first public/protected method that calls it. -4. Unless the method is to be heavily reused, avoid creating trivial 1-2 line methods and instead just place the code inline. -5. Code is allowed to be up to 200 characters wide. Avoid breaking lines into many short lines. -6. Avoid creating private methods that are called only once unless they are at least 10 lines long or are complex. -7. It is never acceptable to use star imports. Import each individual class that is to be used. -8. Never use underscores in class names, variables, or filenames. -9. Never use System.out.println but instead use SLF4J Loggers -10. Avoid excessive whitespace in method invocations. For example, instead of writing: -``` -myObject.doSomething( - arg1, - arg2, - arg3, - arg4, - arg5 -); -``` -Write this instead: -``` -myObject.doSomething(arg1, arg2, arg3, arg4, arg5); -``` - -11. When possible, prefer importing a class, rather than using fully qualified classname inline in the code. -12. Avoid statically importing methods unless they are used frequently such as methods in the `Assertions` and `Mockito` classes. -13. Avoid trailing whitespace at the end of lines, especially in blank lines. - -Unit / Automated Testing ------------------------- -In addition to the general rules defined in the Code Style section, follow these rules when creating or manipulating automated tests. -1. NEVER add comments such as `// Given`, `// When`, `// Then`. These comments are considered an anti-pattern and should be removed or replaced whenever they are encountered. - Instead, leave them out all together (preferred) or use comments that clearly articulate what is happening, such as `// Setup`, `// Invoke method`, `// Assert expected results.` - One of the reasons that this is considered an anti-pattern (in addition to the fact that the given/when/then nomeclature itself provides no meaning) is that it assumes a very specific pattern - in unit tests, that we will create a bunch of objects, invoke the method we care about, make assertions, and then end. This often results in many tests that are extremely repetitive. - Instead, whenever it makes sense to do so, create the prerequisitive objects, invoke the method we care about with appropriate arguments, make assertions, and then invoke again with a different - set of arguments, make assertions, etc. There is no need to have many repetitive methods that each create many repetitive objects. - -2. Unit tests are Java. They are not English. As such, they should be written like Java. Frameworks such as assertj that strive to make the unit tests look more "English-like" should be avoided. - Use of these frameworks sometimes works well but often quickly devolves into automated tests that read like neither English nor Java. - -3. Like any other code, unit tests should be created using reusable methods where appropriate. Do not create 15 methods that are all very similar and repetitive. Instead, create reusable methods - that can be called from each of the methods. - -4. Never use the `assert` keyword. Use JUnit assertions instead. - -5. Never create a test file that is centered around testing a method or capability. Unit tests must always be named after the class they are testing. It is okay if a given unit test class is very, very long. - -6. This is a Java project using the Maven structure. Java test files must always fall under src/test/java of the appropriate sub-module. - -7. Never use pointless assertions such as assertDoesNotThrow - this adds nothing but complexity. Just call the method, and if it throws an Exception, the test will fail. I.e., - it is assumed by default that each line does not throw an Exception, so do not use - - -Unit Test vs. Integration Test vs. System Test ----------------------------------------------- -Unit tests should be used to test individual classes and methods in isolation. This often will result in mocking dependency classes. -However, if there already exists a Mock implementation of an interface or dependency class, it is preferred to use the existing Mock implementation. -Similarly, for simple classes, it is preferable to make use of the real implementation of a class rather than creating a Mock implementation. -While some may argue that this is not a "true" unit test, we are infinitely more interested in having tests that are fast, reliable, -correct, and easy to maintain than we are in having tests that adhere to strict and arbitrary rules. - -In general, unit tests should be used to verify any sufficiently complex method in a class. We should *NOT* have unit tests for -trivial methods such as getters and setters, or methods that are only a few lines long and are not complex. A good general -rule of thumb is that if a person can understand a method and verify that it is correct in a few seconds, then it is not -necessary to have a unit test for that method. - -Avoid adding multiple tests that are redundant. For example, if you have a method that takes a single `String` argument -it may make sense to test with `null`, a 0-length strength, and a long String. But do not test with 4 different Strings -each with a different value but which test the same lines of code. Avoid over-testing by adding multiple tests whose differences -are conditions that do not truly affect the behavior of the code. - -Unit tests should always focus on the "contract" of the method. That is, given a certain input, the test should assert that it -receives the correct output. The test should NOT be focused on the inner details of how the method works. Focusing on the inner details -of how the method works is an anti-pattern because it results in tests that are more brittle, more difficult to maintain, -and tests that fail when the method is improved or refactored but still adheres to the same contract. - -When producing a Processor or Controller Service, unit tests should always be written, and they should always make use of the `nifi-mock` mocking framework. - -When working in the framework, unit tests are still important, but integration tests and system tests are often more important. -Integration tests are still allowed to use mocks but typically we prefer to use real implementations of classes in order to ensure a -more realistic and holistic test. - -System tests live in the `nifi-system-tests` module and should be used for any changes that make significant changes to -the framework and the interaction between a significant number of classes. They should also be used for any changes that -may be fairly isolated but which are in a critical path of the framework, especially those that affect how data is persisted, -processed, or accessed; or those that affected how components are created, configured, scheduled, or executed. -For example, any change to `ProcessScheduler`, `ProcessorNode`, `ControllerServiceNode`, `FlowController`, -`FlowManager`, changes to how Parameters are handled, flow synchronization, the repositories, etc. is a good candidate for a system test. - -Additionally, any unit test that ends up requiring a large number of mocks is a good candidate for an integration test, -and any integration test that ends up requiring a large number of mocks is a good candidate for a system test. - - -Ending Conditions ------------------ -When you have completed the task, ensure that you have done the following: -1. All code compiles and builds successfully using `mvn`. -2. All relevant unit tests pass successfully using `mvn`. -3. All code adheres to the Code Style rules defined above. -4. Checkstyle and PMD pass successfully using `mvn checkstyle:check pmd:check -T 1C` from the appropriate directory. -5. Unit tests have been added to verify the functionality of any sufficiently complex method. -6. A system test or an integration test has been added if the change makes significant changes to the framework - and the interaction between a significant number of classes. - -Do not consider the task complete until all of the above conditions have been met. -When you do consider the task complete, provide a summary of which tests were added or modified and what the -behavior that they verify is correct. diff --git a/.gitignore b/.gitignore index ff8a118e9a59..e299e15e43c4 100644 --- a/.gitignore +++ b/.gitignore @@ -21,7 +21,7 @@ nb-configuration.xml .java-version /nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/dist/ __pycache__ -.cursor/ +.cursor/debug.log # Develocity .mvn/.develocity/ diff --git a/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector b/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector index 7f2959091e31..341890a36ffb 100644 --- a/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector +++ b/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector @@ -14,4 +14,4 @@ # limitations under the License. org.apache.nifi.mock.connectors.GenerateAndLog -org.apache.nifi.mock.connectors.MissingBundleConnector +org.apache.nifi.mock.connectors.MissingBundleConnector \ No newline at end of file