Skip to content

Commit de0868e

Browse files
authored
Merge pull request #76 from multimindlab/cost_optimized_processing_error_solved
Add core types, metrics utilities, and missing method in multimodal r…
2 parents 359686a + 28c9509 commit de0868e

9 files changed

Lines changed: 398 additions & 29 deletions

File tree

examples/multi_modal/advanced/cost_optimized_processing.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
import asyncio
66
import base64
77
from pathlib import Path
8-
from typing import Dict, List, Optional
8+
from typing import Any, Dict, List, Optional
99

1010
from multimind.router.multi_modal_router import MultiModalRouter
11-
from multimind.models.advanced import CostOptimizedWrapper
11+
# from multimind.models.advanced import CostOptimizedWrapper
1212
from multimind.types import UnifiedRequest, ModalityInput, ModalityOutput
1313
from multimind.metrics.cost_tracker import CostTracker
1414
from multimind.metrics.performance import PerformanceTracker
@@ -100,6 +100,8 @@ def _get_cost_optimized_model(self, modality: str) -> str:
100100

101101
# Get available models
102102
models = self.router.get_available_models(modality)
103+
if not models:
104+
return "default"
103105

104106
# Get cost metrics
105107
cost_metrics = self.cost_tracker.get_modality_metrics(modality)
@@ -122,7 +124,7 @@ def _get_cost_optimized_model(self, modality: str) -> str:
122124
best_score = score
123125
best_model = model
124126

125-
return best_model or models[0]
127+
return best_model or (models[0] if models else "default")
126128

127129

128130
async def main():
171 Bytes
Binary file not shown.

multimind/api/unified_api.py

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,13 @@
33
"""
44

55
from fastapi import FastAPI, HTTPException
6-
from pydantic import BaseModel, Field
7-
from typing import Dict, List, Any, Optional, Union
6+
from typing import Dict, Any
87
import asyncio
98
from ..models.moe import MoEFactory
9+
from ..types import UnifiedRequest, UnifiedResponse
1010

1111
app = FastAPI(title="Unified Multi-Modal API")
1212

13-
class ModalityInput(BaseModel):
14-
"""Input for a specific modality."""
15-
content: Any
16-
modality: str
17-
18-
class UnifiedRequest(BaseModel):
19-
"""Unified request structure for multi-modal processing."""
20-
inputs: List[ModalityInput]
21-
use_moe: bool = Field(default=True, description="Whether to use MoE processing")
22-
constraints: Optional[Dict[str, Any]] = Field(
23-
default=None,
24-
description="Processing constraints (cost, latency, etc.)"
25-
)
26-
workflow: Optional[str] = Field(
27-
default=None,
28-
description="Optional MCP workflow to use"
29-
)
30-
31-
class UnifiedResponse(BaseModel):
32-
"""Unified response structure."""
33-
outputs: Dict[str, Any]
34-
expert_weights: Optional[Dict[str, float]] = None
35-
metrics: Dict[str, Any]
36-
3713
# Initialize components
3814
try:
3915
moe_factory = MoEFactory()

multimind/metrics/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
"""
2+
Lightweight metrics utilities used by examples and SDK components.
3+
"""
4+
5+
from .cost_tracker import CostTracker
6+
from .performance import PerformanceTracker
7+
8+
__all__ = [
9+
"CostTracker",
10+
"PerformanceTracker",
11+
]
12+

multimind/metrics/cost_tracker.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"""
2+
Cost tracking helpers.
3+
4+
The SDK has multiple cost/perf tracking implementations; this one is a small,
5+
stable surface used by examples (e.g. multi-modal cost optimization).
6+
"""
7+
8+
from __future__ import annotations
9+
10+
from typing import Any, Dict, DefaultDict
11+
from collections import defaultdict
12+
13+
14+
class CostTracker:
15+
"""
16+
Tracks costs per (modality, model_id).
17+
18+
Expected example surface:
19+
- get_modality_cost(modality, result) -> float
20+
- get_modality_metrics(modality) -> dict[model_id] -> {"avg_cost", "total_cost", "count"}
21+
"""
22+
23+
def __init__(self) -> None:
24+
# stats[modality][model_id] = {"total_cost": float, "count": int}
25+
self._stats: DefaultDict[str, Dict[str, Dict[str, Any]]] = defaultdict(dict)
26+
27+
def _ensure(self, modality: str, model_id: str) -> Dict[str, Any]:
28+
if model_id not in self._stats[modality]:
29+
self._stats[modality][model_id] = {"total_cost": 0.0, "count": 0}
30+
return self._stats[modality][model_id]
31+
32+
def get_modality_cost(self, modality: str, result: Any) -> float:
33+
"""
34+
Extract + record cost for a modality result.
35+
36+
Heuristics:
37+
- If result is a dict, read result["cost"] (default 0.0)
38+
- model id is read from result["model_id"] or result["model"] (default "unknown")
39+
"""
40+
41+
model_id = "unknown"
42+
cost = 0.0
43+
44+
data: Any = result
45+
# Support Pydantic models (v1/v2) and plain objects
46+
if hasattr(result, "model_dump"):
47+
data = result.model_dump()
48+
elif hasattr(result, "dict"):
49+
try:
50+
data = result.dict()
51+
except Exception:
52+
data = result
53+
54+
if isinstance(data, dict):
55+
metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {}
56+
model_id = str(
57+
data.get("model_id")
58+
or data.get("model")
59+
or metadata.get("model_id")
60+
or "unknown"
61+
)
62+
try:
63+
cost = float(data.get("cost") or metadata.get("cost") or 0.0)
64+
except (TypeError, ValueError):
65+
cost = 0.0
66+
67+
stat = self._ensure(modality, model_id)
68+
stat["total_cost"] += cost
69+
stat["count"] += 1
70+
71+
return cost
72+
73+
def get_modality_metrics(self, modality: str) -> Dict[str, Dict[str, Any]]:
74+
"""Return metrics keyed by model id."""
75+
76+
metrics: Dict[str, Dict[str, Any]] = {}
77+
for model_id, stat in self._stats.get(modality, {}).items():
78+
count = int(stat.get("count", 0)) or 0
79+
total_cost = float(stat.get("total_cost", 0.0)) if count else 0.0
80+
avg_cost = total_cost / count if count else 0.0
81+
metrics[model_id] = {
82+
"avg_cost": avg_cost,
83+
"total_cost": total_cost,
84+
"count": count,
85+
}
86+
return metrics
87+

multimind/metrics/performance.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
"""
2+
Performance tracking helpers.
3+
4+
This module provides a small, stable API used by examples.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
import time
10+
from typing import Any, Dict, DefaultDict
11+
from collections import defaultdict
12+
13+
14+
class PerformanceTracker:
15+
"""
16+
Tracks latency/success per (modality, model_id).
17+
18+
Expected example surface:
19+
- get_current_time() -> float
20+
- track_latency(modality, latency) -> None
21+
- track_error(modality, error) -> None
22+
- get_modality_metrics(modality) -> dict[model_id] -> {"success_rate", "avg_latency"}
23+
"""
24+
25+
def __init__(self) -> None:
26+
# stats[modality][model_id] = {"success": int, "fail": int, "lat_total": float, "lat_count": int}
27+
self._stats: DefaultDict[str, Dict[str, Dict[str, Any]]] = defaultdict(dict)
28+
29+
def _ensure(self, modality: str, model_id: str) -> Dict[str, Any]:
30+
if model_id not in self._stats[modality]:
31+
self._stats[modality][model_id] = {
32+
"success": 0,
33+
"fail": 0,
34+
"lat_total": 0.0,
35+
"lat_count": 0,
36+
}
37+
return self._stats[modality][model_id]
38+
39+
def get_current_time(self) -> float:
40+
return time.time()
41+
42+
def track_latency(self, modality: str, latency: float, model_id: str = "unknown", success: bool = True) -> None:
43+
stat = self._ensure(modality, model_id)
44+
if success:
45+
stat["success"] += 1
46+
else:
47+
stat["fail"] += 1
48+
try:
49+
lat = float(latency)
50+
except (TypeError, ValueError):
51+
lat = 0.0
52+
stat["lat_total"] += lat
53+
stat["lat_count"] += 1
54+
55+
def track_error(self, modality: str, error: str, model_id: str = "unknown") -> None:
56+
stat = self._ensure(modality, model_id)
57+
stat["fail"] += 1
58+
59+
def track_success(self, modality: str, model_id: str = "unknown") -> None:
60+
"""Convenience method for recording a success without latency."""
61+
stat = self._ensure(modality, model_id)
62+
stat["success"] += 1
63+
64+
def get_modality_metrics(self, modality: str) -> Dict[str, Dict[str, Any]]:
65+
metrics: Dict[str, Dict[str, Any]] = {}
66+
for model_id, stat in self._stats.get(modality, {}).items():
67+
success = int(stat.get("success", 0))
68+
fail = int(stat.get("fail", 0))
69+
total = success + fail
70+
lat_count = int(stat.get("lat_count", 0))
71+
lat_total = float(stat.get("lat_total", 0.0))
72+
metrics[model_id] = {
73+
"success_rate": (success / total) if total else 0.0,
74+
"avg_latency": (lat_total / lat_count) if lat_count else 0.0,
75+
"success": success,
76+
"fail": fail,
77+
}
78+
return metrics
79+

multimind/router/multi_modal_router.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from .router import ModelRouter
99
from .strategy import RoutingStrategy
1010
from ..api.mcp.registry import WorkflowRegistry
11+
from ..types import ModalityInput, ModalityOutput
1112

1213
class ModalityType:
1314
"""Supported modality types."""
@@ -102,6 +103,65 @@ def register_modality_model(
102103
if modality not in self.modality_registry:
103104
self.modality_registry[modality] = {}
104105
self.modality_registry[modality][model_id] = model
106+
107+
def get_available_models(self, modality: str) -> List[str]:
108+
"""Return available model IDs for a modality."""
109+
models = list(self.modality_registry.get(modality, {}).keys())
110+
# Examples expect at least one model to pick from; provide a safe default.
111+
return models or ["default"]
112+
113+
async def process_modality(
114+
self,
115+
input_data: ModalityInput,
116+
model: Optional[str] = None,
117+
**kwargs: Any
118+
) -> ModalityOutput:
119+
"""
120+
Process a single modality input.
121+
122+
This is a compatibility method used by SDK examples. If a model_id is provided
123+
and registered for that modality, we will call the model's `process()` method.
124+
Otherwise we return a lightweight placeholder `ModalityOutput`.
125+
"""
126+
127+
modality = getattr(input_data, "modality", None) or "unknown"
128+
content = getattr(input_data, "content", None)
129+
130+
model_id = model or "default"
131+
model_obj: Optional[BaseLLM] = None
132+
if modality in self.modality_registry and model:
133+
model_obj = self.modality_registry[modality].get(model)
134+
135+
# If we have a real model, call it; otherwise return a placeholder output.
136+
if model_obj is not None and hasattr(model_obj, "process"):
137+
result = await model_obj.process(content, **kwargs)
138+
139+
# Heuristic mapping to ModalityOutput
140+
if isinstance(result, dict):
141+
out_content = result.get("content") or result.get("output") or result.get("text") or result
142+
confidence = float(result.get("confidence") or 0.0) if "confidence" in result else 0.0
143+
metadata = dict(result)
144+
metadata.setdefault("model_id", model_id)
145+
return ModalityOutput(
146+
content=out_content,
147+
modality=modality,
148+
confidence=confidence,
149+
metadata=metadata,
150+
)
151+
152+
return ModalityOutput(
153+
content=result,
154+
modality=modality,
155+
confidence=0.0,
156+
metadata={"model_id": model_id},
157+
)
158+
159+
return ModalityOutput(
160+
content=f"[placeholder] processed {modality}",
161+
modality=modality,
162+
confidence=0.0,
163+
metadata={"model_id": model_id, "note": "No registered model for modality"},
164+
)
105165

106166
async def _analyze_modalities(
107167
self,

multimind/types.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
"""
2+
Shared type definitions for MultiMind (single contract module).
3+
4+
All layers should import these models instead of redefining them:
5+
- API (FastAPI)
6+
- Router
7+
- MoE
8+
- Workflows
9+
- CLI / SDK examples
10+
"""
11+
12+
from __future__ import annotations
13+
14+
from typing import Any, Dict, List, Optional
15+
16+
from pydantic import BaseModel, Field
17+
18+
19+
class ModalityInput(BaseModel):
20+
"""Input for a specific modality."""
21+
22+
content: Any
23+
modality: str
24+
25+
26+
class ModalityOutput(BaseModel):
27+
"""Output for a specific modality.
28+
29+
Kept flexible (extra fields allowed) to support different backends.
30+
"""
31+
32+
content: Any = None
33+
modality: Optional[str] = None
34+
confidence: float = 0.0
35+
metadata: Dict[str, Any] = Field(default_factory=dict)
36+
37+
model_config = {"extra": "allow"}
38+
39+
40+
class UnifiedRequest(BaseModel):
41+
"""Unified request structure for multi-modal processing."""
42+
43+
inputs: List[ModalityInput]
44+
use_moe: bool = Field(default=True, description="Whether to use MoE processing")
45+
constraints: Optional[Dict[str, Any]] = Field(
46+
default=None,
47+
description="Processing constraints (cost, latency, etc.)",
48+
)
49+
workflow: Optional[str] = Field(
50+
default=None,
51+
description="Optional MCP workflow to use",
52+
)
53+
54+
55+
class UnifiedResponse(BaseModel):
56+
"""Unified response structure."""
57+
58+
outputs: Dict[str, Any]
59+
expert_weights: Optional[Dict[str, float]] = None
60+
metrics: Dict[str, Any] = Field(default_factory=dict)
61+

0 commit comments

Comments
 (0)