Skip to content

Commit 7107ba5

Browse files
authored
NIFI-15480: Added DropFlowFileSummary, ability to drop FlowFiles from a ConnectionFacade / ProcessGroupFacade (#62)
1 parent 80de11f commit 7107ba5

3 files changed

Lines changed: 97 additions & 0 deletions

File tree

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.nifi.components.connector;
19+
20+
/**
21+
* A summary of FlowFiles that were dropped from a FlowFile Queue.
22+
* This class provides information about the number of FlowFiles dropped
23+
* and the total aggregate size in bytes of those FlowFiles.
24+
*/
25+
public class DropFlowFileSummary {
26+
27+
private final int droppedCount;
28+
private final long droppedBytes;
29+
30+
/**
31+
* Creates a new DropFlowFileSummary with the given count and byte size.
32+
*
33+
* @param droppedCount the number of FlowFiles that were dropped
34+
* @param droppedBytes the total size in bytes of all dropped FlowFiles
35+
*/
36+
public DropFlowFileSummary(final int droppedCount, final long droppedBytes) {
37+
this.droppedCount = droppedCount;
38+
this.droppedBytes = droppedBytes;
39+
}
40+
41+
/**
42+
* @return the number of FlowFiles that were dropped
43+
*/
44+
public int getDroppedCount() {
45+
return droppedCount;
46+
}
47+
48+
/**
49+
* @return the total size in bytes of all dropped FlowFiles
50+
*/
51+
public long getDroppedBytes() {
52+
return droppedBytes;
53+
}
54+
55+
/**
56+
* Creates a new DropFlowFileSummary that represents the combination of this summary and the given summary.
57+
*
58+
* @param other the other summary to add to this one
59+
* @return a new DropFlowFileSummary representing the combined totals
60+
*/
61+
public DropFlowFileSummary add(final DropFlowFileSummary other) {
62+
return new DropFlowFileSummary(this.droppedCount + other.droppedCount, this.droppedBytes + other.droppedBytes);
63+
}
64+
65+
@Override
66+
public String toString() {
67+
return "DropFlowFileSummary[droppedCount=" + droppedCount + ", droppedBytes=" + droppedBytes + "]";
68+
}
69+
}

src/main/java/org/apache/nifi/components/connector/components/ConnectionFacade.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,13 @@
1717

1818
package org.apache.nifi.components.connector.components;
1919

20+
import org.apache.nifi.components.connector.DropFlowFileSummary;
2021
import org.apache.nifi.controller.queue.QueueSize;
2122
import org.apache.nifi.flow.VersionedConnection;
23+
import org.apache.nifi.flowfile.FlowFile;
24+
25+
import java.io.IOException;
26+
import java.util.function.Predicate;
2227

2328
public interface ConnectionFacade {
2429

@@ -35,4 +40,13 @@ public interface ConnectionFacade {
3540
*/
3641
void purge();
3742

43+
/**
44+
* Drops all FlowFiles from the connection that match the given predicate.
45+
*
46+
* @param predicate the predicate to use to determine which FlowFiles to drop
47+
* @return a summary of the FlowFiles that were dropped
48+
* @throws IOException if an I/O error occurs while dropping FlowFiles
49+
*/
50+
DropFlowFileSummary dropFlowFiles(Predicate<FlowFile> predicate) throws IOException;
51+
3852
}

src/main/java/org/apache/nifi/components/connector/components/ProcessGroupFacade.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717

1818
package org.apache.nifi.components.connector.components;
1919

20+
import org.apache.nifi.components.connector.DropFlowFileSummary;
2021
import org.apache.nifi.controller.queue.QueueSize;
2122
import org.apache.nifi.flow.VersionedProcessGroup;
23+
import org.apache.nifi.flowfile.FlowFile;
2224

25+
import java.io.IOException;
2326
import java.util.Set;
27+
import java.util.function.Predicate;
2428

2529
public interface ProcessGroupFacade {
2630

@@ -52,4 +56,14 @@ public interface ProcessGroupFacade {
5256

5357
ProcessGroupLifecycle getLifecycle();
5458

59+
/**
60+
* Drops all FlowFiles from all connections in this ProcessGroup and its child ProcessGroups
61+
* that match the given predicate.
62+
*
63+
* @param predicate the predicate to test each FlowFile against
64+
* @return a summary of the dropped FlowFiles
65+
* @throws IOException if an I/O error occurs while dropping FlowFiles
66+
*/
67+
DropFlowFileSummary dropFlowFiles(Predicate<FlowFile> predicate) throws IOException;
68+
5569
}

0 commit comments

Comments
 (0)