Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/langbot/pkg/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ..platform.webhook_pusher import WebhookPusher
from ..provider.session import sessionmgr as llm_session_mgr
from ..provider.modelmgr import modelmgr as llm_model_mgr

from langbot.pkg.provider.tools import toolmgr as llm_tool_mgr
from ..config import manager as config_mgr
from ..command import cmdmgr
Expand All @@ -30,6 +31,7 @@
from ..api.http.service import apikey as apikey_service
from ..api.http.service import webhook as webhook_service
from ..api.http.service import monitoring as monitoring_service

from ..discover import engine as discover_engine
from ..storage import mgr as storagemgr
from ..utils import logcache
Expand Down
102 changes: 102 additions & 0 deletions src/langbot/pkg/persistence/migrations/dbm023_model_fallback_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from .. import migration

import sqlalchemy
import json


@migration.migration_class(23)
class DBMigrateModelFallbackConfig(migration.DBMigration):
"""Convert model field from plain UUID string to object with primary/fallbacks"""

async def upgrade(self):
"""Upgrade"""
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.text('SELECT uuid, config FROM legacy_pipelines')
)
pipelines = result.fetchall()

current_version = self.ap.ver_mgr.get_current_version()

for pipeline_row in pipelines:
uuid = pipeline_row[0]
config = json.loads(pipeline_row[1]) if isinstance(pipeline_row[1], str) else pipeline_row[1]

if 'ai' not in config or 'local-agent' not in config['ai']:
continue

local_agent = config['ai']['local-agent']
changed = False

# Convert model from string to object
model_value = local_agent.get('model', '')
if isinstance(model_value, str):
local_agent['model'] = {
'primary': model_value,
'fallbacks': [],
}
changed = True

# Remove leftover fallback-models field if present
if 'fallback-models' in local_agent:
del local_agent['fallback-models']
changed = True

if not changed:
continue

# Update using raw SQL with compatibility for both SQLite and PostgreSQL
if self.ap.persistence_mgr.db.name == 'postgresql':
await self.ap.persistence_mgr.execute_async(
sqlalchemy.text(
'UPDATE legacy_pipelines SET config = :config::jsonb, for_version = :for_version WHERE uuid = :uuid'
),
{'config': json.dumps(config), 'for_version': current_version, 'uuid': uuid},
)
else:
await self.ap.persistence_mgr.execute_async(
sqlalchemy.text(
'UPDATE legacy_pipelines SET config = :config, for_version = :for_version WHERE uuid = :uuid'
),
{'config': json.dumps(config), 'for_version': current_version, 'uuid': uuid},
)

async def downgrade(self):
"""Downgrade"""
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.text('SELECT uuid, config FROM legacy_pipelines')
)
pipelines = result.fetchall()

current_version = self.ap.ver_mgr.get_current_version()

for pipeline_row in pipelines:
uuid = pipeline_row[0]
config = json.loads(pipeline_row[1]) if isinstance(pipeline_row[1], str) else pipeline_row[1]

if 'ai' not in config or 'local-agent' not in config['ai']:
continue

local_agent = config['ai']['local-agent']

# Convert model from object back to string
model_value = local_agent.get('model', '')
if isinstance(model_value, dict):
local_agent['model'] = model_value.get('primary', '')
else:
continue

# Update using raw SQL with compatibility for both SQLite and PostgreSQL
if self.ap.persistence_mgr.db.name == 'postgresql':
await self.ap.persistence_mgr.execute_async(
sqlalchemy.text(
'UPDATE legacy_pipelines SET config = :config::jsonb, for_version = :for_version WHERE uuid = :uuid'
),
{'config': json.dumps(config), 'for_version': current_version, 'uuid': uuid},
)
else:
await self.ap.persistence_mgr.execute_async(
sqlalchemy.text(
'UPDATE legacy_pipelines SET config = :config, for_version = :for_version WHERE uuid = :uuid'
),
{'config': json.dumps(config), 'for_version': current_version, 'uuid': uuid},
)
67 changes: 47 additions & 20 deletions src/langbot/pkg/pipeline/preproc/preproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,36 @@ async def process(
session = await self.ap.sess_mgr.get_session(query)

# When not local-agent, llm_model is None
try:
llm_model = (
await self.ap.model_mgr.get_model_by_uuid(query.pipeline_config['ai']['local-agent']['model'])
if selected_runner == 'local-agent'
else None
)
except ValueError:
self.ap.logger.warning(
f'LLM model {query.pipeline_config["ai"]["local-agent"]["model"] + " "}not found or not configured'
)
llm_model = None
llm_model = None
if selected_runner == 'local-agent':
# Read model config — new format is { primary: str, fallbacks: [str] },
# but handle legacy plain string for backward compatibility
model_config = query.pipeline_config['ai']['local-agent'].get('model', {})
if isinstance(model_config, str):
# Legacy format: plain UUID string
primary_uuid = model_config
fallback_uuids = []
else:
primary_uuid = model_config.get('primary', '')
fallback_uuids = model_config.get('fallbacks', [])

if primary_uuid:
try:
llm_model = await self.ap.model_mgr.get_model_by_uuid(primary_uuid)
except ValueError:
self.ap.logger.warning(f'LLM model {primary_uuid} not found or not configured')

# Resolve fallback model UUIDs
if fallback_uuids:
valid_fallbacks = []
for fb_uuid in fallback_uuids:
try:
await self.ap.model_mgr.get_model_by_uuid(fb_uuid)
valid_fallbacks.append(fb_uuid)
except ValueError:
self.ap.logger.warning(f'Fallback model {fb_uuid} not found, skipping')
if valid_fallbacks:
query.variables['_fallback_model_uuids'] = valid_fallbacks

conversation = await self.ap.sess_mgr.get_conversation(
query,
Expand All @@ -61,20 +80,28 @@ async def process(
query.prompt = conversation.prompt.copy()
query.messages = conversation.messages.copy()

if selected_runner == 'local-agent' and llm_model:
if selected_runner == 'local-agent':
query.use_funcs = []
query.use_llm_model_uuid = llm_model.model_entity.uuid

if llm_model.model_entity.abilities.__contains__('func_call'):
# Get bound plugins and MCP servers for filtering tools
if llm_model:
query.use_llm_model_uuid = llm_model.model_entity.uuid

if llm_model.model_entity.abilities.__contains__('func_call'):
# Get bound plugins and MCP servers for filtering tools
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
bound_mcp_servers = query.variables.get('_pipeline_bound_mcp_servers', None)
query.use_funcs = await self.ap.tool_mgr.get_all_tools(bound_plugins, bound_mcp_servers)

self.ap.logger.debug(f'Bound plugins: {bound_plugins}')
self.ap.logger.debug(f'Bound MCP servers: {bound_mcp_servers}')
self.ap.logger.debug(f'Use funcs: {query.use_funcs}')

# If primary model doesn't support func_call but fallback models exist,
# load tools anyway since fallback models may support them
if not query.use_funcs and query.variables.get('_fallback_model_uuids'):
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
bound_mcp_servers = query.variables.get('_pipeline_bound_mcp_servers', None)
query.use_funcs = await self.ap.tool_mgr.get_all_tools(bound_plugins, bound_mcp_servers)

self.ap.logger.debug(f'Bound plugins: {bound_plugins}')
self.ap.logger.debug(f'Bound MCP servers: {bound_mcp_servers}')
self.ap.logger.debug(f'Use funcs: {query.use_funcs}')

sender_name = ''

if isinstance(query.message_event, platform_events.GroupMessage):
Expand Down
Loading
Loading