Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions nifi-commons/nifi-connector-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-api</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ public class NiFiProperties extends ApplicationProperties {
public static final String STATUS_REPOSITORY_QUESTDB_PERSIST_BATCH_SIZE = "nifi.status.repository.questdb.persist.batchsize";
public static final String STATUS_REPOSITORY_QUESTDB_PERSIST_FREQUENCY = "nifi.status.repository.questdb.persist.frequency";

// Connector Repository properties
// Connector Manager properties
public static final String CONNECTOR_MANAGER_IMPLEMENTATION = "nifi.components.connectors.manager.implementation";
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Remove CONNECTOR_MANAGER_IMPLEMENTATION as that is no longer intended to be configurable via properties

public static final String CONNECTOR_REPOSITORY_IMPLEMENTATION = "nifi.components.connectors.repository.implementation";
public static final String CONNECTOR_LIFECYCLE_MANAGER_IMPLEMENTATION = "nifi.components.connectors.lifecycle.manager.implementation";

// Secrets Manager properties
public static final String SECRETS_MANAGER_IMPLEMENTATION = "nifi.secrets.manager.implementation";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.nifi.mock.connector.server;

import org.apache.nifi.components.connector.FrameworkConnectorInitializationContextBuilder;
import org.apache.nifi.components.connector.StandardConnectorRepository;
import org.apache.nifi.components.connector.StandardConnectorManager;

public class MockConnectorRepository extends StandardConnectorRepository {
public class MockConnectorManager extends StandardConnectorManager {

private volatile MockExtensionMapper mockExtensionMapper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.connector.AssetReference;
import org.apache.nifi.components.connector.ConnectorNode;
import org.apache.nifi.components.connector.ConnectorRepository;
import org.apache.nifi.components.connector.ConnectorManager;
import org.apache.nifi.components.connector.ConnectorState;
import org.apache.nifi.components.connector.ConnectorValueReference;
import org.apache.nifi.components.connector.FlowUpdateException;
Expand Down Expand Up @@ -82,7 +82,7 @@ public class StandardConnectorMockServer implements ConnectorMockServer {
private FlowController flowController;
private MockExtensionDiscoveringManager extensionManager;
private ConnectorNode connectorNode;
private ConnectorRepository connectorRepository;
private ConnectorManager connectorManager;
private FlowEngine flowEngine;
private MockExtensionMapper mockExtensionMapper;
private FlowFileTransferCounts initialFlowFileTransferCounts = new FlowFileTransferCounts(0L, 0L, 0L, 0L);
Expand Down Expand Up @@ -122,13 +122,13 @@ public void start() {
throw new RuntimeException("Failed to initialize FlowFile Repository", e);
}

connectorRepository = flowController.getConnectorRepository();
if (!(connectorRepository instanceof MockConnectorRepository)) {
throw new IllegalStateException("Connector Repository is not an instance of MockConnectorRepository");
connectorManager = flowController.getConnectorManager();
if (!(connectorManager instanceof MockConnectorManager)) {
throw new IllegalStateException("Connector Manager is not an instance of MockConnectorManager");
}

mockExtensionMapper = new MockExtensionMapper();
((MockConnectorRepository) connectorRepository).setMockExtensionMapper(mockExtensionMapper);
((MockConnectorManager) connectorManager).setMockExtensionMapper(mockExtensionMapper);

flowEngine = new FlowEngine(4, "Connector Threads");
}
Expand Down Expand Up @@ -257,7 +257,7 @@ public ConnectorConfigVerificationResult verifyConfiguration(final String stepNa

@Override
public void addSecret(final String name, final String value) {
final SecretsManager secretsManager = connectorRepository.getSecretsManager();
final SecretsManager secretsManager = connectorManager.getSecretsManager();
if (!(secretsManager instanceof final ConnectorTestRunnerSecretsManager testRunnerSecretsManager)) {
throw new IllegalStateException("Secrets Manager is not an instance of ConnectorTestRunnerSecretsManager");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.nifi.mock.connector.server.MockConnectorRepository
org.apache.nifi.mock.connector.server.MockConnectorManager
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.components.connector;

import org.apache.nifi.flow.Bundle;

import java.util.List;
import java.util.Optional;

/**
* Provides the ability to look up available bundles for a given component type.
*/
public interface ComponentBundleLookup {

/**
* Returns the available bundles that provide the given component type.
*
* @param componentType the fully qualified class name of the component type
* @return the list of bundles that provide the component type
*/
List<Bundle> getAvailableBundles(String componentType);

/**
* Returns the latest version of a bundle that provides the given component type.
*
* @param componentType the fully qualified class name of the component type
* @return an Optional containing the latest bundle, or empty if no bundles are available
*/
Optional<Bundle> getLatestBundle(String componentType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.components.connector;

import java.io.IOException;
import java.time.Duration;
import java.util.Set;

/**
* <p>
* Provides cluster coordination capabilities for connector operations. This interface
* is implemented by the framework and provided to {@link ConnectorLifecycleManager}
* implementations to enable coordination across cluster nodes.
* </p>
*
* <p>
* This interface is not intended to be implemented by third parties. The framework
* provides a standard implementation that handles cluster communication.
* </p>
*/
public interface ConnectorClusterCoordinator {

/**
* Gets the connector state as reported by the cluster. This queries other nodes
* in the cluster to determine the consensus state of the connector.
*
* @param connectorId the identifier of the connector
* @return the cluster-wide connector state
* @throws IOException if an I/O error occurs during cluster communication
*/
ConnectorState getClusterState(String connectorId) throws IOException;

/**
* Waits for the cluster to reach one of the desired states for the specified connector.
* This method polls the cluster nodes until all nodes report a state in the desired set,
* or until the timeout is reached.
*
* @param connectorId the identifier of the connector
* @param desiredStates the set of acceptable final states to wait for
* @param allowableIntermediateStates the set of acceptable intermediate states while waiting
* @param timeout the maximum time to wait for the cluster to reach the desired state
* @throws IOException if an I/O error occurs during cluster communication
* @throws InterruptedException if the wait is interrupted
* @throws ConnectorLifecycleException if the cluster fails to reach the desired state
* within the timeout or if an unexpected state is encountered
*/
void awaitClusterState(String connectorId, Set<ConnectorState> desiredStates,
Set<ConnectorState> allowableIntermediateStates,
Duration timeout) throws IOException, InterruptedException, ConnectorLifecycleException;

/**
* Indicates whether this node is currently connected to a cluster.
*
* @return true if this node is clustered and connected, false otherwise
*/
boolean isClustered();

/**
* Indicates whether this node is the primary node in the cluster.
*
* @return true if this node is the primary node, false otherwise
*/
boolean isPrimaryNode();

/**
* Returns a no-op implementation of ConnectorClusterCoordinator for standalone mode.
*
* @return a standalone (no-op) cluster coordinator
*/
static ConnectorClusterCoordinator standalone() {
return StandaloneConnectorClusterCoordinator.INSTANCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.components.connector;

import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.events.EventReporter;

import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;

/**
* Provides context information for initializing a {@link ConnectorLifecycleManager}.
*/
public interface ConnectorLifecycleContext {

/**
* Returns the configuration properties for the lifecycle manager.
* These properties are typically specified in nifi.properties.
*
* @return the configuration properties
*/
Map<String, String> getProperties();

/**
* Returns the event reporter that can be used to report events and create bulletins.
*
* @return the event reporter
*/
EventReporter getEventReporter();

/**
* Returns the node type provider that provides information about the node's
* cluster status (e.g., whether it is clustered, primary, etc.).
*
* @return the node type provider
*/
NodeTypeProvider getNodeTypeProvider();

/**
* Returns the cluster coordinator that provides cluster coordination capabilities.
* May be null if running in standalone mode or if cluster coordination is not available.
*
* @return the cluster coordinator, or null if not available
*/
ConnectorClusterCoordinator getClusterCoordinator();

/**
* Returns the connector repository for persistence operations.
*
* @return the connector repository
*/
ConnectorRepository getConnectorRepository();

/**
* Returns a scheduled executor service that can be used for scheduling
* lifecycle-related tasks.
*
* @return the scheduled executor service
*/
ScheduledExecutorService getScheduledExecutorService();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@

package org.apache.nifi.components.connector;

public enum ConnectorState {
STARTING,
RUNNING,
STOPPING,
STOPPED,
DRAINING,
PURGING,
PREPARING_FOR_UPDATE,
UPDATING,
UPDATE_FAILED,
UPDATED;
/**
* Exception thrown when a connector lifecycle operation fails.
*/
public class ConnectorLifecycleException extends Exception {

public ConnectorLifecycleException(final String message) {
super(message);
}

public ConnectorLifecycleException(final String message, final Throwable cause) {
super(message, cause);
}

public ConnectorLifecycleException(final Throwable cause) {
super(cause);
}
}
Loading
Loading