From a6a94f4071f52f10f5cef1dbac8e0c0619b69880 Mon Sep 17 00:00:00 2001 From: agenthaulk Date: Sat, 4 Apr 2026 22:07:55 -0700 Subject: [PATCH 1/2] refactor(services): convert AppMode if/elif to match/case in generate and app service (#30001) --- api/services/app_generate_service.py | 292 ++++++++++++++------------- 1 file changed, 153 insertions(+), 139 deletions(-) diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 40013f2b669062..178bf9d14126dc 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -116,139 +116,143 @@ def generate( request_id = RateLimit.gen_request_key() try: request_id = rate_limit.enter(request_id) - if app_model.mode == AppMode.COMPLETION: - return rate_limit.generate( - CompletionAppGenerator.convert_to_event_stream( - CompletionAppGenerator().generate( - app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming - ), - ), - request_id=request_id, - ) - elif app_model.mode == AppMode.AGENT_CHAT or app_model.is_agent: - return rate_limit.generate( - AgentChatAppGenerator.convert_to_event_stream( - AgentChatAppGenerator().generate( - app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming + effective_mode = ( + AppMode.AGENT_CHAT if app_model.is_agent and app_model.mode != AppMode.AGENT_CHAT else app_model.mode + ) + match effective_mode: + case AppMode.COMPLETION: + return rate_limit.generate( + CompletionAppGenerator.convert_to_event_stream( + CompletionAppGenerator().generate( + app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming + ), ), - ), - request_id, - ) - elif app_model.mode == AppMode.CHAT: - return rate_limit.generate( - ChatAppGenerator.convert_to_event_stream( - ChatAppGenerator().generate( - app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming + request_id=request_id, + ) + case AppMode.AGENT_CHAT: + return rate_limit.generate( + AgentChatAppGenerator.convert_to_event_stream( + AgentChatAppGenerator().generate( + app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming + ), ), - ), - request_id=request_id, - ) - elif app_model.mode == AppMode.ADVANCED_CHAT: - workflow_id = args.get("workflow_id") - workflow = cls._get_workflow(app_model, invoke_from, workflow_id) - - if streaming: - # Streaming mode: subscribe to SSE and enqueue the execution on first subscriber - with rate_limit_context(rate_limit, request_id): - payload = AppExecutionParams.new( - app_model=app_model, - workflow=workflow, - user=user, - args=args, - invoke_from=invoke_from, - streaming=True, - call_depth=0, - ) - payload_json = payload.model_dump_json() - - def on_subscribe(): - workflow_based_app_execution_task.delay(payload_json) - - on_subscribe = cls._build_streaming_task_on_subscribe(on_subscribe) - generator = AdvancedChatAppGenerator() + request_id, + ) + case AppMode.CHAT: return rate_limit.generate( - generator.convert_to_event_stream( - generator.retrieve_events( - AppMode.ADVANCED_CHAT, - payload.workflow_run_id, - on_subscribe=on_subscribe, + ChatAppGenerator.convert_to_event_stream( + ChatAppGenerator().generate( + app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming ), ), request_id=request_id, ) - else: - # Blocking mode: run synchronously and return JSON instead of SSE - # Keep behaviour consistent with WORKFLOW blocking branch. - advanced_generator = AdvancedChatAppGenerator() - return rate_limit.generate( - advanced_generator.convert_to_event_stream( - advanced_generator.generate( + case AppMode.ADVANCED_CHAT: + workflow_id = args.get("workflow_id") + workflow = cls._get_workflow(app_model, invoke_from, workflow_id) + + if streaming: + # Streaming mode: subscribe to SSE and enqueue the execution on first subscriber + with rate_limit_context(rate_limit, request_id): + payload = AppExecutionParams.new( app_model=app_model, workflow=workflow, user=user, args=args, invoke_from=invoke_from, + streaming=True, + call_depth=0, + ) + payload_json = payload.model_dump_json() + + def on_subscribe(): + workflow_based_app_execution_task.delay(payload_json) + + on_subscribe = cls._build_streaming_task_on_subscribe(on_subscribe) + generator = AdvancedChatAppGenerator() + return rate_limit.generate( + generator.convert_to_event_stream( + generator.retrieve_events( + AppMode.ADVANCED_CHAT, + payload.workflow_run_id, + on_subscribe=on_subscribe, + ), + ), + request_id=request_id, + ) + else: + # Blocking mode: run synchronously and return JSON instead of SSE + # Keep behaviour consistent with WORKFLOW blocking branch. + advanced_generator = AdvancedChatAppGenerator() + return rate_limit.generate( + advanced_generator.convert_to_event_stream( + advanced_generator.generate( + app_model=app_model, + workflow=workflow, + user=user, + args=args, + invoke_from=invoke_from, + workflow_run_id=str(uuid.uuid4()), + streaming=False, + ) + ), + request_id=request_id, + ) + case AppMode.WORKFLOW: + workflow_id = args.get("workflow_id") + workflow = cls._get_workflow(app_model, invoke_from, workflow_id) + if streaming: + with rate_limit_context(rate_limit, request_id): + payload = AppExecutionParams.new( + app_model=app_model, + workflow=workflow, + user=user, + args=args, + invoke_from=invoke_from, + streaming=True, + call_depth=0, + root_node_id=root_node_id, workflow_run_id=str(uuid.uuid4()), - streaming=False, ) - ), - request_id=request_id, - ) - elif app_model.mode == AppMode.WORKFLOW: - workflow_id = args.get("workflow_id") - workflow = cls._get_workflow(app_model, invoke_from, workflow_id) - if streaming: - with rate_limit_context(rate_limit, request_id): - payload = AppExecutionParams.new( - app_model=app_model, - workflow=workflow, - user=user, - args=args, - invoke_from=invoke_from, - streaming=True, - call_depth=0, - root_node_id=root_node_id, - workflow_run_id=str(uuid.uuid4()), + payload_json = payload.model_dump_json() + + def on_subscribe(): + workflow_based_app_execution_task.delay(payload_json) + + on_subscribe = cls._build_streaming_task_on_subscribe(on_subscribe) + return rate_limit.generate( + WorkflowAppGenerator.convert_to_event_stream( + MessageBasedAppGenerator.retrieve_events( + AppMode.WORKFLOW, + payload.workflow_run_id, + on_subscribe=on_subscribe, + ), + ), + request_id, ) - payload_json = payload.model_dump_json() - - def on_subscribe(): - workflow_based_app_execution_task.delay(payload_json) - on_subscribe = cls._build_streaming_task_on_subscribe(on_subscribe) + pause_config = PauseStateLayerConfig( + session_factory=session_factory.get_session_maker(), + state_owner_user_id=workflow.created_by, + ) return rate_limit.generate( WorkflowAppGenerator.convert_to_event_stream( - MessageBasedAppGenerator.retrieve_events( - AppMode.WORKFLOW, - payload.workflow_run_id, - on_subscribe=on_subscribe, + WorkflowAppGenerator().generate( + app_model=app_model, + workflow=workflow, + user=user, + args=args, + invoke_from=invoke_from, + streaming=False, + root_node_id=root_node_id, + call_depth=0, + pause_state_config=pause_config, ), ), request_id, ) - - pause_config = PauseStateLayerConfig( - session_factory=session_factory.get_session_maker(), - state_owner_user_id=workflow.created_by, - ) - return rate_limit.generate( - WorkflowAppGenerator.convert_to_event_stream( - WorkflowAppGenerator().generate( - app_model=app_model, - workflow=workflow, - user=user, - args=args, - invoke_from=invoke_from, - streaming=False, - root_node_id=root_node_id, - call_depth=0, - pause_state_config=pause_config, - ), - ), - request_id, - ) - else: - raise ValueError(f"Invalid app mode {app_model.mode}") + case _: + raise ValueError(f"Invalid app mode {app_model.mode}") except Exception: quota_charge.refund() rate_limit.exit(request_id) @@ -280,43 +284,53 @@ def _get_max_active_requests(app: App) -> int: @classmethod def generate_single_iteration(cls, app_model: App, user: Account, node_id: str, args: Any, streaming: bool = True): - if app_model.mode == AppMode.ADVANCED_CHAT: - workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) - return AdvancedChatAppGenerator.convert_to_event_stream( - AdvancedChatAppGenerator().single_iteration_generate( - app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + match app_model.mode: + case AppMode.COMPLETION | AppMode.CHAT | AppMode.AGENT_CHAT: + raise ValueError(f"Invalid app mode {app_model.mode}") + case AppMode.ADVANCED_CHAT: + workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) + return AdvancedChatAppGenerator.convert_to_event_stream( + AdvancedChatAppGenerator().single_iteration_generate( + app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + ) ) - ) - elif app_model.mode == AppMode.WORKFLOW: - workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) - return AdvancedChatAppGenerator.convert_to_event_stream( - WorkflowAppGenerator().single_iteration_generate( - app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + case AppMode.WORKFLOW: + workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) + return AdvancedChatAppGenerator.convert_to_event_stream( + WorkflowAppGenerator().single_iteration_generate( + app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + ) ) - ) - else: - raise ValueError(f"Invalid app mode {app_model.mode}") + case AppMode.CHANNEL | AppMode.RAG_PIPELINE: + raise ValueError(f"Invalid app mode {app_model.mode}") + case _: + raise ValueError(f"Invalid app mode {app_model.mode}") @classmethod def generate_single_loop( cls, app_model: App, user: Account, node_id: str, args: LoopNodeRunPayload, streaming: bool = True ): - if app_model.mode == AppMode.ADVANCED_CHAT: - workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) - return AdvancedChatAppGenerator.convert_to_event_stream( - AdvancedChatAppGenerator().single_loop_generate( - app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + match app_model.mode: + case AppMode.COMPLETION | AppMode.CHAT | AppMode.AGENT_CHAT: + raise ValueError(f"Invalid app mode {app_model.mode}") + case AppMode.ADVANCED_CHAT: + workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) + return AdvancedChatAppGenerator.convert_to_event_stream( + AdvancedChatAppGenerator().single_loop_generate( + app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + ) ) - ) - elif app_model.mode == AppMode.WORKFLOW: - workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) - return AdvancedChatAppGenerator.convert_to_event_stream( - WorkflowAppGenerator().single_loop_generate( - app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + case AppMode.WORKFLOW: + workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) + return AdvancedChatAppGenerator.convert_to_event_stream( + WorkflowAppGenerator().single_loop_generate( + app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + ) ) - ) - else: - raise ValueError(f"Invalid app mode {app_model.mode}") + case AppMode.CHANNEL | AppMode.RAG_PIPELINE: + raise ValueError(f"Invalid app mode {app_model.mode}") + case _: + raise ValueError(f"Invalid app mode {app_model.mode}") @classmethod def generate_more_like_this( From b20664e0dd16eaeec1cb6d8a821f071ff7b585fd Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Sun, 5 Apr 2026 09:13:59 +0000 Subject: [PATCH 2/2] [autofix.ci] apply automated fixes --- api/services/app_generate_service.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 178bf9d14126dc..17ed98d301f194 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -291,14 +291,24 @@ def generate_single_iteration(cls, app_model: App, user: Account, node_id: str, workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) return AdvancedChatAppGenerator.convert_to_event_stream( AdvancedChatAppGenerator().single_iteration_generate( - app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + app_model=app_model, + workflow=workflow, + node_id=node_id, + user=user, + args=args, + streaming=streaming, ) ) case AppMode.WORKFLOW: workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) return AdvancedChatAppGenerator.convert_to_event_stream( WorkflowAppGenerator().single_iteration_generate( - app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + app_model=app_model, + workflow=workflow, + node_id=node_id, + user=user, + args=args, + streaming=streaming, ) ) case AppMode.CHANNEL | AppMode.RAG_PIPELINE: @@ -317,14 +327,24 @@ def generate_single_loop( workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) return AdvancedChatAppGenerator.convert_to_event_stream( AdvancedChatAppGenerator().single_loop_generate( - app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + app_model=app_model, + workflow=workflow, + node_id=node_id, + user=user, + args=args, + streaming=streaming, ) ) case AppMode.WORKFLOW: workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) return AdvancedChatAppGenerator.convert_to_event_stream( WorkflowAppGenerator().single_loop_generate( - app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + app_model=app_model, + workflow=workflow, + node_id=node_id, + user=user, + args=args, + streaming=streaming, ) ) case AppMode.CHANNEL | AppMode.RAG_PIPELINE: