-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathloader.py
More file actions
290 lines (231 loc) · 10.2 KB
/
loader.py
File metadata and controls
290 lines (231 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
"""Plugin loader and method validation."""
from __future__ import annotations
import importlib
import warnings
from dataclasses import dataclass
from typing import Any
from openagents.config.schema import (
AgentDefinition,
ContextAssemblerRef,
DiagnosticsRef,
EventBusRef,
MemoryRef,
PatternRef,
PluginRef,
RuntimeRef,
SessionRef,
SkillsRef,
ToolExecutorRef,
ToolRef,
)
from openagents.errors.exceptions import PluginLoadError
from openagents.errors.suggestions import near_match
from openagents.interfaces.skills import SkillsPlugin
from openagents.plugins.registry import get_builtin_plugin_class, list_builtin_plugins
@dataclass
class LoadedAgentPlugins:
memory: Any
pattern: Any
tool_executor: Any | None
context_assembler: Any | None
tools: dict[str, Any]
@dataclass
class LoadedRuntimeComponents:
runtime: Any
session: Any
events: Any
skills: Any
diagnostics: Any = None
def _import_symbol(path: str) -> Any:
if "." not in path:
raise PluginLoadError(
f"Invalid impl path: '{path}'",
hint="impl path must be 'module.path:Symbol' or 'module.path.Symbol'",
)
module_name, attr_name = path.rsplit(".", 1)
try:
module = importlib.import_module(module_name)
except Exception as exc: # pragma: no cover - defensive
raise PluginLoadError(
f"Failed to import module '{module_name}'",
hint=f"check the spelling and that '{module_name}' is importable on this Python path",
) from exc
try:
return getattr(module, attr_name)
except AttributeError as exc:
raise PluginLoadError(
f"Module '{module_name}' has no symbol '{attr_name}'",
hint=f"check the spelling of '{attr_name}' in module '{module_name}'",
) from exc
def _instantiate(factory: Any, config: dict[str, Any]) -> Any:
if not callable(factory):
return factory
try:
return factory(config=config)
except TypeError as exc:
raise PluginLoadError(f"Could not instantiate plugin from {factory!r}: {exc}") from exc
def _validate_class_methods(factory: Any, *, required_methods: tuple[str, ...], where: str) -> None:
if not isinstance(factory, type):
return
for method_name in required_methods:
if not callable(getattr(factory, method_name, None)):
raise PluginLoadError(f"{where} '{factory.__name__}' must implement '{method_name}'")
def _load_plugin_impl(kind: str, ref: PluginRef, *, required_methods: tuple[str, ...] = ()) -> Any:
if ref.impl:
symbol = _import_symbol(ref.impl)
_validate_class_methods(symbol, required_methods=required_methods, where=f"{kind} plugin")
return _instantiate(symbol, ref.config)
if ref.type:
if kind == "context_assembler" and ref.type == "summarizing":
raise PluginLoadError(
"context_assembler type 'summarizing' was renamed to 'truncating' in 0.3.0 "
"because the old implementation only truncated without summarizing. "
"Rename to 'truncating', or set impl= to your own LLM-based summarizer."
)
plugin_cls = get_builtin_plugin_class(kind, ref.type)
if plugin_cls is None:
available = list_builtin_plugins(kind)
guess = near_match(ref.type, available)
if guess:
hint_text = f"Did you mean '{guess}'? Available {kind} plugins: {available}"
else:
hint_text = f"Available {kind} plugins: {available}"
raise PluginLoadError(
f"Unknown {kind} plugin type: '{ref.type}'",
hint=hint_text,
)
_validate_class_methods(plugin_cls, required_methods=required_methods, where=f"{kind} plugin")
return _instantiate(plugin_cls, ref.config)
raise PluginLoadError(
f"{kind} plugin must set one of 'type' or 'impl'",
hint="add 'type: \"<name>\"' for a builtin or 'impl: \"module.path:Class\"' for a custom plugin",
)
def load_plugin(
kind: str,
ref: PluginRef,
*,
required_methods: tuple[str, ...] = (),
) -> Any:
"""Load a child plugin from a PluginRef.
Public entry point used by combinator builtins that compose other
plugins (memory.chain, tool_executor.retry, events.file_logging) and
by external custom combinators.
"""
return _load_plugin_impl(kind, ref, required_methods=required_methods)
def _load_plugin(kind: str, ref: PluginRef, *, required_methods: tuple[str, ...] = ()) -> Any:
"""Deprecated alias for :func:`load_plugin`.
Kept for one release so external combinator plugins keep working.
Emits a DeprecationWarning at call time.
"""
warnings.warn(
"openagents.plugins.loader._load_plugin is deprecated; use openagents.plugins.loader.load_plugin",
DeprecationWarning,
stacklevel=2,
)
return _load_plugin_impl(kind, ref, required_methods=required_methods)
def load_memory_plugin(ref: MemoryRef) -> Any:
return _load_plugin_impl("memory", ref)
def load_pattern_plugin(ref: PatternRef) -> Any:
return _load_plugin_impl("pattern", ref, required_methods=("execute", "react"))
def load_tool_plugin(ref: ToolRef) -> Any:
return _load_plugin_impl("tool", ref, required_methods=("invoke",))
def load_tool_executor_plugin(ref: ToolExecutorRef | None) -> Any | None:
if ref is None:
return None
plugin = _load_plugin_impl("tool_executor", ref, required_methods=("execute", "execute_stream"))
if not callable(getattr(plugin, "execute", None)):
raise PluginLoadError(f"tool executor '{type(plugin).__name__}' must implement 'execute'")
if not callable(getattr(plugin, "execute_stream", None)):
raise PluginLoadError(f"tool executor '{type(plugin).__name__}' must implement 'execute_stream'")
return plugin
def load_context_assembler_plugin(ref: ContextAssemblerRef | None) -> Any | None:
if ref is None:
return None
plugin = _load_plugin_impl("context_assembler", ref, required_methods=("assemble", "finalize"))
if not callable(getattr(plugin, "assemble", None)):
raise PluginLoadError(f"context assembler '{type(plugin).__name__}' must implement 'assemble'")
if not callable(getattr(plugin, "finalize", None)):
raise PluginLoadError(f"context assembler '{type(plugin).__name__}' must implement 'finalize'")
return plugin
def load_agent_plugins(agent: AgentDefinition) -> LoadedAgentPlugins:
memory = load_memory_plugin(agent.memory)
pattern = load_pattern_plugin(agent.pattern)
tool_executor = load_tool_executor_plugin(agent.tool_executor)
context_assembler = load_context_assembler_plugin(agent.context_assembler)
tools: dict[str, Any] = {}
for tool_ref in agent.tools:
if not tool_ref.enabled:
continue
tools[tool_ref.id] = load_tool_plugin(tool_ref)
return LoadedAgentPlugins(
memory=memory,
pattern=pattern,
tool_executor=tool_executor,
context_assembler=context_assembler,
tools=tools,
)
def load_runtime_plugin(ref: RuntimeRef) -> Any:
"""Load a runtime plugin."""
return _load_plugin_impl("runtime", ref, required_methods=("run",))
def load_session_plugin(ref: SessionRef) -> Any:
"""Load a session manager plugin."""
return _load_plugin_impl("session", ref, required_methods=("session",))
def load_events_plugin(ref: EventBusRef) -> Any:
"""Load an event bus plugin."""
return _load_plugin_impl("events", ref, required_methods=("emit", "subscribe"))
def load_skills_plugin(ref: SkillsRef | None) -> SkillsPlugin:
from openagents.config.schema import SkillsRef as DefaultSkillsRef
actual = ref or DefaultSkillsRef(type="local")
plugin = _load_plugin_impl("skills", actual, required_methods=("prepare_session", "load_references", "run_skill"))
for method_name in ("prepare_session", "load_references", "run_skill"):
if not callable(getattr(plugin, method_name, None)):
raise PluginLoadError(f"skills component '{type(plugin).__name__}' must implement '{method_name}'")
return plugin
def load_diagnostics_plugin(ref: DiagnosticsRef | None) -> Any:
"""Load a diagnostics plugin. Returns NullDiagnosticsPlugin when ref is None."""
from openagents.plugins.builtin.diagnostics.null_plugin import NullDiagnosticsPlugin
if ref is None:
return NullDiagnosticsPlugin()
return _load_plugin_impl("diagnostics", ref)
def load_agent_router_plugin(config: Any) -> Any:
"""Create a DefaultAgentRouter from a MultiAgentConfig.
Returns None when the config is absent or disabled.
"""
if config is None or not getattr(config, "enabled", False):
return None
from openagents.plugins.builtin.agent_router.default import DefaultAgentRouter
return DefaultAgentRouter(config=config.model_dump())
def load_runtime_components(
runtime_ref: RuntimeRef,
session_ref: SessionRef,
events_ref: EventBusRef,
skills_ref: SkillsRef | None,
diagnostics_ref: DiagnosticsRef | None = None,
) -> LoadedRuntimeComponents:
"""Load all runtime components from config references.
``AppConfig`` guarantees non-None defaults for runtime/session/events
via pydantic field defaults, so callers always pass real refs for
those three. ``skills_ref`` remains optional because
:func:`load_skills_plugin` owns its own default (``type=local``).
``diagnostics_ref`` is optional; None yields a NullDiagnosticsPlugin.
"""
events = load_events_plugin(events_ref)
session = load_session_plugin(session_ref)
skills = load_skills_plugin(skills_ref)
if hasattr(skills, "_session_manager"):
skills._session_manager = session
diagnostics = load_diagnostics_plugin(diagnostics_ref)
runtime = load_runtime_plugin(runtime_ref)
if hasattr(runtime, "_event_bus"):
runtime._event_bus = events
if hasattr(runtime, "_session_manager"):
runtime._session_manager = session
if hasattr(runtime, "_diagnostics"):
runtime._diagnostics = diagnostics
return LoadedRuntimeComponents(
runtime=runtime,
session=session,
events=events,
skills=skills,
diagnostics=diagnostics,
)