-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcore.py
More file actions
183 lines (158 loc) · 6.43 KB
/
core.py
File metadata and controls
183 lines (158 loc) · 6.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
"""Shared core — tool schemas, expert definitions, and LLM client."""
import inspect
import json
import typing
from json import JSONDecodeError
import requests
from endpoint_policy import validate_endpoint_url
from experts.math.tools.algebra import math_tool, OPERATIONS as MATH_OPS, DOMAINS
from experts.math.tools.calculus import calculus_tool, OPERATIONS as CALC_OPS
from experts.math.tools.matrix import matrix_tool, OPERATIONS as MAT_OPS
from experts.math.tools.statistics import statistics_tool, OPERATIONS as STAT_OPS
from experts.code.tools.code import code_tool, OPERATIONS as CODE_OPS
DEFAULT_MODEL = "qwen3:32b"
DEFAULT_URL = "http://localhost:11434/api/chat"
MAX_ROUNDS = 5
_PY_TO_JSON = {str: "string", int: "integer", float: "number", bool: "boolean"}
def _json_type(annotation):
"""Map a Python type annotation to a JSON Schema type."""
origin = typing.get_origin(annotation)
if origin is list:
args = typing.get_args(annotation)
items = {"type": _json_type(args[0])} if args else {}
return "array", items
return _PY_TO_JSON.get(annotation, "string"), None
TOOL_ENUMS = {
"math_tool": {"operation": sorted(MATH_OPS), "domain": sorted(DOMAINS.keys())},
"calculus_tool": {"operation": sorted(CALC_OPS)},
"matrix_tool": {"operation": sorted(MAT_OPS)},
"statistics_tool": {"operation": sorted(STAT_OPS)},
"code_tool": {"operation": sorted(CODE_OPS)},
}
def build_schema(fn, enums=None):
"""Build an OpenAI-format tool schema from a function's signature + docstring."""
sig = inspect.signature(fn)
props = {}
required = []
for name, param in sig.parameters.items():
ann = param.annotation if param.annotation != inspect.Parameter.empty else str
json_type, items = _json_type(ann)
prop = {"type": json_type}
if items:
prop["items"] = items
if enums and name in enums:
prop["enum"] = enums[name]
if param.default != inspect.Parameter.empty:
prop["default"] = param.default
else:
required.append(name)
props[name] = prop
doc = inspect.getdoc(fn) or fn.__name__
desc = doc.split("\n\n", 1)[1].strip() if "\n\n" in doc else doc.split("\n")[0]
return {
"type": "function",
"function": {
"name": fn.__name__,
"description": desc,
"parameters": {
"type": "object",
"properties": props,
"required": required,
},
},
}
_MATH_TOOLS = [math_tool, calculus_tool, matrix_tool, statistics_tool]
_CODE_TOOLS = [code_tool]
EXPERTS = {
"Mathematician": {
"system": (
"You are a highly capable mathematician. "
"You have access to specialized function tools. You MUST use the proper function calling API format to invoke them when you use tools. Be concise, no redundancy.\n"
"**TRUST TOOL RESULTS BLINDLY. DO NOT RECOMPUTE OR QUESTION THEM.**"
),
"tools": [build_schema(fn, TOOL_ENUMS.get(fn.__name__)) for fn in _MATH_TOOLS],
"dispatch": {fn.__name__: fn for fn in _MATH_TOOLS},
},
"Coder": {
"system": (
"You are a precise coding assistant. "
"You have access to code_tool — a sandboxed Python execution environment. "
"ALWAYS use code_tool — NEVER guess output or run code in your head.\n"
"Use code_tool with operation='check' to verify syntax before running.\n"
"Use code_tool with operation='run' to execute and get actual output.\n"
"Use code_tool with operation='ast_inspect' to analyze code structure.\n"
"**TRUST TOOL RESULTS BLINDLY. DO NOT RECOMPUTE OR QUESTION THEM.**"
),
"tools": [build_schema(fn, TOOL_ENUMS.get(fn.__name__)) for fn in _CODE_TOOLS],
"dispatch": {fn.__name__: fn for fn in _CODE_TOOLS},
},
}
def llm_request(messages, tools, model, url, stream=False, think=True):
"""Send a chat request to Ollama. Returns parsed JSON or raw response for streaming."""
try:
url = validate_endpoint_url(url)
except ValueError as e:
raise ConnectionError(str(e))
payload = {"model": model, "messages": messages, "stream": stream, "think": think}
if tools:
payload["tools"] = tools
try:
resp = requests.post(url, json=payload, stream=stream, timeout=600)
resp.raise_for_status()
if not stream:
return resp.json()
return resp
except requests.exceptions.HTTPError as e:
body = e.response.text[:500]
raise ConnectionError(f"HTTP {e.response.status_code} from {url}\n{body}")
except requests.exceptions.ConnectionError as e:
raise ConnectionError(
f"Cannot connect to {url}. Is your model server running?\nDetail: {e}"
)
except requests.exceptions.Timeout:
raise ConnectionError(f"Request to {url} timed out (600s).")
def iter_stream(resp):
"""Yield (token, thinking_token, tool_calls, done, msg) from a streaming Ollama response."""
try:
for raw_line in resp.iter_lines():
if not raw_line:
continue
line = raw_line.decode("utf-8", errors="replace").strip()
try:
chunk = json.loads(line)
except JSONDecodeError:
continue
msg = chunk.get("message", {})
yield (
msg.get("content", ""),
msg.get("thinking", ""),
msg.get("tool_calls", []),
chunk.get("done", False),
msg,
)
if chunk.get("done"):
break
finally:
resp.close()
def stream_to_msg(resp):
"""Consume a streaming Ollama response and return the final message dict."""
content = ""
tool_calls = []
try:
for raw_line in resp.iter_lines():
if not raw_line:
continue
line = raw_line.decode("utf-8", errors="replace").strip()
try:
chunk = json.loads(line)
except JSONDecodeError:
continue
msg = chunk.get("message", {})
content += msg.get("content", "")
if msg.get("tool_calls"):
tool_calls.extend(msg["tool_calls"])
if chunk.get("done"):
break
finally:
resp.close()
return {"role": "assistant", "content": content, "tool_calls": tool_calls}