Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions wavefront/client/src/api/workflow-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ export class WorkflowService {
async runInference(
id: string,
inputs: string | unknown[],
variables: Record<string, unknown> = {}
variables: Record<string, unknown> = {},
outputJsonEnabled: boolean = false
): Promise<WorkflowInferenceResponse> {
const requestBody: Record<string, unknown> = {
inputs,
variables,
output_json_enabled: false,
output_json_enabled: outputJsonEnabled,
};

const response: IApiResponse<WorkflowInferenceData> = await this.http.post(
Expand Down
29 changes: 11 additions & 18 deletions wavefront/client/src/components/ChatBot.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { Button } from '@app/components/ui/button';
import { Label } from '@app/components/ui/label';
import { Switch } from '@app/components/ui/switch';
import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from '@app/components/ui/select';
import { Spinner } from '@app/components/ui/spinner';
import { Textarea } from '@app/components/ui/textarea';
Expand Down Expand Up @@ -102,7 +104,7 @@ const ChatBot = ({
isModelSwitchEnabled = true,
}: ChatBotProps) => {
const variablesModalRef = useRef<HTMLDivElement>(null);
const [showLogic, setShowLogic] = useState(false);
const [showLogic, setShowLogic] = useState(true);
const [selectValue, setSelectValue] = useState<string>('');

return (
Expand All @@ -111,23 +113,14 @@ const ChatBot = ({
{listenEventsEnabled !== undefined && setListenEventsEnabled !== undefined && (
<div>
<div className="flex items-center justify-between">
<label className="block text-sm font-medium text-gray-700">Real-time Events</label>
<button
type="button"
onClick={() => setListenEventsEnabled(!listenEventsEnabled)}
className={`relative inline-flex h-6 w-11 shrink-0 cursor-pointer rounded-full border-2 border-transparent transition-colors duration-200 ease-in-out focus:ring-2 focus:ring-blue-500 focus:ring-offset-2 focus:outline-none ${
listenEventsEnabled ? 'bg-blue-600' : 'bg-gray-200'
}`}
role="switch"
aria-checked={listenEventsEnabled}
aria-label="Toggle real-time events"
>
<span
className={`pointer-events-none inline-block h-5 w-5 transform rounded-full bg-white shadow ring-0 transition duration-200 ease-in-out ${
listenEventsEnabled ? 'translate-x-5' : 'translate-x-0'
}`}
/>
</button>
<Label htmlFor="realtime-events-toggle" className="text-sm font-medium text-gray-700">
Real-time Events
</Label>
<Switch
id="realtime-events-toggle"
checked={listenEventsEnabled}
onCheckedChange={setListenEventsEnabled}
/>
</div>
<p className="mt-1 text-xs text-gray-500">
{listenEventsEnabled ? 'Stream real-time workflow execution events' : 'Standard inference response only'}
Expand Down
37 changes: 22 additions & 15 deletions wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import floConsoleService from '@app/api';
import ChatBot from '@app/components/ChatBot';
import { Button } from '@app/components/ui/button';
import { Label } from '@app/components/ui/label';
import { Switch } from '@app/components/ui/switch';
import { Dialog, DialogContent, DialogFooter, DialogHeader, DialogTitle } from '@app/components/ui/dialog';
import { appEnv } from '@app/config/env';
import { useNotifyStore } from '@app/store';
Expand Down Expand Up @@ -61,17 +63,17 @@ const WorkflowDetail: React.FC = () => {
}>
>([]);
const [uploadingDocument, setUploadingDocument] = useState(false);

// JSON output state
const [outputJsonEnabled, setOutputJsonEnabled] = useState(false);

// SSE state
const [listenEventsEnabled, setListenEventsEnabled] = useState<boolean>(false);
const [abortController, setAbortController] = useState<AbortController | null>(null);
const [streamingEvents, setStreamingEvents] = useState<WorkflowEvent[]>([]);
const [isStreaming, setIsStreaming] = useState<boolean>(false);

// Ref for auto-scrolling events container
const eventsContainerRef = useRef<HTMLDivElement>(null);
// Ref for current abort controller - used so unmount cleanup doesn't depend on state
const abortControllerRef = useRef<AbortController | null>(null);

const loadWorkflow = useCallback(async () => {
if (!id) return;
Expand Down Expand Up @@ -187,15 +189,13 @@ const WorkflowDetail: React.FC = () => {
});
}, []);

// Cleanup fetchEventSource on unmount and when switching between SSE/normal inference
// Cleanup SSE request only on unmount (avoid aborting when abortController state changes)
useEffect(() => {
return () => {
if (abortController) {
abortController.abort();
setAbortController(null);
}
abortControllerRef.current?.abort();
abortControllerRef.current = null;
};
}, [abortController]);
}, []);
const handleDocumentUpload = useCallback(
(event: React.ChangeEvent<HTMLInputElement>) => {
const files = event.target.files;
Expand Down Expand Up @@ -331,9 +331,9 @@ const WorkflowDetail: React.FC = () => {
setRunningInference(true);

// Abort any existing fetchEventSource
if (abortController) {
abortController.abort();
setAbortController(null);
if (abortControllerRef.current) {
abortControllerRef.current.abort();
abortControllerRef.current = null;
}

// Clear previous results
Expand Down Expand Up @@ -421,7 +421,7 @@ const WorkflowDetail: React.FC = () => {
await handleSSEInference(inputs, variables);
} else {
// Handle normal inference
const result = await floConsoleService.workflowService.runInference(id, inputs, variables);
const result = await floConsoleService.workflowService.runInference(id, inputs, variables, outputJsonEnabled);
const resultContent = result.data?.data?.data?.result;
if (resultContent) {
setChatHistory((prev) => [...prev, { role: 'assistant', content: resultContent }]);
Expand Down Expand Up @@ -458,13 +458,14 @@ const WorkflowDetail: React.FC = () => {
inputs,
variables,
listen_events: true,
output_json_enabled: outputJsonEnabled,
};

const url = `${baseUrl}/v1/${appId}/floware/v2/workflows/${id}/inference`;

// Create abort controller for cleanup
const controller = new AbortController();
setAbortController(controller);
abortControllerRef.current = controller;
Comment on lines 467 to +468
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -n -C4 'abortControllerRef\.current|new AbortController\(|function cleanup' \
  'wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx'

Repository: rootflo/wavefront

Length of output: 1392


🏁 Script executed:

#!/bin/bash
# Get the full handleSSEInference function to understand the async flow
sed -n '341,660p' 'wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx' | cat -n

Repository: rootflo/wavefront

Length of output: 12844


🏁 Script executed:

#!/bin/bash
# Get context around line 334-336 to see if same pattern applies
sed -n '330,340p' 'wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx' | cat -n

Repository: rootflo/wavefront

Length of output: 399


Add identity guard to prevent stale cleanup from clearing the active controller.

cleanup() unconditionally nulls abortControllerRef.current at line 310. If request A is aborted and request B stores a new controller before A's finally block runs, A's cleanup will clear B's ref, making the active stream unabortable and causing resource leaks.

Proposed fix
  const handleSSEInference = async (
    inputs: string | Array<{ role: 'user' | 'assistant'; content: ChatMessageContent }>,
    variables: Record<string, unknown>
  ) => {
    if (!id) return;
+   let controller: AbortController | null = null;

    try {
      setIsStreaming(true);
      setStreamingEvents([]); // Clear previous events immediately

-     const controller = new AbortController();
+     controller = new AbortController();
      abortControllerRef.current = controller;

    // ... rest of function ...

    function cleanup() {
      setRunningInference(false);
      setIsStreaming(false);
-     abortControllerRef.current = null;
+     if (abortControllerRef.current === controller) {
+       abortControllerRef.current = null;
+     }
    }
  };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const controller = new AbortController();
setAbortController(controller);
abortControllerRef.current = controller;
const handleSSEInference = async (
inputs: string | Array<{ role: 'user' | 'assistant'; content: ChatMessageContent }>,
variables: Record<string, unknown>
) => {
if (!id) return;
let controller: AbortController | null = null;
try {
setIsStreaming(true);
setStreamingEvents([]); // Clear previous events immediately
controller = new AbortController();
abortControllerRef.current = controller;
// ... rest of function ...
function cleanup() {
setRunningInference(false);
setIsStreaming(false);
if (abortControllerRef.current === controller) {
abortControllerRef.current = null;
}
}
};
};
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/client/src/pages/apps/`[appId]/workflows/[id].tsx around lines 467
- 468, The cleanup currently unconditionally sets abortControllerRef.current =
null and can clear a newer controller from a concurrent request; change cleanup
(the finally block that runs after the fetch/stream) to only clear the ref when
the stored controller === the local controller instance created by const
controller = new AbortController() (the one assigned to
abortControllerRef.current), i.e. add an identity guard so cleanup does: if
(abortControllerRef.current === controller) abortControllerRef.current = null;
This ensures request A cannot clear request B's active controller.


// RAW FETCH with immediate ReadableStream processing
const response = await fetch(url, {
Expand Down Expand Up @@ -646,7 +647,7 @@ const WorkflowDetail: React.FC = () => {
function cleanup() {
setRunningInference(false);
setIsStreaming(false);
setAbortController(null);
abortControllerRef.current = null;
}
};

Expand Down Expand Up @@ -679,6 +680,12 @@ const WorkflowDetail: React.FC = () => {
</div>

<div className="flex w-full flex-col gap-2">
<div className="flex items-center justify-between pb-2">
<Label htmlFor="output-json-toggle" className="text-sm text-gray-700">
JSON output
</Label>
<Switch id="output-json-toggle" checked={outputJsonEnabled} onCheckedChange={setOutputJsonEnabled} />
</div>
<ChatBot
chatHistory={chatHistory}
runningInference={runningInference}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ async def workflow_inference_v2(
workflow_name = workflow_data['name']

resolved_inputs = process_inference_inputs(request_body.inputs)
logger.info(f'Inputs to workflow: {resolved_inputs}')
logger.debug(f'Inputs to workflow: {resolved_inputs}')

# Prepare event streaming if requested
event_callback = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ async def execute_message_processor(
),
)

required_inputs = inputs['required']
required_inputs = inputs.get('required') or []
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate input_schema.required instead of coercing it.

Line 294 now treats required: null, required: "", and other falsy malformed values as [], so bad processor YAML skips required-input validation and executes anyway. Reject non-list values explicitly here instead of silently normalizing them.

Proposed fix
-    required_inputs = inputs.get('required') or []
+    required_inputs = inputs.get('required')
+    if not isinstance(required_inputs, list):
+        return JSONResponse(
+            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+            content=response_formatter.buildErrorResponse(
+                "Invalid processor YAML: input_schema.required must be a list"
+            ),
+        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/message_processor_controller.py`
at line 294, The current assignment required_inputs = inputs.get('required') or
[] silently coerces falsy/malformed values to an empty list and skips
validation; instead, explicitly validate that inputs.get('required') (the
input_schema.required value) is either None or a list and raise/return a
validation error when it is present but not a list. Update the logic around
required_inputs in message_processor_controller (the code that reads
inputs.get('required')) to: fetch the raw value, if it's None treat as [], if
it's not a list raise a clear error (or mark the processor YAML invalid) so
non-list values like null or empty string are rejected rather than normalized
away.

execution_inputs = {}
for input in required_inputs:
if input not in payload.input_data.keys():
Expand All @@ -301,7 +301,7 @@ async def execute_message_processor(
f'Input `{input}` is required but not provided'
),
)
execution_inputs[input] = payload.input_data[input]
execution_inputs = payload.input_data
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't pass undeclared inputs straight to Hermes.

Line 304 now forwards the entire payload.input_data dict. In wavefront/server/modules/plugins_module/plugins_module/services/message_processor_service.py:180-186 and wavefront/server/modules/plugins_module/plugins_module/services/message_processor_service.py:56-75, that dict is passed to Hermes unchanged, so extra client-supplied keys bypass the processor schema entirely. Please filter payload.input_data to schema-declared fields before execution.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/plugins_module/plugins_module/controllers/message_processor_controller.py`
at line 304, The controller currently assigns execution_inputs =
payload.input_data and forwards the entire dict to Hermes; change this to only
include keys declared by the processor's input schema (e.g., intersect
payload.input_data with the processor schema fields) before handing off to
Hermes. Locate the assignment in message_processor_controller.py
(execution_inputs = payload.input_data) and replace it with a filtered map
constructed from the processor's declared inputs (use the processor/schema
metadata available in the controller or call the existing sanitizer/validator
helper in message_processor_service such as the functions used around
process_message/execute_* that currently accept payloads), ensuring the same
sanitized object is what message_processor_service (the functions referenced in
message_processor_service.py) receives. Ensure unknown client-supplied keys are
dropped, and preserve validation logic by reusing the service's schema
validation routine rather than duplicating validation code.


try:
result = await processor_service.execute_message_processor(
Expand Down
4 changes: 4 additions & 0 deletions wavefront/server/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading