Skip to content

feat: Mvtx Short circuit dataplane changes#3123

Closed
vaibhavtiwari33 wants to merge 32 commits intomainfrom
short-circuit-dataplane2
Closed

feat: Mvtx Short circuit dataplane changes#3123
vaibhavtiwari33 wants to merge 32 commits intomainfrom
short-circuit-dataplane2

Conversation

@vaibhavtiwari33
Copy link
Copy Markdown
Contributor

@vaibhavtiwari33 vaibhavtiwari33 commented Dec 22, 2025

What this PR does / why we need it

This PR relates to the dataplane side implementation for short-circuiting in monovertex.
The implementation follows the design doc here

Related issues

Fixes #3101

Testing

Spec testing

✅ Deploy bypass spec with all currently supported bypass sinks

Screenshot 2025-12-18 at 4 50 33 PM

✅ Error when setting bypass config for a sink that doesn't exist:

Error from server (BadRequest): error when applying patch:
{"metadata":{"annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"numaflow.numaproj.io/v1alpha1\",\"kind\":\"MonoVertex\",\"metadata\":{\"annotations\":{},\"name\":\"simple-mono-vertex\",\"namespace\":\"numaflow-system\"},\"spec\":{\"bypass\":{\"fallback\":{\"tags\":{\"operator\":\"or\",\"values\":[\"fallback\"]}},\"onSuccess\":{\"tags\":{\"operator\":\"or\",\"values\":[\"onSuccess\"]}},\"sink\":{\"tags\":{\"operator\":\"or\",\"values\":[\"sinker\"]}}},\"sink\":{\"fallback\":{\"udsink\":{\"container\":{\"image\":\"quay.io/numaio/numaflow-go/fb-sink-log:stable\"}}},\"udsink\":{\"container\":{\"image\":\"quay.io/numaio/numaflow-rs/sink-log:stable\"}}},\"source\":{\"transformer\":{\"container\":{\"image\":\"quay.io/numaio/numaflow-rs/source-transformer-now:stable\"}},\"udsource\":{\"container\":{\"image\":\"quay.io/numaio/numaflow-rs/simple-source:stable\"}}},\"udf\":{\"container\":{\"image\":\"quay.io/numaio/numaflow-go/map-cat-bypass:stable\",\"imagePullPolicy\":\"IfNotPresent\"}}}}\n"}},"spec":{"sink":{"onSuccess":null}}}
to:
Resource: "numaflow.numaproj.io/v1alpha1, Resource=monovertices", GroupVersionKind: "numaflow.numaproj.io/v1alpha1, Kind=MonoVertex"
Name: "simple-mono-vertex", Namespace: "numaflow-system"
for: "/Users/vtiwari5/Documents/GitHub/numaflow/numaflow/examples/21-simple-mono-vertex.yaml": error when patching "/Users/vtiwari5/Documents/GitHub/numaflow/numaflow/examples/21-simple-mono-vertex.yaml": admission webhook "webhook.numaflow.numaproj.io" denied the request: new MonoVertex spec is invalid: invalid bypass spec: bypass to on-success sink is defined but on-success sink itself is not defined

Testing Scenarios

1. ✅ Bypass defines all sink types, but UDF only sets tags relevant to fallback and on-success sink

Expected behaviour: fallback and on-success sink receive data, but primary sink doesn't receive anything

Bypass spec:

  bypass:
    sink:
      tags:
        operator: or
        values:
          - sinker
    fallback:
      tags:
        operator: or
        values:
          - fallback
    onSuccess:
      tags:
        operator: or
        values:
          - onSuccess

Map UDF:

UDF doesn't set correct tag for sending data to primary sink -

func (c *Cat) Map(ctx context.Context, keys []string, d mapper.Datum) mapper.Messages {
	i, err := strconv.Atoi(string(d.Value()))
	if err != nil {
		log.Printf("Failed to convert %v value to int: %v", string(d.Value()), err)
	}
	responseMessage := mapper.NewMessage(d.Value()).WithKeys(keys)
	if i%3 == 0 {
		log.Printf("Sending %v to fallback", string(d.Value()))
		return mapper.MessagesBuilder().Append(responseMessage.WithTags([]string{"fallback"}))
	} else if i%3 == 1 {
		log.Printf("Sending %v to onSuccess", string(d.Value()))
		return mapper.MessagesBuilder().Append(responseMessage.WithTags([]string{"onSuccess"}))
	}
	log.Printf("Sending %v to sink", string(d.Value()))
	return mapper.MessagesBuilder().Append(responseMessage)
}

Sink logs:

vtiwari5@macos-H29191QYT0 numaflow % klf simple-mono-vertex-mv-0-jdhen -c udsink
2025-12-18T21:49:40.856646Z  INFO write_info_file{server_info=ServerInfo { protocol: "uds", language: "rust", minimum_numaflow_version: "1.4.0-z", version: "0.4.0", metadata: Some({}) } path="/var/run/numaflow/sinker-server-info"}: numaflow::shared::server: Writing to file content="{\"protocol\":\"uds\",\"language\":\"rust\",\"minimum_numaflow_version\":\"1.4.0-z\",\"version\":\"0.4.0\",\"metadata\":{}}U+005C__END__"

Fallback sink logs:

vtiwari5@macos-H29191QYT0 numaflow % klf simple-mono-vertex-mv-0-jdhen -c fb-udsink
Primary sink under maintenance, writing to fallback sink -  15
Primary sink under maintenance, writing to fallback sink -  0
Primary sink under maintenance, writing to fallback sink -  3
Primary sink under maintenance, writing to fallback sink -  9
Primary sink under maintenance, writing to fallback sink -  6
Primary sink under maintenance, writing to fallback sink -  12
Primary sink under maintenance, writing to fallback sink -  18
Primary sink under maintenance, writing to fallback sink -  21

Similar logs for on-success sink are observed

2. ✅ Bypass defines fallback and on-success sink types, and UDF only sets tags relevant to fallback and on-success sink

Expected behaviour: All sinks receive data respective data.

Explanation, if conditions for primary sink aren't defined then we don't want to block users from sending data to it.

Bypass spec:

  bypass:
    fallback:
      tags:
        operator: or
        values:
          - fallback
    onSuccess:
      tags:
        operator: or
        values:
          - onSuccess

Map UDF:

UDF doesn't set any tag for sending data to primary sink, implementation is same as above.

Primary Sink logs:

vtiwari5@macos-H29191QYT0 numaflow % klf simple-mono-vertex-mv-0-dnf1w -c udsink
2025-12-18T22:13:10.077913Z  INFO write_info_file{server_info=ServerInfo { protocol: "uds", language: "rust", minimum_numaflow_version: "1.4.0-z", version: "0.4.0", metadata: Some({}) } path="/var/run/numaflow/sinker-server-info"}: numaflow::shared::server: Writing to file content="{\"protocol\":\"uds\",\"language\":\"rust\",\"minimum_numaflow_version\":\"1.4.0-z\",\"version\":\"0.4.0\",\"metadata\":{}}U+005C__END__"
2
5
8
53
11
14
17
20
23
26
29

Fallback sink logs:

Primary sink under maintenance, writing to fallback sink -  2112393
Primary sink under maintenance, writing to fallback sink -  2112027
Primary sink under maintenance, writing to fallback sink -  2112399
Primary sink under maintenance, writing to fallback sink -  2112024
Primary sink under maintenance, writing to fallback sink -  2112030
Primary sink under maintenance, writing to fallback sink -  2112033
Primary sink under maintenance, writing to fallback sink -  2112432

Similar logs for on-success sink are observed

3. ✅ Bypass rules are not defined

Expected Behaviour: All the messages should just end up in primary sink and nothing in any other sink.

Map UDF:

Same implementation as above is used to add tags to messages.

Primary Sink logs:

vtiwari5@macos-H29191QYT0 rust % klf simple-mono-vertex-mv-0-mtrnp -c udsink
1922009
1922015
1922007
1922000
1922001

Fallback and on-success sink logs:

vtiwari5@macos-H29191QYT0 rust % klf simple-mono-vertex-mv-0-mtrnp -c fb-udsink
^C
vtiwari5@macos-H29191QYT0 rust % klf simple-mono-vertex-mv-0-mtrnp -c ons-udsink
2025-12-18T22:22:44.997025Z  INFO write_info_file{server_info=ServerInfo { protocol: "uds", language: "rust", minimum_numaflow_version: "1.4.0-z", version: "0.4.0", metadata: Some({}) } path="/var/run/numaflow/ons-sinker-server-info"}: numaflow::shared::server: Writing to file content="{\"protocol\":\"uds\",\"language\":\"rust\",\"minimum_numaflow_version\":\"1.4.0-z\",\"version\":\"0.4.0\",\"metadata\":{}}U+005C__END__"
^C

4. ✅ Bypass rules are defined for fallback and sink. Both Transformer and Map route messages.

Expected behaviour: Messages tagged for fallback/onSuccess from transformer/UDF respectively should end up in fallback/onSuccess sink.

Bypass spec:

  bypass:
    fallback:
      tags:
        operator: or
        values:
          - fallback
    onSuccess:
      tags:
        operator: or
        values:
          - onSuccess

Source Transformer:

impl sourcetransform::SourceTransformer for NowCat {
    async fn transform(
        &self,
        input: sourcetransform::SourceTransformRequest,
    ) -> Vec<sourcetransform::Message> {
        let integer_value = String::from_utf8_lossy(&input.value)
            .trim()
            .parse()
            .unwrap_or(0);
        let new_message = (String::from("From Source Transformer: ") + &integer_value.to_string())
            .as_bytes()
            .to_vec();

        let base_message = sourcetransform::Message::new(new_message, chrono::offset::Utc::now()).with_keys(input.keys.clone());

        return vec![match integer_value % 7 {
            0 => base_message.with_tags(vec!["fallback".to_string()]),
            1 => base_message.with_tags(vec!["onSuccess".to_string()]),
            2 => base_message.with_tags(vec!["sink".to_string()]),
            _ => base_message,
        }];
    }
}

Map UDF:

func (c *Cat) Map(ctx context.Context, keys []string, d mapper.Datum) mapper.Messages {
	originalValue, err := strconv.Atoi(string(d.Value()))
	if err != nil {
		originalValue, err = strconv.Atoi(strings.TrimSpace(strings.Split(string(d.Value()), ":")[1]))
		if err != nil {
			log.Printf("Failed to convert %v value to int: %v", string(d.Value()), err)
		}
	}
	routedResponseMessage := mapper.NewMessage([]byte("From UDF: " + strconv.Itoa(originalValue))).WithKeys(keys)
	forwardedResponseMessage := mapper.NewMessage([]byte(strconv.Itoa(originalValue))).WithKeys(keys)
	
	if originalValue%7 == 3 {
		log.Printf("Sending %v to fallback", string(d.Value()))
		return mapper.MessagesBuilder().Append(routedResponseMessage.WithTags([]string{"fallback"}))
	} else if originalValue%7 == 4 {
		log.Printf("Sending %v to onSuccess", string(d.Value()))
		return mapper.MessagesBuilder().Append(routedResponseMessage.WithTags([]string{"onSuccess"}))
	} else if originalValue%7 == 5 {
		log.Printf("Sending %v to sink", string(d.Value()))
		return mapper.MessagesBuilder().Append(routedResponseMessage.WithTags([]string{"sink"}))
	}
	log.Printf("Sending %v without tags", string(d.Value()))
	return mapper.MessagesBuilder().Append(forwardedResponseMessage)
}

Fallback/OnSuccess sink logs:

Explanation - From UDF and From Source Transformer signifies that these messages are directly being written from UDF/Transformer hence bypassing components in between.

From UDF: 1947460
From UDF: 1947488
From UDF: 1947495
From Source Transformer: 1947505
From Source Transformer: 1947511
From Source Transformer: 1947517
From Source Transformer: 1947523
From Source Transformer: 1947529
From Source Transformer: 1947535
From Source Transformer: 1947541
From Source Transformer: 1947547
From Source Transformer: 1947553
From Source Transformer: 1947559
From Source Transformer: 1947565
From UDF: 1947502

Primary Sink logs:

Explanation - From UDF messages are ones that UDF tagged with 'sink' tag, while others are ones that it didn't tag.

13050620
From UDF: 13050623
From UDF: 13050609
13050626
13050627
13050617
13050632

5. ✅ Bypass rules are defined for all sinks. Both Transformer and Map route messages.

Expected behaviour: Messages tagged for fallback/onSuccess/sink from transformer/UDF respectively should end up in respective sinks. Messages left out from being forwarded should be dropped.

Bypass spec:

  bypass:
    sink:
      tags:
        operator: or
        values:
          - sink
    fallback:
      tags:
        operator: or
        values:
          - fallback
    onSuccess:
      tags:
        operator: or
        values:
          - onSuccess

Map/Transformer UDF are the same as above.

Fallback sink logs (onSuccess sink has similar logs):

writing to fallback sink -  From UDF: 60707
writing to fallback sink -  From UDF: 60721
writing to fallback sink -  From Source Transformer: 60956
writing to fallback sink -  From Source Transformer: 60963
writing to fallback sink -  From Source Transformer: 60970
writing to fallback sink -  From Source Transformer: 60977
writing to fallback sink -  From Source Transformer: 60984
writing to fallback sink -  From Source Transformer: 60991
writing to fallback sink -  From Source Transformer: 60998
writing to fallback sink -  From UDF: 60728

Primary Sink logs:

From UDF: 94582
From UDF: 94589
From Source Transformer: 94761
From Source Transformer: 94768
From Source Transformer: 94775
From Source Transformer: 94782
From Source Transformer: 94789
From Source Transformer: 94796
From UDF: 94603
From UDF: 94617

Special notes for reviewers

Anything notable for review (risk, rollout, follow-ups).

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
… put them as part of monovertex

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
…former spec

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
…condition spec

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
… splitter

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
…ss rules are set in spec. Add a check in dataplane code to only send messages when corresponding sub-sinks are defined

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@vaibhavtiwari33 vaibhavtiwari33 changed the title Short circuit dataplane2 feat: Mvtx Short circuit dataplane changes Dec 22, 2025
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@codecov
Copy link
Copy Markdown

codecov Bot commented Dec 22, 2025

Codecov Report

❌ Patch coverage is 6.43087% with 291 lines in your changes missing coverage. Please review.
✅ Project coverage is 79.39%. Comparing base (241cac9) to head (9d3fbf1).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
rust/numaflow-core/src/sinker/sink.rs 0.00% 115 Missing ⚠️
rust/numaflow-core/src/monovertex/bypass.rs 0.00% 87 Missing ⚠️
rust/numaflow-core/src/monovertex/forwarder.rs 14.60% 76 Missing ⚠️
pkg/reconciler/validator/mvtx_validate.go 22.22% 5 Missing and 2 partials ⚠️
rust/numaflow-core/src/monovertex.rs 37.50% 5 Missing ⚠️
rust/numaflow-core/src/config/monovertex.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3123      +/-   ##
==========================================
- Coverage   79.75%   79.39%   -0.36%     
==========================================
  Files         291      292       +1     
  Lines       65142    65441     +299     
==========================================
+ Hits        51951    51958       +7     
- Misses      12638    12928     +290     
- Partials      553      555       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@vaibhavtiwari33
Copy link
Copy Markdown
Contributor Author

Requesting review for implementation outline before writing tests for the final implementation

@vaibhavtiwari33 vaibhavtiwari33 marked this pull request as ready for review December 22, 2025 19:16
vigith and others added 2 commits December 23, 2025 10:11
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Copy link
Copy Markdown
Contributor

@yhl25 yhl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider injecting the bypass router (with sink handle) directly into the component start methods?

This way, components could write directly to the sink handle when bypass conditions match, rather than writing to an intermediate stream that then gets routed. It would reduce stream hops and cloning, also it would simplify the forwarder and gives more control for the components to do routing.

Comment on lines +85 to +87
// Read from a chunked stream of messages
let chunked_stream = input_stream.chunks_timeout(batch_size, chunk_timeout);
tokio::pin!(chunked_stream);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to chunk the stream?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write, write_to_fallback and write_to_on_success methods accept batch of messages and hence I'm collecting/building this batch by chunking the stream.

// for each message read from the chunked stream determine which sink it should be sent to
// based on the bypass conditions and wrap it in respective MessageToSink enum value
for msg in msgs {
let msg_clone = msg.clone();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to clone the complete message?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done for brevity in the rest of the while loop. If we remove this message cloning, we can't prove to the compiler that the original msg wasn't moved inside the enum for the None case in the subsequent match conditional.
This cloning can certainly be avoided by moving the match logic within these if/else statements.

@vaibhavtiwari33
Copy link
Copy Markdown
Contributor Author

Should we consider injecting the bypass router (with sink handle) directly into the component start methods?

This way, components could write directly to the sink handle when bypass conditions match, rather than writing to an intermediate stream that then gets routed. It would reduce stream hops and cloning, also it would simplify the forwarder and gives more control for the components to do routing.

We should be able to do that, my only concern was that these components are also used by the pipeline, thus inherently changing the component behaviour for one specific case of monovertex might introduce unwarranted complexity within the component logic.

@vaibhavtiwari33 vaibhavtiwari33 marked this pull request as draft January 4, 2026 21:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Dataplane changes for conditionally writing messages directly to fallback/onsuccess from transformer/map

3 participants