Skip to content

Conversation

@MasterPtato
Copy link
Contributor

No description provided.

@vercel
Copy link

vercel bot commented Nov 18, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
rivetkit-serverless Error Error Dec 2, 2025 7:11pm
3 Skipped Deployments
Project Deployment Preview Comments Updated (UTC)
rivet-cloud Ignored Ignored Dec 2, 2025 7:11pm
rivet-inspector Ignored Ignored Preview Dec 2, 2025 7:11pm
rivet-site Ignored Ignored Preview Dec 2, 2025 7:11pm

Copy link
Contributor Author

MasterPtato commented Nov 18, 2025

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Nov 18, 2025

More templates

@rivetkit/cloudflare-workers

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/cloudflare-workers@3480

@rivetkit/db

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/db@3480

@rivetkit/framework-base

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/framework-base@3480

@rivetkit/next-js

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/next-js@3480

@rivetkit/react

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/react@3480

rivetkit

pnpm add https://pkg.pr.new/rivet-dev/rivet/rivetkit@3480

@rivetkit/sql-loader

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sql-loader@3480

@rivetkit/engine-runner

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner@3480

@rivetkit/engine-runner-protocol

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner-protocol@3480

commit: 4eabd07

@claude
Copy link

claude bot commented Nov 18, 2025

Code Review: Rewrite runner workflow to handle batch signals

Summary

This PR introduces runner2.rs as a rewrite of the runner workflow to handle batch signal processing. The key improvement is switching from processing signals one-at-a-time to processing them in batches using listen_n_with_timeout instead of listen_with_timeout.

Positive Aspects

Architecture & Design

  • Batch Processing: The switch to listen_n_with_timeout with a batch size of 1024 is excellent for performance and throughput
  • State Separation: Good separation between State (persistent workflow state) and LifecycleState (loop iteration state)
  • Signal Batching: Well-designed batching logic that accumulates signals before processing (lines 94-183)
  • Message Batching: Efficient batching of outbound messages before sending (line 185)

Error Handling

  • Graceful Degradation: Proper use of .graceful_not_found() when signaling actor workflows (lines 163, 247, 362)
  • Race Condition Handling: Good handling of draining runners receiving start actor commands (lines 139-172)

Code Quality

  • Clear Comments: Good inline documentation explaining non-obvious behavior (lines 236, 819, 967, 985)
  • Consistent Patterns: Activities follow consistent naming and structure
  • Logging: Appropriate use of structured logging with tracing::warn!

Issues & Concerns

Critical Issues

1. Unused Import (line 11)

use vbare::OwnedVersionedData;

This import is not used anywhere in the file and should be removed. The project uses workspace dependencies, so unnecessary imports should be cleaned up.

Fix: Remove line 11

2. Potential Division by Zero (lines 620, 1057)

let remaining_millislots = (remaining_slots * 1000) / input.total_slots;

If input.total_slots is 0, this will panic. While this may be prevented by validation elsewhere, defensive coding suggests adding a check or assertion.

Recommendation: Add validation or use checked_div with proper error handling

3. Missing Signal Handler?

The Main signal enum at line 1170 includes CheckQueue, Command, Forward, and Stop, but I notice the batch processing loop handles these signals. However, there's no validation that all signal variants are handled - if a new signal type is added to Main, the compiler won't force an update here since the match isn't exhaustive on the enum.

Recommendation: Consider if this is intentional or if the signal handling should be refactored for better compile-time safety

Performance Considerations

4. Sequential Signal Processing (lines 239-256)

// NOTE: This should not be parallelized because signals should be sent in order
// Forward to actor workflows
// Process events
for event in &events {
    // ... sends signal to actor workflow
}

While the comment explains this must be sequential, this could become a bottleneck with many events. Each signal send is an async operation that must complete before the next begins.

Consider:

  • Is strict ordering truly required for all events, or just events for the same actor?
  • Could you batch events by actor_id and parallelize across different actors while maintaining order per-actor?

5. Sequential Allocation Signals (lines 315-321)

for alloc in res.allocations {
    ctx.signal(alloc.signal)
        .to_workflow::<crate::workflows::actor::Workflow>()
        .tag("actor_id", alloc.actor_id)
        .send()
        .await?;
}

Similar to #4, these allocations are sent sequentially but could potentially be parallelized since they're going to different actors.

Recommendation: Use futures::future::try_join_all or similar to parallelize these independent operations

6. Message-by-Message Publishing (lines 1140-1147)

for message in &input.messages {
    let message_serialized = versioned::ToClient::wrap_latest(message.clone())
        .serialize_with_embedded_version(PROTOCOL_VERSION)?;
    
    ctx.ups()?
        .publish(&receiver_subject, &message_serialized, PublishOpts::one())
        .await?;
}

Each message is published individually in a loop. If the pubsub system supports batch publishing, this could be optimized.

Recommendation: Check if ups() supports batch publishing and utilize it

Code Quality Issues

7. Commented Out Code (lines 36, 819-825)

// events: Vec<EventRow>,
// TODO: Storing events is disabled for now, otherwise state will grow indefinitely

While the TODO explains why events aren't stored, the commented code should either be removed or the decision should be finalized.

Recommendation: Either implement a proper solution (e.g., event truncation/archival) or remove the dead code

8. Deprecated Field (line 455-456)

#[derive(Debug, Serialize, Deserialize)]
struct InitOutput {
    /// Deprecated.
    evict_workflow_id: Option<Id>,
}

If this field is deprecated, there should be a plan to remove it. Is this for backwards compatibility? Should it have a timeline for removal?

Recommendation: Add context on when this can be removed or if it's needed for backward compatibility

9. Clone Overhead (line 70, 284)

let input = input.clone();  // line 70
commands: commands.clone(),  // line 284

The input is cloned on every loop iteration. While Input is relatively small, commands could grow large and is cloned even when it might not be needed (the clone happens before the activity, but the activity might not use all fields).

Recommendation: Consider if these clones are necessary or if references could be used

Testing & Documentation

10. No Tests Included

This is a significant rewrite (1176 lines) that changes core behavior from single-signal to batch-signal processing, but no tests are included in the PR.

Critical Recommendation: Add tests covering:

  • Batch signal processing with various batch sizes
  • Timeout behavior when receiving fewer signals than batch size
  • Race conditions (draining runner receiving start commands)
  • Event ordering guarantees
  • Command acknowledgment logic
  • State persistence and recovery

11. Missing Migration Path

The file is named runner2.rs suggesting this runs alongside the old runner.rs.

Questions:

  • How will existing runner workflows migrate from runner to runner2?
  • Are both implementations expected to run concurrently?
  • Is there a feature flag or rollout plan?
  • Will runner.rs be deprecated?

Recommendation: Document the migration strategy

Minor Issues

12. Inconsistent Naming (lines 427-442)

LifecycleState::new() could use Default trait instead of a custom constructor since it just initializes with default values.

Recommendation:

impl Default for LifecycleState {
    fn default() -> Self { ... }
}

13. Magic Number (line 16)

const EVENT_ACK_BATCH_SIZE: i64 = 500;

Good use of a constant, but consider if this should be configurable via the config system like other thresholds.

Security Concerns

14. Unbounded Batch Size

While the batch size is limited to 1024 signals (line 77), each signal could contain variable-sized data (e.g., prepopulate_actor_names in init, metadata). This could potentially lead to memory issues if many large signals arrive.

Recommendation: Consider adding size-based limits in addition to count-based limits

15. Error Message Information Disclosure (lines 130-133)

bail!(
    "received message that should not be sent to runner workflow: {:?}",
    sig.inner
)

Logging the full message content in an error could potentially expose sensitive information.

Recommendation: Log message type only, not full content, or ensure sensitive fields are redacted

Performance Metrics

The batch processing approach should provide significant performance improvements:

  • Reduced syscalls: Batching reduces context switches
  • Better throughput: Processing 1024 signals vs 1 signal per iteration
  • Reduced activity overhead: Batching messages reduces workflow activity invocations

Recommendation: Add metrics to track:

  • Average batch sizes received
  • Processing latency per batch
  • Time spent waiting vs processing
  • Signal queue depths

Summary & Recommendations

Must Fix Before Merge

  1. Remove unused import vbare::OwnedVersionedData (line 11)
  2. Add division-by-zero safety for total_slots calculations (lines 620, 1057)
  3. Add comprehensive tests for batch signal processing behavior
  4. Document migration strategy from runner.rs to runner2.rs

Should Fix Before Merge

  1. Parallelize independent signal sends (allocation signals, lines 315-321)
  2. Review event processing ordering requirements - consider per-actor parallelization
  3. Resolve the commented-out event storage code - implement or remove
  4. Add performance metrics for batch processing

Consider for Future

  1. Make EVENT_ACK_BATCH_SIZE configurable
  2. Implement batch publishing if supported by pubsub system
  3. Add size-based limits in addition to count-based batch limits
  4. Use Default trait for LifecycleState

Conclusion

This is a well-architected improvement to the runner workflow that should significantly improve performance through batch processing. The code quality is generally good with clear separation of concerns and proper error handling. However, the lack of tests for such a critical component is concerning, and the migration strategy needs clarification. With the critical issues addressed, this will be a solid improvement to the system.

Great work on the batch processing implementation! The signal accumulation logic is particularly well done.

@MasterPtato MasterPtato force-pushed the 11-17-fix_pb_rewrite_runner_wf_to_handle_batch_signals branch from b180eca to 4eabd07 Compare November 18, 2025 23:50
@MasterPtato MasterPtato force-pushed the 11-17-fix_gas_fix_batch_listen_fix_history_for_graceful_signal_send_in_workflows branch from 0ca2fa2 to 321b2c3 Compare November 18, 2025 23:50
@claude
Copy link

claude bot commented Nov 18, 2025

Code Review

I've reviewed PR #3480 which rewrites the runner workflow to handle batch signals. Here's my detailed feedback:

Overview

This PR introduces a new runner2.rs workflow that processes signals in batches instead of one-by-one, which should significantly improve performance and reduce database transaction overhead. The changes update all references from runner to runner2 throughout the codebase.


🎯 Positive Changes

  1. Batch Signal Processing - The core improvement from listen_with_timeout to listen_n_with_timeout(runner_lost_threshold, 1024) is excellent for performance. Processing up to 1024 signals per iteration will dramatically reduce workflow state transitions.

  2. Consolidated Message Sending - The new SendMessagesToRunner activity (plural) batches multiple messages together instead of sending them individually, reducing pubsub overhead.

  3. Efficient Command/Event Batching - Events and commands are accumulated during signal processing and sent in a single activity call, which is much more efficient.

  4. Combined Activity InsertCommands - The new version combines inserting and acknowledging commands into a single activity, reducing state updates.


⚠️ Issues & Concerns

1. Missing Activity Removal in runner.rs

The old runner.rs still has the AckCommands activity (lines 797-806) which is no longer needed in runner2.rs. This suggests that runner.rs should eventually be deleted once runner2 is proven stable. Is there a migration plan?

Recommendation: Document the migration strategy in the PR description or add a TODO comment.

2. Potential Division by Zero

In runner2.rs:620:

let remaining_millislots = (remaining_slots * 1000) / input.total_slots;

If input.total_slots is 0, this will panic. While this is likely validated elsewhere, defensive programming suggests adding an assertion or early return.

Recommendation:

assert\!(input.total_slots > 0, "total_slots must be greater than 0");
let remaining_millislots = (remaining_slots * 1000) / input.total_slots;

3. Duplicate Division by Zero Risk

Same issue exists in runner2.rs:1051:

let new_remaining_millislots = (new_remaining_slots * 1000) / old_runner_alloc_key_data.total_slots;

4. Event Batching Logic Issue

In runner2.rs:113-117:

events.extend(
    new_events
        .into_iter()
        .filter(|event| event.index > state.last_event_idx),
);

The filtering uses state.last_event_idx which is only updated AFTER processing all signals. If multiple ToServerEvents signals arrive in the same batch with overlapping indices, duplicates could slip through.

Recommendation: Track the max event index during batch accumulation:

let mut max_event_idx = state.last_event_idx;
// ... in the loop:
events.extend(
    new_events
        .into_iter()
        .filter(|event| {
            if event.index > max_event_idx {
                max_event_idx = event.index;
                true
            } else {
                false
            }
        }),
);

5. Inconsistent Error Handling for Missing Actor Workflows

The code uses graceful_not_found() and logs a warning when actor workflows aren't found (lines 166-170, 250-255, etc.). This is good, but the warning messages are inconsistent:

  • Some say "likely already stopped"
  • The pattern is repeated multiple times

Recommendation: Extract to a helper function to ensure consistent logging and reduce duplication.

6. Unused Imports

Line 5 imports PROTOCOL_VERSION and versioned, but PROTOCOL_VERSION is only used in the send_messages_to_runner activity. Same in runner.rs. Minor issue but affects code cleanliness.

7. State Management During Batch Processing

The batch processing accumulates ack_last_command_idx using max() (line 122), which is correct. However, if the workflow crashes mid-batch, some signals might be lost. This is probably acceptable given workflow semantics, but worth documenting.


🔒 Security Considerations

  1. JSON Deserialization with unwrap_or_default (lines 752-755, 766-770): Using unwrap_or_default() for invalid JSON is reasonable, but consider logging when invalid metadata is encountered for debugging purposes.

  2. No apparent security vulnerabilities - The changes don't introduce new attack vectors.


🚀 Performance Considerations

Major Improvements:

  • Batching signals reduces workflow state flushes by up to 1024x
  • Consolidated message sending reduces pubsub calls
  • Combined command insert/ack reduces activity invocations

Potential Concerns:

  • Processing 1024 signals in memory could increase memory usage, but this should be negligible for the signal types involved
  • The batch size of 1024 seems reasonable, but might need tuning based on production metrics

🧪 Test Coverage

Missing: The PR doesn't include tests for the new batching behavior. Critical test cases:

  1. Multiple signals of the same type in one batch
  2. Mixed signal types in one batch
  3. Duplicate event indices across multiple signals in the same batch (see issue [SVC-2483] Remove hardcoded uses of rivet.gg #4)
  4. Batch with CommandStartActor to a draining runner
  5. Edge case: exactly 1024 signals vs 1025 signals

Recommendation: Add integration tests for runner2 before replacing runner.


📝 Code Quality & Style

  1. Good: Follows existing patterns and Rust conventions
  2. Good: Structured logging with tracing and proper use of ?x syntax
  3. Good: Comprehensive comments explaining complex logic
  4. Minor: The // events: Vec<EventRow> commented field (line 36) should probably be removed if events won't be stored

✅ Recommendations Summary

Critical:

  1. Fix potential duplicate event processing in batch accumulation (issue [SVC-2483] Remove hardcoded uses of rivet.gg #4)
  2. Add assertions for division by zero (issues [SVC-2479] Send cluster events to PostHog #2 & [SVC-2504] Fix 5 GB upload limit for local development from Cloudflare #3)
  3. Add test coverage for batch signal processing

Important:
4. Document migration plan from runner to runner2
5. Consider extracting repeated "actor not found" logging to a helper

Nice to have:
6. Log when invalid metadata is encountered
7. Remove commented events field if not needed
8. Clean up unused imports


📊 Overall Assessment

This is a well-designed performance optimization with significant benefits. The batch processing approach is sound and should dramatically reduce workflow overhead. However, the duplicate event filtering issue needs to be addressed before merging, and comprehensive tests are essential given the critical nature of the runner workflow.

Recommendation: Request Changes - Address the critical issues above, particularly the event batching logic and test coverage.

@claude
Copy link

claude bot commented Nov 18, 2025

PR Review: Rewrite Runner Workflow to Handle Batch Signals

Summary

This PR rewrites the pegboard runner workflow (runner2.rs) to batch-process signals instead of handling them one at a time. This is a significant performance optimization that should reduce database transaction overhead and improve throughput.


✅ Strengths

1. Performance Optimization

  • Batch signal processing: The key improvement is using listen_n_with_timeout() to fetch up to 1024 signals at once (line 76-78), versus the old listen_with_timeout() that processed one signal at a time
  • Reduced activity calls: By batching multiple commands/events, the workflow makes fewer database transactions and pubsub calls
  • Message batching: Multiple messages are accumulated and sent together (lines 324-330), reducing network overhead

2. Improved State Management

  • The batch processing logic cleanly separates signal collection (lines 94-183) from processing (lines 185-330)
  • State updates are properly managed within the loop

3. Code Quality

  • Clear separation of concerns with well-named variables (init, events, commands, check_queue)
  • Proper filtering of duplicate events (lines 113-117)
  • Comments explain non-obvious behavior (e.g., line 236: "should not be parallelized")

⚠️ Issues & Concerns

Critical: Potential Signal Ordering Issue

Location: Lines 101-183

Issue: When batching signals, the order of processing different signal types may not match the order they were received. The current code processes all signals in a single loop and separates them by type into different collections:

for signal in signals {
    match signal {
        Main::Forward(sig) => { /* collect into init/events/ack_last_command_idx */ }
        Main::Command(command) => { /* collect into commands */ }
        Main::CheckQueue(_) => { check_queue = true; }
        Main::Stop(sig) => { handle_stopping(...).await?; }
    }
}

Then processes them in this order:

  1. Init (lines 187-232)
  2. Events (lines 239-256)
  3. Commands (lines 280-303)
  4. Check queue (lines 305-322)

Problem: If a Command signal arrives before a Forward::ToServerInit signal in the batch, the command will be processed first (added to the commands vec), but conceptually init should happen first. Similarly, commands and events may be processed out of order relative to when they arrived.

Recommendation: Consider whether signal ordering matters for correctness. If it does, you may need to:

  • Process signals in arrival order, OR
  • Document why out-of-order processing is safe, OR
  • Ensure init is always processed first if present before processing other signals

Bug: Missing Return After handle_stopping

Location: Line 125 and Line 180

Issue: When ToServerStopping or Stop signal is received, handle_stopping() is called but the code continues to process other signals in the batch. This might not be the intended behavior.

protocol::ToServer::ToServerStopping => {
    handle_stopping(ctx, &input, state, false).await?;
    // No return or break here - continues processing other signals
}

Current behavior: If stopping signal appears in the middle of a batch, subsequent signals in that batch will still be processed.

Recommendation: Consider if you should break out of the signal processing loop when stopping is initiated, or document why continued processing is safe.


Minor: Inconsistent Activity Input Naming

Location: Line 325

Issue: The activity is called SendMessagesToRunnerInput (plural "Messages"), but previously it was SendMessageToRunnerInput (singular). The new version takes Vec<protocol::ToClient> which is appropriate for batching.

Finding: This is actually correct - the new version properly uses plural naming. Good work! ✅


Code Smell: Duplicate Init Handling

Location: Lines 105-109

Issue: The code only keeps the first init signal if multiple arrive:

protocol::ToServer::ToServerInit(init_sig) => {
    if init.is_none() {
        init = Some(init_sig);
        check_queue = true;
    }
}

Questions:

  • Is it possible to receive multiple init signals in a batch?
  • Should subsequent inits be logged as warnings?
  • Could this hide bugs where runners incorrectly send multiple inits?

Recommendation: Add a warning log if a duplicate init is received.


Performance: Sequential Event Processing

Location: Lines 239-256

Issue: Events are still processed sequentially with individual await calls for each actor workflow signal:

for event in &events {
    let actor_id = crate::utils::event_actor_id(&event.inner).to_string();
    let res = ctx
        .signal(crate::workflows::actor::Event { inner: event.inner.clone() })
        .to_workflow::<crate::workflows::actor::Workflow>()
        .tag("actor_id", &actor_id)
        .graceful_not_found()
        .send()
        .await?;
    // ...
}

Recommendation: Consider whether these signals could be sent in parallel (e.g., using futures::future::join_all) if they target different actors. The comment on line 236 says "should not be parallelized" but it's unclear if this applies to events targeting different actors or just events for the same actor.

Impact: If you have 100 events in a batch, this still makes 100 sequential network calls which may negate some of the batching benefits.


Minor: Magic Number in Event Ack Logic

Location: Lines 265-277

Issue: The batching logic for event acks is correct but could be clearer:

if last_event_idx > state.last_event_ack_idx.saturating_add(EVENT_ACK_BATCH_SIZE) {

Recommendation: Add a comment explaining why we batch event acks (to avoid sending an ack message for every single event).


🔒 Security Concerns

No major security issues identified. The changes are primarily performance-oriented and don't introduce new security vulnerabilities.


🧪 Test Coverage

Concerns:

  1. No tests visible in this PR - This is a significant rewrite of core functionality
  2. Critical scenarios to test:
    • Batch processing with mixed signal types
    • Signal ordering semantics
    • Handling stop/stopping signals mid-batch
    • Behavior when batch size limit (1024) is reached
    • Race conditions with draining state and incoming commands
    • Event deduplication logic

Recommendation: Add integration tests that verify:

  • Batches of various sizes are processed correctly
  • Commands are correctly indexed when batched
  • Events are properly deduplicated
  • Init signal handling with concurrent signals
  • Draining behavior with pending signals

📝 Additional Observations

Good Practices

  1. Proper use of graceful_not_found() for actor workflow signals
  2. Clear logging with structured fields (e.g., ?actor_id)
  3. Batching events acknowledgment to reduce message overhead
  4. Database transaction instrumentation with custom_instrument()

Deprecated Code

Line 456: evict_workflow_id: Option<Id> is marked as deprecated but still present. Consider removing if truly unused.


📊 Performance Impact

Expected improvements:

  • Reduced database transactions: Commands are batched into single activity calls
  • Fewer pubsub messages: Multiple ToClient messages sent together
  • Lower workflow execution overhead: Fewer state transitions

Potential concerns:

  • Latency: Waiting to batch signals adds latency (up to runner_lost_threshold duration)
  • Memory: Batching 1024 signals could increase memory usage, especially if events contain large payloads

✅ Final Verdict

This is a solid performance optimization with good implementation quality. The main concerns are:

  1. 🔴 Critical: Verify signal ordering semantics are correct
  2. 🟡 Important: Consider whether to break/return after stopping signals
  3. 🟡 Important: Add comprehensive tests for batch processing scenarios
  4. 🟢 Nice-to-have: Investigate parallelizing actor signals for different actors

Recommendation: Address the critical signal ordering question before merging. Consider adding tests to prevent regressions.


Great work on this optimization! The batch processing approach should significantly improve throughput for busy runners. 🚀

@claude
Copy link

claude bot commented Nov 19, 2025

PR Review: Rewrite Runner Workflow to Handle Batch Signals

I've reviewed this PR and have the following feedback:

Overview

This PR rewrites the runner workflow (creating runner2.rs) to handle batch signal processing instead of processing signals one at a time. This is a significant architectural improvement that should reduce workflow overhead and improve performance.


Strengths

  1. Batch Signal Processing: The switch from listen_with_timeout to listen_n_with_timeout (line 76-78) allows processing up to 1024 signals in a single loop iteration, which should significantly reduce workflow execution overhead.

  2. Message Batching: Multiple messages are now accumulated and sent together (line 185, 324-330), reducing the number of activity calls to SendMessagesToRunner.

  3. Efficient Event Acknowledgment: Events are still batched with the same EVENT_ACK_BATCH_SIZE logic but now benefit from the overall batching architecture (lines 264-277).

  4. Proper Signal Deduplication: Command acks are properly aggregated using max() (line 122) to handle multiple ack signals in the same batch.

  5. Consistent Error Handling: The workflow maintains the same error handling patterns and graceful degradation.


Potential Issues

Critical: State Management Concern

Location: process_init activity (line 740-801)

The process_init activity reads from state.last_event_idx and state.commands. The LifecycleState tracks last_event_idx separately from the activity State. When process_init returns missed commands, it compares against State.last_command_idx which might be out of sync with what was actually processed in the loop.

Recommendation: Verify state consistency between State and LifecycleState is maintained correctly.


Medium: Sequential Event Processing

Location: Lines 239-256

Events are still processed sequentially with the comment that they should not be parallelized. However, sending 1024 events one-by-one could become a bottleneck. Consider whether strict ordering is required for all events or if some could be parallelized.


Medium: Message Sending Pattern

Location: Lines 324-330, 1132-1150

The implementation sends messages in a loop without batching at the protocol level. Consider whether the protocol supports sending multiple messages in a single publish operation, or if this activity could publish in parallel.


Test Coverage

Are there tests that verify:

  1. Batch signal processing works correctly when receiving the maximum (1024) signals?
  2. State consistency between State and LifecycleState?
  3. Command/event deduplication works correctly in batch scenarios?
  4. The workflow correctly handles mixed signal types in a single batch?

Performance Considerations

Positive Impact:

  • Significant reduction in workflow loop iterations
  • Fewer activity executions for message sending
  • Better throughput for high-signal workloads

Questions:

  1. What is the expected performance improvement? Has this been load-tested?
  2. Are there any concerns about the 1024 batch size causing timeouts in extreme cases?
  3. Should there be metrics added to track batch sizes and processing times?

Migration Strategy

Since this creates runner2.rs alongside the existing runner.rs:

  • When will the old runner workflow be deprecated?
  • Is there a rollback plan if issues are discovered?
  • Are both versions expected to run concurrently during migration?

Overall, this is a solid architectural improvement that should significantly enhance performance. The main concern is ensuring state consistency is maintained correctly with the new batching approach.

@claude
Copy link

claude bot commented Nov 19, 2025

Code Review for PR #3480

I've reviewed this pull request which rewrites the runner workflow to handle batch signal processing.

Strengths & Improvements

  1. Batch Signal Processing: Switching from listen_with_timeout to listen_n_with_timeout processes up to 1024 signals in a single loop iteration - significant performance improvement.

  2. Batch Message Sending: Messages are accumulated and sent together via SendMessagesToRunnerInput, reducing activity calls.

  3. Improved Command/Event Batching: Events and commands are accumulated and processed together, reducing database overhead.

  4. Better State Management: Command acknowledgment logic is cleaner - InsertCommandsInput handles both insertions and acknowledgments in a single activity.

Potential Issues

1. Incomplete File (Critical)

The file appears truncated at line 1176. Line 944 shows incomplete struct name. Please verify full file was committed.

2. Backward Compatibility

SendMessageToRunnerInput (singular) replaced by SendMessagesToRunnerInput (plural) - could break workflow replays. Need migration strategy.

3. Duplicate Code

Both runner.rs and runner2.rs exist with nearly identical code - maintenance burden. Need clear deprecation plan.

4. Sequential Actor Signal Processing

Lines 239-256 send signals sequentially. Could become bottleneck under high event volume. Consider adding telemetry.

5. Missing Validation

Line 122 uses .max() for ack indices - silently handles out-of-order acks. Consider logging these cases.

Code Quality

  • Excellent adherence to logging conventions (structured logging, lowercase messages)
  • Proper error handling with custom error system
  • Code style follows project standards

Testing

Missing:

  • Tests for batch signal processing
  • Performance benchmarks
  • Migration tests for in-flight workflows

Action Items

Must Fix:

  1. Complete truncated file

Should Address:
2. Migration plan for runner.rs to runner2.rs transition
3. Integration tests for batch processing

Nice to Have:
4. Make batch size (1024) configurable
5. Add metrics for batch sizes
6. Deprecation timeline documentation

Overall

Solid performance improvement with good code quality. Main concerns: incomplete file, dual-workflow situation, missing tests. Once addressed, should be ready to merge.


Review generated by Claude Code

@MasterPtato MasterPtato force-pushed the 11-17-fix_gas_fix_batch_listen_fix_history_for_graceful_signal_send_in_workflows branch from aa9ef2a to dfd167c Compare November 20, 2025 03:07
@MasterPtato MasterPtato marked this pull request as ready for review November 26, 2025 23:36
@MasterPtato MasterPtato force-pushed the 11-17-fix_gas_fix_batch_listen_fix_history_for_graceful_signal_send_in_workflows branch from c35fe5e to d01b6fc Compare November 27, 2025 00:52
@MasterPtato MasterPtato force-pushed the 11-17-fix_pb_rewrite_runner_wf_to_handle_batch_signals branch from f7de5c8 to 6eecd1a Compare November 27, 2025 00:52
@claude
Copy link

claude bot commented Nov 27, 2025

Code Review: Runner Workflow Batch Signal Handling

Overview

This PR rewrites the runner workflow to handle signals in batches (up to 1024 at once) rather than one-at-a-time, which should significantly improve throughput and reduce workflow activity overhead.

Strengths

  • Batch processing using listen_n_with_timeout is well-designed
  • Signal batching logic correctly accumulates events/commands before processing
  • Message batching reduces pubsub overhead
  • Proper event filtering by index to avoid duplicates
  • Consistent structure with original workflow

Critical Issues

1. Protocol Type Mismatch (HIGH PRIORITY)
The new workflow uses protocol::ToClient while the original uses protocol::ToRunner. Verify this is intentional - it could cause serialization failures.

2. Return Type Change
InsertCommands now returns Result<Option> instead of Result. Verified the None case is handled correctly at call sites.

Potential Issues

3. Event Processing Still Sequential
Despite batching reception, events are forwarded to actors sequentially (lines 239-256). Comment says this is for ordering. Question: Is ordering per-actor or global? If per-actor, could parallelize across actors.

4. Command Cloning
Commands are cloned at line 283 for activity input. Performance impact likely negligible vs batching gains.

Security

  • No SQL injection risk (typed keys)
  • Input validated (event index filtering)
  • Resource exhaustion prevented (1024 signal cap)

Performance

  • Batch size (1024) is reasonable, consider making configurable
  • Event ack batching (500) reduces overhead
  • Message serialization still in loop - could batch if needed

Test Coverage

CONCERN: No test changes included. This is a significant rewrite - tests should cover:

  • Batch signal processing
  • Init deduplication
  • Command ack batching
  • Event filtering
  • Draining with pending signals

Action Items

  1. CRITICAL: Verify protocol::ToClient vs ToRunner is correct
  2. HIGH: Add test coverage for batch scenarios
  3. MEDIUM: Make batch size configurable
  4. LOW: Document event ordering requirements

Overall Assessment

Code Quality: 4/5
Correctness: 4/5 (pending protocol verification)
Performance: 5/5 (excellent improvement)
Test Coverage: 2/5 (needs tests)

Recommendation: Approve with changes - Verify protocol type and add tests before merging.


Generated by Claude Code

@MasterPtato MasterPtato force-pushed the 11-17-fix_pb_rewrite_runner_wf_to_handle_batch_signals branch from 6eecd1a to 8c26dca Compare November 27, 2025 00:58
@MasterPtato MasterPtato force-pushed the 11-17-fix_gas_fix_batch_listen_fix_history_for_graceful_signal_send_in_workflows branch from d01b6fc to 5a41109 Compare November 27, 2025 00:58
@claude
Copy link

claude bot commented Nov 27, 2025

PR Review: Rewrite Runner Workflow to Handle Batch Signals

Summary

This PR introduces runner2.rs as a rewrite of the Pegboard runner workflow to handle signals in batches rather than one at a time. This is a significant architectural improvement that should reduce workflow execution overhead and improve throughput.


Key Improvements

1. Batch Signal Processing

The main workflow loop now uses listen_n_with_timeout() instead of listen_with_timeout() to fetch up to 1024 signals at once (line 77). This is excellent for performance and reduces workflow churn.

2. Batch Message Sending

Messages to runners are now batched in a Vec<protocol::ToClient> and sent all at once via SendMessagesToRunnerInput (lines 324-330), replacing the old SendMessageToRunnerInput approach. This reduces activity overhead significantly.

3. Signal Aggregation Logic

The code intelligently aggregates signals during batch processing:

  • Multiple Forward signals are collected (lines 103-135)
  • Commands are batched together (lines 137-174)
  • Event acknowledgments use max() to find the highest index (line 122)

Potential Issues

Critical

1. Division by Zero Risk (Line 620)

let remaining_millislots = (remaining_slots * 1000) / input.total_slots;

If input.total_slots is 0, this will panic. Add validation before division.

2. Potential Integer Division Issues (Line 1056)

let new_remaining_millislots = (new_remaining_slots * 1000) / old_runner_alloc_key_data.total_slots;

Same division by zero concern applies here.

Medium Priority

3. Multiple Init Handling (Lines 106-109)
The code silently ignores subsequent init signals in a batch. While this is probably correct behavior, consider logging a warning if multiple init signals are received.

4. Event Cloning (Line 243)
Events are cloned for each actor signal. If event.inner is large, this could be expensive. Consider if you can use references or Arc to avoid cloning.

5. Sequential Signal Sending (Lines 239-256)
The comment says signals should be sent in order, but this means not parallelizing sends. This could become a bottleneck with many events in a batch.

Low Priority

6. TODO Comments

  • Line 667: "TODO: Combine into a single udb txn"
  • Lines 819-825: Events storage is disabled

7. Magic Number (Line 77)
The batch size of 1024 should be a named constant like EVENT_ACK_BATCH_SIZE.

8. Command Clone (Lines 173, 283-284)
Commands are cloned when passing to the activity. Consider if you can move instead of clone.


Good Practices

  1. Structured logging - Consistently uses tracing with structured fields
  2. Graceful handling - Uses graceful_not_found() when signaling actors that may have stopped
  3. Transaction instrumentation - All database transactions have custom spans
  4. Clear comments - Good inline documentation explaining logic
  5. Consistent error handling - Uses Result<T> and ? operator throughout
  6. State management - Properly separates workflow State and loop LifecycleState

Security Considerations

  1. JSON Parsing - Lines 752-755 and 766-769 use unwrap_or_default() when parsing metadata JSON, which is safe
  2. No SQL injection risks - Uses typed database keys throughout
  3. Resource exhaustion - Batch size is capped at 1024, preventing unbounded memory growth

Testing Recommendations

  1. Test batch signal processing - Verify behavior when receiving 1024+ signals
  2. Test duplicate event handling - Ensure events with duplicate indices are handled correctly
  3. Test division edge cases - Verify behavior when total_slots is 0 or 1
  4. Test init signal duplicates - Verify behavior with multiple init signals in same batch
  5. Load testing - Measure performance improvement vs. old runner workflow

Performance Considerations

Positive:

  • Batch processing reduces workflow wake-ups
  • Batched message sending reduces activity overhead
  • Should significantly improve throughput for high-signal-volume runners

Watch out for:

  • Large batches could increase latency for individual signals (acceptable tradeoff)
  • Memory usage scales with batch size × signal size
  • Sequential event forwarding could be a bottleneck with many actors

Migration Notes

The PR correctly updates all references from workflows::runner to workflows::runner2 in:

  • pegboard-runner/src/conn.rs
  • pegboard-runner/src/ping_task.rs
  • pegboard-runner/src/ws_to_tunnel_task.rs
  • pegboard/src/workflows/actor/mod.rs
  • pegboard/src/workflows/actor/runtime.rs

Question: Is there a migration plan to eventually remove the old runner.rs workflow, or will both coexist?


Recommendation

Approve with minor changes requested:

  1. Must fix: Add division-by-zero protection (line 620, 1056)
  2. Should fix: Extract magic number 1024 to named constant
  3. Consider: Add warning log for duplicate init signals
  4. Consider: Review if event cloning can be optimized

Overall this is a well-structured improvement to the runner workflow. The batch processing approach is sound and should provide significant performance benefits. The code follows project conventions and has good error handling.

Great work on this refactor!

@MasterPtato MasterPtato force-pushed the 11-17-fix_pb_rewrite_runner_wf_to_handle_batch_signals branch from 8c26dca to 9e20345 Compare December 2, 2025 19:10
@MasterPtato MasterPtato force-pushed the 11-17-fix_gas_fix_batch_listen_fix_history_for_graceful_signal_send_in_workflows branch from 5a41109 to 5b7c52f Compare December 2, 2025 19:10
@claude
Copy link

claude bot commented Dec 2, 2025

PR Review: Rewrite Runner Workflow to Handle Batch Signals

Summary

This PR rewrites the Pegboard runner workflow to handle batch signal processing instead of processing signals one at a time. The main change is the introduction of runner2.rs (1176 new lines) with batch signal handling via listen_n_with_timeout, while the original runner.rs uses listen_with_timeout.

Architecture & Design

Strengths:

  • Batch Processing: The new approach uses listen_n_with_timeout to collect up to 1024 signals at once, which should significantly improve throughput under high load
  • Message Coalescing: Multiple messages are batched into a single SendMessagesToRunner call (runner2.rs:324-330), reducing pubsub overhead
  • Efficient Command/Event Handling: Commands and events are collected during signal processing and sent in batches

Concerns:

  • Parallel Deployment Risk: Both runner.rs and runner2.rs coexist in the codebase with all callsites updated to use runner2. This suggests a migration strategy, but there's no deprecation marker or TODO for removing the old version
  • Signal Protocol Change: The ToRunner protocol type is replaced with ToClient in runner2. Verify this is intentional and backward compatible

Code Quality

Good Practices:

  • Structured logging follows conventions (e.g., runner2.rs:148-150)
  • Proper error handling with context
  • Clear separation of concerns with activity functions
  • Transaction instrumentation with custom spans

Issues Found:

  1. Type Inconsistency (runner2.rs:205-210):

    • Uses protocol::ToClient::ToClientInit directly
    • Original uses protocol::ToRunner::ToClientInit (runner.rs:100)
    • This might be a protocol version change, but needs verification
  2. Activity Signature Change (runner2.rs:1127-1149):

    • runner2 takes Vec protocol::ToClient
    • runner takes single protocol::ToRunner
    • Plural vs singular naming is good but protocol type difference needs clarification
  3. Command Indexing Logic (runner2.rs:281-303):

    • The InsertCommandsInput now takes ack_last_command_idx parameter
    • Returns Option i64 instead of i64
    • More complex logic that combines insert and ack operations
    • Potential Issue: The batch_start_index calculation depends on both insert and ack happening atomically
  4. Removed Activity (runner.rs:797-806):

    • The AckCommands activity exists in runner.rs but is absent in runner2.rs
    • Command acknowledgment is now combined with insertion in InsertCommands activity
    • This is more efficient but changes the activity signature

@claude
Copy link

claude bot commented Dec 2, 2025

Performance Considerations

Improvements:

  • Batch signal processing reduces workflow loop iterations
  • Message coalescing reduces network calls
  • Single activity call for multiple messages vs. multiple calls in original

Considerations:

  • Large batch sizes (1024) could increase memory usage
  • Event filtering happens twice: once in collection (runner2.rs:113-117) and potentially in InsertEvents
  • The EVENT_ACK_BATCH_SIZE = 500 means events are acked less frequently, which could impact runner memory if it buffers unacked events

Potential Bugs

  1. Init Packet Handling (runner2.rs:106-109):

    • Only the first init packet in a batch is processed
    • Multiple inits shouldn't happen, but no error/warning is logged if they do
  2. Command Acknowledgment Race (runner2.rs:122):

    • Multiple AckCommands signals in a batch are correctly handled with max
    • However, if commands are added and acked in the same batch, the ordering needs verification

Security Concerns

No Critical Issues:

  • Proper input validation on event/command indices
  • Database transactions use appropriate isolation levels
  • Conflict resolution with ConflictRangeType::Read is preserved

Test Coverage

Missing Information:

  • No test files in the PR
  • No indication of how batch signal handling is tested
  • The PR title mentions fix but there's no linked issue or bug description

Migration Strategy

Questions:

  • Why keep both runner.rs and runner2.rs?
  • Is this meant to be a gradual rollout with ability to rollback?
  • When will runner.rs be removed?
  • Are there any feature flags controlling which version is used?

Recommendations

  1. Add Migration Documentation: Document why both workflows exist and the deprecation timeline for runner.rs

  2. Protocol Type Clarification (runner2.rs:205): Verify that ToClient vs ToRunner is intentional and add a comment explaining the protocol change if this is breaking

  3. Add Warning for Multiple Inits (runner2.rs:106): Log a warning if multiple init packets are received in a single batch

  4. Consider Extracting Batch Logic: The signal processing loop (runner2.rs:101-183) is dense. Consider extracting into helper functions

  5. Add Metrics: Consider adding metrics for average batch size, batch processing latency, and number of coalesced messages

Verdict

Approve with Recommendations

The rewrite achieves its goal of batch signal processing, which should improve performance under load. The code quality is good overall and follows repository conventions. The main concerns are:

  1. Clarify the migration strategy and timeline
  2. Verify protocol type changes are intentional
  3. Add defensive checks for edge cases
  4. Consider adding metrics to validate performance improvements

The changes are well-structured and the batch processing approach is sound. However, more context around the migration strategy would be helpful for reviewers and future maintainers.


Review generated by Claude Code

Copy link
Member

@NathanFlurry NathanFlurry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runner wf could benefit from more comments explaining the parallel flow with the actor wf. hard to undertsand unless you have context that there's 2 flows for queued & non-queued.

might make more sense upstack, feel free to disregard pr if so

@MasterPtato MasterPtato dismissed NathanFlurry’s stale review December 3, 2025 00:17

Fixed in 12-02-fix_serverless_misc_pr_fixes

@graphite-app
Copy link
Contributor

graphite-app bot commented Dec 3, 2025

Merge activity

  • Dec 3, 12:33 AM UTC: NathanFlurry added this pull request to the Graphite merge queue.
  • Dec 3, 12:34 AM UTC: CI is running for this pull request on a draft pull request (#3577) due to your merge queue CI optimization settings.
  • Dec 3, 12:34 AM UTC: Merged by the Graphite merge queue via draft PR: #3577.

@graphite-app graphite-app bot closed this Dec 3, 2025
@graphite-app graphite-app bot deleted the 11-17-fix_pb_rewrite_runner_wf_to_handle_batch_signals branch December 3, 2025 00:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants