Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions examples/multi_modal/advanced/cost_optimized_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import asyncio
import base64
from pathlib import Path
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional

from multimind.router.multi_modal_router import MultiModalRouter
from multimind.models.advanced import CostOptimizedWrapper
# from multimind.models.advanced import CostOptimizedWrapper
from multimind.types import UnifiedRequest, ModalityInput, ModalityOutput
from multimind.metrics.cost_tracker import CostTracker
from multimind.metrics.performance import PerformanceTracker
Expand Down Expand Up @@ -100,6 +100,8 @@ def _get_cost_optimized_model(self, modality: str) -> str:

# Get available models
models = self.router.get_available_models(modality)
if not models:
return "default"

# Get cost metrics
cost_metrics = self.cost_tracker.get_modality_metrics(modality)
Expand All @@ -122,7 +124,7 @@ def _get_cost_optimized_model(self, modality: str) -> str:
best_score = score
best_model = model

return best_model or models[0]
return best_model or (models[0] if models else "default")


async def main():
Expand Down
Binary file modified multimind/__pycache__/__init__.cpython-313.pyc
Binary file not shown.
28 changes: 2 additions & 26 deletions multimind/api/unified_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,13 @@
"""

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import Dict, List, Any, Optional, Union
from typing import Dict, Any
import asyncio
from ..models.moe import MoEFactory
from ..types import UnifiedRequest, UnifiedResponse

app = FastAPI(title="Unified Multi-Modal API")

class ModalityInput(BaseModel):
"""Input for a specific modality."""
content: Any
modality: str

class UnifiedRequest(BaseModel):
"""Unified request structure for multi-modal processing."""
inputs: List[ModalityInput]
use_moe: bool = Field(default=True, description="Whether to use MoE processing")
constraints: Optional[Dict[str, Any]] = Field(
default=None,
description="Processing constraints (cost, latency, etc.)"
)
workflow: Optional[str] = Field(
default=None,
description="Optional MCP workflow to use"
)

class UnifiedResponse(BaseModel):
"""Unified response structure."""
outputs: Dict[str, Any]
expert_weights: Optional[Dict[str, float]] = None
metrics: Dict[str, Any]

# Initialize components
try:
moe_factory = MoEFactory()
Expand Down
12 changes: 12 additions & 0 deletions multimind/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""
Lightweight metrics utilities used by examples and SDK components.
"""

from .cost_tracker import CostTracker
from .performance import PerformanceTracker

__all__ = [
"CostTracker",
"PerformanceTracker",
]

87 changes: 87 additions & 0 deletions multimind/metrics/cost_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""
Cost tracking helpers.

The SDK has multiple cost/perf tracking implementations; this one is a small,
stable surface used by examples (e.g. multi-modal cost optimization).
"""

from __future__ import annotations

from typing import Any, Dict, DefaultDict
from collections import defaultdict


class CostTracker:
"""
Tracks costs per (modality, model_id).

Expected example surface:
- get_modality_cost(modality, result) -> float
- get_modality_metrics(modality) -> dict[model_id] -> {"avg_cost", "total_cost", "count"}
"""

def __init__(self) -> None:
# stats[modality][model_id] = {"total_cost": float, "count": int}
self._stats: DefaultDict[str, Dict[str, Dict[str, Any]]] = defaultdict(dict)

def _ensure(self, modality: str, model_id: str) -> Dict[str, Any]:
if model_id not in self._stats[modality]:
self._stats[modality][model_id] = {"total_cost": 0.0, "count": 0}
return self._stats[modality][model_id]

def get_modality_cost(self, modality: str, result: Any) -> float:
"""
Extract + record cost for a modality result.

Heuristics:
- If result is a dict, read result["cost"] (default 0.0)
- model id is read from result["model_id"] or result["model"] (default "unknown")
"""

model_id = "unknown"
cost = 0.0

data: Any = result
# Support Pydantic models (v1/v2) and plain objects
if hasattr(result, "model_dump"):
data = result.model_dump()
elif hasattr(result, "dict"):
try:
data = result.dict()
except Exception:
data = result

if isinstance(data, dict):
metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {}
model_id = str(
data.get("model_id")
or data.get("model")
or metadata.get("model_id")
or "unknown"
)
try:
cost = float(data.get("cost") or metadata.get("cost") or 0.0)
except (TypeError, ValueError):
cost = 0.0

stat = self._ensure(modality, model_id)
stat["total_cost"] += cost
stat["count"] += 1

return cost

def get_modality_metrics(self, modality: str) -> Dict[str, Dict[str, Any]]:
"""Return metrics keyed by model id."""

metrics: Dict[str, Dict[str, Any]] = {}
for model_id, stat in self._stats.get(modality, {}).items():
count = int(stat.get("count", 0)) or 0
total_cost = float(stat.get("total_cost", 0.0)) if count else 0.0
avg_cost = total_cost / count if count else 0.0
metrics[model_id] = {
"avg_cost": avg_cost,
"total_cost": total_cost,
"count": count,
}
return metrics

79 changes: 79 additions & 0 deletions multimind/metrics/performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""
Performance tracking helpers.

This module provides a small, stable API used by examples.
"""

from __future__ import annotations

import time
from typing import Any, Dict, DefaultDict
from collections import defaultdict


class PerformanceTracker:
"""
Tracks latency/success per (modality, model_id).

Expected example surface:
- get_current_time() -> float
- track_latency(modality, latency) -> None
- track_error(modality, error) -> None
- get_modality_metrics(modality) -> dict[model_id] -> {"success_rate", "avg_latency"}
"""

def __init__(self) -> None:
# stats[modality][model_id] = {"success": int, "fail": int, "lat_total": float, "lat_count": int}
self._stats: DefaultDict[str, Dict[str, Dict[str, Any]]] = defaultdict(dict)

def _ensure(self, modality: str, model_id: str) -> Dict[str, Any]:
if model_id not in self._stats[modality]:
self._stats[modality][model_id] = {
"success": 0,
"fail": 0,
"lat_total": 0.0,
"lat_count": 0,
}
return self._stats[modality][model_id]

def get_current_time(self) -> float:
return time.time()

def track_latency(self, modality: str, latency: float, model_id: str = "unknown", success: bool = True) -> None:
stat = self._ensure(modality, model_id)
if success:
stat["success"] += 1
else:
stat["fail"] += 1
try:
lat = float(latency)
except (TypeError, ValueError):
lat = 0.0
stat["lat_total"] += lat
stat["lat_count"] += 1

def track_error(self, modality: str, error: str, model_id: str = "unknown") -> None:
stat = self._ensure(modality, model_id)
stat["fail"] += 1

def track_success(self, modality: str, model_id: str = "unknown") -> None:
"""Convenience method for recording a success without latency."""
stat = self._ensure(modality, model_id)
stat["success"] += 1

def get_modality_metrics(self, modality: str) -> Dict[str, Dict[str, Any]]:
metrics: Dict[str, Dict[str, Any]] = {}
for model_id, stat in self._stats.get(modality, {}).items():
success = int(stat.get("success", 0))
fail = int(stat.get("fail", 0))
total = success + fail
lat_count = int(stat.get("lat_count", 0))
lat_total = float(stat.get("lat_total", 0.0))
metrics[model_id] = {
"success_rate": (success / total) if total else 0.0,
"avg_latency": (lat_total / lat_count) if lat_count else 0.0,
"success": success,
"fail": fail,
}
return metrics

60 changes: 60 additions & 0 deletions multimind/router/multi_modal_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .router import ModelRouter
from .strategy import RoutingStrategy
from ..api.mcp.registry import WorkflowRegistry
from ..types import ModalityInput, ModalityOutput

class ModalityType:
"""Supported modality types."""
Expand Down Expand Up @@ -102,6 +103,65 @@ def register_modality_model(
if modality not in self.modality_registry:
self.modality_registry[modality] = {}
self.modality_registry[modality][model_id] = model

def get_available_models(self, modality: str) -> List[str]:
"""Return available model IDs for a modality."""
models = list(self.modality_registry.get(modality, {}).keys())
# Examples expect at least one model to pick from; provide a safe default.
return models or ["default"]

async def process_modality(
self,
input_data: ModalityInput,
model: Optional[str] = None,
**kwargs: Any
) -> ModalityOutput:
"""
Process a single modality input.

This is a compatibility method used by SDK examples. If a model_id is provided
and registered for that modality, we will call the model's `process()` method.
Otherwise we return a lightweight placeholder `ModalityOutput`.
"""

modality = getattr(input_data, "modality", None) or "unknown"
content = getattr(input_data, "content", None)

model_id = model or "default"
model_obj: Optional[BaseLLM] = None
if modality in self.modality_registry and model:
model_obj = self.modality_registry[modality].get(model)

# If we have a real model, call it; otherwise return a placeholder output.
if model_obj is not None and hasattr(model_obj, "process"):
result = await model_obj.process(content, **kwargs)

# Heuristic mapping to ModalityOutput
if isinstance(result, dict):
out_content = result.get("content") or result.get("output") or result.get("text") or result
confidence = float(result.get("confidence") or 0.0) if "confidence" in result else 0.0
metadata = dict(result)
metadata.setdefault("model_id", model_id)
return ModalityOutput(
content=out_content,
modality=modality,
confidence=confidence,
metadata=metadata,
)

return ModalityOutput(
content=result,
modality=modality,
confidence=0.0,
metadata={"model_id": model_id},
)

return ModalityOutput(
content=f"[placeholder] processed {modality}",
modality=modality,
confidence=0.0,
metadata={"model_id": model_id, "note": "No registered model for modality"},
)

async def _analyze_modalities(
self,
Expand Down
61 changes: 61 additions & 0 deletions multimind/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Shared type definitions for MultiMind (single contract module).

All layers should import these models instead of redefining them:
- API (FastAPI)
- Router
- MoE
- Workflows
- CLI / SDK examples
"""

from __future__ import annotations

from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field


class ModalityInput(BaseModel):
"""Input for a specific modality."""

content: Any
modality: str


class ModalityOutput(BaseModel):
"""Output for a specific modality.

Kept flexible (extra fields allowed) to support different backends.
"""

content: Any = None
modality: Optional[str] = None
confidence: float = 0.0
metadata: Dict[str, Any] = Field(default_factory=dict)

model_config = {"extra": "allow"}


class UnifiedRequest(BaseModel):
"""Unified request structure for multi-modal processing."""

inputs: List[ModalityInput]
use_moe: bool = Field(default=True, description="Whether to use MoE processing")
constraints: Optional[Dict[str, Any]] = Field(
default=None,
description="Processing constraints (cost, latency, etc.)",
)
workflow: Optional[str] = Field(
default=None,
description="Optional MCP workflow to use",
)


class UnifiedResponse(BaseModel):
"""Unified response structure."""

outputs: Dict[str, Any]
expert_weights: Optional[Dict[str, float]] = None
metrics: Dict[str, Any] = Field(default_factory=dict)

Loading
Loading