Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,28 @@
</to>
</configuration>
</execution>
<execution>
<id>accumulator-blackhole</id>
<phase>package</phase>
<goals>
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.accumulator.blackhole.BlackholeFactory
</mainClass>
</container>
<to>
<image>
numaflow-java-examples/accumulator-blackhole:${docker.tag}
</image>
</to>
</configuration>
</execution>
<execution>
<id>on-success-sink</id>
<phase>package</phase>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.numaproj.numaflow.examples.accumulator.blackhole;

import io.numaproj.numaflow.accumulator.Server;
import io.numaproj.numaflow.accumulator.model.Accumulator;
import io.numaproj.numaflow.accumulator.model.AccumulatorFactory;
import io.numaproj.numaflow.accumulator.model.Datum;
import io.numaproj.numaflow.accumulator.model.Message;
import io.numaproj.numaflow.accumulator.model.OutputStreamObserver;
import lombok.extern.slf4j.Slf4j;

/**
* Blackhole is an accumulator that intentionally discards every datum it receives without
* forwarding any data downstream.
*
* <p>A naive implementation would simply read the input stream and emit nothing. However, an
* accumulator that never emits anything for the datums it consumes leaves the framework unable to
* release the per-datum tracked state, leading to unbounded memory growth.
*
* <p>Instead, this example emits a drop message for every datum using {@link Message#toDrop(Datum)}.
* A drop message is not forwarded to the next vertex, but it still allows the framework to advance
* the watermark and release the tracked state for that datum - giving us "blackhole" semantics
* without leaking memory. This pattern is useful for multiplexer-, cross-join-, or filter-style
* accumulators that legitimately need to omit some (or all) of their inputs.
*/
@Slf4j
public class BlackholeFactory extends AccumulatorFactory<BlackholeFactory.Blackhole> {

public static void main(String[] args) throws Exception {
log.info("Starting blackhole accumulator server..");
Server server = new Server(new BlackholeFactory());

// Start the server
server.start();

// wait for the server to shut down
server.awaitTermination();
log.info("Blackhole accumulator server exited..");
}

@Override
public Blackhole createAccumulator() {
return new Blackhole();
}

public static class Blackhole extends Accumulator {
@Override
public void processMessage(Datum datum, OutputStreamObserver outputStream) {
log.info(
"Dropping datum with event time: {}, watermark: {}",
datum.getEventTime().toEpochMilli(),
datum.getWatermark().toEpochMilli());
// Emit a drop message: nothing is forwarded downstream, but the framework still
// advances the watermark and releases the tracked state for this datum.
outputStream.send(Message.toDrop(datum));
}

@Override
public void handleEndOfStream(OutputStreamObserver outputStreamObserver) {
log.info("End of stream received, nothing to flush for blackhole accumulator");
}
}
}
19 changes: 19 additions & 0 deletions src/main/java/io/numaproj/numaflow/accumulator/model/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
/** Message is used to wrap the data returned by Accumulator functions. */
@Getter
public class Message {
private static final String[] DROP_TAGS = {"U+005C__DROP__"};

private final Instant eventTime;
private final Instant watermark;
private final Map<String, String> headers;
Expand All @@ -32,6 +34,23 @@ public Message(Datum datum) {
this.tags = null;
}

/**
* Creates a Message from the given Datum with drop tags set, so the message is not forwarded to
* the next vertex but still allows the accumulator to advance the watermark and release the
* tracked state. It is advised to use the incoming Datum to construct the message, because event
* time, watermark, id and headers of the message are derived from the Datum. Only use a custom
* implementation of the Datum if you know what you are doing.
*
* @param datum {@link Datum} object to drop the results for
* @return a {@link Message} tagged to be dropped
*/
public static Message toDrop(Datum datum) {
Message message = new Message(datum);
message.setValue(new byte[0]);
message.setTags(DROP_TAGS);
return message;
}

/*
* sets the value of the message
*
Expand Down
109 changes: 109 additions & 0 deletions src/test/java/io/numaproj/numaflow/accumulator/model/MessageTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package io.numaproj.numaflow.accumulator.model;

import org.junit.Test;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;

public class MessageTest {

@Test
public void testMessageFromDatum() {
Datum datum = buildDatum();
Message message = new Message(datum);

assertArrayEquals("hello".getBytes(), message.getValue());
assertArrayEquals(new String[]{"key1", "key2"}, message.getKeys());
assertArrayEquals(null, message.getTags());
assertEquals("test-id", message.getId());
assertEquals(Instant.ofEpochMilli(60000), message.getEventTime());
assertEquals(Instant.ofEpochMilli(59000), message.getWatermark());
}

@Test
public void testToDrop() {
Datum datum = buildDatum();
Message message = Message.toDrop(datum);

// The DROP tag must be set so the message is not forwarded downstream.
String[] dropTags = {"U+005C__DROP__"};
assertArrayEquals(dropTags, message.getTags());
// No value is forwarded, but the identifying/watermark fields are carried over so the
// accumulator can advance the watermark and release the tracked state.
assertArrayEquals(new byte[0], message.getValue());
assertArrayEquals(new String[]{"key1", "key2"}, message.getKeys());
assertEquals("test-id", message.getId());
assertEquals(Instant.ofEpochMilli(60000), message.getEventTime());
assertEquals(Instant.ofEpochMilli(59000), message.getWatermark());
}

private Datum buildDatum() {
Map<String, String> headers = new HashMap<>();
headers.put("x", "y");
return new TestDatum(
new String[]{"key1", "key2"},
"hello".getBytes(),
Instant.ofEpochMilli(59000),
Instant.ofEpochMilli(60000),
headers,
"test-id");
}

private static class TestDatum implements Datum {
private final String[] keys;
private final byte[] value;
private final Instant watermark;
private final Instant eventTime;
private final Map<String, String> headers;
private final String id;

TestDatum(
String[] keys,
byte[] value,
Instant watermark,
Instant eventTime,
Map<String, String> headers,
String id) {
this.keys = keys;
this.value = value;
this.watermark = watermark;
this.eventTime = eventTime;
this.headers = headers;
this.id = id;
}

@Override
public byte[] getValue() {
return value;
}

@Override
public String[] getKeys() {
return keys;
}

@Override
public Instant getEventTime() {
return eventTime;
}

@Override
public Instant getWatermark() {
return watermark;
}

@Override
public Map<String, String> getHeaders() {
return headers;
}

@Override
public String getID() {
return id;
}
}
}
Loading