diff --git a/.cursor/rules/building.mdc b/.cursor/rules/building.mdc new file mode 100644 index 000000000000..af4829a42628 --- /dev/null +++ b/.cursor/rules/building.mdc @@ -0,0 +1,18 @@ +--- +description: Maven build instructions for the NiFi codebase +alwaysApply: true +--- + +# Building + +NiFi is a complex Maven codebase. Never build code (testing or otherwise) using javac. +Always use `mvn` instead, or preferably the `.mvnw` wrapper script. + +Additionally, building a maven module using the also-make flag (`-am`) is often very +expensive and slow. Instead, only build the specific module you are modifying. Assume that +the user has already built the entire codebase and that only the specific module you are +modifying needs to be built again. If this fails, you can prompt the user to build the entire +codebase, but only after you have attempted to build the relevant modules yourself first. +It is important not to run `mvn clean` at the root level or at the `nifi-assembly` level without +the user's express permission, as this may delete a running instance of NiFi, causing permanent +loss of flows and configuration. diff --git a/.cursor/rules/code-style.mdc b/.cursor/rules/code-style.mdc new file mode 100644 index 000000000000..ca7e9cfed5a6 --- /dev/null +++ b/.cursor/rules/code-style.mdc @@ -0,0 +1,74 @@ +--- +description: Java code style conventions for the NiFi codebase +globs: "**/*.java" +alwaysApply: false +--- + +# Code Style + +NiFi adheres to a few code styles that are not necessarily common. Please ensure that you +observe these code styles. + +1. Any variable that can be marked `final` must be marked `final`. This includes + declarations of Exceptions, method arguments, local variables, member variables, etc. +2. Short-hand is highly discouraged in names of variables, classes, methods, etc., as well + as in documentation. Exceptions to this include in the framework, you may see references to + `procNode` for `ProcessorNode` or other such short-hand that is very difficult to confuse with + other terms, and it is used only when clearly defined such as `final ProcessorNode procNode = ...`. + Even though, however, we would not abbreviate `ControllerService` as `cs` because `cs` is too vague + and easily misunderstood. Instead, a value of `serviceNode` might be used. +3. Private / helper methods should not be placed before the first public/protected method + that calls it. +4. Unless the method is to be heavily reused, avoid creating trivial 1-2 line methods and + instead just place the code inline. +5. Code is allowed to be up to 200 characters wide. Avoid breaking lines into many short lines. +6. Avoid creating private methods that are called only once unless they are at least 10 + lines long or are complex. +7. It is never acceptable to use star imports. Import each individual class that is to be used. +8. Never use underscores in class names, variables, or filenames. +9. Never use System.out.println but instead use SLF4J Loggers. +10. Avoid excessive whitespace in method invocations. For example, instead of writing: + +```java +myObject.doSomething( + arg1, + arg2, + arg3, + arg4, + arg5 +); +``` + +Write this instead: + +```java +myObject.doSomething(arg1, arg2, arg3, arg4, arg5); +``` + +It is okay to use many newlines in a builder pattern, such as: +```java +final MyObject myObject = MyObject.builder() + .arg1(arg1) + .arg2(arg2) + .arg3(arg3) + .build(); +``` + +It is also acceptable when chaining methods in a functional style such as: +```java +final List result = myList.stream() + .filter(s -> s.startsWith("A")) + .map(String::toUpperCase) + .toList(); +``` + +11. When possible, prefer importing a class, rather than using fully qualified classname + inline in the code. +12. Avoid statically importing methods, except in methods that are frequently used in testing + frameworks, such as the `Assertions` and `Mockito` classes. +13. Avoid trailing whitespace at the end of lines, especially in blank lines. +14. The `var` keyword is never allowed in the codebase. Always explicitly declare the type of variables. +15. Prefer procedural code over functional code. For example, prefer using a for loop instead of a stream + when the logic is not simple and straightforward. The stream API is powerful but can be difficult to + read when overused or used in complex scenarios. Functional style is best used when the logic is simple + and chains together no more than 3-4 operations. diff --git a/.cursor/rules/ending-conditions.mdc b/.cursor/rules/ending-conditions.mdc new file mode 100644 index 000000000000..aa2b607801a1 --- /dev/null +++ b/.cursor/rules/ending-conditions.mdc @@ -0,0 +1,30 @@ +--- +description: Task completion checklist that must be verified before considering any task done +alwaysApply: true +--- + +# Ending Conditions + +When you have completed a task, ensure that you have verified the following: + +1. All code compiles and builds successfully using `mvn`. +2. All relevant unit tests pass successfully using `mvn`. +3. All code adheres to the Code Style rules. +4. Checkstyle and PMD pass successfully using + `mvn checkstyle:check pmd:check -T 1C` from the appropriate directory. +5. Unit tests have been added to verify the functionality of any sufficiently complex method. +6. A system test or an integration test has been added if the change makes significant + changes to the framework and the interaction between a significant number of classes. +7. You have performed a full review of the code to ensure that there are no logical errors + and that the code is not duplicative or difficult to understand. If you find any code that + is in need of refactoring due to clarity or duplication, you should report this to the user + and offer to make those changes as well. +8. If creating a new Processor or Controller Service, ensure that all relevant annotations + have been added, including `@Tags`, `@CapabilityDescription`, `@UseCase`, and + `@MultiProcessorUseCase` as appropriate. + + +Do not consider the task complete until all of the above conditions have been met. When you +do consider the task complete, provide a summary of what you changed and which tests were +added or modified and what the behavior is that they verify. Additionally, provide any feedback +about your work that may need further review or that is not entirely complete. diff --git a/.cursor/rules/extension-development.mdc b/.cursor/rules/extension-development.mdc new file mode 100644 index 000000000000..74e53f34d5f1 --- /dev/null +++ b/.cursor/rules/extension-development.mdc @@ -0,0 +1,107 @@ +--- +description: Development patterns for NiFi extensions (Processors, Controller Services, Connectors). Covers Property Descriptors, Relationships, and common patterns. +alwaysApply: false +--- + +# Extension Development + +This rule applies when developing NiFi extensions: Processors, Controller Services, and Connectors. + +## Property Descriptors + +Property Descriptors are defined as `static final` fields on the component class using +`PropertyDescriptor.Builder`. + +- **Naming:** Use clear, descriptive names. The `displayName` field should never be used. Make the + name itself clear and concise. Use Title Case for property names. +- **Required vs. optional:** Mark properties as `.required(true)` when the component cannot + function without them. Prefer sensible defaults via `.defaultValue(...)` when possible. + When a default value is provided, the property will always have a value. The `required` flag in this + case is more of a documentation aid to indicate the importance of the property. +- **Validators:** Always attach an appropriate `Validator` (e.g., `StandardValidators.NON_EMPTY_VALIDATOR`, + `StandardValidators.POSITIVE_INTEGER_VALIDATOR`). The Validator can be left off only when Allowable Values + are provided. In this case, do not include a Validator because it is redundant and confusing. +- **Expression Language:** If a property should support Expression Language, add + `.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)` or the + appropriate scope. Always document when Expression Language is supported in the property + description. Some developers tend to go overboard here and feel like Expression Language should be supported + everywhere, but this is a mistake! The default assumption should be that Expression Language is not supported + unless the value is expected to be different for every FlowFile that is processed. +- **Dependencies:** Use `.dependsOn(...)` to conditionally show properties based on the + values of other properties. This keeps the configuration UI clean and avoids exposing + irrelevant properties. If there is a dependency, it is important to understand that `.required(true)` means that + this property is required IF AND ONLY IF the dependency condition is met. + +## Processors + +- The `onTrigger` method should be focused on processing FlowFiles. Keep setup and teardown + logic in lifecycle methods when possible. +- Prefer `session.read()` and `session.write()` with callbacks over directly working with + streams to ensure proper resource management. +- Prefer `session.commitAsync()` over `session.commit()`. The `commit` method was the original implementation, + but it has now been deprecated in favor of `commitAsync`. The `commitAsync` call provide a clearer, cleaner + interface for handling post-commit actions including success and failure callbacks. In addition, the async + method allows Processors to be used much more efficiently in a Stateless NiFi flow. + +### Processor Lifecycle Annotations + +- Use `@OnScheduled` for setup that should happen once before the processor starts + running (e.g., creating clients, compiling patterns). +- Use `@OnStopped` for cleanup (e.g., closing clients, releasing resources). +- `@OnUnscheduled` is rarely used but can be used to interrupt long-running processes when the Processor is stopped. + Generally, though, it is preferable to write the Processor in such a way that long-running processes check `isScheduled()` + and stop gracefully if the return value is `false`. + +### Relationships +- **Declaration**: Relationships are defined as `static final` fields using `new Relationship.Builder()`. + Relationship names should generally be lowercase. +- **Success and Failure:** Most processors define at least a `success` and `failure` + relationship. Use `REL_SUCCESS` and `REL_FAILURE` as constant names. +- **Original relationship:** Processors that enrich or fork FlowFiles often include an + `original` relationship for the unmodified input FlowFile. + +### Use Case Documentation +The `@UseCase` and `@MultiProcessorUseCase` annotations help document common usage patterns for Processors. +This is helpful for users to understand when and how to use the component effectively. It is equally important +for Agents that can determine which components should be used for a given task. + +- Use `@UseCase` to document common use cases for the Processor. This helps users understand + when and how to use the component effectively. This is unnecessary for Processors that serve a single use case + that is clearly described by the component name and description. For example, a Processor that consumes messages + from a specific service likely does not need a `@UseCase` annotation because its purpose is clear. +- Use `@MultiProcessorUseCase` to document well-known patterns that involve multiple Processors working + together to achieve a common goal. Examples include List/Fetch patterns, Fork/Join patterns, etc. + The `@MultiProcessorUseCase` annotation should not be added to each individual Processor involved in the pattern. + Rather, the convention is to add the annotation to the last Processor in the flow that completes the pattern. + Some Processors will have one or more `@UseCase` annotations and no `@MultiProcessorUseCase` annotations, + while some will have one or more `@MultiProcessorUseCase` annotations and no `@UseCase` annotations. + + +## Controller Services + +Controller Services are objects that can be shared across multiple components. This is typically done for +clients that connect to external systems in order to avoid creating many connections, or in order to share +configuration across multiple components without the user having to duplicate configuration. Controller Services +can also be helpful for abstracting away some piece of functionality into a separate extension point so that the +implementation can be swapped out by the user. For example, Record Readers and Writers are implemented as Controller +Services so that the user can simply choose which format they want to read and write in a flexible and reusable way. + +That said, Controller Services can be more onerous to configure and maintain for the user, so they should +be used sparingly and only when there is a clear benefit to doing so. + +### Controller Service Lifecycle Annotations + +- Use `@OnScheduled` for setup that should happen once before the service is enabled (e.g., creating clients, compiling patterns). +- Use `@OnDisabled` for cleanup (e.g., closing clients, releasing resources). + + +## General Patterns + +- Use `ComponentLog` (obtained via `getLogger()`) for all logging, not SLF4J directly. + This ensures log messages are associated with the component instance and that they generate Bulletins. +- Use `@CapabilityDescription` to provide a clear and concise description of what the component does. This should not + be used for configuration details. +- Use `@Tags` to provide relevant keywords that help users find the component. +- Use `@SeeAlso` to reference related components. +- Use `@WritesAttributes` and `@ReadsAttributes` to document which FlowFile attributes are read and written by the component. +- Use `@DynamicProperty` to document any dynamic properties supported by the component. diff --git a/.cursor/rules/extension-testing.mdc b/.cursor/rules/extension-testing.mdc new file mode 100644 index 000000000000..97692657289b --- /dev/null +++ b/.cursor/rules/extension-testing.mdc @@ -0,0 +1,80 @@ +--- +description: Testing guidance for NiFi extensions (Processors, Controller Services, Connectors). Covers nifi-mock and TestRunner usage. +alwaysApply: false +--- + +# Extension Testing + +This rule applies when writing tests for NiFi extensions: Processors, Controller Services, and Connectors. + +## Unit Tests + +Unit tests should be used to test individual classes and methods in isolation. This often +will result in mocking dependency classes. However, if there already exists a Mock +implementation of an interface or dependency class, it is preferred to use the existing +Mock implementation. Similarly, for simple classes, it is preferable to make use of the +real implementation of a class rather than creating a Mock implementation. We are infinitely +more interested in having tests that are fast, reliable, correct, and easy to maintain than +we are in having tests that adhere to strict and arbitrary definitions of what constitutes +a "unit test." + +## Use nifi-mock + +Tests for extensions should always make use of the `nifi-mock` mocking framework. This is +done through the `TestRunner` interface and its standard implementation, obtained via +`TestRunners.newTestRunner(processor)`. + +The `TestRunner` provides methods for: +- Setting property values (`setProperty`) +- Enqueueing FlowFiles (`enqueue`) +- Running the processor (`run`) +- Asserting transfer to relationships (`assertTransferCount`, `assertAllFlowFilesTransferred`) +- Validating processor configuration (`assertValid`, `assertNotValid`) +- Asserting content and attributes of FlowFiles (`assertContentEquals`, `assertAttributeEquals`, etc.) + +## No System Tests for Extensions + +System tests are not expected for extensions. Extensions are tested at the unit level using +`nifi-mock`. The `nifi-mock` framework provides sufficient isolation and simulation of the +NiFi runtime environment. + +## What to Test + +- **Property validation:** If the extension has a custom Validator, it +- **customValidate:** If the extension overrides the `customValidate` method, test that it correctly + validates the configuration and produces appropriate validation results. +- **Relationship routing:** Verify that FlowFiles are routed to the correct relationship + based on input and configuration. +- **Content transformation:** For processors that modify FlowFile content, verify that + output content matches expectations. +- **Attribute handling:** Verify that expected attributes are set on output FlowFiles. +- **Error handling:** Verify that error conditions (bad input, misconfiguration, simulated + failures) are handled correctly, typically by routing to a failure relationship. + +## What NOT to Test + +- **NiFi framework behavior:** Do not attempt to test the behavior of the NiFi framework itself. + For example, do not test that `session.commitAsync()` actually commits a transaction. Instead, + focus on testing that your extension behaves correctly when `commitAsync` is called, and trust + that the NiFi framework will handle the commit correctly. +- **Validator behavior:** If a custom validator is used by an extension, that custom validator should + be tested separate as a unit test for the validator itself. However, if the extension point provides + a `customValidate` method, that should absolutely be tested as part of the extension's unit tests. +- **The PropertyDescriptors that are returned:** Do not test that the `getSupportedPropertyDescriptors` + method returns the expected PropertyDescriptors. This is an anti-pattern because it does not properly + test that the extension abides by the contract of the API. For example, if a new PropertyDescriptor is + added whose default is to behave the same way as the old behavior, the test should absolutely pass. + However, if the test is written to expect a specific set of PropertyDescriptors, then the test will fail, + leading to confusion and unnecessary maintenance. + +## Controller Service Testing + +When a processor depends on a Controller Service, use `TestRunner.addControllerService` +and `TestRunner.enableControllerService` to wire up either a real or mock implementation +of the service for testing. + +## TestContainers + +For Processors that interact with external systems, it can be helpful to use TestContainers to spin up +a temporary instance of the external system for testing. This allows for more realistic integration tests +without requiring the user to have the external system installed and running on their machine. diff --git a/.cursor/rules/framework-testing.mdc b/.cursor/rules/framework-testing.mdc new file mode 100644 index 000000000000..d25680b5021d --- /dev/null +++ b/.cursor/rules/framework-testing.mdc @@ -0,0 +1,46 @@ +--- +description: Testing guidance for NiFi framework code (not extensions). Covers when to use unit, integration, and system tests for framework classes. +alwaysApply: false +--- + +# Framework Testing + +This rule applies when working on NiFi framework code (not Processors, Controller +Services, or Connectors). + +## Unit Tests + +Unit tests should be used to test individual classes and methods in isolation. This often +will result in mocking dependency classes. However, if there already exists a Mock +implementation of an interface or dependency class, it is preferred to use the existing +Mock implementation. Similarly, for simple classes, it is preferable to make use of the +real implementation of a class rather than creating a Mock implementation. We are infinitely +more interested in having tests that are fast, reliable, correct, and easy to maintain than +we are in having tests that adhere to strict and arbitrary definitions of what constitutes +a "unit test." + +## Integration Tests + +When working in the framework, unit tests are still important, but integration tests and +system tests are often more important. Integration tests are still allowed to use mocks but +typically we prefer to use real implementations of classes in order to ensure a more +realistic and holistic test. + +## System Tests + +System tests live in the `nifi-system-tests` module and should be used for any changes +that make significant changes to the framework and the interaction between a significant +number of classes. They should also be used for any changes that may be fairly isolated but +which are in a critical path of the framework, especially those that affect how data is +persisted, processed, or accessed; or those that affect how components are created, +configured, scheduled, or executed. + +Good candidates for system tests include changes to `ProcessScheduler`, `ProcessorNode`, +`ControllerServiceNode`, `FlowController`, `FlowManager`, how Parameters are handled, flow +synchronization, the repositories, etc. + +## Escalation + +Any unit test that ends up requiring a large number of mocks is a good candidate for an +integration test, and any integration test that ends up requiring a large number of mocks +is a good candidate for a system test. diff --git a/.cursor/rules/persona.mdc b/.cursor/rules/persona.mdc new file mode 100644 index 000000000000..3e4a66ed6455 --- /dev/null +++ b/.cursor/rules/persona.mdc @@ -0,0 +1,16 @@ +--- +description: AI persona and general approach for working on the Apache NiFi codebase +alwaysApply: true +--- + +# AI Persona + +Act as an experienced Java software engineer. When considering how to implement a task, +first consider the big picture of what is being asked. Then determine which classes will +need to be updated. + +Quite often, a single request will require manipulating many different classes. Generally +speaking, it is best to avoid changing established interfaces, especially those in nifi-api. +It is acceptable when necessary, but any change in nifi-api needs to be backward compatible. +For example, you might introduce a new method with a default implementation, or add a new method +and deprecate an old one without removing it. diff --git a/.cursor/rules/testing-standards.mdc b/.cursor/rules/testing-standards.mdc new file mode 100644 index 000000000000..09a543a967de --- /dev/null +++ b/.cursor/rules/testing-standards.mdc @@ -0,0 +1,69 @@ +--- +description: Shared test coding conventions for all NiFi automated tests (framework and extensions) +globs: "**/src/test/**/*.java" +alwaysApply: false +--- + +# Testing Standards + +In addition to the general rules defined in the Code Style rule, follow these rules when +creating or manipulating automated tests. + +## Test Code Conventions + +1. NEVER add comments such as `// Given`, `// When`, `// Then`. These comments are + considered an anti-pattern and should be removed or replaced whenever they are + encountered. Instead, leave them out all together (preferred) or use comments that + clearly articulate what is happening, such as `// Setup`, `// Invoke method`, + `// Assert expected results`. One of the reasons that this is considered an anti-pattern + (in addition to the fact that the given/when/then nomenclature itself provides no + meaning) is that it assumes a very specific pattern in unit tests, that we will create a + bunch of objects, invoke the method we care about, make assertions, and then end. This + often results in many tests that are extremely repetitive. Instead, whenever it makes + sense to do so, create the prerequisite objects, invoke the method we care about with + appropriate arguments, make assertions, and then invoke again with a different set of + arguments, make assertions, etc. There is no need to have many repetitive methods that + each create many repetitive objects. + +2. Unit tests are Java. They are not English. As such, they should be written like Java. + Frameworks such as assertj that strive to make the unit tests look more "English-like" + should be avoided. Use of these frameworks sometimes works well but often quickly + devolves into automated tests that read like neither English nor Java. + +3. Like any other code, unit tests should be created using reusable methods where + appropriate. Do not create 15 methods that are all very similar and repetitive. Instead, + create reusable methods that can be called from each of the methods. + +4. Never use the `assert` keyword. Use JUnit assertions instead. + +5. Never create a test file that is centered around testing a method or capability. Unit + tests must always be named after the class they are testing. It is okay if a given unit + test class is very, very long. + +6. This is a Java project using the Maven structure. Java test files must always fall under + src/test/java of the appropriate sub-module. + +7. Never use pointless assertions such as assertDoesNotThrow. This adds nothing but + complexity. Just call the method, and if it throws an Exception, the test will fail. It + is assumed by default that each line does not throw an Exception. + +## General Testing Philosophy + +- Unit tests should be used to verify any sufficiently complex method in a class. We should + *NOT* have unit tests for trivial methods such as getters and setters, or methods that are + only a few lines long and are not complex. A good general rule of thumb is that if a + person can understand a method and verify that it is correct in a few seconds, then it is + not necessary to have a unit test for that method. + +- Avoid adding multiple tests that are redundant. For example, if you have a method that + takes a single `String` argument it may make sense to test with `null`, a 0-length + string, and a long String. But do not test with 4 different Strings each with a different + value but which test the same lines of code. Avoid over-testing by adding multiple tests + whose differences are conditions that do not truly affect the behavior of the code. + +- Unit tests should always focus on the "contract" of the method. That is, given a certain + input, the test should assert that it receives the correct output. The test should NOT be + focused on the inner details of how the method works. Focusing on the inner details of + how the method works is an anti-pattern because it results in tests that are more brittle, + more difficult to maintain, and tests that fail when the method is improved or refactored + but still adheres to the same contract. diff --git a/.cursorrules b/.cursorrules deleted file mode 100644 index 1990f9de801b..000000000000 --- a/.cursorrules +++ /dev/null @@ -1,74 +0,0 @@ -AI persona ----------- -Act as an experienced Java software engineer. When considering how to implement a task, first consider the big picture of what is being asked. Then determine which classes will need to be udpated. -Quite often, a single request will require manipulating many different classes. Generally speaking, it is best to avoid changing established interfaces, but it is often acceptable because the public API -that NiFi adheres to is not in this module. - - -API ---- -When it is necessary to lookup the API for reference, the API can generally be found at ../nifi-api, where .. is relative to the project's root directory. - -Building --------- -NiFi is a complex Maven codebase. Never build code (testing or otherwise) using javac. Always use `mvn` instead. -Additionally, building a maven module using the also-make flag (`-am`) is often very expensive and slow. -Instead, only build the specific module you are modifying. Assume that the user has already built the entire -codebase and that only the specific module you are modifying needs to be built again. - -Code Style ----------- -NiFi adheres to a few code styles that are not necessarily common. Please ensure that you observe these code styles. -1. Any variable that can be marked `final` must be marked `final`. This includes declarations of Exceptions, method arguments, local variables, member variables, etc. -2. Short-hand is highly discouraged in names of variables, classes, methods, etc., as well as in documentation. -3. Private / helper methods should not be placed before the first public/protected method that calls it. -4. Unless the method is to be heavily reused, avoid creating trivial 1-2 line methods and instead just place the code inline. -5. Code is allowed to be up to 200 characters wide. Avoid breaking lines into many short lines. -6. Avoid creating private methods that are called only once unless they are at least 10 lines long or are complex. -7. It is never acceptable to use star imports. Import each individual class that is to be used. -8. Never use underscores in class names, variables, or filenames. -9. Never use System.out.println but instead use SLF4J Loggers -10. Avoid excessive whitespace in method invocations. For example, instead of writing: -``` -myObject.doSomething( - arg1, - arg2, - arg3, - arg4, - arg5 -); -``` -Write this instead: -``` -myObject.doSomething(arg1, arg2, arg3, arg4, arg5); -``` - -11. When possible, prefer importing a class, rather than using fully qualified classname inline in the code. -12. Avoid statically importing methods unless they are used frequently such as methods in the `Assertions` and `Mockito` classes. -13. Avoid trailing whitespace at the end of lines, especially in blank lines. - -Unit / Automated Testing ------------------------- -In addition to the general rules defined in the Code Style section, follow these rules when creating or manipulating automated tests. -1. NEVER add comments such as `// Given`, `// When`, `// Then`. These comments are considered an anti-pattern and should be removed or replaced whenever they are encountered. - Instead, leave them out all together (preferred) or use comments that clearly articulate what is happening, such as `// Setup`, `// Invoke method`, `// Assert expected results.` - One of the reasons that this is considered an anti-pattern (in addition to the fact that the given/when/then nomeclature itself provides no meaning) is that it assumes a very specific pattern - in unit tests, that we will create a bunch of objects, invoke the method we care about, make assertions, and then end. This often results in many tests that are extremely repetitive. - Instead, whenever it makes sense to do so, create the prerequisitive objects, invoke the method we care about with appropriate arguments, make assertions, and then invoke again with a different - set of arguments, make assertions, etc. There is no need to have many repetitive methods that each create many repetitive objects. - -2. Unit tests are Java. They are not English. As such, they should be written like Java. Frameworks such as assertj that strive to make the unit tests look more "English-like" should be avoided. - Use of these frameworks sometimes works well but often quickly devolves into automated tests that read like neither English nor Java. - -3. Like any other code, unit tests should be created using reusable methods where appropriate. Do not create 15 methods that are all very similar and repetitive. Instead, create reusable methods - that can be called from each of the methods. - -4. Never use the `assert` keyword. Use JUnit assertions instead. - -5. Never create a test file that is centered around testing a method or capability. Unit tests must always be named after the class they are testing. It is okay if a given unit test class is very, very long. - -6. This is a Java project using the Maven structure. Java test files must always fall under src/test/java of the appropriate sub-module. - -7. Never use pointless assertions such as assertDoesNotThrow - this adds nothing but complexity. Just call the method, and if it throws an Exception, the test will fail. I.e., - it is assumed by default that each line does not throw an Exception, so do not use - diff --git a/.gitignore b/.gitignore index ff8a118e9a59..e299e15e43c4 100644 --- a/.gitignore +++ b/.gitignore @@ -21,7 +21,7 @@ nb-configuration.xml .java-version /nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/dist/ __pycache__ -.cursor/ +.cursor/debug.log # Develocity .mvn/.develocity/ diff --git a/nifi-commons/nifi-connector-utils/src/main/java/org/apache/nifi/components/connector/util/VersionedFlowUtils.java b/nifi-commons/nifi-connector-utils/src/main/java/org/apache/nifi/components/connector/util/VersionedFlowUtils.java index d4a06b8658ea..739280d16fa1 100644 --- a/nifi-commons/nifi-connector-utils/src/main/java/org/apache/nifi/components/connector/util/VersionedFlowUtils.java +++ b/nifi-commons/nifi-connector-utils/src/main/java/org/apache/nifi/components/connector/util/VersionedFlowUtils.java @@ -24,6 +24,7 @@ import org.apache.nifi.flow.ComponentType; import org.apache.nifi.flow.ConnectableComponent; import org.apache.nifi.flow.ConnectableComponentType; +import org.apache.nifi.flow.PortType; import org.apache.nifi.flow.Position; import org.apache.nifi.flow.ScheduledState; import org.apache.nifi.flow.VersionedConnection; @@ -109,7 +110,8 @@ public static ConnectableComponent createConnectableComponent(final VersionedPor return component; } - public static void addConnection(final VersionedProcessGroup group, final ConnectableComponent source, final ConnectableComponent destination, final Set relationships) { + public static VersionedConnection addConnection(final VersionedProcessGroup group, final ConnectableComponent source, final ConnectableComponent destination, + final Set relationships) { final VersionedConnection connection = new VersionedConnection(); connection.setSource(source); connection.setDestination(destination); @@ -134,6 +136,7 @@ public static void addConnection(final VersionedProcessGroup group, final Connec final String uuid = generateDeterministicUuid(group, ComponentType.CONNECTION); connection.setIdentifier(uuid); + return connection; } public static List findOutboundConnections(final VersionedProcessGroup group, final VersionedProcessor processor) { @@ -263,6 +266,64 @@ public static VersionedControllerService addControllerService(final VersionedPro return controllerService; } + public static VersionedProcessGroup createProcessGroup(final String identifier, final String name) { + final VersionedProcessGroup group = new VersionedProcessGroup(); + group.setIdentifier(identifier); + group.setName(name); + group.setProcessors(new HashSet<>()); + group.setProcessGroups(new HashSet<>()); + group.setConnections(new HashSet<>()); + group.setControllerServices(new HashSet<>()); + group.setInputPorts(new HashSet<>()); + group.setOutputPorts(new HashSet<>()); + group.setFunnels(new HashSet<>()); + group.setLabels(new HashSet<>()); + group.setComponentType(ComponentType.PROCESS_GROUP); + return group; + } + + public static VersionedPort addInputPort(final VersionedProcessGroup group, final String name, final Position position) { + return addPort(group, name, position, PortType.INPUT_PORT); + } + + public static VersionedPort addOutputPort(final VersionedProcessGroup group, final String name, final Position position) { + return addPort(group, name, position, PortType.OUTPUT_PORT); + } + + private static VersionedPort addPort(final VersionedProcessGroup group, final String name, final Position position, final PortType portType) { + final boolean isInput = portType == PortType.INPUT_PORT; + final ComponentType componentType = isInput ? ComponentType.INPUT_PORT : ComponentType.OUTPUT_PORT; + + final VersionedPort port = new VersionedPort(); + port.setIdentifier(generateDeterministicUuid(group, componentType)); + port.setName(name); + port.setPosition(position); + port.setType(portType); + port.setComponentType(componentType); + port.setScheduledState(ScheduledState.ENABLED); + port.setConcurrentlySchedulableTaskCount(1); + port.setAllowRemoteAccess(false); + port.setGroupIdentifier(group.getIdentifier()); + + if (isInput) { + Set inputPorts = group.getInputPorts(); + if (inputPorts == null) { + inputPorts = new HashSet<>(); + group.setInputPorts(inputPorts); + } + inputPorts.add(port); + } else { + Set outputPorts = group.getOutputPorts(); + if (outputPorts == null) { + outputPorts = new HashSet<>(); + group.setOutputPorts(outputPorts); + } + outputPorts.add(port); + } + + return port; + } + public static Set getReferencedControllerServices(final VersionedProcessGroup group) { final Set referencedServices = new HashSet<>(); collectReferencedControllerServices(group, referencedServices); @@ -307,6 +368,105 @@ private static void collectReferencedControllerServices(final VersionedProcessGr } } + /** + * Returns the set of controller services that are transitively referenced by the given processor. + * This includes any services directly referenced by the processor's properties, as well as any services + * that those services reference, and so on. Only services that are accessible to the processor are considered, + * meaning services in the processor's own group and its ancestor groups. + * + * @param rootGroup the root process group to search for controller services + * @param processor the processor whose referenced services should be found + * @return the set of transitively referenced controller services + */ + public static Set getReferencedControllerServices(final VersionedProcessGroup rootGroup, final VersionedProcessor processor) { + return findTransitivelyReferencedServices(rootGroup, processor.getGroupIdentifier(), processor.getProperties()); + } + + /** + * Returns the set of controller services that are transitively referenced by the given controller service. + * This includes any services directly referenced by the service's properties, as well as any services + * that those services reference, and so on. Only services that are accessible to the given service are considered, + * meaning services in the service's own group and its ancestor groups. + * + * @param rootGroup the root process group to search for controller services + * @param controllerService the controller service whose referenced services should be found + * @return the set of transitively referenced controller services + */ + public static Set getReferencedControllerServices(final VersionedProcessGroup rootGroup, final VersionedControllerService controllerService) { + return findTransitivelyReferencedServices(rootGroup, controllerService.getGroupIdentifier(), controllerService.getProperties()); + } + + private static Set findTransitivelyReferencedServices(final VersionedProcessGroup rootGroup, final String componentGroupId, + final Map properties) { + final Map serviceMap = new HashMap<>(); + collectAccessibleControllerServices(rootGroup, componentGroupId, serviceMap); + + final Set referencedServices = new HashSet<>(); + for (final String propertyValue : properties.values()) { + final VersionedControllerService referencedService = serviceMap.get(propertyValue); + if (referencedService != null) { + referencedServices.add(referencedService); + } + } + + resolveTransitiveServiceReferences(referencedServices, serviceMap); + return referencedServices; + } + + /** + * Collects controller services that are accessible from the given target group. In NiFi, a component can reference + * controller services in its own group or any ancestor group. This method traverses from the root group down to the + * target group, collecting services from each group along the path. + * + * @param group the current group being examined + * @param targetGroupId the identifier of the group whose accessible services should be collected + * @param serviceMap the map to populate with accessible service identifiers and their corresponding services + * @return true if the target group was found at or beneath this group, false otherwise + */ + private static boolean collectAccessibleControllerServices(final VersionedProcessGroup group, final String targetGroupId, + final Map serviceMap) { + final boolean isTarget = group.getIdentifier().equals(targetGroupId); + + boolean foundInChild = false; + if (!isTarget) { + for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { + if (collectAccessibleControllerServices(childGroup, targetGroupId, serviceMap)) { + foundInChild = true; + break; + } + } + } + + if (isTarget || foundInChild) { + for (final VersionedControllerService service : group.getControllerServices()) { + serviceMap.put(service.getIdentifier(), service); + } + return true; + } + + return false; + } + + private static void resolveTransitiveServiceReferences(final Set referencedServices, final Map serviceMap) { + while (true) { + final Set newlyAddedServices = new HashSet<>(); + + for (final VersionedControllerService service : referencedServices) { + for (final String propertyValue : service.getProperties().values()) { + final VersionedControllerService referencedService = serviceMap.get(propertyValue); + if (referencedService != null && !referencedServices.contains(referencedService)) { + newlyAddedServices.add(referencedService); + } + } + } + + referencedServices.addAll(newlyAddedServices); + if (newlyAddedServices.isEmpty()) { + break; + } + } + } + public static void removeControllerServiceReferences(final VersionedProcessGroup processGroup, final String serviceIdentifier) { for (final VersionedProcessor processor : processGroup.getProcessors()) { removeValuesFromMap(processor.getProperties(), serviceIdentifier); @@ -347,6 +507,18 @@ public static void setParameterValue(final VersionedParameterContext parameterCo } } + public static void setParameterValues(final VersionedExternalFlow externalFlow, final Map parameterValues) { + for (final Map.Entry entry : parameterValues.entrySet()) { + setParameterValue(externalFlow, entry.getKey(), entry.getValue()); + } + } + + public static void setParameterValues(final VersionedParameterContext parameterContext, final Map parameterValues) { + for (final Map.Entry entry : parameterValues.entrySet()) { + setParameterValue(parameterContext, entry.getKey(), entry.getValue()); + } + } + public static void removeUnreferencedControllerServices(final VersionedProcessGroup processGroup) { final Set referencedServices = getReferencedControllerServices(processGroup); final Set referencedServiceIds = new HashSet<>(); diff --git a/nifi-commons/nifi-connector-utils/src/test/java/org/apache/nifi/components/connector/util/TestVersionedFlowUtils.java b/nifi-commons/nifi-connector-utils/src/test/java/org/apache/nifi/components/connector/util/TestVersionedFlowUtils.java index f3c9741d8e8a..a7534d9088f5 100644 --- a/nifi-commons/nifi-connector-utils/src/test/java/org/apache/nifi/components/connector/util/TestVersionedFlowUtils.java +++ b/nifi-commons/nifi-connector-utils/src/test/java/org/apache/nifi/components/connector/util/TestVersionedFlowUtils.java @@ -19,7 +19,6 @@ import org.apache.nifi.components.connector.ComponentBundleLookup; import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.ComponentType; import org.apache.nifi.flow.Position; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedProcessGroup; @@ -27,8 +26,8 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import java.util.HashSet; import java.util.Optional; +import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -137,39 +136,150 @@ void testUpdateToLatestBundleReturnsFalseWhenNotUpdated() { assertEquals("2.0.0", processor.getBundle().getVersion()); } - private VersionedProcessGroup createProcessGroup() { - final VersionedProcessGroup group = new VersionedProcessGroup(); - group.setIdentifier("test-group-id"); - group.setName("Test Process Group"); - group.setProcessors(new HashSet<>()); - group.setProcessGroups(new HashSet<>()); - group.setControllerServices(new HashSet<>()); - group.setConnections(new HashSet<>()); - group.setInputPorts(new HashSet<>()); - group.setOutputPorts(new HashSet<>()); - group.setFunnels(new HashSet<>()); - group.setLabels(new HashSet<>()); - group.setComponentType(ComponentType.PROCESS_GROUP); - return group; + } + + @Nested + class GetReferencedControllerServices { + private static final String PROCESSOR_TYPE = "org.apache.nifi.processors.TestProcessor"; + private static final String SERVICE_TYPE = "org.apache.nifi.services.TestService"; + private static final Bundle TEST_BUNDLE = new Bundle("group", "artifact", "1.0.0"); + + @Test + void testProcessorWithNoReferences() { + final VersionedProcessGroup group = createProcessGroup(); + VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Unreferenced Service"); + final VersionedProcessor processor = VersionedFlowUtils.addProcessor(group, PROCESSOR_TYPE, TEST_BUNDLE, "Processor", new Position(0, 0)); + processor.getProperties().put("some-property", "not-a-service-id"); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(group, processor); + + assertTrue(result.isEmpty()); + } + + @Test + void testProcessorWithDirectReferences() { + final VersionedProcessGroup group = createProcessGroup(); + final VersionedControllerService service1 = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service 1"); + final VersionedControllerService service2 = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service 2"); + final VersionedControllerService unreferencedService = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Unreferenced Service"); + + final VersionedProcessor processor = VersionedFlowUtils.addProcessor(group, PROCESSOR_TYPE, TEST_BUNDLE, "Processor", new Position(0, 0)); + processor.getProperties().put("service-prop-1", service1.getIdentifier()); + processor.getProperties().put("service-prop-2", service2.getIdentifier()); + processor.getProperties().put("non-service-prop", "some-value"); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(group, processor); + + assertEquals(2, result.size()); + assertTrue(result.contains(service1)); + assertTrue(result.contains(service2)); + assertFalse(result.contains(unreferencedService)); + } + + @Test + void testProcessorWithTransitiveReferences() { + final VersionedProcessGroup group = createProcessGroup(); + final VersionedControllerService serviceC = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service C"); + final VersionedControllerService serviceB = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service B"); + serviceB.getProperties().put("nested-service", serviceC.getIdentifier()); + final VersionedControllerService serviceA = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service A"); + serviceA.getProperties().put("nested-service", serviceB.getIdentifier()); + + final VersionedProcessor processor = VersionedFlowUtils.addProcessor(group, PROCESSOR_TYPE, TEST_BUNDLE, "Processor", new Position(0, 0)); + processor.getProperties().put("service-prop", serviceA.getIdentifier()); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(group, processor); + + assertEquals(3, result.size()); + assertTrue(result.contains(serviceA)); + assertTrue(result.contains(serviceB)); + assertTrue(result.contains(serviceC)); + } + + @Test + void testProcessorReferencingServiceInAncestorGroup() { + final VersionedProcessGroup rootGroup = createProcessGroup(); + final VersionedControllerService parentService = VersionedFlowUtils.addControllerService(rootGroup, SERVICE_TYPE, TEST_BUNDLE, "Parent Service"); + final VersionedProcessGroup childGroup = createChildProcessGroup(rootGroup, "child-group-id"); + final VersionedProcessor processor = VersionedFlowUtils.addProcessor(childGroup, PROCESSOR_TYPE, TEST_BUNDLE, "Processor", new Position(0, 0)); + processor.getProperties().put("service-prop", parentService.getIdentifier()); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(rootGroup, processor); + + assertEquals(1, result.size()); + assertTrue(result.contains(parentService)); } - private VersionedProcessGroup createChildProcessGroup(final VersionedProcessGroup parent, final String identifier) { - final VersionedProcessGroup childGroup = new VersionedProcessGroup(); - childGroup.setIdentifier(identifier); - childGroup.setName("Child Process Group"); - childGroup.setGroupIdentifier(parent.getIdentifier()); - childGroup.setProcessors(new HashSet<>()); - childGroup.setProcessGroups(new HashSet<>()); - childGroup.setControllerServices(new HashSet<>()); - childGroup.setConnections(new HashSet<>()); - childGroup.setInputPorts(new HashSet<>()); - childGroup.setOutputPorts(new HashSet<>()); - childGroup.setFunnels(new HashSet<>()); - childGroup.setLabels(new HashSet<>()); - childGroup.setComponentType(ComponentType.PROCESS_GROUP); - parent.getProcessGroups().add(childGroup); - return childGroup; + @Test + void testProcessorDoesNotFindServiceInDescendantGroup() { + final VersionedProcessGroup rootGroup = createProcessGroup(); + final VersionedProcessGroup childGroup = createChildProcessGroup(rootGroup, "child-group-id"); + final VersionedControllerService childService = VersionedFlowUtils.addControllerService(childGroup, SERVICE_TYPE, TEST_BUNDLE, "Child Service"); + + final VersionedProcessor processor = VersionedFlowUtils.addProcessor(rootGroup, PROCESSOR_TYPE, TEST_BUNDLE, "Processor", new Position(0, 0)); + processor.getProperties().put("service-prop", childService.getIdentifier()); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(rootGroup, processor); + + assertTrue(result.isEmpty()); + } + + @Test + void testProcessorWithCircularServiceReferences() { + final VersionedProcessGroup group = createProcessGroup(); + final VersionedControllerService serviceA = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service A"); + final VersionedControllerService serviceB = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service B"); + serviceA.getProperties().put("nested-service", serviceB.getIdentifier()); + serviceB.getProperties().put("nested-service", serviceA.getIdentifier()); + + final VersionedProcessor processor = VersionedFlowUtils.addProcessor(group, PROCESSOR_TYPE, TEST_BUNDLE, "Processor", new Position(0, 0)); + processor.getProperties().put("service-prop", serviceA.getIdentifier()); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(group, processor); + + assertEquals(2, result.size()); + assertTrue(result.contains(serviceA)); + assertTrue(result.contains(serviceB)); } + + @Test + void testControllerServiceWithNoReferences() { + final VersionedProcessGroup group = createProcessGroup(); + final VersionedControllerService service = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service"); + service.getProperties().put("some-property", "not-a-service-id"); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(group, service); + + assertTrue(result.isEmpty()); + } + + @Test + void testControllerServiceWithDirectAndTransitiveReferences() { + final VersionedProcessGroup group = createProcessGroup(); + final VersionedControllerService serviceC = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service C"); + final VersionedControllerService serviceB = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service B"); + serviceB.getProperties().put("nested-service", serviceC.getIdentifier()); + final VersionedControllerService serviceA = VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, TEST_BUNDLE, "Service A"); + serviceA.getProperties().put("nested-service", serviceB.getIdentifier()); + + final Set result = VersionedFlowUtils.getReferencedControllerServices(group, serviceA); + + assertEquals(2, result.size()); + assertTrue(result.contains(serviceB)); + assertTrue(result.contains(serviceC)); + assertFalse(result.contains(serviceA)); + } + } + + private static VersionedProcessGroup createProcessGroup() { + return VersionedFlowUtils.createProcessGroup("test-group-id", "Test Process Group"); + } + + private static VersionedProcessGroup createChildProcessGroup(final VersionedProcessGroup parent, final String identifier) { + final VersionedProcessGroup childGroup = VersionedFlowUtils.createProcessGroup(identifier, "Child Process Group"); + childGroup.setGroupIdentifier(parent.getIdentifier()); + parent.getProcessGroups().add(childGroup); + return childGroup; } } diff --git a/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector b/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector index 7f2959091e31..341890a36ffb 100644 --- a/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector +++ b/nifi-connector-mock-bundle/nifi-connector-mock-test-bundle/nifi-connector-mock-test-connectors/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector @@ -14,4 +14,4 @@ # limitations under the License. org.apache.nifi.mock.connectors.GenerateAndLog -org.apache.nifi.mock.connectors.MissingBundleConnector +org.apache.nifi.mock.connectors.MissingBundleConnector \ No newline at end of file diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneConnectionFacade.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneConnectionFacade.java index a1ecb960867b..cc21353731d5 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneConnectionFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneConnectionFacade.java @@ -25,6 +25,7 @@ import org.apache.nifi.flowfile.FlowFile; import java.io.IOException; +import java.util.Objects; import java.util.function.Predicate; public class StandaloneConnectionFacade implements ConnectionFacade { @@ -56,4 +57,26 @@ public void purge() { public DropFlowFileSummary dropFlowFiles(final Predicate predicate) throws IOException { return connection.getFlowFileQueue().dropFlowFiles(predicate); } + + @Override + public String toString() { + return "StandaloneConnectionFacade[id=" + versionedConnection.getIdentifier() + ", name=" + versionedConnection.getName() + "]"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StandaloneConnectionFacade that = (StandaloneConnectionFacade) o; + return Objects.equals(versionedConnection.getIdentifier(), that.versionedConnection.getIdentifier()); + } + + @Override + public int hashCode() { + return Objects.hashCode(versionedConnection.getIdentifier()); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java index ff507e4fab52..808b5a4747dc 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneControllerServiceFacade.java @@ -46,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; public class StandaloneControllerServiceFacade implements ControllerServiceFacade { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() @@ -200,6 +201,30 @@ public T invokeConnectorMethod(final String methodName, final Map serializeArgumentsToJson(final Map arguments) throws InvocationFailedException { final Map jsonArguments = new HashMap<>(); for (final Map.Entry entry : arguments.entrySet()) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneParameterContextFacade.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneParameterContextFacade.java index 00ab88857df0..2599269d3554 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneParameterContextFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneParameterContextFacade.java @@ -205,4 +205,25 @@ public Asset createAsset(final InputStream inputStream) throws IOException { return null; } + @Override + public String toString() { + return "StandaloneParameterContextFacade[id=" + parameterContext.getIdentifier() + ", name=" + parameterContext.getName() + "]"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StandaloneParameterContextFacade that = (StandaloneParameterContextFacade) o; + return Objects.equals(parameterContext.getIdentifier(), that.parameterContext.getIdentifier()); + } + + @Override + public int hashCode() { + return Objects.hashCode(parameterContext.getIdentifier()); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupFacade.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupFacade.java index 3d02ff161b5c..307a786aea86 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupFacade.java @@ -49,6 +49,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Predicate; @@ -381,6 +382,28 @@ public ProcessGroupLifecycle getLifecycle() { return lifecycle; } + @Override + public String toString() { + return "StandaloneProcessGroupFacade[id=" + flowDefinition.getIdentifier() + ", name=" + flowDefinition.getName() + "]"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StandaloneProcessGroupFacade that = (StandaloneProcessGroupFacade) o; + return Objects.equals(flowDefinition.getIdentifier(), that.flowDefinition.getIdentifier()); + } + + @Override + public int hashCode() { + return Objects.hashCode(flowDefinition.getIdentifier()); + } + @Override public DropFlowFileSummary dropFlowFiles(final Predicate predicate) throws IOException { DropFlowFileSummary summary = new DropFlowFileSummary(0, 0); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java index 707aa8b67c9f..3de26ad4431a 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessorFacade.java @@ -48,6 +48,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; public class StandaloneProcessorFacade implements ProcessorFacade { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() @@ -208,6 +209,30 @@ public T invokeConnectorMethod(final String methodName, final Map serializeArgumentsToJson(final Map arguments) throws InvocationFailedException { final Map jsonArguments = new HashMap<>(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingConnectionFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingConnectionFacade.java index a061bd3b8345..ea72b197f6b6 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingConnectionFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingConnectionFacade.java @@ -23,6 +23,7 @@ import org.apache.nifi.flowfile.FlowFile; import java.io.IOException; +import java.util.Objects; import java.util.function.Predicate; /** @@ -62,5 +63,27 @@ public DropFlowFileSummary dropFlowFiles(final Predicate predicate) th authContext.authorizeWrite(); return delegate.dropFlowFiles(predicate); } + + @Override + public String toString() { + return "AuthorizingConnectionFacade[delegate=" + delegate + "]"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AuthorizingConnectionFacade that = (AuthorizingConnectionFacade) o; + return Objects.equals(delegate, that.delegate); + } + + @Override + public int hashCode() { + return Objects.hashCode(delegate); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingControllerServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingControllerServiceFacade.java index 2086c875d495..a85d924079f8 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingControllerServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingControllerServiceFacade.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; +import java.util.Objects; /** * A wrapper around {@link ControllerServiceFacade} that enforces authorization before delegating @@ -95,5 +96,27 @@ public T invokeConnectorMethod(final String methodName, final Map predicate) th authContext.authorizeWrite(); return delegate.dropFlowFiles(predicate); } + + @Override + public String toString() { + return "AuthorizingProcessGroupFacade[delegate=" + delegate + "]"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AuthorizingProcessGroupFacade that = (AuthorizingProcessGroupFacade) o; + return Objects.equals(delegate, that.delegate); + } + + @Override + public int hashCode() { + return Objects.hashCode(delegate); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessorFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessorFacade.java index 22fe5622306b..d392d2c2b9f2 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessorFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessorFacade.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; +import java.util.Objects; /** * A wrapper around {@link ProcessorFacade} that enforces authorization before delegating @@ -95,5 +96,27 @@ public T invokeConnectorMethod(final String methodName, final Mapcom.fasterxml.jackson.core jackson-databind + + org.apache.nifi + nifi-connector-utils + 2.8.0-SNAPSHOT + org.apache.nifi nifi-system-test-extensions-services-api diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/BundleResolutionConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/BundleResolutionConnector.java index e1f6be0cf076..f15d668ef51f 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/BundleResolutionConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/BundleResolutionConnector.java @@ -27,18 +27,15 @@ import org.apache.nifi.components.connector.FlowUpdateException; import org.apache.nifi.components.connector.PropertyType; import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.util.VersionedFlowUtils; import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.ComponentType; import org.apache.nifi.flow.Position; -import org.apache.nifi.flow.ScheduledState; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedParameter; import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -94,13 +91,7 @@ protected void onStepConfigured(final String stepName, final FlowContext working @Override public VersionedExternalFlow getInitialFlow() { - final VersionedProcessGroup group = new VersionedProcessGroup(); - group.setIdentifier(UUID.randomUUID().toString()); - group.setName("Bundle Resolution Flow"); - group.setProcessors(new HashSet<>()); - group.setProcessGroups(new HashSet<>()); - group.setConnections(new HashSet<>()); - group.setControllerServices(new HashSet<>()); + final VersionedProcessGroup group = VersionedFlowUtils.createProcessGroup(UUID.randomUUID().toString(), "Bundle Resolution Flow"); final VersionedParameter compatabilityParam = new VersionedParameter(); compatabilityParam.setName("BUNDLE_COMPATABILITY"); @@ -120,26 +111,17 @@ public VersionedExternalFlow getInitialFlow() { } private VersionedExternalFlow createFlowWithBundleScenarios() { - final VersionedProcessGroup group = new VersionedProcessGroup(); - group.setIdentifier(UUID.randomUUID().toString()); - group.setName("Bundle Resolution Flow"); - group.setProcessors(new HashSet<>()); - group.setProcessGroups(new HashSet<>()); - group.setConnections(new HashSet<>()); - group.setControllerServices(new HashSet<>()); + final VersionedProcessGroup group = VersionedFlowUtils.createProcessGroup(UUID.randomUUID().toString(), "Bundle Resolution Flow"); // Add a processor with an unavailable bundle (fake version) that should be resolved based on BundleCompatability // Uses the system test GenerateFlowFile processor which is available in the system test extensions bundle - final VersionedProcessor testProcessor = createProcessor( - "test-processor", - "GenerateFlowFile for Bundle Resolution Test", - "org.apache.nifi.processors.tests.system.GenerateFlowFile", - "org.apache.nifi", - "nifi-system-test-extensions-nar", - "0.0.0-NONEXISTENT", - new Position(100, 100) - ); - group.getProcessors().add(testProcessor); + final Bundle nonexistentBundle = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "0.0.0-NONEXISTENT"); + + final VersionedProcessor testProcessor = VersionedFlowUtils.addProcessor(group, + "org.apache.nifi.processors.tests.system.GenerateFlowFile", nonexistentBundle, + "GenerateFlowFile for Bundle Resolution Test", new Position(100, 100)); + testProcessor.setSchedulingPeriod("1 sec"); + testProcessor.setAutoTerminatedRelationships(Set.of("success")); final VersionedParameterContext parameterContext = new VersionedParameterContext(); parameterContext.setName("Bundle Resolution Parameter Context"); @@ -151,43 +133,6 @@ private VersionedExternalFlow createFlowWithBundleScenarios() { return flow; } - private VersionedProcessor createProcessor(final String id, final String name, final String type, - final String bundleGroup, final String bundleArtifact, - final String bundleVersion, final Position position) { - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier(id); - processor.setName(name); - processor.setType(type); - processor.setPosition(position); - - final Bundle bundle = new Bundle(); - bundle.setGroup(bundleGroup); - bundle.setArtifact(bundleArtifact); - bundle.setVersion(bundleVersion); - processor.setBundle(bundle); - - processor.setProperties(new HashMap<>()); - processor.setPropertyDescriptors(new HashMap<>()); - processor.setStyle(new HashMap<>()); - processor.setSchedulingPeriod("1 sec"); - processor.setSchedulingStrategy("TIMER_DRIVEN"); - processor.setExecutionNode("ALL"); - processor.setPenaltyDuration("30 sec"); - processor.setYieldDuration("1 sec"); - processor.setBulletinLevel("WARN"); - processor.setRunDurationMillis(0L); - processor.setConcurrentlySchedulableTaskCount(1); - processor.setAutoTerminatedRelationships(Set.of("success")); - processor.setScheduledState(ScheduledState.ENABLED); - processor.setRetryCount(10); - processor.setRetriedRelationships(new HashSet<>()); - processor.setBackoffMechanism("PENALIZE_FLOWFILE"); - processor.setMaxBackoffPeriod("10 mins"); - processor.setComponentType(ComponentType.PROCESSOR); - - return processor; - } - @Override public List verifyConfigurationStep(final String stepName, final Map propertyValueOverrides, diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java index f3fb33d80b46..bb767219154d 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java @@ -28,12 +28,11 @@ import org.apache.nifi.components.connector.StepConfigurationContext; import org.apache.nifi.components.connector.components.FlowContext; import org.apache.nifi.components.connector.components.ProcessorFacade; +import org.apache.nifi.components.connector.util.VersionedFlowUtils; import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.Position; -import org.apache.nifi.flow.ScheduledState; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedProcessGroup; -import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.processor.util.StandardValidators; import java.io.File; @@ -41,7 +40,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Set; /** * A test connector that invokes a ConnectorMethod on a CalculateProcessor using its own POJO types. @@ -163,36 +161,10 @@ public void setResult(final int result) { @Override public VersionedExternalFlow getInitialFlow() { - final VersionedProcessGroup group = new VersionedProcessGroup(); - group.setName("Calculate Flow"); - group.setIdentifier("calculate-flow-id"); - - final Bundle bundle = new Bundle(); - bundle.setGroup("org.apache.nifi"); - bundle.setArtifact("nifi-system-test-extensions-nar"); - bundle.setVersion("2.8.0-SNAPSHOT"); - - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier("calculate-processor-id"); - processor.setName("Calculate Processor"); - processor.setType("org.apache.nifi.processors.tests.system.Calculate"); - processor.setBundle(bundle); - processor.setProperties(Map.of()); - processor.setPropertyDescriptors(Map.of()); - processor.setScheduledState(ScheduledState.ENABLED); - processor.setBulletinLevel("WARN"); - processor.setPosition(new Position(0D, 0D)); - processor.setPenaltyDuration("30 sec"); - processor.setAutoTerminatedRelationships(Set.of()); - processor.setExecutionNode("ALL"); - processor.setGroupIdentifier(group.getIdentifier()); - processor.setConcurrentlySchedulableTaskCount(1); - processor.setRunDurationMillis(0L); - processor.setSchedulingStrategy("TIMER_DRIVEN"); - processor.setYieldDuration("1 sec"); - processor.setSchedulingPeriod("0 sec"); - processor.setStyle(Map.of()); - group.setProcessors(Set.of(processor)); + final Bundle bundle = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "2.8.0-SNAPSHOT"); + final VersionedProcessGroup group = VersionedFlowUtils.createProcessGroup("calculate-flow-id", "Calculate Flow"); + + VersionedFlowUtils.addProcessor(group, "org.apache.nifi.processors.tests.system.Calculate", bundle, "Calculate Processor", new Position(0, 0)); final VersionedExternalFlow flow = new VersionedExternalFlow(); flow.setFlowContents(group); diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java index 40b4b41475e5..81c6af171b13 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java @@ -22,16 +22,12 @@ import org.apache.nifi.components.connector.ConfigurationStep; import org.apache.nifi.components.connector.FlowUpdateException; import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.util.VersionedFlowUtils; import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.ComponentType; -import org.apache.nifi.flow.ConnectableComponent; -import org.apache.nifi.flow.ConnectableComponentType; import org.apache.nifi.flow.ControllerServiceAPI; import org.apache.nifi.flow.ExecutionEngine; -import org.apache.nifi.flow.PortType; import org.apache.nifi.flow.Position; import org.apache.nifi.flow.ScheduledState; -import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedPort; @@ -57,18 +53,7 @@ */ public class ComponentLifecycleConnector extends AbstractConnector { - public static final String ROOT_PROCESSOR_ID = "root-processor-id"; - public static final String CHILD_GROUP_ID = "child-group-id"; - public static final String CHILD_INPUT_PORT_ID = "child-input-port-id"; - public static final String CHILD_OUTPUT_PORT_ID = "child-output-port-id"; - public static final String CHILD_PROCESSOR_ID = "child-processor-id"; - public static final String STATELESS_GROUP_ID = "stateless-group-id"; - public static final String STATELESS_PROCESSOR_ID = "stateless-processor-id"; - public static final String STATELESS_INPUT_PORT_ID = "stateless-input-port-id"; - public static final String ROOT_CONTROLLER_SERVICE_ID = "root-controller-service-id"; - public static final String CHILD_CONTROLLER_SERVICE_ID = "child-controller-service-id"; - - private static final Bundle SYSTEM_TEST_EXTENSIONS_BUNDLE = createBundle(); + private static final Bundle SYSTEM_TEST_EXTENSIONS_BUNDLE = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "2.8.0-SNAPSHOT"); @Override protected void onStepConfigured(final String stepName, final FlowContext workingContext) { @@ -83,272 +68,98 @@ public VersionedExternalFlow getInitialFlow() { } private VersionedProcessGroup createRootGroup() { - final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); - rootGroup.setIdentifier(UUID.randomUUID().toString()); - rootGroup.setName("Component Lifecycle Root"); + final VersionedProcessGroup rootGroup = VersionedFlowUtils.createProcessGroup(UUID.randomUUID().toString(), "Component Lifecycle Root"); rootGroup.setPosition(new Position(0, 0)); - rootGroup.setProcessors(new HashSet<>()); - rootGroup.setProcessGroups(new HashSet<>()); - rootGroup.setConnections(new HashSet<>()); - rootGroup.setInputPorts(new HashSet<>()); - rootGroup.setOutputPorts(new HashSet<>()); - rootGroup.setControllerServices(new HashSet<>()); - rootGroup.setLabels(new HashSet<>()); - rootGroup.setFunnels(new HashSet<>()); rootGroup.setRemoteProcessGroups(new HashSet<>()); rootGroup.setScheduledState(ScheduledState.ENABLED); rootGroup.setExecutionEngine(ExecutionEngine.STANDARD); - rootGroup.setComponentType(ComponentType.PROCESS_GROUP); - // Create root-level Controller Service - final VersionedControllerService rootControllerService = createControllerService(ROOT_CONTROLLER_SERVICE_ID, "Root Count Service", rootGroup.getIdentifier()); - rootGroup.getControllerServices().add(rootControllerService); + final VersionedControllerService rootControllerService = VersionedFlowUtils.addControllerService(rootGroup, + "org.apache.nifi.cs.tests.system.StandardCountService", SYSTEM_TEST_EXTENSIONS_BUNDLE, "Root Count Service"); + rootControllerService.setScheduledState(ScheduledState.ENABLED); + final ControllerServiceAPI rootServiceApi = new ControllerServiceAPI(); + rootServiceApi.setType("org.apache.nifi.cs.tests.system.CountService"); + rootServiceApi.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE); + rootControllerService.setControllerServiceApis(Collections.singletonList(rootServiceApi)); - // Create root-level processor (GenerateFlowFile) - final VersionedProcessor rootProcessor = createProcessor(ROOT_PROCESSOR_ID, "Root GenerateFlowFile", - "org.apache.nifi.processors.tests.system.GenerateFlowFile", new Position(100, 100)); - rootProcessor.setGroupIdentifier(rootGroup.getIdentifier()); + final VersionedProcessor rootProcessor = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.GenerateFlowFile", SYSTEM_TEST_EXTENSIONS_BUNDLE, "Root GenerateFlowFile", new Position(100, 100)); rootProcessor.setSchedulingPeriod("10 sec"); - rootGroup.getProcessors().add(rootProcessor); - // Create root-level processor (TerminateFlowFile) - final VersionedProcessor rootTerminateProcessor = createProcessor("root-terminate-processor-id", "Root TerminateFlowFile", - "org.apache.nifi.processors.tests.system.TerminateFlowFile", new Position(300, 100)); - rootTerminateProcessor.setGroupIdentifier(rootGroup.getIdentifier()); - rootGroup.getProcessors().add(rootTerminateProcessor); + final VersionedProcessor rootTerminateProcessor = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.TerminateFlowFile", SYSTEM_TEST_EXTENSIONS_BUNDLE, "Root TerminateFlowFile", new Position(300, 100)); - // Create child process group with ports and processor final VersionedProcessGroup childGroup = createChildGroup(rootGroup.getIdentifier()); rootGroup.getProcessGroups().add(childGroup); - // Create connection from root processor to child group's input port - final VersionedConnection rootToChildConnection = createConnection( - createConnectableComponent(ROOT_PROCESSOR_ID, "Root GenerateFlowFile", ConnectableComponentType.PROCESSOR, rootGroup.getIdentifier()), - Set.of("success"), - createConnectableComponent(CHILD_INPUT_PORT_ID, "Child Input", ConnectableComponentType.INPUT_PORT, CHILD_GROUP_ID), - rootGroup.getIdentifier() - ); - rootGroup.getConnections().add(rootToChildConnection); + final VersionedPort childInputPort = childGroup.getInputPorts().iterator().next(); + final VersionedPort childOutputPort = childGroup.getOutputPorts().iterator().next(); - // Create connection from child group's output port to root terminate processor - final VersionedConnection childToRootConnection = createConnection( - createConnectableComponent(CHILD_OUTPUT_PORT_ID, "Child Output", ConnectableComponentType.OUTPUT_PORT, CHILD_GROUP_ID), - Set.of(""), - createConnectableComponent("root-terminate-processor-id", "Root TerminateFlowFile", ConnectableComponentType.PROCESSOR, rootGroup.getIdentifier()), - rootGroup.getIdentifier() - ); - rootGroup.getConnections().add(childToRootConnection); + VersionedFlowUtils.addConnection(rootGroup, VersionedFlowUtils.createConnectableComponent(rootProcessor), + VersionedFlowUtils.createConnectableComponent(childInputPort), Set.of("success")); + VersionedFlowUtils.addConnection(rootGroup, VersionedFlowUtils.createConnectableComponent(childOutputPort), + VersionedFlowUtils.createConnectableComponent(rootTerminateProcessor), Set.of("")); return rootGroup; } private VersionedProcessGroup createChildGroup(final String parentGroupId) { - final VersionedProcessGroup childGroup = new VersionedProcessGroup(); - childGroup.setIdentifier(CHILD_GROUP_ID); - childGroup.setName("Child Group"); + final VersionedProcessGroup childGroup = VersionedFlowUtils.createProcessGroup("child-group-id", "Child Group"); childGroup.setPosition(new Position(100, 300)); - childGroup.setProcessors(new HashSet<>()); - childGroup.setProcessGroups(new HashSet<>()); - childGroup.setConnections(new HashSet<>()); - childGroup.setInputPorts(new HashSet<>()); - childGroup.setOutputPorts(new HashSet<>()); - childGroup.setControllerServices(new HashSet<>()); - childGroup.setLabels(new HashSet<>()); - childGroup.setFunnels(new HashSet<>()); childGroup.setRemoteProcessGroups(new HashSet<>()); childGroup.setScheduledState(ScheduledState.ENABLED); childGroup.setExecutionEngine(ExecutionEngine.STANDARD); - childGroup.setComponentType(ComponentType.PROCESS_GROUP); childGroup.setGroupIdentifier(parentGroupId); - // Create Controller Service in child group - final VersionedControllerService childControllerService = createControllerService(CHILD_CONTROLLER_SERVICE_ID, "Child Count Service", CHILD_GROUP_ID); - childGroup.getControllerServices().add(childControllerService); - - // Create input port - final VersionedPort inputPort = createPort(CHILD_INPUT_PORT_ID, "Child Input", true, CHILD_GROUP_ID); - childGroup.getInputPorts().add(inputPort); + final VersionedControllerService childControllerService = VersionedFlowUtils.addControllerService(childGroup, + "org.apache.nifi.cs.tests.system.StandardCountService", SYSTEM_TEST_EXTENSIONS_BUNDLE, "Child Count Service"); + childControllerService.setScheduledState(ScheduledState.ENABLED); + final ControllerServiceAPI childServiceApi = new ControllerServiceAPI(); + childServiceApi.setType("org.apache.nifi.cs.tests.system.CountService"); + childServiceApi.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE); + childControllerService.setControllerServiceApis(Collections.singletonList(childServiceApi)); - // Create output port - final VersionedPort outputPort = createPort(CHILD_OUTPUT_PORT_ID, "Child Output", false, CHILD_GROUP_ID); - childGroup.getOutputPorts().add(outputPort); + final VersionedPort inputPort = VersionedFlowUtils.addInputPort(childGroup, "Child Input", new Position(0, 0)); + final VersionedPort outputPort = VersionedFlowUtils.addOutputPort(childGroup, "Child Output", new Position(200, 0)); - // Create processor in child group - final VersionedProcessor childProcessor = createProcessor(CHILD_PROCESSOR_ID, "Child Terminate", - "org.apache.nifi.processors.tests.system.PassThrough", new Position(100, 100)); - childProcessor.setGroupIdentifier(CHILD_GROUP_ID); - childGroup.getProcessors().add(childProcessor); + final VersionedProcessor childProcessor = VersionedFlowUtils.addProcessor(childGroup, + "org.apache.nifi.processors.tests.system.PassThrough", SYSTEM_TEST_EXTENSIONS_BUNDLE, "Child Terminate", new Position(100, 100)); - // Create stateless group - final VersionedProcessGroup statelessGroup = createStatelessGroup(CHILD_GROUP_ID); + final VersionedProcessGroup statelessGroup = createStatelessGroup(childGroup.getIdentifier()); childGroup.getProcessGroups().add(statelessGroup); - // Connection: input port -> child processor - final VersionedConnection inputToProcessor = createConnection( - createConnectableComponent(CHILD_INPUT_PORT_ID, "Child Input", ConnectableComponentType.INPUT_PORT, CHILD_GROUP_ID), - Set.of(""), - createConnectableComponent(CHILD_PROCESSOR_ID, "Child Terminate", ConnectableComponentType.PROCESSOR, CHILD_GROUP_ID), - CHILD_GROUP_ID - ); - childGroup.getConnections().add(inputToProcessor); - - // Connection: input port -> stateless group - final VersionedConnection inputToStateless = createConnection( - createConnectableComponent(CHILD_INPUT_PORT_ID, "Child Input", ConnectableComponentType.INPUT_PORT, CHILD_GROUP_ID), - Set.of(""), - createConnectableComponent(STATELESS_INPUT_PORT_ID, "Stateless Input", ConnectableComponentType.INPUT_PORT, STATELESS_GROUP_ID), - CHILD_GROUP_ID - ); - childGroup.getConnections().add(inputToStateless); + final VersionedPort statelessInputPort = statelessGroup.getInputPorts().iterator().next(); - // Connection: child processor -> output port - final VersionedConnection processorToOutput = createConnection( - createConnectableComponent(CHILD_PROCESSOR_ID, "Child Terminate", ConnectableComponentType.PROCESSOR, CHILD_GROUP_ID), - Set.of("success"), - createConnectableComponent(CHILD_OUTPUT_PORT_ID, "Child Output", ConnectableComponentType.OUTPUT_PORT, CHILD_GROUP_ID), - CHILD_GROUP_ID - ); - childGroup.getConnections().add(processorToOutput); + VersionedFlowUtils.addConnection(childGroup, VersionedFlowUtils.createConnectableComponent(inputPort), + VersionedFlowUtils.createConnectableComponent(childProcessor), Set.of("")); + VersionedFlowUtils.addConnection(childGroup, VersionedFlowUtils.createConnectableComponent(inputPort), + VersionedFlowUtils.createConnectableComponent(statelessInputPort), Set.of("")); + VersionedFlowUtils.addConnection(childGroup, VersionedFlowUtils.createConnectableComponent(childProcessor), + VersionedFlowUtils.createConnectableComponent(outputPort), Set.of("success")); return childGroup; } private VersionedProcessGroup createStatelessGroup(final String parentGroupId) { - final VersionedProcessGroup statelessGroup = new VersionedProcessGroup(); - statelessGroup.setIdentifier(STATELESS_GROUP_ID); - statelessGroup.setName("Stateless Group"); + final VersionedProcessGroup statelessGroup = VersionedFlowUtils.createProcessGroup("stateless-group-id", "Stateless Group"); statelessGroup.setPosition(new Position(400, 100)); - statelessGroup.setProcessors(new HashSet<>()); - statelessGroup.setProcessGroups(new HashSet<>()); - statelessGroup.setConnections(new HashSet<>()); - statelessGroup.setInputPorts(new HashSet<>()); - statelessGroup.setOutputPorts(new HashSet<>()); - statelessGroup.setControllerServices(new HashSet<>()); - statelessGroup.setLabels(new HashSet<>()); - statelessGroup.setFunnels(new HashSet<>()); statelessGroup.setRemoteProcessGroups(new HashSet<>()); statelessGroup.setScheduledState(ScheduledState.ENABLED); statelessGroup.setExecutionEngine(ExecutionEngine.STATELESS); statelessGroup.setStatelessFlowTimeout("1 min"); - statelessGroup.setComponentType(ComponentType.PROCESS_GROUP); statelessGroup.setGroupIdentifier(parentGroupId); - // Create input port for stateless group - final VersionedPort statelessInput = createPort(STATELESS_INPUT_PORT_ID, "Stateless Input", true, STATELESS_GROUP_ID); - statelessGroup.getInputPorts().add(statelessInput); + final VersionedPort statelessInput = VersionedFlowUtils.addInputPort(statelessGroup, "Stateless Input", new Position(0, 0)); - // Create processor in stateless group - final VersionedProcessor statelessProcessor = createProcessor(STATELESS_PROCESSOR_ID, "Stateless Terminate", - "org.apache.nifi.processors.tests.system.TerminateFlowFile", new Position(100, 100)); - statelessProcessor.setGroupIdentifier(STATELESS_GROUP_ID); - statelessGroup.getProcessors().add(statelessProcessor); + final VersionedProcessor statelessProcessor = VersionedFlowUtils.addProcessor(statelessGroup, + "org.apache.nifi.processors.tests.system.TerminateFlowFile", SYSTEM_TEST_EXTENSIONS_BUNDLE, "Stateless Terminate", new Position(100, 100)); - // Connection: input port -> processor - final VersionedConnection inputToProcessor = createConnection( - createConnectableComponent(STATELESS_INPUT_PORT_ID, "Stateless Input", ConnectableComponentType.INPUT_PORT, STATELESS_GROUP_ID), - Set.of(""), - createConnectableComponent(STATELESS_PROCESSOR_ID, "Stateless Terminate", ConnectableComponentType.PROCESSOR, STATELESS_GROUP_ID), - STATELESS_GROUP_ID - ); - statelessGroup.getConnections().add(inputToProcessor); + VersionedFlowUtils.addConnection(statelessGroup, VersionedFlowUtils.createConnectableComponent(statelessInput), + VersionedFlowUtils.createConnectableComponent(statelessProcessor), Set.of("")); return statelessGroup; } - private VersionedProcessor createProcessor(final String id, final String name, final String type, final Position position) { - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier(id); - processor.setName(name); - processor.setType(type); - processor.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE); - processor.setPosition(position); - processor.setProperties(Map.of()); - processor.setPropertyDescriptors(Map.of()); - processor.setSchedulingPeriod("0 sec"); - processor.setSchedulingStrategy("TIMER_DRIVEN"); - processor.setExecutionNode("ALL"); - processor.setPenaltyDuration("30 sec"); - processor.setYieldDuration("1 sec"); - processor.setBulletinLevel("WARN"); - processor.setRunDurationMillis(0L); - processor.setConcurrentlySchedulableTaskCount(1); - processor.setAutoTerminatedRelationships(Set.of()); - processor.setScheduledState(ScheduledState.ENABLED); - processor.setRetryCount(0); - processor.setRetriedRelationships(Set.of()); - processor.setComponentType(ComponentType.PROCESSOR); - return processor; - } - - private VersionedPort createPort(final String id, final String name, final boolean isInput, final String groupId) { - final VersionedPort port = new VersionedPort(); - port.setIdentifier(id); - port.setName(name); - port.setPosition(new Position(isInput ? 0 : 200, 0)); - port.setType(isInput ? PortType.INPUT_PORT : PortType.OUTPUT_PORT); - port.setComponentType(isInput ? ComponentType.INPUT_PORT : ComponentType.OUTPUT_PORT); - port.setConcurrentlySchedulableTaskCount(1); - port.setScheduledState(ScheduledState.ENABLED); - port.setAllowRemoteAccess(false); - port.setGroupIdentifier(groupId); - return port; - } - - private VersionedControllerService createControllerService(final String id, final String name, final String groupId) { - final VersionedControllerService service = new VersionedControllerService(); - service.setIdentifier(id); - service.setName(name); - service.setType("org.apache.nifi.cs.tests.system.StandardCountService"); - service.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE); - service.setGroupIdentifier(groupId); - service.setProperties(Map.of()); - service.setPropertyDescriptors(Map.of()); - service.setScheduledState(ScheduledState.ENABLED); - service.setBulletinLevel("WARN"); - - final ControllerServiceAPI serviceApi = new ControllerServiceAPI(); - serviceApi.setType("org.apache.nifi.cs.tests.system.CountService"); - serviceApi.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE); - service.setControllerServiceApis(Collections.singletonList(serviceApi)); - - return service; - } - - private VersionedConnection createConnection(final ConnectableComponent source, final Set relationships, - final ConnectableComponent destination, final String groupId) { - final VersionedConnection connection = new VersionedConnection(); - connection.setIdentifier(UUID.randomUUID().toString()); - connection.setName(""); - connection.setSource(source); - connection.setDestination(destination); - connection.setSelectedRelationships(relationships); - connection.setBackPressureObjectThreshold(10000L); - connection.setBackPressureDataSizeThreshold("1 GB"); - connection.setFlowFileExpiration("0 sec"); - connection.setLabelIndex(0); - connection.setzIndex(0L); - connection.setComponentType(ComponentType.CONNECTION); - connection.setGroupIdentifier(groupId); - return connection; - } - - private ConnectableComponent createConnectableComponent(final String id, final String name, final ConnectableComponentType type, final String groupId) { - final ConnectableComponent component = new ConnectableComponent(); - component.setId(id); - component.setName(name); - component.setType(type); - component.setGroupId(groupId); - return component; - } - - private static Bundle createBundle() { - final Bundle bundle = new Bundle(); - bundle.setGroup("org.apache.nifi"); - bundle.setArtifact("nifi-system-test-extensions-nar"); - bundle.setVersion("2.8.0-SNAPSHOT"); - return bundle; - } - @Override public List verifyConfigurationStep(final String stepName, final Map propertyValueOverrides, final FlowContext flowContext) { return List.of(); diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java index 987b7e812db8..1c50941c8371 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java @@ -21,12 +21,10 @@ import org.apache.nifi.components.connector.AbstractConnector; import org.apache.nifi.components.connector.ConfigurationStep; import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.util.VersionedFlowUtils; import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.ConnectableComponent; -import org.apache.nifi.flow.ConnectableComponentType; import org.apache.nifi.flow.Position; import org.apache.nifi.flow.ScheduledState; -import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; @@ -44,47 +42,20 @@ protected void onStepConfigured(final String stepName, final FlowContext working @Override public VersionedExternalFlow getInitialFlow() { - final Bundle bundle = new Bundle(); - bundle.setGroup("org.apache.nifi"); - bundle.setArtifact("nifi-system-test-extensions-nar"); - bundle.setVersion("2.8.0-SNAPSHOT"); + final Bundle bundle = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "2.8.0-SNAPSHOT"); + final VersionedProcessGroup rootGroup = VersionedFlowUtils.createProcessGroup("1234", "Data Queuing Connector"); - final VersionedProcessor generate = createVersionedProcessor("gen-1", "1234", "GenerateFlowFile", - "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, Map.of("File Size", "1 KB"), ScheduledState.ENABLED); + final VersionedProcessor generate = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, "GenerateFlowFile", new Position(0, 0)); + generate.getProperties().put("File Size", "1 KB"); generate.setSchedulingPeriod("100 millis"); - final VersionedProcessor terminate = createVersionedProcessor("term-1", "1234", "TerminateFlowFile", - "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, Collections.emptyMap(), ScheduledState.DISABLED); + final VersionedProcessor terminate = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, "TerminateFlowFile", new Position(0, 0)); + terminate.setScheduledState(ScheduledState.DISABLED); - final ConnectableComponent source = new ConnectableComponent(); - source.setId(generate.getIdentifier()); - source.setType(ConnectableComponentType.PROCESSOR); - source.setGroupId("1234"); - - final ConnectableComponent destination = new ConnectableComponent(); - destination.setId(terminate.getIdentifier()); - destination.setType(ConnectableComponentType.PROCESSOR); - destination.setGroupId("1234"); - - final VersionedConnection connection = new VersionedConnection(); - connection.setIdentifier("generate-to-terminate-1"); - connection.setSource(source); - connection.setDestination(destination); - connection.setGroupIdentifier("1234"); - connection.setSelectedRelationships(Set.of("success")); - connection.setBackPressureDataSizeThreshold("1 GB"); - connection.setBackPressureObjectThreshold(10_000L); - connection.setBends(Collections.emptyList()); - connection.setLabelIndex(1); - connection.setFlowFileExpiration("0 sec"); - connection.setPrioritizers(Collections.emptyList()); - connection.setzIndex(1L); - - final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); - rootGroup.setName("Data Queuing Connector"); - rootGroup.setIdentifier("1234"); - rootGroup.setProcessors(Set.of(generate, terminate)); - rootGroup.setConnections(Set.of(connection)); + VersionedFlowUtils.addConnection(rootGroup, VersionedFlowUtils.createConnectableComponent(generate), + VersionedFlowUtils.createConnectableComponent(terminate), Set.of("success")); final VersionedExternalFlow flow = new VersionedExternalFlow(); flow.setFlowContents(rootGroup); @@ -105,36 +76,4 @@ public List getConfigurationSteps() { @Override public void applyUpdate(final FlowContext workingFlowContext, final FlowContext activeFlowContext) { } - - private VersionedProcessor createVersionedProcessor(final String identifier, final String groupIdentifier, final String name, - final String type, final Bundle bundle, final Map properties, - final ScheduledState scheduledState) { - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier(identifier); - processor.setGroupIdentifier(groupIdentifier); - processor.setName(name); - processor.setType(type); - processor.setBundle(bundle); - processor.setProperties(properties); - processor.setPropertyDescriptors(Collections.emptyMap()); - processor.setScheduledState(scheduledState); - - processor.setBulletinLevel("WARN"); - processor.setSchedulingStrategy("TIMER_DRIVEN"); - processor.setSchedulingPeriod("0 sec"); - processor.setExecutionNode("ALL"); - processor.setConcurrentlySchedulableTaskCount(1); - processor.setPenaltyDuration("30 sec"); - processor.setYieldDuration("1 sec"); - processor.setRunDurationMillis(0L); - processor.setPosition(new Position(0, 0)); - - processor.setAutoTerminatedRelationships(Collections.emptySet()); - processor.setRetryCount(10); - processor.setRetriedRelationships(Collections.emptySet()); - processor.setBackoffMechanism("PENALIZE_FLOWFILE"); - processor.setMaxBackoffPeriod("10 mins"); - - return processor; - } } diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java index 9d8b9a9508ff..8c7f680d86ba 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java @@ -26,19 +26,15 @@ import org.apache.nifi.components.connector.FlowUpdateException; import org.apache.nifi.components.connector.PropertyType; import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.util.VersionedFlowUtils; import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.ConnectableComponent; -import org.apache.nifi.flow.ConnectableComponentType; import org.apache.nifi.flow.Position; -import org.apache.nifi.flow.ScheduledState; -import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.processor.util.StandardValidators; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -53,8 +49,6 @@ */ public class GatedDataQueuingConnector extends AbstractConnector { - private static final String TERMINATE_PROCESSOR_ID = "term-1"; - static final ConnectorPropertyDescriptor GATE_FILE_PATH = new ConnectorPropertyDescriptor.Builder() .name("Gate File Path") .description("The path to the gate file. When this file exists, the TerminateFlowFile processor " + @@ -80,49 +74,19 @@ protected void onStepConfigured(final String stepName, final FlowContext working @Override public VersionedExternalFlow getInitialFlow() { - final Bundle bundle = new Bundle(); - bundle.setGroup("org.apache.nifi"); - bundle.setArtifact("nifi-system-test-extensions-nar"); - bundle.setVersion("2.8.0"); - - final VersionedProcessor generate = createVersionedProcessor("gen-1", "1234", "GenerateFlowFile", - "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, - Map.of("File Size", "1 KB"), ScheduledState.ENABLED); + final Bundle bundle = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "2.8.0"); + final VersionedProcessGroup rootGroup = VersionedFlowUtils.createProcessGroup("1234", "Gated Data Queuing Connector"); + + final VersionedProcessor generate = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, "GenerateFlowFile", new Position(0, 0)); + generate.getProperties().put("File Size", "1 KB"); generate.setSchedulingPeriod("100 millis"); - final VersionedProcessor terminate = createVersionedProcessor(TERMINATE_PROCESSOR_ID, "1234", "TerminateFlowFile", - "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, - Collections.emptyMap(), ScheduledState.ENABLED); - - final ConnectableComponent source = new ConnectableComponent(); - source.setId(generate.getIdentifier()); - source.setType(ConnectableComponentType.PROCESSOR); - source.setGroupId("1234"); - - final ConnectableComponent destination = new ConnectableComponent(); - destination.setId(terminate.getIdentifier()); - destination.setType(ConnectableComponentType.PROCESSOR); - destination.setGroupId("1234"); - - final VersionedConnection connection = new VersionedConnection(); - connection.setIdentifier("generate-to-terminate-1"); - connection.setSource(source); - connection.setDestination(destination); - connection.setGroupIdentifier("1234"); - connection.setSelectedRelationships(Set.of("success")); - connection.setBackPressureDataSizeThreshold("1 GB"); - connection.setBackPressureObjectThreshold(10_000L); - connection.setBends(Collections.emptyList()); - connection.setLabelIndex(1); - connection.setFlowFileExpiration("0 sec"); - connection.setPrioritizers(Collections.emptyList()); - connection.setzIndex(1L); - - final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); - rootGroup.setName("Gated Data Queuing Connector"); - rootGroup.setIdentifier("1234"); - rootGroup.setProcessors(Set.of(generate, terminate)); - rootGroup.setConnections(Set.of(connection)); + final VersionedProcessor terminate = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, "TerminateFlowFile", new Position(0, 0)); + + VersionedFlowUtils.addConnection(rootGroup, VersionedFlowUtils.createConnectableComponent(generate), + VersionedFlowUtils.createConnectableComponent(terminate), Set.of("success")); final VersionedExternalFlow flow = new VersionedExternalFlow(); flow.setFlowContents(rootGroup); @@ -150,46 +114,9 @@ public void applyUpdate(final FlowContext workingFlowContext, final FlowContext final VersionedExternalFlow flow = getInitialFlow(); final VersionedProcessGroup rootGroup = flow.getFlowContents(); - for (final VersionedProcessor processor : rootGroup.getProcessors()) { - if (TERMINATE_PROCESSOR_ID.equals(processor.getIdentifier())) { - final Map properties = new HashMap<>(processor.getProperties()); - properties.put("Gate File", gateFilePath); - processor.setProperties(properties); - } - } + VersionedFlowUtils.findProcessor(rootGroup, p -> p.getType().endsWith("TerminateFlowFile")) + .ifPresent(processor -> processor.getProperties().put("Gate File", gateFilePath)); getInitializationContext().updateFlow(activeFlowContext, flow, BundleCompatibility.RESOLVE_BUNDLE); } - - private VersionedProcessor createVersionedProcessor(final String identifier, final String groupIdentifier, final String name, - final String type, final Bundle bundle, final Map properties, - final ScheduledState scheduledState) { - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier(identifier); - processor.setGroupIdentifier(groupIdentifier); - processor.setName(name); - processor.setType(type); - processor.setBundle(bundle); - processor.setProperties(properties); - processor.setPropertyDescriptors(Collections.emptyMap()); - processor.setScheduledState(scheduledState); - - processor.setBulletinLevel("WARN"); - processor.setSchedulingStrategy("TIMER_DRIVEN"); - processor.setSchedulingPeriod("0 sec"); - processor.setExecutionNode("ALL"); - processor.setConcurrentlySchedulableTaskCount(1); - processor.setPenaltyDuration("30 sec"); - processor.setYieldDuration("1 sec"); - processor.setRunDurationMillis(0L); - processor.setPosition(new Position(0, 0)); - - processor.setAutoTerminatedRelationships(Collections.emptySet()); - processor.setRetryCount(10); - processor.setRetriedRelationships(Collections.emptySet()); - processor.setBackoffMechanism("PENALIZE_FLOWFILE"); - processor.setMaxBackoffPeriod("10 mins"); - - return processor; - } } diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ParameterContextConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ParameterContextConnector.java index 8ba055dda93b..9d160a11f1c2 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ParameterContextConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ParameterContextConnector.java @@ -26,13 +26,9 @@ import org.apache.nifi.components.connector.FlowUpdateException; import org.apache.nifi.components.connector.PropertyType; import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.util.VersionedFlowUtils; import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.ConnectableComponent; -import org.apache.nifi.flow.ConnectableComponentType; -import org.apache.nifi.flow.PortType; import org.apache.nifi.flow.Position; -import org.apache.nifi.flow.ScheduledState; -import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedParameter; import org.apache.nifi.flow.VersionedParameterContext; @@ -43,7 +39,6 @@ import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -75,15 +70,8 @@ public class ParameterContextConnector extends AbstractConnector { private static final String CONFIGURATION_STEP_NAME = "Parameter Context Configuration"; private static final String ROOT_GROUP_ID = "root-group"; - private static final String GENERATE_PROCESSOR_ID = "generate-flowfile"; private static final String GROUP_A_ID = "process-group-a"; private static final String GROUP_B_ID = "process-group-b"; - private static final String INPUT_PORT_A_ID = "input-port-a"; - private static final String INPUT_PORT_B_ID = "input-port-b"; - private static final String UPDATE_CONTENT_ID = "update-content"; - private static final String REPLACE_WITH_FILE_ID = "replace-with-file"; - private static final String WRITE_SENSITIVE_ID = "write-sensitive"; - private static final String WRITE_ASSET_ID = "write-asset"; private static final String PARENT_CONTEXT_NAME = "Parent Parameter Context"; private static final String CHILD_CONTEXT_A_NAME = "Child Context A"; @@ -147,15 +135,7 @@ public VersionedExternalFlow getInitialFlow() { } private VersionedExternalFlow createEmptyFlow() { - final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); - rootGroup.setIdentifier(ROOT_GROUP_ID); - rootGroup.setName("Parameter Context Test Flow"); - rootGroup.setProcessors(new HashSet<>()); - rootGroup.setProcessGroups(new HashSet<>()); - rootGroup.setConnections(new HashSet<>()); - rootGroup.setInputPorts(new HashSet<>()); - rootGroup.setOutputPorts(new HashSet<>()); - rootGroup.setControllerServices(new HashSet<>()); + final VersionedProcessGroup rootGroup = VersionedFlowUtils.createProcessGroup(ROOT_GROUP_ID, "Parameter Context Test Flow"); final VersionedExternalFlow flow = new VersionedExternalFlow(); flow.setFlowContents(rootGroup); @@ -190,46 +170,30 @@ public void applyUpdate(final FlowContext workingContext, final FlowContext acti private VersionedExternalFlow createFlow(final String sensitiveValue, final String assetFilePath, final String sensitiveOutputFile, final String assetOutputFile) { - final Bundle bundle = createBundle(); - - // Create Parameter Contexts with inheritance + final Bundle bundle = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "2.8.0-SNAPSHOT"); final Map parameterContexts = createParameterContexts(sensitiveValue, assetFilePath); - // Create root group - final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); - rootGroup.setIdentifier(ROOT_GROUP_ID); - rootGroup.setName("Parameter Context Test Flow"); + final VersionedProcessGroup rootGroup = VersionedFlowUtils.createProcessGroup(ROOT_GROUP_ID, "Parameter Context Test Flow"); rootGroup.setParameterContextName(PARENT_CONTEXT_NAME); - // Create GenerateFlowFile at root level - final VersionedProcessor generateProcessor = createProcessor(GENERATE_PROCESSOR_ID, ROOT_GROUP_ID, "GenerateFlowFile", - "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, - Map.of("Max FlowFiles", "1", "File Size", "0 B"), ScheduledState.ENABLED); + final VersionedProcessor generateProcessor = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, "GenerateFlowFile", new Position(0, 0)); + generateProcessor.getProperties().putAll(Map.of("Max FlowFiles", "1", "File Size", "0 B")); generateProcessor.setSchedulingPeriod("60 sec"); - // Create Process Group A (sensitive value path) final VersionedProcessGroup groupA = createProcessGroupA(bundle, sensitiveOutputFile); + rootGroup.getProcessGroups().add(groupA); - // Create Process Group B (asset path) final VersionedProcessGroup groupB = createProcessGroupB(bundle, assetOutputFile); + rootGroup.getProcessGroups().add(groupB); - // Create connections from GenerateFlowFile to both process group input ports - final VersionedConnection connectionToA = createConnection("conn-to-group-a", ROOT_GROUP_ID, - GENERATE_PROCESSOR_ID, ConnectableComponentType.PROCESSOR, - INPUT_PORT_A_ID, ConnectableComponentType.INPUT_PORT, GROUP_A_ID, - Set.of("success")); - - final VersionedConnection connectionToB = createConnection("conn-to-group-b", ROOT_GROUP_ID, - GENERATE_PROCESSOR_ID, ConnectableComponentType.PROCESSOR, - INPUT_PORT_B_ID, ConnectableComponentType.INPUT_PORT, GROUP_B_ID, - Set.of("success")); + final VersionedPort inputPortA = groupA.getInputPorts().iterator().next(); + final VersionedPort inputPortB = groupB.getInputPorts().iterator().next(); - rootGroup.setProcessors(Set.of(generateProcessor)); - rootGroup.setProcessGroups(Set.of(groupA, groupB)); - rootGroup.setConnections(Set.of(connectionToA, connectionToB)); - rootGroup.setInputPorts(new HashSet<>()); - rootGroup.setOutputPorts(new HashSet<>()); - rootGroup.setControllerServices(new HashSet<>()); + VersionedFlowUtils.addConnection(rootGroup, VersionedFlowUtils.createConnectableComponent(generateProcessor), + VersionedFlowUtils.createConnectableComponent(inputPortA), Set.of("success")); + VersionedFlowUtils.addConnection(rootGroup, VersionedFlowUtils.createConnectableComponent(generateProcessor), + VersionedFlowUtils.createConnectableComponent(inputPortB), Set.of("success")); final VersionedExternalFlow flow = new VersionedExternalFlow(); flow.setFlowContents(rootGroup); @@ -238,7 +202,6 @@ private VersionedExternalFlow createFlow(final String sensitiveValue, final Stri } private Map createParameterContexts(final String sensitiveValue, final String assetFilePath) { - // Child Context A - sensitive parameter final VersionedParameter sensitiveParam = new VersionedParameter(); sensitiveParam.setName(SENSITIVE_PARAM_NAME); sensitiveParam.setSensitive(true); @@ -250,7 +213,6 @@ private Map createParameterContexts(final Str childContextA.setName(CHILD_CONTEXT_A_NAME); childContextA.setParameters(Set.of(sensitiveParam)); - // Child Context B - asset parameter final VersionedParameter assetParam = new VersionedParameter(); assetParam.setName(ASSET_PARAM_NAME); assetParam.setSensitive(false); @@ -262,7 +224,6 @@ private Map createParameterContexts(final Str childContextB.setName(CHILD_CONTEXT_B_NAME); childContextB.setParameters(Set.of(assetParam)); - // Parent Context - inherits from both child contexts final VersionedParameterContext parentContext = new VersionedParameterContext(); parentContext.setName(PARENT_CONTEXT_NAME); parentContext.setParameters(Set.of()); @@ -276,172 +237,50 @@ private Map createParameterContexts(final Str } private VersionedProcessGroup createProcessGroupA(final Bundle bundle, final String outputFile) { - final VersionedProcessGroup groupA = new VersionedProcessGroup(); - groupA.setIdentifier(GROUP_A_ID); + final VersionedProcessGroup groupA = VersionedFlowUtils.createProcessGroup(GROUP_A_ID, "Process Group A - Sensitive Value"); groupA.setGroupIdentifier(ROOT_GROUP_ID); - groupA.setName("Process Group A - Sensitive Value"); groupA.setParameterContextName(PARENT_CONTEXT_NAME); - // Input Port - final VersionedPort inputPortA = createInputPort(INPUT_PORT_A_ID, GROUP_A_ID, "Input Port A"); + final VersionedPort inputPortA = VersionedFlowUtils.addInputPort(groupA, "Input Port A", new Position(0, 0)); - // UpdateContent processor using sensitive parameter - final VersionedProcessor updateContent = createProcessor(UPDATE_CONTENT_ID, GROUP_A_ID, "UpdateContent", - "org.apache.nifi.processors.tests.system.UpdateContent", bundle, - Map.of("Sensitive Content", "#{" + SENSITIVE_PARAM_NAME + "}", "Update Strategy", "Replace"), - ScheduledState.ENABLED); + final VersionedProcessor updateContent = VersionedFlowUtils.addProcessor(groupA, + "org.apache.nifi.processors.tests.system.UpdateContent", bundle, "UpdateContent", new Position(0, 0)); + updateContent.getProperties().putAll(Map.of("Sensitive Content", "#{" + SENSITIVE_PARAM_NAME + "}", "Update Strategy", "Replace")); - // WriteToFile processor - final VersionedProcessor writeToFile = createProcessor(WRITE_SENSITIVE_ID, GROUP_A_ID, "WriteToFile", - "org.apache.nifi.processors.tests.system.WriteToFile", bundle, - Map.of("Filename", outputFile), ScheduledState.ENABLED); + final VersionedProcessor writeToFile = VersionedFlowUtils.addProcessor(groupA, + "org.apache.nifi.processors.tests.system.WriteToFile", bundle, "WriteToFile", new Position(0, 0)); + writeToFile.getProperties().put("Filename", outputFile); writeToFile.setAutoTerminatedRelationships(Set.of("success", "failure")); - // Connections within Group A - final VersionedConnection inputToUpdate = createConnection("input-to-update", GROUP_A_ID, - INPUT_PORT_A_ID, ConnectableComponentType.INPUT_PORT, - UPDATE_CONTENT_ID, ConnectableComponentType.PROCESSOR, null, - Set.of()); - - final VersionedConnection updateToWrite = createConnection("update-to-write", GROUP_A_ID, - UPDATE_CONTENT_ID, ConnectableComponentType.PROCESSOR, - WRITE_SENSITIVE_ID, ConnectableComponentType.PROCESSOR, null, - Set.of("success")); - - groupA.setInputPorts(Set.of(inputPortA)); - groupA.setOutputPorts(new HashSet<>()); - groupA.setProcessors(Set.of(updateContent, writeToFile)); - groupA.setConnections(Set.of(inputToUpdate, updateToWrite)); - groupA.setProcessGroups(new HashSet<>()); - groupA.setControllerServices(new HashSet<>()); + VersionedFlowUtils.addConnection(groupA, VersionedFlowUtils.createConnectableComponent(inputPortA), + VersionedFlowUtils.createConnectableComponent(updateContent), Set.of()); + VersionedFlowUtils.addConnection(groupA, VersionedFlowUtils.createConnectableComponent(updateContent), + VersionedFlowUtils.createConnectableComponent(writeToFile), Set.of("success")); return groupA; } private VersionedProcessGroup createProcessGroupB(final Bundle bundle, final String outputFile) { - final VersionedProcessGroup groupB = new VersionedProcessGroup(); - groupB.setIdentifier(GROUP_B_ID); + final VersionedProcessGroup groupB = VersionedFlowUtils.createProcessGroup(GROUP_B_ID, "Process Group B - Asset Value"); groupB.setGroupIdentifier(ROOT_GROUP_ID); - groupB.setName("Process Group B - Asset Value"); groupB.setParameterContextName(PARENT_CONTEXT_NAME); - // Input Port - final VersionedPort inputPortB = createInputPort(INPUT_PORT_B_ID, GROUP_B_ID, "Input Port B"); + final VersionedPort inputPortB = VersionedFlowUtils.addInputPort(groupB, "Input Port B", new Position(0, 0)); - // ReplaceWithFile processor using asset parameter - final VersionedProcessor replaceWithFile = createProcessor(REPLACE_WITH_FILE_ID, GROUP_B_ID, "ReplaceWithFile", - "org.apache.nifi.processors.tests.system.ReplaceWithFile", bundle, - Map.of("Filename", "#{" + ASSET_PARAM_NAME + "}"), ScheduledState.ENABLED); + final VersionedProcessor replaceWithFile = VersionedFlowUtils.addProcessor(groupB, + "org.apache.nifi.processors.tests.system.ReplaceWithFile", bundle, "ReplaceWithFile", new Position(0, 0)); + replaceWithFile.getProperties().put("Filename", "#{" + ASSET_PARAM_NAME + "}"); - // WriteToFile processor - final VersionedProcessor writeToFile = createProcessor(WRITE_ASSET_ID, GROUP_B_ID, "WriteToFile", - "org.apache.nifi.processors.tests.system.WriteToFile", bundle, - Map.of("Filename", outputFile), ScheduledState.ENABLED); + final VersionedProcessor writeToFile = VersionedFlowUtils.addProcessor(groupB, + "org.apache.nifi.processors.tests.system.WriteToFile", bundle, "WriteToFile", new Position(0, 0)); + writeToFile.getProperties().put("Filename", outputFile); writeToFile.setAutoTerminatedRelationships(Set.of("success", "failure")); - // Connections within Group B - final VersionedConnection inputToReplace = createConnection("input-to-replace", GROUP_B_ID, - INPUT_PORT_B_ID, ConnectableComponentType.INPUT_PORT, - REPLACE_WITH_FILE_ID, ConnectableComponentType.PROCESSOR, null, - Set.of()); - - final VersionedConnection replaceToWrite = createConnection("replace-to-write", GROUP_B_ID, - REPLACE_WITH_FILE_ID, ConnectableComponentType.PROCESSOR, - WRITE_ASSET_ID, ConnectableComponentType.PROCESSOR, null, - Set.of("success")); - - groupB.setInputPorts(Set.of(inputPortB)); - groupB.setOutputPorts(new HashSet<>()); - groupB.setProcessors(Set.of(replaceWithFile, writeToFile)); - groupB.setConnections(Set.of(inputToReplace, replaceToWrite)); - groupB.setProcessGroups(new HashSet<>()); - groupB.setControllerServices(new HashSet<>()); + VersionedFlowUtils.addConnection(groupB, VersionedFlowUtils.createConnectableComponent(inputPortB), + VersionedFlowUtils.createConnectableComponent(replaceWithFile), Set.of()); + VersionedFlowUtils.addConnection(groupB, VersionedFlowUtils.createConnectableComponent(replaceWithFile), + VersionedFlowUtils.createConnectableComponent(writeToFile), Set.of("success")); return groupB; } - - private Bundle createBundle() { - final Bundle bundle = new Bundle(); - bundle.setGroup("org.apache.nifi"); - bundle.setArtifact("nifi-system-test-extensions-nar"); - bundle.setVersion("2.8.0-SNAPSHOT"); - return bundle; - } - - private VersionedPort createInputPort(final String identifier, final String groupIdentifier, final String name) { - final VersionedPort port = new VersionedPort(); - port.setIdentifier(identifier); - port.setGroupIdentifier(groupIdentifier); - port.setName(name); - port.setType(PortType.INPUT_PORT); - port.setScheduledState(ScheduledState.ENABLED); - port.setConcurrentlySchedulableTaskCount(1); - port.setPosition(new Position(0, 0)); - port.setAllowRemoteAccess(false); - return port; - } - - private VersionedProcessor createProcessor(final String identifier, final String groupIdentifier, final String name, - final String type, final Bundle bundle, final Map properties, - final ScheduledState scheduledState) { - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier(identifier); - processor.setGroupIdentifier(groupIdentifier); - processor.setName(name); - processor.setType(type); - processor.setBundle(bundle); - processor.setProperties(properties); - processor.setPropertyDescriptors(Collections.emptyMap()); - processor.setScheduledState(scheduledState); - - processor.setBulletinLevel("WARN"); - processor.setSchedulingStrategy("TIMER_DRIVEN"); - processor.setSchedulingPeriod("0 sec"); - processor.setExecutionNode("ALL"); - processor.setConcurrentlySchedulableTaskCount(1); - processor.setPenaltyDuration("30 sec"); - processor.setYieldDuration("1 sec"); - processor.setRunDurationMillis(0L); - processor.setPosition(new Position(0, 0)); - - processor.setAutoTerminatedRelationships(Collections.emptySet()); - processor.setRetryCount(10); - processor.setRetriedRelationships(Collections.emptySet()); - processor.setBackoffMechanism("PENALIZE_FLOWFILE"); - processor.setMaxBackoffPeriod("10 mins"); - - return processor; - } - - private VersionedConnection createConnection(final String identifier, final String groupIdentifier, - final String sourceId, final ConnectableComponentType sourceType, - final String destinationId, final ConnectableComponentType destinationType, - final String destinationGroupId, - final Set selectedRelationships) { - final ConnectableComponent source = new ConnectableComponent(); - source.setId(sourceId); - source.setType(sourceType); - source.setGroupId(groupIdentifier); - - final ConnectableComponent destination = new ConnectableComponent(); - destination.setId(destinationId); - destination.setType(destinationType); - destination.setGroupId(destinationGroupId != null ? destinationGroupId : groupIdentifier); - - final VersionedConnection connection = new VersionedConnection(); - connection.setIdentifier(identifier); - connection.setGroupIdentifier(groupIdentifier); - connection.setSource(source); - connection.setDestination(destination); - connection.setSelectedRelationships(selectedRelationships); - connection.setBackPressureDataSizeThreshold("1 GB"); - connection.setBackPressureObjectThreshold(10_000L); - connection.setBends(Collections.emptyList()); - connection.setLabelIndex(1); - connection.setFlowFileExpiration("0 sec"); - connection.setPrioritizers(Collections.emptyList()); - connection.setzIndex(0L); - - return connection; - } } diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/SelectiveDropConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/SelectiveDropConnector.java index 06ccdd561c92..b1b852b18afa 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/SelectiveDropConnector.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/SelectiveDropConnector.java @@ -25,9 +25,8 @@ import org.apache.nifi.components.connector.components.ConnectionFacade; import org.apache.nifi.components.connector.components.FlowContext; import org.apache.nifi.components.connector.components.ProcessGroupFacade; +import org.apache.nifi.components.connector.util.VersionedFlowUtils; import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.ConnectableComponent; -import org.apache.nifi.flow.ConnectableComponentType; import org.apache.nifi.flow.Position; import org.apache.nifi.flow.ScheduledState; import org.apache.nifi.flow.VersionedConnection; @@ -50,62 +49,33 @@ */ public class SelectiveDropConnector extends AbstractConnector { - private static final String CONNECTION_ID = "generate-to-terminate-connection"; - @Override protected void onStepConfigured(final String stepName, final FlowContext workingContext) { } @Override public VersionedExternalFlow getInitialFlow() { - final Bundle bundle = new Bundle(); - bundle.setGroup("org.apache.nifi"); - bundle.setArtifact("nifi-system-test-extensions-nar"); - bundle.setVersion("2.7.0-SNAPSHOT"); + final Bundle bundle = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "2.7.0-SNAPSHOT"); + final VersionedProcessGroup rootGroup = VersionedFlowUtils.createProcessGroup("1234", "Selective Drop Connector"); - // GenerateFlowFile processor configured to generate 1-byte FlowFiles with flowFileIndex attribute - final Map generateProperties = Map.of( + final VersionedProcessor generate = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, "GenerateFlowFile", new Position(0, 0)); + generate.getProperties().putAll(Map.of( "File Size", "1 B", "Batch Size", "20000", "Max FlowFiles", "20000", "flowFileIndex", "${nextInt()}" - ); - final VersionedProcessor generate = createVersionedProcessor("gen-1", "1234", "GenerateFlowFile", - "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, generateProperties, ScheduledState.ENABLED); + )); generate.setSchedulingPeriod("10 sec"); - final VersionedProcessor terminate = createVersionedProcessor("term-1", "1234", "TerminateFlowFile", - "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, Collections.emptyMap(), ScheduledState.DISABLED); - - final ConnectableComponent source = new ConnectableComponent(); - source.setId(generate.getIdentifier()); - source.setType(ConnectableComponentType.PROCESSOR); - source.setGroupId("1234"); - - final ConnectableComponent destination = new ConnectableComponent(); - destination.setId(terminate.getIdentifier()); - destination.setType(ConnectableComponentType.PROCESSOR); - destination.setGroupId("1234"); - - final VersionedConnection connection = new VersionedConnection(); - connection.setIdentifier(CONNECTION_ID); - connection.setSource(source); - connection.setDestination(destination); - connection.setGroupIdentifier("1234"); - connection.setSelectedRelationships(Set.of("success")); + final VersionedProcessor terminate = VersionedFlowUtils.addProcessor(rootGroup, + "org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, "TerminateFlowFile", new Position(0, 0)); + terminate.setScheduledState(ScheduledState.DISABLED); + + final VersionedConnection connection = VersionedFlowUtils.addConnection(rootGroup, + VersionedFlowUtils.createConnectableComponent(generate), VersionedFlowUtils.createConnectableComponent(terminate), Set.of("success")); connection.setBackPressureDataSizeThreshold("100 GB"); connection.setBackPressureObjectThreshold(100_000L); - connection.setBends(Collections.emptyList()); - connection.setLabelIndex(1); - connection.setFlowFileExpiration("0 sec"); - connection.setPrioritizers(Collections.emptyList()); - connection.setzIndex(1L); - - final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); - rootGroup.setName("Selective Drop Connector"); - rootGroup.setIdentifier("1234"); - rootGroup.setProcessors(Set.of(generate, terminate)); - rootGroup.setConnections(Set.of(connection)); final VersionedExternalFlow flow = new VersionedExternalFlow(); flow.setFlowContents(rootGroup); @@ -131,15 +101,13 @@ public void applyUpdate(final FlowContext workingFlowContext, final FlowContext @Override public void stop(final FlowContext context) throws FlowUpdateException { - // First, stop the processors via the parent class super.stop(context); - // Then, drop all FlowFiles where flowFileIndex has an even value final ProcessGroupFacade rootGroup = context.getRootGroup(); - final ConnectionFacade connection = findConnectionById(rootGroup, CONNECTION_ID); + final ConnectionFacade connection = findFirstConnection(rootGroup); if (connection == null) { - getLogger().warn("Could not find connection with ID {} to perform selective drop", CONNECTION_ID); + getLogger().warn("Could not find connection to perform selective drop"); return; } @@ -165,15 +133,13 @@ private boolean hasEvenFlowFileIndex(final FlowFile flowFile) { return Integer.parseInt(flowFile.getAttribute("flowFileIndex")) % 2 == 0; } - private ConnectionFacade findConnectionById(final ProcessGroupFacade group, final String connectionId) { + private ConnectionFacade findFirstConnection(final ProcessGroupFacade group) { for (final ConnectionFacade connection : group.getConnections()) { - if (connectionId.equals(connection.getDefinition().getIdentifier())) { - return connection; - } + return connection; } for (final ProcessGroupFacade childGroup : group.getProcessGroups()) { - final ConnectionFacade found = findConnectionById(childGroup, connectionId); + final ConnectionFacade found = findFirstConnection(childGroup); if (found != null) { return found; } @@ -181,36 +147,4 @@ private ConnectionFacade findConnectionById(final ProcessGroupFacade group, fina return null; } - - private VersionedProcessor createVersionedProcessor(final String identifier, final String groupIdentifier, final String name, - final String type, final Bundle bundle, final Map properties, - final ScheduledState scheduledState) { - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier(identifier); - processor.setGroupIdentifier(groupIdentifier); - processor.setName(name); - processor.setType(type); - processor.setBundle(bundle); - processor.setProperties(properties); - processor.setPropertyDescriptors(Collections.emptyMap()); - processor.setScheduledState(scheduledState); - - processor.setBulletinLevel("WARN"); - processor.setSchedulingStrategy("TIMER_DRIVEN"); - processor.setSchedulingPeriod("0 sec"); - processor.setExecutionNode("ALL"); - processor.setConcurrentlySchedulableTaskCount(1); - processor.setPenaltyDuration("30 sec"); - processor.setYieldDuration("1 sec"); - processor.setRunDurationMillis(0L); - processor.setPosition(new Position(0, 0)); - - processor.setAutoTerminatedRelationships(Collections.emptySet()); - processor.setRetryCount(10); - processor.setRetriedRelationships(Collections.emptySet()); - processor.setBackoffMechanism("PENALIZE_FLOWFILE"); - processor.setMaxBackoffPeriod("10 mins"); - - return processor; - } }