Skip to content

fix: enable json output for workflows and update logs#238

Open
vishnurk6247 wants to merge 1 commit intodevelopfrom
fix/workflow-inference
Open

fix: enable json output for workflows and update logs#238
vishnurk6247 wants to merge 1 commit intodevelopfrom
fix/workflow-inference

Conversation

@vishnurk6247
Copy link
Member

@vishnurk6247 vishnurk6247 commented Mar 6, 2026

Summary by CodeRabbit

Release Notes

  • New Features

    • Added configurable JSON output option for workflow inferences with a new toggle switch to enable JSON-formatted output
  • Improvements

    • Refined real-time event toggle controls with enhanced UI components
    • Improved error handling for SSE workflow execution with better feedback on authentication, permissions, and server errors

@coderabbitai
Copy link

coderabbitai bot commented Mar 6, 2026

📝 Walkthrough

Walkthrough

The changes introduce a JSON output feature for workflow inferences controlled by a UI toggle, refactor AbortController management to prevent state-related re-triggering, upgrade UI components in ChatBot, and simplify input extraction in the message processor.

Changes

Cohort / File(s) Summary
JSON Output Feature
wavefront/client/src/api/workflow-service.ts, wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx
Added outputJsonEnabled parameter to runInference method and propagated it through both standard and SSE inference paths; updated UI with Label + Switch control to toggle the feature.
UI Component Migration
wavefront/client/src/components/ChatBot.tsx
Replaced custom button-based toggle with standardized Label + Switch components for the real-time events control; changed default showLogic state to true.
AbortController Refactoring
wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx
Introduced abortControllerRef to manage SSE and inference cleanup without triggering effects on every state change; ensures proper cleanup on unmount and before new inferences.
Server-side Workflow Processing
wavefront/server/modules/agents_module/.../workflow_controller.py
Demoted input logging from info to debug level in workflow_inference_v2.
Input Extraction Simplification
wavefront/server/modules/plugins_module/.../message_processor_controller.py
Changed required_inputs extraction to use safe .get() method instead of direct key access; simplified execution_inputs construction to directly reference payload data.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Poem

🐰 A toggle for JSON output so fine,
Components switched with a Switch divine,
AbortController won't cause a fuss,
The workflow inference flows smooth for us! ✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately captures the main changes: enabling JSON output for workflows via the new outputJsonEnabled parameter and updating logging levels in the workflow controller.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/workflow-inference

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
wavefront/server/modules/agents_module/agents_module/controllers/workflow_controller.py (1)

294-294: Appropriate log level reduction, but inconsistent with v1 endpoint.

Changing to debug level for workflow inputs is sensible—inputs may contain sensitive user data that shouldn't appear in production info logs. However, the v1 endpoint at line 85 still logs inputs at info level:

logger.info(f'Inputs to workflow: {resolved_inputs}')  # Line 85

Consider applying the same change to workflow_inference (v1) for consistency and to avoid potentially logging sensitive data in both endpoints.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/agents_module/agents_module/controllers/workflow_controller.py`
at line 294, The v1 endpoint still logs resolved_inputs at info level; update
the logging in the workflow_inference function to use logger.debug like the v2
endpoint (where logger.debug(f'Inputs to workflow: {resolved_inputs}') is used)
so both endpoints consistently avoid logging potentially sensitive input data at
info level—locate the logger.info call that prints resolved_inputs in
workflow_inference and change it to logger.debug.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@wavefront/client/src/pages/apps/`[appId]/workflows/[id].tsx:
- Around line 467-468: The cleanup currently unconditionally sets
abortControllerRef.current = null and can clear a newer controller from a
concurrent request; change cleanup (the finally block that runs after the
fetch/stream) to only clear the ref when the stored controller === the local
controller instance created by const controller = new AbortController() (the one
assigned to abortControllerRef.current), i.e. add an identity guard so cleanup
does: if (abortControllerRef.current === controller) abortControllerRef.current
= null; This ensures request A cannot clear request B's active controller.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/message_processor_controller.py`:
- Line 304: The controller currently assigns execution_inputs =
payload.input_data and forwards the entire dict to Hermes; change this to only
include keys declared by the processor's input schema (e.g., intersect
payload.input_data with the processor schema fields) before handing off to
Hermes. Locate the assignment in message_processor_controller.py
(execution_inputs = payload.input_data) and replace it with a filtered map
constructed from the processor's declared inputs (use the processor/schema
metadata available in the controller or call the existing sanitizer/validator
helper in message_processor_service such as the functions used around
process_message/execute_* that currently accept payloads), ensuring the same
sanitized object is what message_processor_service (the functions referenced in
message_processor_service.py) receives. Ensure unknown client-supplied keys are
dropped, and preserve validation logic by reusing the service's schema
validation routine rather than duplicating validation code.
- Line 294: The current assignment required_inputs = inputs.get('required') or
[] silently coerces falsy/malformed values to an empty list and skips
validation; instead, explicitly validate that inputs.get('required') (the
input_schema.required value) is either None or a list and raise/return a
validation error when it is present but not a list. Update the logic around
required_inputs in message_processor_controller (the code that reads
inputs.get('required')) to: fetch the raw value, if it's None treat as [], if
it's not a list raise a clear error (or mark the processor YAML invalid) so
non-list values like null or empty string are rejected rather than normalized
away.

---

Nitpick comments:
In
`@wavefront/server/modules/agents_module/agents_module/controllers/workflow_controller.py`:
- Line 294: The v1 endpoint still logs resolved_inputs at info level; update the
logging in the workflow_inference function to use logger.debug like the v2
endpoint (where logger.debug(f'Inputs to workflow: {resolved_inputs}') is used)
so both endpoints consistently avoid logging potentially sensitive input data at
info level—locate the logger.info call that prints resolved_inputs in
workflow_inference and change it to logger.debug.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9a64f073-b28c-465e-85b1-636016a4ee40

📥 Commits

Reviewing files that changed from the base of the PR and between 7e37975 and fd4504f.

⛔ Files ignored due to path filters (1)
  • wavefront/server/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • wavefront/client/src/api/workflow-service.ts
  • wavefront/client/src/components/ChatBot.tsx
  • wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx
  • wavefront/server/modules/agents_module/agents_module/controllers/workflow_controller.py
  • wavefront/server/modules/plugins_module/plugins_module/controllers/message_processor_controller.py

Comment on lines 467 to +468
const controller = new AbortController();
setAbortController(controller);
abortControllerRef.current = controller;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -n -C4 'abortControllerRef\.current|new AbortController\(|function cleanup' \
  'wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx'

Repository: rootflo/wavefront

Length of output: 1392


🏁 Script executed:

#!/bin/bash
# Get the full handleSSEInference function to understand the async flow
sed -n '341,660p' 'wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx' | cat -n

Repository: rootflo/wavefront

Length of output: 12844


🏁 Script executed:

#!/bin/bash
# Get context around line 334-336 to see if same pattern applies
sed -n '330,340p' 'wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx' | cat -n

Repository: rootflo/wavefront

Length of output: 399


Add identity guard to prevent stale cleanup from clearing the active controller.

cleanup() unconditionally nulls abortControllerRef.current at line 310. If request A is aborted and request B stores a new controller before A's finally block runs, A's cleanup will clear B's ref, making the active stream unabortable and causing resource leaks.

Proposed fix
  const handleSSEInference = async (
    inputs: string | Array<{ role: 'user' | 'assistant'; content: ChatMessageContent }>,
    variables: Record<string, unknown>
  ) => {
    if (!id) return;
+   let controller: AbortController | null = null;

    try {
      setIsStreaming(true);
      setStreamingEvents([]); // Clear previous events immediately

-     const controller = new AbortController();
+     controller = new AbortController();
      abortControllerRef.current = controller;

    // ... rest of function ...

    function cleanup() {
      setRunningInference(false);
      setIsStreaming(false);
-     abortControllerRef.current = null;
+     if (abortControllerRef.current === controller) {
+       abortControllerRef.current = null;
+     }
    }
  };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const controller = new AbortController();
setAbortController(controller);
abortControllerRef.current = controller;
const handleSSEInference = async (
inputs: string | Array<{ role: 'user' | 'assistant'; content: ChatMessageContent }>,
variables: Record<string, unknown>
) => {
if (!id) return;
let controller: AbortController | null = null;
try {
setIsStreaming(true);
setStreamingEvents([]); // Clear previous events immediately
controller = new AbortController();
abortControllerRef.current = controller;
// ... rest of function ...
function cleanup() {
setRunningInference(false);
setIsStreaming(false);
if (abortControllerRef.current === controller) {
abortControllerRef.current = null;
}
}
};
};
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/client/src/pages/apps/`[appId]/workflows/[id].tsx around lines 467
- 468, The cleanup currently unconditionally sets abortControllerRef.current =
null and can clear a newer controller from a concurrent request; change cleanup
(the finally block that runs after the fetch/stream) to only clear the ref when
the stored controller === the local controller instance created by const
controller = new AbortController() (the one assigned to
abortControllerRef.current), i.e. add an identity guard so cleanup does: if
(abortControllerRef.current === controller) abortControllerRef.current = null;
This ensures request A cannot clear request B's active controller.

)

required_inputs = inputs['required']
required_inputs = inputs.get('required') or []
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate input_schema.required instead of coercing it.

Line 294 now treats required: null, required: "", and other falsy malformed values as [], so bad processor YAML skips required-input validation and executes anyway. Reject non-list values explicitly here instead of silently normalizing them.

Proposed fix
-    required_inputs = inputs.get('required') or []
+    required_inputs = inputs.get('required')
+    if not isinstance(required_inputs, list):
+        return JSONResponse(
+            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+            content=response_formatter.buildErrorResponse(
+                "Invalid processor YAML: input_schema.required must be a list"
+            ),
+        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/message_processor_controller.py`
at line 294, The current assignment required_inputs = inputs.get('required') or
[] silently coerces falsy/malformed values to an empty list and skips
validation; instead, explicitly validate that inputs.get('required') (the
input_schema.required value) is either None or a list and raise/return a
validation error when it is present but not a list. Update the logic around
required_inputs in message_processor_controller (the code that reads
inputs.get('required')) to: fetch the raw value, if it's None treat as [], if
it's not a list raise a clear error (or mark the processor YAML invalid) so
non-list values like null or empty string are rejected rather than normalized
away.

),
)
execution_inputs[input] = payload.input_data[input]
execution_inputs = payload.input_data
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't pass undeclared inputs straight to Hermes.

Line 304 now forwards the entire payload.input_data dict. In wavefront/server/modules/plugins_module/plugins_module/services/message_processor_service.py:180-186 and wavefront/server/modules/plugins_module/plugins_module/services/message_processor_service.py:56-75, that dict is passed to Hermes unchanged, so extra client-supplied keys bypass the processor schema entirely. Please filter payload.input_data to schema-declared fields before execution.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/message_processor_controller.py`
at line 304, The controller currently assigns execution_inputs =
payload.input_data and forwards the entire dict to Hermes; change this to only
include keys declared by the processor's input schema (e.g., intersect
payload.input_data with the processor schema fields) before handing off to
Hermes. Locate the assignment in message_processor_controller.py
(execution_inputs = payload.input_data) and replace it with a filtered map
constructed from the processor's declared inputs (use the processor/schema
metadata available in the controller or call the existing sanitizer/validator
helper in message_processor_service such as the functions used around
process_message/execute_* that currently accept payloads), ensuring the same
sanitized object is what message_processor_service (the functions referenced in
message_processor_service.py) receives. Ensure unknown client-supplied keys are
dropped, and preserve validation logic by reusing the service's schema
validation routine rather than duplicating validation code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant