diff --git a/examples/multi_modal/advanced/cost_optimized_processing.py b/examples/multi_modal/advanced/cost_optimized_processing.py index ae760e35..882e3271 100644 --- a/examples/multi_modal/advanced/cost_optimized_processing.py +++ b/examples/multi_modal/advanced/cost_optimized_processing.py @@ -5,10 +5,10 @@ import asyncio import base64 from pathlib import Path -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from multimind.router.multi_modal_router import MultiModalRouter -from multimind.models.advanced import CostOptimizedWrapper +# from multimind.models.advanced import CostOptimizedWrapper from multimind.types import UnifiedRequest, ModalityInput, ModalityOutput from multimind.metrics.cost_tracker import CostTracker from multimind.metrics.performance import PerformanceTracker @@ -100,6 +100,8 @@ def _get_cost_optimized_model(self, modality: str) -> str: # Get available models models = self.router.get_available_models(modality) + if not models: + return "default" # Get cost metrics cost_metrics = self.cost_tracker.get_modality_metrics(modality) @@ -122,7 +124,7 @@ def _get_cost_optimized_model(self, modality: str) -> str: best_score = score best_model = model - return best_model or models[0] + return best_model or (models[0] if models else "default") async def main(): diff --git a/multimind/__pycache__/__init__.cpython-313.pyc b/multimind/__pycache__/__init__.cpython-313.pyc index ee0afe1c..1e0cdcd0 100644 Binary files a/multimind/__pycache__/__init__.cpython-313.pyc and b/multimind/__pycache__/__init__.cpython-313.pyc differ diff --git a/multimind/api/unified_api.py b/multimind/api/unified_api.py index 6c6a1a55..dab2d180 100644 --- a/multimind/api/unified_api.py +++ b/multimind/api/unified_api.py @@ -3,37 +3,13 @@ """ from fastapi import FastAPI, HTTPException -from pydantic import BaseModel, Field -from typing import Dict, List, Any, Optional, Union +from typing import Dict, Any import asyncio from ..models.moe import MoEFactory +from ..types import UnifiedRequest, UnifiedResponse app = FastAPI(title="Unified Multi-Modal API") -class ModalityInput(BaseModel): - """Input for a specific modality.""" - content: Any - modality: str - -class UnifiedRequest(BaseModel): - """Unified request structure for multi-modal processing.""" - inputs: List[ModalityInput] - use_moe: bool = Field(default=True, description="Whether to use MoE processing") - constraints: Optional[Dict[str, Any]] = Field( - default=None, - description="Processing constraints (cost, latency, etc.)" - ) - workflow: Optional[str] = Field( - default=None, - description="Optional MCP workflow to use" - ) - -class UnifiedResponse(BaseModel): - """Unified response structure.""" - outputs: Dict[str, Any] - expert_weights: Optional[Dict[str, float]] = None - metrics: Dict[str, Any] - # Initialize components try: moe_factory = MoEFactory() diff --git a/multimind/metrics/__init__.py b/multimind/metrics/__init__.py new file mode 100644 index 00000000..de01c9fd --- /dev/null +++ b/multimind/metrics/__init__.py @@ -0,0 +1,12 @@ +""" +Lightweight metrics utilities used by examples and SDK components. +""" + +from .cost_tracker import CostTracker +from .performance import PerformanceTracker + +__all__ = [ + "CostTracker", + "PerformanceTracker", +] + diff --git a/multimind/metrics/cost_tracker.py b/multimind/metrics/cost_tracker.py new file mode 100644 index 00000000..c0c5b9dd --- /dev/null +++ b/multimind/metrics/cost_tracker.py @@ -0,0 +1,87 @@ +""" +Cost tracking helpers. + +The SDK has multiple cost/perf tracking implementations; this one is a small, +stable surface used by examples (e.g. multi-modal cost optimization). +""" + +from __future__ import annotations + +from typing import Any, Dict, DefaultDict +from collections import defaultdict + + +class CostTracker: + """ + Tracks costs per (modality, model_id). + + Expected example surface: + - get_modality_cost(modality, result) -> float + - get_modality_metrics(modality) -> dict[model_id] -> {"avg_cost", "total_cost", "count"} + """ + + def __init__(self) -> None: + # stats[modality][model_id] = {"total_cost": float, "count": int} + self._stats: DefaultDict[str, Dict[str, Dict[str, Any]]] = defaultdict(dict) + + def _ensure(self, modality: str, model_id: str) -> Dict[str, Any]: + if model_id not in self._stats[modality]: + self._stats[modality][model_id] = {"total_cost": 0.0, "count": 0} + return self._stats[modality][model_id] + + def get_modality_cost(self, modality: str, result: Any) -> float: + """ + Extract + record cost for a modality result. + + Heuristics: + - If result is a dict, read result["cost"] (default 0.0) + - model id is read from result["model_id"] or result["model"] (default "unknown") + """ + + model_id = "unknown" + cost = 0.0 + + data: Any = result + # Support Pydantic models (v1/v2) and plain objects + if hasattr(result, "model_dump"): + data = result.model_dump() + elif hasattr(result, "dict"): + try: + data = result.dict() + except Exception: + data = result + + if isinstance(data, dict): + metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} + model_id = str( + data.get("model_id") + or data.get("model") + or metadata.get("model_id") + or "unknown" + ) + try: + cost = float(data.get("cost") or metadata.get("cost") or 0.0) + except (TypeError, ValueError): + cost = 0.0 + + stat = self._ensure(modality, model_id) + stat["total_cost"] += cost + stat["count"] += 1 + + return cost + + def get_modality_metrics(self, modality: str) -> Dict[str, Dict[str, Any]]: + """Return metrics keyed by model id.""" + + metrics: Dict[str, Dict[str, Any]] = {} + for model_id, stat in self._stats.get(modality, {}).items(): + count = int(stat.get("count", 0)) or 0 + total_cost = float(stat.get("total_cost", 0.0)) if count else 0.0 + avg_cost = total_cost / count if count else 0.0 + metrics[model_id] = { + "avg_cost": avg_cost, + "total_cost": total_cost, + "count": count, + } + return metrics + diff --git a/multimind/metrics/performance.py b/multimind/metrics/performance.py new file mode 100644 index 00000000..46700f5b --- /dev/null +++ b/multimind/metrics/performance.py @@ -0,0 +1,79 @@ +""" +Performance tracking helpers. + +This module provides a small, stable API used by examples. +""" + +from __future__ import annotations + +import time +from typing import Any, Dict, DefaultDict +from collections import defaultdict + + +class PerformanceTracker: + """ + Tracks latency/success per (modality, model_id). + + Expected example surface: + - get_current_time() -> float + - track_latency(modality, latency) -> None + - track_error(modality, error) -> None + - get_modality_metrics(modality) -> dict[model_id] -> {"success_rate", "avg_latency"} + """ + + def __init__(self) -> None: + # stats[modality][model_id] = {"success": int, "fail": int, "lat_total": float, "lat_count": int} + self._stats: DefaultDict[str, Dict[str, Dict[str, Any]]] = defaultdict(dict) + + def _ensure(self, modality: str, model_id: str) -> Dict[str, Any]: + if model_id not in self._stats[modality]: + self._stats[modality][model_id] = { + "success": 0, + "fail": 0, + "lat_total": 0.0, + "lat_count": 0, + } + return self._stats[modality][model_id] + + def get_current_time(self) -> float: + return time.time() + + def track_latency(self, modality: str, latency: float, model_id: str = "unknown", success: bool = True) -> None: + stat = self._ensure(modality, model_id) + if success: + stat["success"] += 1 + else: + stat["fail"] += 1 + try: + lat = float(latency) + except (TypeError, ValueError): + lat = 0.0 + stat["lat_total"] += lat + stat["lat_count"] += 1 + + def track_error(self, modality: str, error: str, model_id: str = "unknown") -> None: + stat = self._ensure(modality, model_id) + stat["fail"] += 1 + + def track_success(self, modality: str, model_id: str = "unknown") -> None: + """Convenience method for recording a success without latency.""" + stat = self._ensure(modality, model_id) + stat["success"] += 1 + + def get_modality_metrics(self, modality: str) -> Dict[str, Dict[str, Any]]: + metrics: Dict[str, Dict[str, Any]] = {} + for model_id, stat in self._stats.get(modality, {}).items(): + success = int(stat.get("success", 0)) + fail = int(stat.get("fail", 0)) + total = success + fail + lat_count = int(stat.get("lat_count", 0)) + lat_total = float(stat.get("lat_total", 0.0)) + metrics[model_id] = { + "success_rate": (success / total) if total else 0.0, + "avg_latency": (lat_total / lat_count) if lat_count else 0.0, + "success": success, + "fail": fail, + } + return metrics + diff --git a/multimind/router/multi_modal_router.py b/multimind/router/multi_modal_router.py index 5dabbdef..fed3e0b9 100644 --- a/multimind/router/multi_modal_router.py +++ b/multimind/router/multi_modal_router.py @@ -8,6 +8,7 @@ from .router import ModelRouter from .strategy import RoutingStrategy from ..api.mcp.registry import WorkflowRegistry +from ..types import ModalityInput, ModalityOutput class ModalityType: """Supported modality types.""" @@ -102,6 +103,65 @@ def register_modality_model( if modality not in self.modality_registry: self.modality_registry[modality] = {} self.modality_registry[modality][model_id] = model + + def get_available_models(self, modality: str) -> List[str]: + """Return available model IDs for a modality.""" + models = list(self.modality_registry.get(modality, {}).keys()) + # Examples expect at least one model to pick from; provide a safe default. + return models or ["default"] + + async def process_modality( + self, + input_data: ModalityInput, + model: Optional[str] = None, + **kwargs: Any + ) -> ModalityOutput: + """ + Process a single modality input. + + This is a compatibility method used by SDK examples. If a model_id is provided + and registered for that modality, we will call the model's `process()` method. + Otherwise we return a lightweight placeholder `ModalityOutput`. + """ + + modality = getattr(input_data, "modality", None) or "unknown" + content = getattr(input_data, "content", None) + + model_id = model or "default" + model_obj: Optional[BaseLLM] = None + if modality in self.modality_registry and model: + model_obj = self.modality_registry[modality].get(model) + + # If we have a real model, call it; otherwise return a placeholder output. + if model_obj is not None and hasattr(model_obj, "process"): + result = await model_obj.process(content, **kwargs) + + # Heuristic mapping to ModalityOutput + if isinstance(result, dict): + out_content = result.get("content") or result.get("output") or result.get("text") or result + confidence = float(result.get("confidence") or 0.0) if "confidence" in result else 0.0 + metadata = dict(result) + metadata.setdefault("model_id", model_id) + return ModalityOutput( + content=out_content, + modality=modality, + confidence=confidence, + metadata=metadata, + ) + + return ModalityOutput( + content=result, + modality=modality, + confidence=0.0, + metadata={"model_id": model_id}, + ) + + return ModalityOutput( + content=f"[placeholder] processed {modality}", + modality=modality, + confidence=0.0, + metadata={"model_id": model_id, "note": "No registered model for modality"}, + ) async def _analyze_modalities( self, diff --git a/multimind/types.py b/multimind/types.py new file mode 100644 index 00000000..83c9ccbe --- /dev/null +++ b/multimind/types.py @@ -0,0 +1,61 @@ +""" +Shared type definitions for MultiMind (single contract module). + +All layers should import these models instead of redefining them: +- API (FastAPI) +- Router +- MoE +- Workflows +- CLI / SDK examples +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + + +class ModalityInput(BaseModel): + """Input for a specific modality.""" + + content: Any + modality: str + + +class ModalityOutput(BaseModel): + """Output for a specific modality. + + Kept flexible (extra fields allowed) to support different backends. + """ + + content: Any = None + modality: Optional[str] = None + confidence: float = 0.0 + metadata: Dict[str, Any] = Field(default_factory=dict) + + model_config = {"extra": "allow"} + + +class UnifiedRequest(BaseModel): + """Unified request structure for multi-modal processing.""" + + inputs: List[ModalityInput] + use_moe: bool = Field(default=True, description="Whether to use MoE processing") + constraints: Optional[Dict[str, Any]] = Field( + default=None, + description="Processing constraints (cost, latency, etc.)", + ) + workflow: Optional[str] = Field( + default=None, + description="Optional MCP workflow to use", + ) + + +class UnifiedResponse(BaseModel): + """Unified response structure.""" + + outputs: Dict[str, Any] + expert_weights: Optional[Dict[str, float]] = None + metrics: Dict[str, Any] = Field(default_factory=dict) + diff --git a/tests/examples/multi_modal/test_cost_optimized_processing1.py b/tests/examples/multi_modal/test_cost_optimized_processing1.py new file mode 100644 index 00000000..84ab53f1 --- /dev/null +++ b/tests/examples/multi_modal/test_cost_optimized_processing1.py @@ -0,0 +1,92 @@ +""" +Lightweight tests for the cost-optimized multi-modal processing example. + +These tests are intentionally offline-safe: +- they do not require API keys +- they do not make network calls +""" + +import sys +from pathlib import Path +import pytest + +from multimind.router.multi_modal_router import MultiModalRouter +from multimind.types import UnifiedRequest, ModalityInput +from multimind.metrics.cost_tracker import CostTracker +from multimind.metrics.performance import PerformanceTracker + +# Import the example class under test +REPO_ROOT = Path(__file__).resolve().parents[3] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) +from examples.multi_modal.advanced.cost_optimized_processing import CostOptimizedMultiModalProcessor + + +@pytest.mark.asyncio +async def test_placeholder_flow_cost_is_zero(): + """ + With no models registered, the MultiModalRouter returns placeholder outputs. + CostTracker should therefore record 0 cost. + """ + router = MultiModalRouter() + cost_tracker = CostTracker() + performance_tracker = PerformanceTracker() + + processor = CostOptimizedMultiModalProcessor( + router=router, + cost_tracker=cost_tracker, + performance_tracker=performance_tracker, + budget=1.0, + ) + + request = UnifiedRequest( + inputs=[ + ModalityInput(modality="text", content="hello"), + ] + ) + + result = await processor.process_request(request, optimize_cost=True) + assert result["cost"] == 0.0 + assert "text" in result["results"] + +@pytest.mark.asyncio +async def test_placeholder_flow_image_audio_text_cost_is_zero(): + """ + With no models registered, image/audio/text modalities all use placeholder outputs. + This should still run end-to-end without network calls and record 0 cost. + """ + router = MultiModalRouter() + cost_tracker = CostTracker() + performance_tracker = PerformanceTracker() + + processor = CostOptimizedMultiModalProcessor( + router=router, + cost_tracker=cost_tracker, + performance_tracker=performance_tracker, + budget=1.0, + ) + + request = UnifiedRequest( + inputs=[ + ModalityInput(modality="image", content="fake_image_base64"), + ModalityInput(modality="audio", content="fake_audio_base64"), + ModalityInput(modality="text", content="hello"), + ] + ) + + result = await processor.process_request(request, optimize_cost=True) + assert result["cost"] == 0.0 + assert set(result["results"].keys()) == {"image", "audio", "text"} + + +@pytest.mark.asyncio +async def test_get_available_models_default_is_present(): + """ + The router should always return at least one model id so callers + don't crash with an empty list. + """ + router = MultiModalRouter() + models = router.get_available_models("text") + assert isinstance(models, list) + assert len(models) >= 1 +