Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs-gb/user-guide/deployment/kserve.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,49 @@ it directly through `kubectl`, by running:
```bash
kubectl apply -f my-inferenceservice-manifest.yaml
```

## Advanced Configuration

### Model Repository Availability

When deploying with KServe, models may be mounted from external storage (e.g., S3, PVC, or OCI images).
In some scenarios, there may be a delay between when MLServer starts and when the model repository becomes available.
MLServer provides configuration options to handle such scenarios gracefully:

| Setting | Environment Variable | Default | Description |
|---------|---------------------|---------|-------------|
| `model_repository_retries` | `MLSERVER_MODEL_REPOSITORY_RETRIES` | `10` | Number of retries to wait for model repository to become available |
| `model_repository_wait_interval` | `MLSERVER_MODEL_REPOSITORY_WAIT_INTERVAL` | `1.0` | Wait interval (in seconds) between retries |

These settings can be configured via environment variables in your `InferenceService` manifest:

```yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: my-model
spec:
predictor:
sklearn:
protocolVersion: v2
storageUri: gs://seldon-models/sklearn/iris
env:
- name: MLSERVER_MODEL_REPOSITORY_RETRIES
value: "20"
- name: MLSERVER_MODEL_REPOSITORY_WAIT_INTERVAL
value: "2.0"
```

Or via a `settings.json` file in your model repository:

```json
{
"model_repository_retries": 20,
"model_repository_wait_interval": 2.0
}
```

This is particularly useful when working with:
- **OCI model images** where the model sidecar may take time to mount files
- **Network storage** where connectivity or initialization delays may occur
- **Large models** where the download or extraction process takes time
46 changes: 46 additions & 0 deletions docs/user-guide/deployment/kserve.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,49 @@ it directly through `kubectl`, by running:
```bash
kubectl apply -f my-inferenceservice-manifest.yaml
```

## Advanced Configuration

### Model Repository Availability

When deploying with KServe, models may be mounted from external storage (e.g., S3, PVC, or OCI images).
In some scenarios, there may be a delay between when MLServer starts and when the model repository becomes available.
MLServer provides configuration options to handle such scenarios gracefully:

| Setting | Environment Variable | Default | Description |
|---------|---------------------|---------|-------------|
| `model_repository_retries` | `MLSERVER_MODEL_REPOSITORY_RETRIES` | `10` | Number of retries to wait for model repository to become available |
| `model_repository_wait_interval` | `MLSERVER_MODEL_REPOSITORY_WAIT_INTERVAL` | `1.0` | Wait interval (in seconds) between retries |

These settings can be configured via environment variables in your `InferenceService` manifest:

```yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: my-model
spec:
predictor:
sklearn:
protocolVersion: v2
storageUri: gs://seldon-models/sklearn/iris
env:
- name: MLSERVER_MODEL_REPOSITORY_RETRIES
value: "20"
- name: MLSERVER_MODEL_REPOSITORY_WAIT_INTERVAL
value: "2.0"
```

Or via a `settings.json` file in your model repository:

```json
{
"model_repository_retries": 20,
"model_repository_wait_interval": 2.0
}
```

This is particularly useful when working with:
- **OCI model images** where the model sidecar may take time to mount files
- **Network storage** where connectivity or initialization delays may occur
- **Large models** where the download or extraction process takes time
42 changes: 41 additions & 1 deletion mlserver/model.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
import logging

from typing import Any, Dict, Optional, List, AsyncIterator

from .codecs import (
Expand Down Expand Up @@ -25,6 +28,8 @@
Parameters,
)

logger = logging.getLogger(__name__)


def _generate_metadata_index(
metadata_tensors: Optional[List[MetadataTensor]],
Expand Down Expand Up @@ -52,6 +57,9 @@ def __init__(self, settings: ModelSettings):

self._inputs_index = _generate_metadata_index(self._settings.inputs)
self._outputs_index = _generate_metadata_index(self._settings.outputs)

# Track transient model files for automatic cleanup on unload
self._transient_model_files: List[str] = []

self.ready = False

Expand Down Expand Up @@ -91,6 +99,21 @@ async def predict_stream(
logic.**
"""
yield await self.predict((await payloads.__anext__()))

def register_transient_file(self, file_path: str) -> None:
"""
Register a transient model file for automatic cleanup when the model is unloaded.

Transient files are created when model artifacts need to be copied from
incompatible filesystem mounts (e.g., bind mounts, proc paths) to local
storage for runtime compatibility. These files are automatically removed
when the model is unloaded.

Args:
file_path: Path to the transient model file to track for cleanup
"""
if file_path and file_path not in self._transient_model_files:
self._transient_model_files.append(file_path)

async def unload(self) -> bool:
"""
Expand All @@ -100,10 +123,27 @@ async def unload(self) -> bool:
:doc:`parallel inference </user-guide/parallel-inference>`) is
enabled).
A return value of ``True`` will mean the model is now unloaded.

This base implementation automatically cleans up any transient model files
registered via ``register_transient_file()``.

**This method can be overriden to implement your custom unload
logic.**
logic. If you override this method, call super().unload() to ensure
transient files are cleaned up.**
"""
# Clean up transient model files
for transient_file in self._transient_model_files:
try:
if os.path.exists(transient_file):
os.remove(transient_file)
logger.debug(f"Cleaned up transient model file: {transient_file}")
except Exception as e:
# Log but don't fail unload if cleanup fails
logger.warning(
f"Failed to cleanup transient file {transient_file}: {e}"
)

self._transient_model_files.clear()
return True

@property
Expand Down
21 changes: 17 additions & 4 deletions mlserver/repository/factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .repository import ModelRepository, SchemalessModelRepository
from ..settings import Settings
from pydantic import ImportString
import inspect


class ModelRepositoryFactory:
Expand All @@ -12,9 +13,21 @@ def resolve_model_repository(settings: Settings) -> ModelRepository:
if settings.model_repository_implementation:
model_repository_implementation = settings.model_repository_implementation

result = model_repository_implementation(
root=settings.model_repository_root,
**settings.model_repository_implementation_args,
)
# Check if the repository constructor accepts 'settings' parameter
sig = inspect.signature(model_repository_implementation.__init__)
accepts_settings = 'settings' in sig.parameters

if accepts_settings:
result = model_repository_implementation(
root=settings.model_repository_root,
settings=settings,
**settings.model_repository_implementation_args,
)
else:
# Backward compatibility: don't pass settings if not accepted
result = model_repository_implementation(
root=settings.model_repository_root,
**settings.model_repository_implementation_args,
)

return result
50 changes: 46 additions & 4 deletions mlserver/repository/repository.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import abc
import os
import glob
import time

from pydantic import ValidationError
from typing import List
from typing import List, Optional

from ..settings import ModelParameters, ModelSettings
from ..settings import ModelParameters, ModelSettings, Settings
from ..errors import ModelNotFound
from ..logging import logger

Expand All @@ -30,17 +31,58 @@ class SchemalessModelRepository(ModelRepository):
loaded onto the model registry.
"""

def __init__(self, root: str):
def __init__(self, root: str, settings: Optional[Settings] = None):
self._root = root
self._settings = settings

# Get retry configuration from settings or use defaults
if self._settings:
self._retries = self._settings.model_repository_retries
self._wait_interval = self._settings.model_repository_wait_interval
else:
# Fallback to env vars for backward compatibility
self._retries = int(os.environ.get("MLSERVER_MODEL_REPOSITORY_RETRIES", "10"))
self._wait_interval = float(os.environ.get("MLSERVER_MODEL_REPOSITORY_WAIT_INTERVAL", "1.0"))

def _wait_for_repository(self, path: str) -> str:
"""Wait for model repository path to become available."""
abs_path = os.path.abspath(path)

if os.path.exists(abs_path):
return abs_path

if self._retries <= 0:
return abs_path

logger.info(
f"Waiting up to {self._retries * self._wait_interval}s "
f"for model repository to become available..."
)

for attempt in range(self._retries):
time.sleep(self._wait_interval)

if os.path.exists(abs_path):
logger.info(f"Model repository is now available")
time.sleep(0.5) # Brief delay for files to stabilize
return abs_path

logger.warning(f"Model repository still not available after waiting")
return abs_path

async def list(self) -> List[ModelSettings]:
all_model_settings = []

# TODO: Use an async alternative for filesys ops
if self._root:
abs_root = os.path.abspath(self._root)
# Wait for model repository to become available
abs_root = self._wait_for_repository(self._root)

pattern = os.path.join(abs_root, "**", DEFAULT_MODEL_SETTINGS_FILENAME)
matches = glob.glob(pattern, recursive=True)

if len(matches) == 0:
logger.warning(f"No model-settings.json found in {self._root}")

for model_settings_path in matches:
try:
Expand Down
6 changes: 6 additions & 0 deletions mlserver/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ class Settings(BaseSettings):
model_repository_root: str = "."
"""Root of the model repository, where we will search for models."""

model_repository_retries: int = 10
"""Number of retries to wait for model repository to become available."""

model_repository_wait_interval: float = 1.0
"""Wait interval (in seconds) between retries for model repository."""

# Model Repository parameters are meant to be set directly by the MLServer runtime.
model_repository_implementation_args: dict = {}
"""Extra parameters for model repository."""
Expand Down
55 changes: 55 additions & 0 deletions mlserver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from typing import cast
import warnings
import urllib.parse
import shutil
import tempfile

from asyncio import Task
from typing import Any, Optional, TypeVar
Expand Down Expand Up @@ -223,6 +225,59 @@ async def get_model_uri(
raise InvalidModelURI(settings.name, full_model_path)


def ensure_local_path(file_path: str) -> str:
"""
Ensure a file path is on local filesystem accessible by model runtimes.

Some runtimes may have issues loading from certain filesystem types
(e.g., special mounts, network filesystems). This function returns a
path that is guaranteed to be on local storage.

If the file is already on accessible storage, returns the original path.
Otherwise, copies to temporary storage and returns the temp path.

Note: Caller is responsible for cleanup of temporary files.

Args:
file_path: Original file path

Returns:
Path to file on local filesystem
"""
# Check if file is accessible
if not os.path.exists(file_path):
return file_path

# Check if path contains indicators of special mounts
realpath = os.path.realpath(file_path)
normalized_input = os.path.normpath(file_path)

# Detect problematic filesystem configurations:
# 1. Path contains /proc/ (proc-based mounts)
# 2. Path resolves to a different location (bind mounts, symlinks)
# 3. Broken symlinks
needs_copy = (
'/proc/' in realpath or
realpath != normalized_input or
(os.path.islink(file_path) and not os.path.exists(realpath))
)

if not needs_copy:
return file_path

# Copy to temp location
logger.info(f"Copying model file to local temporary storage for runtime compatibility")

filename = os.path.basename(file_path)
suffix = os.path.splitext(filename)[1]
temp_fd, temp_path = tempfile.mkstemp(suffix=suffix, prefix='mlserver_')
os.close(temp_fd)

shutil.copy2(file_path, temp_path)
logger.debug(f"Model copied from {file_path} to {temp_path}")
return temp_path


def to_absolute_path(model_settings: ModelSettings, uri: str) -> str:
source = model_settings._source
if source is None:
Expand Down
12 changes: 10 additions & 2 deletions runtimes/alibi-detect/mlserver_alibi_detect/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from mlserver.settings import ModelSettings
from mlserver.model import MLModel
from mlserver.codecs import NumpyCodec, NumpyRequestCodec
from mlserver.utils import get_model_uri
from mlserver.utils import get_model_uri, ensure_local_path
from mlserver.errors import MLServerError, InferenceError
from mlserver.logging import logger

Expand Down Expand Up @@ -70,8 +70,16 @@ def __init__(self, settings: ModelSettings):

async def load(self) -> bool:
self._model_uri = await get_model_uri(self._settings)

# Ensure model is on local filesystem for compatibility
local_model_uri = ensure_local_path(self._model_uri)

# Register transient file for automatic cleanup on unload
if local_model_uri != self._model_uri:
self.register_transient_file(local_model_uri)

try:
self._model = load_detector(self._model_uri)
self._model = load_detector(local_model_uri)
mlserver.register("seldon_model_drift", "Drift metrics")

# Check whether an online drift detector (i.e. has a save_state method)
Expand Down
Loading