diff --git a/src/langbot/pkg/core/app.py b/src/langbot/pkg/core/app.py index 98e886175..12849f2a9 100644 --- a/src/langbot/pkg/core/app.py +++ b/src/langbot/pkg/core/app.py @@ -9,6 +9,7 @@ from ..platform.webhook_pusher import WebhookPusher from ..provider.session import sessionmgr as llm_session_mgr from ..provider.modelmgr import modelmgr as llm_model_mgr + from langbot.pkg.provider.tools import toolmgr as llm_tool_mgr from ..config import manager as config_mgr from ..command import cmdmgr @@ -30,6 +31,7 @@ from ..api.http.service import apikey as apikey_service from ..api.http.service import webhook as webhook_service from ..api.http.service import monitoring as monitoring_service + from ..discover import engine as discover_engine from ..storage import mgr as storagemgr from ..utils import logcache diff --git a/src/langbot/pkg/persistence/migrations/dbm023_model_fallback_config.py b/src/langbot/pkg/persistence/migrations/dbm023_model_fallback_config.py new file mode 100644 index 000000000..11ab3bb4e --- /dev/null +++ b/src/langbot/pkg/persistence/migrations/dbm023_model_fallback_config.py @@ -0,0 +1,102 @@ +from .. import migration + +import sqlalchemy +import json + + +@migration.migration_class(23) +class DBMigrateModelFallbackConfig(migration.DBMigration): + """Convert model field from plain UUID string to object with primary/fallbacks""" + + async def upgrade(self): + """Upgrade""" + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.text('SELECT uuid, config FROM legacy_pipelines') + ) + pipelines = result.fetchall() + + current_version = self.ap.ver_mgr.get_current_version() + + for pipeline_row in pipelines: + uuid = pipeline_row[0] + config = json.loads(pipeline_row[1]) if isinstance(pipeline_row[1], str) else pipeline_row[1] + + if 'ai' not in config or 'local-agent' not in config['ai']: + continue + + local_agent = config['ai']['local-agent'] + changed = False + + # Convert model from string to object + model_value = local_agent.get('model', '') + if isinstance(model_value, str): + local_agent['model'] = { + 'primary': model_value, + 'fallbacks': [], + } + changed = True + + # Remove leftover fallback-models field if present + if 'fallback-models' in local_agent: + del local_agent['fallback-models'] + changed = True + + if not changed: + continue + + # Update using raw SQL with compatibility for both SQLite and PostgreSQL + if self.ap.persistence_mgr.db.name == 'postgresql': + await self.ap.persistence_mgr.execute_async( + sqlalchemy.text( + 'UPDATE legacy_pipelines SET config = :config::jsonb, for_version = :for_version WHERE uuid = :uuid' + ), + {'config': json.dumps(config), 'for_version': current_version, 'uuid': uuid}, + ) + else: + await self.ap.persistence_mgr.execute_async( + sqlalchemy.text( + 'UPDATE legacy_pipelines SET config = :config, for_version = :for_version WHERE uuid = :uuid' + ), + {'config': json.dumps(config), 'for_version': current_version, 'uuid': uuid}, + ) + + async def downgrade(self): + """Downgrade""" + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.text('SELECT uuid, config FROM legacy_pipelines') + ) + pipelines = result.fetchall() + + current_version = self.ap.ver_mgr.get_current_version() + + for pipeline_row in pipelines: + uuid = pipeline_row[0] + config = json.loads(pipeline_row[1]) if isinstance(pipeline_row[1], str) else pipeline_row[1] + + if 'ai' not in config or 'local-agent' not in config['ai']: + continue + + local_agent = config['ai']['local-agent'] + + # Convert model from object back to string + model_value = local_agent.get('model', '') + if isinstance(model_value, dict): + local_agent['model'] = model_value.get('primary', '') + else: + continue + + # Update using raw SQL with compatibility for both SQLite and PostgreSQL + if self.ap.persistence_mgr.db.name == 'postgresql': + await self.ap.persistence_mgr.execute_async( + sqlalchemy.text( + 'UPDATE legacy_pipelines SET config = :config::jsonb, for_version = :for_version WHERE uuid = :uuid' + ), + {'config': json.dumps(config), 'for_version': current_version, 'uuid': uuid}, + ) + else: + await self.ap.persistence_mgr.execute_async( + sqlalchemy.text( + 'UPDATE legacy_pipelines SET config = :config, for_version = :for_version WHERE uuid = :uuid' + ), + {'config': json.dumps(config), 'for_version': current_version, 'uuid': uuid}, + ) diff --git a/src/langbot/pkg/pipeline/preproc/preproc.py b/src/langbot/pkg/pipeline/preproc/preproc.py index cd039d796..eacf47725 100644 --- a/src/langbot/pkg/pipeline/preproc/preproc.py +++ b/src/langbot/pkg/pipeline/preproc/preproc.py @@ -36,17 +36,36 @@ async def process( session = await self.ap.sess_mgr.get_session(query) # When not local-agent, llm_model is None - try: - llm_model = ( - await self.ap.model_mgr.get_model_by_uuid(query.pipeline_config['ai']['local-agent']['model']) - if selected_runner == 'local-agent' - else None - ) - except ValueError: - self.ap.logger.warning( - f'LLM model {query.pipeline_config["ai"]["local-agent"]["model"] + " "}not found or not configured' - ) - llm_model = None + llm_model = None + if selected_runner == 'local-agent': + # Read model config — new format is { primary: str, fallbacks: [str] }, + # but handle legacy plain string for backward compatibility + model_config = query.pipeline_config['ai']['local-agent'].get('model', {}) + if isinstance(model_config, str): + # Legacy format: plain UUID string + primary_uuid = model_config + fallback_uuids = [] + else: + primary_uuid = model_config.get('primary', '') + fallback_uuids = model_config.get('fallbacks', []) + + if primary_uuid: + try: + llm_model = await self.ap.model_mgr.get_model_by_uuid(primary_uuid) + except ValueError: + self.ap.logger.warning(f'LLM model {primary_uuid} not found or not configured') + + # Resolve fallback model UUIDs + if fallback_uuids: + valid_fallbacks = [] + for fb_uuid in fallback_uuids: + try: + await self.ap.model_mgr.get_model_by_uuid(fb_uuid) + valid_fallbacks.append(fb_uuid) + except ValueError: + self.ap.logger.warning(f'Fallback model {fb_uuid} not found, skipping') + if valid_fallbacks: + query.variables['_fallback_model_uuids'] = valid_fallbacks conversation = await self.ap.sess_mgr.get_conversation( query, @@ -61,20 +80,28 @@ async def process( query.prompt = conversation.prompt.copy() query.messages = conversation.messages.copy() - if selected_runner == 'local-agent' and llm_model: + if selected_runner == 'local-agent': query.use_funcs = [] - query.use_llm_model_uuid = llm_model.model_entity.uuid - - if llm_model.model_entity.abilities.__contains__('func_call'): - # Get bound plugins and MCP servers for filtering tools + if llm_model: + query.use_llm_model_uuid = llm_model.model_entity.uuid + + if llm_model.model_entity.abilities.__contains__('func_call'): + # Get bound plugins and MCP servers for filtering tools + bound_plugins = query.variables.get('_pipeline_bound_plugins', None) + bound_mcp_servers = query.variables.get('_pipeline_bound_mcp_servers', None) + query.use_funcs = await self.ap.tool_mgr.get_all_tools(bound_plugins, bound_mcp_servers) + + self.ap.logger.debug(f'Bound plugins: {bound_plugins}') + self.ap.logger.debug(f'Bound MCP servers: {bound_mcp_servers}') + self.ap.logger.debug(f'Use funcs: {query.use_funcs}') + + # If primary model doesn't support func_call but fallback models exist, + # load tools anyway since fallback models may support them + if not query.use_funcs and query.variables.get('_fallback_model_uuids'): bound_plugins = query.variables.get('_pipeline_bound_plugins', None) bound_mcp_servers = query.variables.get('_pipeline_bound_mcp_servers', None) query.use_funcs = await self.ap.tool_mgr.get_all_tools(bound_plugins, bound_mcp_servers) - self.ap.logger.debug(f'Bound plugins: {bound_plugins}') - self.ap.logger.debug(f'Bound MCP servers: {bound_mcp_servers}') - self.ap.logger.debug(f'Use funcs: {query.use_funcs}') - sender_name = '' if isinstance(query.message_event, platform_events.GroupMessage): diff --git a/src/langbot/pkg/provider/runners/localagent.py b/src/langbot/pkg/provider/runners/localagent.py index 52e78b9d4..7d45a1f2e 100644 --- a/src/langbot/pkg/provider/runners/localagent.py +++ b/src/langbot/pkg/provider/runners/localagent.py @@ -4,6 +4,7 @@ import copy import typing from .. import runner +from ..modelmgr import requester as modelmgr_requester import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query import langbot_plugin.api.entities.builtin.provider.message as provider_message import langbot_plugin.api.entities.builtin.rag.context as rag_context @@ -26,19 +27,109 @@ @runner.runner_class('local-agent') class LocalAgentRunner(runner.RequestRunner): - """本地Agent请求运行器""" - - class ToolCallTracker: - """工具调用追踪器""" - - def __init__(self): - self.active_calls: dict[str, dict] = {} - self.completed_calls: list[provider_message.ToolCall] = [] + """Local agent request runner""" + + async def _get_model_candidates( + self, + query: pipeline_query.Query, + ) -> list[modelmgr_requester.RuntimeLLMModel]: + """Build ordered list of models to try: primary model + fallback models.""" + candidates = [] + + # Primary model + if query.use_llm_model_uuid: + try: + primary = await self.ap.model_mgr.get_model_by_uuid(query.use_llm_model_uuid) + candidates.append(primary) + except ValueError: + self.ap.logger.warning(f'Primary model {query.use_llm_model_uuid} not found') + + # Fallback models + fallback_uuids = (query.variables or {}).get('_fallback_model_uuids', []) + for fb_uuid in fallback_uuids: + try: + fb_model = await self.ap.model_mgr.get_model_by_uuid(fb_uuid) + candidates.append(fb_model) + except ValueError: + self.ap.logger.warning(f'Fallback model {fb_uuid} not found, skipping') + + return candidates + + async def _invoke_with_fallback( + self, + query: pipeline_query.Query, + candidates: list[modelmgr_requester.RuntimeLLMModel], + messages: list, + funcs: list, + remove_think: bool, + ) -> tuple[provider_message.Message, modelmgr_requester.RuntimeLLMModel]: + """Try non-streaming invocation with sequential fallback. Returns (message, model_used).""" + last_error = None + for model in candidates: + try: + msg = await model.provider.invoke_llm( + query, + model, + messages, + funcs if model.model_entity.abilities.__contains__('func_call') else [], + extra_args=model.model_entity.extra_args, + remove_think=remove_think, + ) + return msg, model + except Exception as e: + last_error = e + self.ap.logger.warning(f'Model {model.model_entity.name} failed: {e}, trying next fallback...') + raise last_error or RuntimeError('No model candidates available') + + async def _invoke_stream_with_fallback( + self, + query: pipeline_query.Query, + candidates: list[modelmgr_requester.RuntimeLLMModel], + messages: list, + funcs: list, + remove_think: bool, + ) -> tuple[typing.AsyncGenerator, modelmgr_requester.RuntimeLLMModel]: + """Try streaming invocation with sequential fallback. Returns (stream_generator, model_used). + + Fallback is only possible before any chunks have been yielded to the client. + Once streaming starts, the model is committed. + """ + last_error = None + for model in candidates: + try: + stream = model.provider.invoke_llm_stream( + query, + model, + messages, + funcs if model.model_entity.abilities.__contains__('func_call') else [], + extra_args=model.model_entity.extra_args, + remove_think=remove_think, + ) + # Attempt to get the first chunk to verify the stream works + first_chunk = await stream.__anext__() + + async def _chain_stream(first, rest): + yield first + async for chunk in rest: + yield chunk + + return _chain_stream(first_chunk, stream), model + except StopAsyncIteration: + # Empty stream — treat as success (model returned nothing) + async def _empty_stream(): + return + yield # make it a generator + + return _empty_stream(), model + except Exception as e: + last_error = e + self.ap.logger.warning(f'Model {model.model_entity.name} stream failed: {e}, trying next fallback...') + raise last_error or RuntimeError('No model candidates available') async def run( self, query: pipeline_query.Query ) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]: - """运行请求""" + """Run request""" pending_tool_calls = [] # Get knowledge bases list (new field) @@ -119,51 +210,51 @@ async def run( remove_think = query.pipeline_config['output'].get('misc', '').get('remove-think') - use_llm_model = await self.ap.model_mgr.get_model_by_uuid(query.use_llm_model_uuid) + # Build ordered candidate list (primary + fallbacks) + candidates = await self._get_model_candidates(query) + if not candidates: + raise RuntimeError('No LLM model configured for local-agent runner') self.ap.logger.debug( - f'localagent req: query={query.query_id} req_messages={req_messages} use_llm_model={query.use_llm_model_uuid}' + f'localagent req: query={query.query_id} req_messages={req_messages} ' + f'candidates={[m.model_entity.name for m in candidates]}' ) if not is_stream: - # 非流式输出,直接请求 - - msg = await use_llm_model.provider.invoke_llm( + # Non-streaming: invoke with fallback + msg, use_llm_model = await self._invoke_with_fallback( query, - use_llm_model, + candidates, req_messages, query.use_funcs, - extra_args=use_llm_model.model_entity.extra_args, - remove_think=remove_think, + remove_think, ) yield msg final_msg = msg else: - # 流式输出,需要处理工具调用 + # Streaming: invoke with fallback tool_calls_map: dict[str, provider_message.ToolCall] = {} msg_idx = 0 - accumulated_content = '' # 从开始累积的所有内容 + accumulated_content = '' last_role = 'assistant' msg_sequence = 1 - async for msg in use_llm_model.provider.invoke_llm_stream( + + stream_src, use_llm_model = await self._invoke_stream_with_fallback( query, - use_llm_model, + candidates, req_messages, query.use_funcs, - extra_args=use_llm_model.model_entity.extra_args, - remove_think=remove_think, - ): + remove_think, + ) + async for msg in stream_src: msg_idx = msg_idx + 1 - # 记录角色 if msg.role: last_role = msg.role - # 累积内容 if msg.content: accumulated_content += msg.content - # 处理工具调用 if msg.tool_calls: for tool_call in msg.tool_calls: if tool_call.id not in tool_calls_map: @@ -175,21 +266,18 @@ async def run( ), ) if tool_call.function and tool_call.function.arguments: - # 流式处理中,工具调用参数可能分多个chunk返回,需要追加而不是覆盖 tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments - # continue - # 每8个chunk或最后一个chunk时,输出所有累积的内容 + if msg_idx % 8 == 0 or msg.is_final: msg_sequence += 1 yield provider_message.MessageChunk( role=last_role, - content=accumulated_content, # 输出所有累积内容 + content=accumulated_content, tool_calls=list(tool_calls_map.values()) if (tool_calls_map and msg.is_final) else None, is_final=msg.is_final, msg_sequence=msg_sequence, ) - # 创建最终消息用于后续处理 final_msg = provider_message.MessageChunk( role=last_role, content=accumulated_content, @@ -204,7 +292,8 @@ async def run( req_messages.append(final_msg) - # 持续请求,只要还有待处理的工具调用就继续处理调用 + # Once a model succeeds, commit to it for the tool call loop + # (no fallback mid-conversation — different models may interpret tool results differently) while pending_tool_calls: for tool_call in pending_tool_calls: try: @@ -245,7 +334,6 @@ async def run( req_messages.append(msg) except Exception as e: - # 工具调用出错,添加一个报错信息到 req_messages err_msg = provider_message.Message(role='tool', content=f'err: {e}', tool_call_id=tool_call.id) yield err_msg @@ -253,39 +341,38 @@ async def run( req_messages.append(err_msg) self.ap.logger.debug( - f'localagent req: query={query.query_id} req_messages={req_messages} use_llm_model={query.use_llm_model_uuid}' + f'localagent req: query={query.query_id} req_messages={req_messages} ' + f'use_llm_model={use_llm_model.model_entity.name}' ) if is_stream: tool_calls_map = {} msg_idx = 0 - accumulated_content = '' # 从开始累积的所有内容 + accumulated_content = '' last_role = 'assistant' msg_sequence = first_end_sequence - async for msg in use_llm_model.provider.invoke_llm_stream( + tool_stream_src = use_llm_model.provider.invoke_llm_stream( query, use_llm_model, req_messages, - query.use_funcs, + query.use_funcs if use_llm_model.model_entity.abilities.__contains__('func_call') else [], extra_args=use_llm_model.model_entity.extra_args, remove_think=remove_think, - ): + ) + async for msg in tool_stream_src: msg_idx += 1 - # 记录角色 if msg.role: last_role = msg.role - # 第一次请求工具调用时的内容 + # Prepend first-round content on first chunk of tool-call round if msg_idx == 1: accumulated_content = first_content if first_content is not None else accumulated_content - # 累积内容 if msg.content: accumulated_content += msg.content - # 处理工具调用 if msg.tool_calls: for tool_call in msg.tool_calls: if tool_call.id not in tool_calls_map: @@ -297,15 +384,13 @@ async def run( ), ) if tool_call.function and tool_call.function.arguments: - # 流式处理中,工具调用参数可能分多个chunk返回,需要追加而不是覆盖 tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments - # 每8个chunk或最后一个chunk时,输出所有累积的内容 if msg_idx % 8 == 0 or msg.is_final: msg_sequence += 1 yield provider_message.MessageChunk( role=last_role, - content=accumulated_content, # 输出所有累积内容 + content=accumulated_content, tool_calls=list(tool_calls_map.values()) if (tool_calls_map and msg.is_final) else None, is_final=msg.is_final, msg_sequence=msg_sequence, @@ -318,12 +403,12 @@ async def run( msg_sequence=msg_sequence, ) else: - # 处理完所有调用,再次请求 + # Non-streaming: use committed model directly (no fallback in tool loop) msg = await use_llm_model.provider.invoke_llm( query, use_llm_model, req_messages, - query.use_funcs, + query.use_funcs if use_llm_model.model_entity.abilities.__contains__('func_call') else [], extra_args=use_llm_model.model_entity.extra_args, remove_think=remove_think, ) diff --git a/src/langbot/pkg/utils/constants.py b/src/langbot/pkg/utils/constants.py index d87efec7d..c1b854f29 100644 --- a/src/langbot/pkg/utils/constants.py +++ b/src/langbot/pkg/utils/constants.py @@ -2,7 +2,7 @@ semantic_version = f'v{langbot.__version__}' -required_database_version = 22 +required_database_version = 23 """Tag the version of the database schema, used to check if the database needs to be migrated""" debug_mode = False diff --git a/src/langbot/templates/metadata/pipeline/ai.yaml b/src/langbot/templates/metadata/pipeline/ai.yaml index 7a13b2b14..46f5d4635 100644 --- a/src/langbot/templates/metadata/pipeline/ai.yaml +++ b/src/langbot/templates/metadata/pipeline/ai.yaml @@ -59,8 +59,11 @@ stages: label: en_US: Model zh_Hans: 模型 - type: llm-model-selector + type: model-fallback-selector required: true + default: + primary: '' + fallbacks: [] - name: max-round label: en_US: Max Round diff --git a/web/src/app/home/bots/components/bot-form/BotForm.tsx b/web/src/app/home/bots/components/bot-form/BotForm.tsx index d2ca22d79..1e2439575 100644 --- a/web/src/app/home/bots/components/bot-form/BotForm.tsx +++ b/web/src/app/home/bots/components/bot-form/BotForm.tsx @@ -124,12 +124,6 @@ export default function BotForm({ const currentAdapter = form.watch('adapter'); const currentAdapterConfig = form.watch('adapter_config'); - // Serialize adapter_config to a stable string so it can be used as a - // useEffect dependency without triggering on every render. form.watch() - // returns a new object reference each time, which would otherwise cause - // the filtering effect below to loop indefinitely. - const adapterConfigJson = JSON.stringify(currentAdapterConfig); - useEffect(() => { setBotFormValues(); }, []); @@ -153,7 +147,7 @@ export default function BotForm({ // For non-Lark adapters, show all fields setFilteredDynamicFormConfigList(dynamicFormConfigList); } - }, [currentAdapter, adapterConfigJson, dynamicFormConfigList]); + }, [currentAdapter, currentAdapterConfig, dynamicFormConfigList]); // 复制到剪贴板的辅助函数 - 使用页面上的真实input元素 const copyToClipboard = () => { diff --git a/web/src/app/home/components/dynamic-form/DynamicFormComponent.tsx b/web/src/app/home/components/dynamic-form/DynamicFormComponent.tsx index b40563ae8..7ebfcec12 100644 --- a/web/src/app/home/components/dynamic-form/DynamicFormComponent.tsx +++ b/web/src/app/home/components/dynamic-form/DynamicFormComponent.tsx @@ -11,7 +11,7 @@ import { FormMessage, } from '@/components/ui/form'; import DynamicFormItemComponent from '@/app/home/components/dynamic-form/DynamicFormItemComponent'; -import { useCallback, useEffect, useRef } from 'react'; +import { useEffect, useRef } from 'react'; import { extractI18nObject } from '@/i18n/I18nProvider'; import { useTranslation } from 'react-i18next'; @@ -73,6 +73,12 @@ export default function DynamicFormComponent({ case 'bot-selector': fieldSchema = z.string(); break; + case 'model-fallback-selector': + fieldSchema = z.object({ + primary: z.string(), + fallbacks: z.array(z.string()), + }); + break; case 'prompt-editor': fieldSchema = z.array( z.object({ @@ -160,39 +166,34 @@ export default function DynamicFormComponent({ const onSubmitRef = useRef(onSubmit); onSubmitRef.current = onSubmit; - // Track the last emitted values to avoid emitting identical snapshots, - // which would cause the parent to call setValue with an equivalent object, - // triggering a re-render loop. - const lastEmittedRef = useRef(''); - - const emitValues = useCallback(() => { + // 监听表单值变化 + useEffect(() => { + // Emit initial form values immediately so the parent always has a valid snapshot, + // even if the user saves without modifying any field. + // form.watch(callback) only fires on subsequent changes, not on mount. const formValues = form.getValues(); - const finalValues = itemConfigList.reduce( + const initialFinalValues = itemConfigList.reduce( (acc, item) => { acc[item.name] = formValues[item.name] ?? item.default; return acc; }, {} as Record, ); - const serialized = JSON.stringify(finalValues); - if (serialized !== lastEmittedRef.current) { - lastEmittedRef.current = serialized; - onSubmitRef.current?.(finalValues); - } - }, [form, itemConfigList]); - - // 监听表单值变化 - useEffect(() => { - // Emit initial form values immediately so the parent always has a valid snapshot, - // even if the user saves without modifying any field. - // form.watch(callback) only fires on subsequent changes, not on mount. - emitValues(); + onSubmitRef.current?.(initialFinalValues); const subscription = form.watch(() => { - emitValues(); + const formValues = form.getValues(); + const finalValues = itemConfigList.reduce( + (acc, item) => { + acc[item.name] = formValues[item.name] ?? item.default; + return acc; + }, + {} as Record, + ); + onSubmitRef.current?.(finalValues); }); return () => subscription.unsubscribe(); - }, [form, itemConfigList, emitValues]); + }, [form, itemConfigList]); return (
@@ -231,6 +232,7 @@ export default function DynamicFormComponent({ // All fields are disabled when editing (creation_settings are immutable) const isFieldDisabled = !!isEditing; + return ( { + if (config.type === DynamicFormItemType.MODEL_FALLBACK_SELECTOR) { + httpClient + .getProviderLLMModels() + .then((resp) => { + let models = resp.models; + if ( + systemInfo.disable_models_service || + userInfo?.account_type !== 'space' + ) { + models = models.filter( + (m) => m.provider?.requester !== 'space-chat-completions', + ); + } + setLlmModels(models); + }) + .catch((err) => { + toast.error('Failed to get LLM model list: ' + err.msg); + }); + } + }, [config.type]); + useEffect(() => { if ( config.type === DynamicFormItemType.KNOWLEDGE_BASE_SELECTOR || @@ -171,12 +193,7 @@ export default function DynamicFormItemComponent({ return