on-success-sink
package
diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/accumulator/blackhole/BlackholeFactory.java b/examples/src/main/java/io/numaproj/numaflow/examples/accumulator/blackhole/BlackholeFactory.java
new file mode 100644
index 00000000..4b0b3a2a
--- /dev/null
+++ b/examples/src/main/java/io/numaproj/numaflow/examples/accumulator/blackhole/BlackholeFactory.java
@@ -0,0 +1,62 @@
+package io.numaproj.numaflow.examples.accumulator.blackhole;
+
+import io.numaproj.numaflow.accumulator.Server;
+import io.numaproj.numaflow.accumulator.model.Accumulator;
+import io.numaproj.numaflow.accumulator.model.AccumulatorFactory;
+import io.numaproj.numaflow.accumulator.model.Datum;
+import io.numaproj.numaflow.accumulator.model.Message;
+import io.numaproj.numaflow.accumulator.model.OutputStreamObserver;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Blackhole is an accumulator that intentionally discards every datum it receives without
+ * forwarding any data downstream.
+ *
+ * A naive implementation would simply read the input stream and emit nothing. However, an
+ * accumulator that never emits anything for the datums it consumes leaves the framework unable to
+ * release the per-datum tracked state, leading to unbounded memory growth.
+ *
+ *
Instead, this example emits a drop message for every datum using {@link Message#toDrop(Datum)}.
+ * A drop message is not forwarded to the next vertex, but it still allows the framework to advance
+ * the watermark and release the tracked state for that datum - giving us "blackhole" semantics
+ * without leaking memory. This pattern is useful for multiplexer-, cross-join-, or filter-style
+ * accumulators that legitimately need to omit some (or all) of their inputs.
+ */
+@Slf4j
+public class BlackholeFactory extends AccumulatorFactory {
+
+ public static void main(String[] args) throws Exception {
+ log.info("Starting blackhole accumulator server..");
+ Server server = new Server(new BlackholeFactory());
+
+ // Start the server
+ server.start();
+
+ // wait for the server to shut down
+ server.awaitTermination();
+ log.info("Blackhole accumulator server exited..");
+ }
+
+ @Override
+ public Blackhole createAccumulator() {
+ return new Blackhole();
+ }
+
+ public static class Blackhole extends Accumulator {
+ @Override
+ public void processMessage(Datum datum, OutputStreamObserver outputStream) {
+ log.info(
+ "Dropping datum with event time: {}, watermark: {}",
+ datum.getEventTime().toEpochMilli(),
+ datum.getWatermark().toEpochMilli());
+ // Emit a drop message: nothing is forwarded downstream, but the framework still
+ // advances the watermark and releases the tracked state for this datum.
+ outputStream.send(Message.toDrop(datum));
+ }
+
+ @Override
+ public void handleEndOfStream(OutputStreamObserver outputStreamObserver) {
+ log.info("End of stream received, nothing to flush for blackhole accumulator");
+ }
+ }
+}
diff --git a/src/main/java/io/numaproj/numaflow/accumulator/model/Message.java b/src/main/java/io/numaproj/numaflow/accumulator/model/Message.java
index 8e06087c..ae63c80f 100644
--- a/src/main/java/io/numaproj/numaflow/accumulator/model/Message.java
+++ b/src/main/java/io/numaproj/numaflow/accumulator/model/Message.java
@@ -7,6 +7,8 @@
/** Message is used to wrap the data returned by Accumulator functions. */
@Getter
public class Message {
+ private static final String[] DROP_TAGS = {"U+005C__DROP__"};
+
private final Instant eventTime;
private final Instant watermark;
private final Map headers;
@@ -32,6 +34,23 @@ public Message(Datum datum) {
this.tags = null;
}
+ /**
+ * Creates a Message from the given Datum with drop tags set, so the message is not forwarded to
+ * the next vertex but still allows the accumulator to advance the watermark and release the
+ * tracked state. It is advised to use the incoming Datum to construct the message, because event
+ * time, watermark, id and headers of the message are derived from the Datum. Only use a custom
+ * implementation of the Datum if you know what you are doing.
+ *
+ * @param datum {@link Datum} object to drop the results for
+ * @return a {@link Message} tagged to be dropped
+ */
+ public static Message toDrop(Datum datum) {
+ Message message = new Message(datum);
+ message.setValue(new byte[0]);
+ message.setTags(DROP_TAGS);
+ return message;
+ }
+
/*
* sets the value of the message
*
diff --git a/src/test/java/io/numaproj/numaflow/accumulator/model/MessageTest.java b/src/test/java/io/numaproj/numaflow/accumulator/model/MessageTest.java
new file mode 100644
index 00000000..57640cd5
--- /dev/null
+++ b/src/test/java/io/numaproj/numaflow/accumulator/model/MessageTest.java
@@ -0,0 +1,109 @@
+package io.numaproj.numaflow.accumulator.model;
+
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class MessageTest {
+
+ @Test
+ public void testMessageFromDatum() {
+ Datum datum = buildDatum();
+ Message message = new Message(datum);
+
+ assertArrayEquals("hello".getBytes(), message.getValue());
+ assertArrayEquals(new String[]{"key1", "key2"}, message.getKeys());
+ assertArrayEquals(null, message.getTags());
+ assertEquals("test-id", message.getId());
+ assertEquals(Instant.ofEpochMilli(60000), message.getEventTime());
+ assertEquals(Instant.ofEpochMilli(59000), message.getWatermark());
+ }
+
+ @Test
+ public void testToDrop() {
+ Datum datum = buildDatum();
+ Message message = Message.toDrop(datum);
+
+ // The DROP tag must be set so the message is not forwarded downstream.
+ String[] dropTags = {"U+005C__DROP__"};
+ assertArrayEquals(dropTags, message.getTags());
+ // No value is forwarded, but the identifying/watermark fields are carried over so the
+ // accumulator can advance the watermark and release the tracked state.
+ assertArrayEquals(new byte[0], message.getValue());
+ assertArrayEquals(new String[]{"key1", "key2"}, message.getKeys());
+ assertEquals("test-id", message.getId());
+ assertEquals(Instant.ofEpochMilli(60000), message.getEventTime());
+ assertEquals(Instant.ofEpochMilli(59000), message.getWatermark());
+ }
+
+ private Datum buildDatum() {
+ Map headers = new HashMap<>();
+ headers.put("x", "y");
+ return new TestDatum(
+ new String[]{"key1", "key2"},
+ "hello".getBytes(),
+ Instant.ofEpochMilli(59000),
+ Instant.ofEpochMilli(60000),
+ headers,
+ "test-id");
+ }
+
+ private static class TestDatum implements Datum {
+ private final String[] keys;
+ private final byte[] value;
+ private final Instant watermark;
+ private final Instant eventTime;
+ private final Map headers;
+ private final String id;
+
+ TestDatum(
+ String[] keys,
+ byte[] value,
+ Instant watermark,
+ Instant eventTime,
+ Map headers,
+ String id) {
+ this.keys = keys;
+ this.value = value;
+ this.watermark = watermark;
+ this.eventTime = eventTime;
+ this.headers = headers;
+ this.id = id;
+ }
+
+ @Override
+ public byte[] getValue() {
+ return value;
+ }
+
+ @Override
+ public String[] getKeys() {
+ return keys;
+ }
+
+ @Override
+ public Instant getEventTime() {
+ return eventTime;
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return watermark;
+ }
+
+ @Override
+ public Map getHeaders() {
+ return headers;
+ }
+
+ @Override
+ public String getID() {
+ return id;
+ }
+ }
+}