diff --git a/docs/Privacy.md b/docs/Privacy.md index 95aee00b0b..17127ba993 100644 --- a/docs/Privacy.md +++ b/docs/Privacy.md @@ -13,4 +13,4 @@ In addition, Olive may collect additional telemetry data such as: - Performance data - Exception information -Collection of this additional telemetry can be disabled by adding the `--disable_telemetry` flag to any Olive CLI command, or by setting the `OLIVE_DISABLE_TELEMETRY` environment variable to `1` before running. Telemetry is also automatically disabled when a CI/CD environment is detected (e.g., GitHub Actions, Azure Pipelines, Jenkins). If telemetry is enabled, but cannot be sent to Microsoft, it will be stored locally and sent when a connection is available. You can override the default cache location by setting the `OLIVE_TELEMETRY_CACHE_DIR` environment variable to a valid directory path. +Collection of this additional telemetry can be disabled by adding the `--disable_telemetry` flag to any Olive CLI command, or by setting the `OLIVE_DISABLE_TELEMETRY` environment variable to `1` before running. In CI/CD environments (e.g., GitHub Actions, Azure Pipelines, Jenkins), Olive suppresses the general heartbeat/action/error events and only emits the `OliveRecipe` event. The `OliveRecipe` event may include recipe metadata such as pass types, explicitly configured target settings, the host system type and any explicitly configured host accelerator settings, whether a custom package config was provided, a redacted snapshot of custom package-config overrides, and a redacted snapshot of explicitly supplied config overrides. Outside CI/CD environments, if telemetry is enabled but cannot be sent to Microsoft, it will be stored locally and sent when a connection is available. You can override the default cache location by setting the `OLIVE_TELEMETRY_CACHE_DIR` environment variable to a valid directory path. diff --git a/olive/cli/base.py b/olive/cli/base.py index 75fd2816c7..26bd042107 100644 --- a/olive/cli/base.py +++ b/olive/cli/base.py @@ -42,13 +42,24 @@ def _run_workflow(self): if self.args.dry_run: print("Dry run mode enabled. Configuration file is generated but no optimization is performed.") return None - workflow_output = olive_run(run_config) + workflow_output = olive_run(run_config, recipe_telemetry_metadata=self._get_recipe_telemetry_metadata()) if not workflow_output.has_output_model(): print("No output model produced. Please check the log for details.") else: print(f"Model is saved at {self.args.output_path}") return workflow_output + def _get_recipe_telemetry_metadata(self) -> dict[str, str]: + recipe_name = self.__class__.__name__ + if recipe_name.endswith("Command"): + recipe_name = recipe_name[: -len("Command")] + return { + "recipe_name": recipe_name, + "recipe_command": recipe_name, + "recipe_source": "generated_cli", + "recipe_format": "generated", + } + @staticmethod def _parse_extra_options(kv_items): from onnxruntime_genai import __version__ as OrtGenaiVersion diff --git a/olive/cli/run.py b/olive/cli/run.py index 6d2a831aef..4b0360fedf 100644 --- a/olive/cli/run.py +++ b/olive/cli/run.py @@ -49,11 +49,14 @@ def register_subcommand(parser: ArgumentParser): @action def run(self): + from pathlib import Path + from olive.common.config_utils import load_config_file from olive.workflows import run as olive_run # allow the run_config to be a dict already (for api use) - run_config = self.args.run_config + run_config_input = self.args.run_config + run_config = run_config_input if not isinstance(run_config, dict): run_config = load_config_file(run_config) if input_model_config := get_input_model_config(self.args, required=False): @@ -73,6 +76,15 @@ def run(self): list_required_packages=self.args.list_required_packages, tempdir=self.args.tempdir, package_config=self.args.package_config, + recipe_telemetry_metadata={ + "recipe_command": "WorkflowRun", + "recipe_source": "config_dict" if isinstance(run_config_input, dict) else "config_file", + "recipe_format": "dict" + if isinstance(run_config_input, dict) + else Path(run_config_input).suffix.lstrip(".").lower() or "unknown", + "execution_mode": "list_required_packages" if self.args.list_required_packages else "run", + "package_config_provided": bool(self.args.package_config), + }, ) if self.args.list_required_packages is True: diff --git a/olive/systems/docker/docker_system.py b/olive/systems/docker/docker_system.py index 8371cccd44..2a479ec690 100644 --- a/olive/systems/docker/docker_system.py +++ b/olive/systems/docker/docker_system.py @@ -232,6 +232,8 @@ def _prepare_run_params(self) -> dict: def _prepare_environment(self, base_env) -> dict: """Prepare environment variables for container.""" + from olive.telemetry.telemetry import is_ci_environment + # Convert list to dict if needed if isinstance(base_env, list): environment = {env.split("=")[0]: env.split("=")[1] for env in base_env} @@ -241,6 +243,8 @@ def _prepare_environment(self, base_env) -> dict: # Add default environment variables environment.setdefault("PYTHONPYCACHEPREFIX", "/tmp") environment["OLIVE_LOG_LEVEL"] = logging.getLevelName(logger.getEffectiveLevel()) + if is_ci_environment(): + environment["CI"] = "1" # Add HuggingFace token if needed if self.hf_token: diff --git a/olive/systems/docker/workflow_runner.py b/olive/systems/docker/workflow_runner.py index 5842d0bd49..be0d59d671 100644 --- a/olive/systems/docker/workflow_runner.py +++ b/olive/systems/docker/workflow_runner.py @@ -20,7 +20,7 @@ def runner_entry(config): config = json.load(f) logger.info("Running workflow with config: %s", config) - olive_run(config) + olive_run(config, emit_recipe_telemetry=False) if __name__ == "__main__": diff --git a/olive/telemetry/constants.py b/olive/telemetry/constants.py index ca9e150b1b..25a60e813e 100644 --- a/olive/telemetry/constants.py +++ b/olive/telemetry/constants.py @@ -3,6 +3,6 @@ # Licensed under the MIT License. # -------------------------------------------------------------------------- -"""OneCollector connection string.""" +"""Telemetry constants.""" -CONNECTION_STRING = "SW5zdHJ1bWVudGF0aW9uS2V5PTlkNWRkYWVjNjFlMjQ1NjdiNzg4YTIwYWVhMzI0NjMxLTcyMzdkN2M2LWVlNjEtNGNmZC1iYjdiLTU5MDNhOTcyYzJlNC03MDQ3" +CONNECTION_STRING = "SW5zdHJ1bWVudGF0aW9uS2V5PTYyMTUwOTExZGMwMDRmYzliYjY3YmE5NjA2NDI3ZTU2LWVjNjFmOWFmLTVkN2EtNGQxOS1hZjMxLWI5Y2Q2OWU5ODdmMS02OTE1" diff --git a/olive/telemetry/library/options.py b/olive/telemetry/library/options.py index dd934cad2d..31fd1ba195 100644 --- a/olive/telemetry/library/options.py +++ b/olive/telemetry/library/options.py @@ -62,6 +62,7 @@ class OneCollectorExporterOptions: """Configuration options for OneCollector exporter.""" connection_string: Optional[str] = None + service_name: Optional[str] = None transport_options: OneCollectorTransportOptions = field(default_factory=OneCollectorTransportOptions) # Internal fields populated during validation diff --git a/olive/telemetry/library/telemetry_logger.py b/olive/telemetry/library/telemetry_logger.py index 7eb236e759..626e1da872 100644 --- a/olive/telemetry/library/telemetry_logger.py +++ b/olive/telemetry/library/telemetry_logger.py @@ -6,6 +6,7 @@ """High-level telemetry logger facade for easy usage.""" import logging +import threading import uuid from typing import Any, Callable, Optional @@ -28,6 +29,7 @@ class TelemetryLogger: _instance: Optional["TelemetryLogger"] = None _default_logger: Optional["TelemetryLogger"] = None + _singleton_lock = threading.RLock() _logger: Optional[logging.Logger] = None _logger_exporter: Optional[OneCollectorLogExporter] = None _logger_provider: Optional[LoggerProvider] = None @@ -39,9 +41,10 @@ def __new__(cls, options: Optional[OneCollectorExporterOptions] = None): options: Exporter options (only used on first instantiation) """ - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance._initialize(options) + with cls._singleton_lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialize(options) return cls._instance @@ -57,10 +60,13 @@ def _initialize(self, options: Optional[OneCollectorExporterOptions]) -> None: self._logger_exporter = OneCollectorLogExporter(options=options) # Create logger provider + service_name = ( + options.service_name if options and options.service_name else __name__.split(".", maxsplit=1)[0] + ) self._logger_provider = LoggerProvider( resource=Resource.create( { - "service.name": __name__.split(".", maxsplit=1)[0], + "service.name": service_name, "service.version": VERSION, "service.instance.id": str(uuid.uuid4()), # Unique instance ID; can double as session ID } @@ -141,21 +147,27 @@ def shutdown(self) -> None: self._logger_provider.shutdown() @classmethod - def get_default_logger(cls, connection_string: Optional[str] = None) -> "TelemetryLogger": + def get_default_logger( + cls, connection_string: Optional[str] = None, service_name: Optional[str] = None + ) -> "TelemetryLogger": """Get or create the default telemetry logger. Args: connection_string: OneCollector connection string (only used on first call) + service_name: Logical application/service name for emitted telemetry (only used on first call) Returns: TelemetryLogger instance """ - if cls._default_logger is None: - options = None - if connection_string: - options = OneCollectorExporterOptions(connection_string=connection_string) - cls._default_logger = cls(options=options) + with cls._singleton_lock: + if cls._default_logger is None: + options = None + if connection_string: + options = OneCollectorExporterOptions( + connection_string=connection_string, service_name=service_name + ) + cls._default_logger = cls(options=options) return cls._default_logger @@ -167,17 +179,20 @@ def shutdown_default_logger(cls) -> None: cls._default_logger = None -def get_telemetry_logger(connection_string: Optional[str] = None) -> TelemetryLogger: +def get_telemetry_logger( + connection_string: Optional[str] = None, service_name: Optional[str] = None +) -> TelemetryLogger: """Get or create the default telemetry logger. Args: connection_string: OneCollector connection string (only used on first call) + service_name: Logical application/service name for emitted telemetry (only used on first call) Returns: TelemetryLogger instance """ - return TelemetryLogger.get_default_logger(connection_string=connection_string) + return TelemetryLogger.get_default_logger(connection_string=connection_string, service_name=service_name) def log_event(event_name: str, attributes: Optional[dict[str, Any]] = None) -> None: diff --git a/olive/telemetry/telemetry.py b/olive/telemetry/telemetry.py index 0ddb690e2a..a37b29bf5e 100644 --- a/olive/telemetry/telemetry.py +++ b/olive/telemetry/telemetry.py @@ -19,8 +19,6 @@ from olive.telemetry.library.event_source import event_source from olive.telemetry.library.telemetry_logger import TelemetryLogger, get_telemetry_logger from olive.telemetry.utils import ( - _decode_cache_line, - _encode_cache_line, _exclusive_file_lock, get_telemetry_base_dir, ) @@ -30,6 +28,7 @@ # Default event names used by the high-level telemetry helpers. HEARTBEAT_EVENT_NAME = "OliveHeartbeat" +RECIPE_EVENT_NAME = "OliveRecipe" # CI/CD environment variables whose presence indicates an automated pipeline. _CI_ENV_VARS = ( @@ -43,6 +42,7 @@ ) ACTION_EVENT_NAME = "OliveAction" ERROR_EVENT_NAME = "OliveError" +APP_NAME = "Olive" ALLOWED_KEYS = { HEARTBEAT_EVENT_NAME: { @@ -72,6 +72,38 @@ "app_instance_id", "initTs", }, + RECIPE_EVENT_NAME: { + "recipe_name", + "recipe_hash", + "recipe_source", + "recipe_format", + "recipe_command", + "execution_mode", + "workflow_id", + "config_overrides", + "success", + "input_model_type", + "input_model_source", + "model_task", + "target_system_type", + "target_device", + "target_execution_provider", + "target_execution_providers", + "host_system_type", + "host_device", + "host_execution_provider", + "host_execution_providers", + "pass_types", + "pass_count", + "data_config_count", + "search_enabled", + "package_config_provided", + "package_config_overrides", + "is_ci", + "app_version", + "app_instance_id", + "initTs", + }, } CRITICAL_EVENTS = {HEARTBEAT_EVENT_NAME} @@ -80,6 +112,11 @@ CACHE_FILE_NAME = "olive.json" +def is_ci_environment() -> bool: + """Detect CI/CD environments by checking well-known environment variables.""" + return any(os.environ.get(var) for var in _CI_ENV_VARS) + + class TelemetryCacheHandler: """Handles caching of failed telemetry events for offline resilience. @@ -103,9 +140,10 @@ def __init__(self, telemetry: "Telemetry") -> None: # Single shared cache file for all processes self._cache_file_name = CACHE_FILE_NAME self._shutdown = False - # Protects all shared state to prevent race conditions - self._lock = threading.Lock() - self._callback_condition = threading.Condition() + # Single condition protects all shared state: _shutdown, _is_flushing, + # _callbacks_item_count, _events_logged. Using one lock eliminates + # lock ordering issues that arise with separate locks. + self._condition = threading.Condition() self._callbacks_item_count = 0 self._events_logged = 0 # Prevents concurrent flush operations @@ -118,7 +156,7 @@ def shutdown(self) -> None: offline resilience. If network is working, success callbacks already flushed. If network is down, flushing would fail anyway. """ - with self._lock: + with self._condition: self._shutdown = True def __del__(self): @@ -150,7 +188,7 @@ def on_payload_transmitted(self, args: "PayloadTransmittedCallbackArgs") -> None payload = None should_flush = False - with self._lock: + with self._condition: if self._shutdown: return @@ -159,9 +197,6 @@ def on_payload_transmitted(self, args: "PayloadTransmittedCallbackArgs") -> None # so it's unlikely that an event would suddenly fail and need to be cached # and we don't need to flush again. if self._is_flushing: - with self._callback_condition: - self._callbacks_item_count += args.item_count - self._callback_condition.notify_all() return if args.succeeded: @@ -182,26 +217,23 @@ def on_payload_transmitted(self, args: "PayloadTransmittedCallbackArgs") -> None # Fail silently - telemetry should never crash the application pass finally: - with self._callback_condition: + with self._condition: self._callbacks_item_count += args.item_count - self._callback_condition.notify_all() + self._condition.notify_all() def wait_for_callbacks(self, timeout_sec: float, during_flush: bool = False) -> bool: deadline = time.time() + timeout_sec - while True: - with self._callback_condition: - callbacks_item_count = self._callbacks_item_count - expected_items = self._events_logged - if (during_flush or not self.is_flushing) and callbacks_item_count >= expected_items: + with self._condition: + while True: + if (during_flush or not self._is_flushing) and self._callbacks_item_count >= self._events_logged: return True - remaining = deadline - time.time() - if remaining <= 0: - return False - with self._callback_condition: - self._callback_condition.wait(timeout=remaining) + remaining = deadline - time.time() + if remaining <= 0: + return False + self._condition.wait(timeout=remaining) def record_event_logged(self, count: int = 1) -> None: - with self._callback_condition: + with self._condition: self._events_logged += count def _schedule_flush(self) -> None: @@ -218,7 +250,7 @@ def _schedule_flush(self) -> None: - Daemon thread is acceptable (flush is best-effort) """ # Check before spawning thread to avoid unnecessary thread creation - with self._lock: + with self._condition: if self._shutdown or self._is_flushing: return self._is_flushing = True @@ -231,7 +263,7 @@ def flush_task(): pass finally: # Always clear flag, even on exception - with self._lock: + with self._condition: self._is_flushing = False thread = threading.Thread(target=flush_task, daemon=True) @@ -246,8 +278,11 @@ def cache_path(self) -> Optional[Path]: """ telemetry_cache_dir = None - if "OLIVE_TELEMETRY_CACHE_DIR" in os.environ: - telemetry_cache_dir = os.environ["OLIVE_TELEMETRY_CACHE_DIR"] + telemetry_cache_dir_override = os.environ.get("OLIVE_TELEMETRY_CACHE_DIR") + if telemetry_cache_dir_override: + telemetry_cache_dir_override = telemetry_cache_dir_override.strip() + if telemetry_cache_dir_override: + telemetry_cache_dir = Path(telemetry_cache_dir_override).expanduser() if not telemetry_cache_dir: telemetry_cache_dir = get_telemetry_base_dir() / "cache" return telemetry_cache_dir / self._cache_file_name @@ -295,12 +330,11 @@ def _write_payload_to_cache(self, payload: bytes) -> None: if not entries: return - # Append base64-encoded newline-delimited entries + # Append newline-delimited JSON entries # Use exclusive file lock for multi-process safety with _exclusive_file_lock(cache_path, mode="a") as cache_file: for entry in entries: - plain = json.dumps(entry, ensure_ascii=False, separators=(",", ":")) - cache_file.write(_encode_cache_line(plain) + "\n") + cache_file.write(json.dumps(entry, ensure_ascii=False, separators=(",", ":")) + "\n") return except OSError as exc: # Retry only on transient access errors (file locked by another process) @@ -322,62 +356,59 @@ def _flush_cache(self) -> None: self._flush_cache_file(cache_path) + def _restore_flush_file(self, flush_path: Optional[Path], cache_path: Path) -> None: + """Restore a claimed flush file back into the cache without overwriting new entries. + + Another process may create a fresh cache file while this process is flushing. + Appending the old flush contents preserves both sets of entries. + """ + if not flush_path or not flush_path.exists(): + return + + try: + cache_path.parent.mkdir(parents=True, exist_ok=True) + with ( + _exclusive_file_lock(cache_path, mode="a") as cache_file, + _exclusive_file_lock(flush_path, mode="r") as flush_file, + ): + for raw_line in flush_file: + line = raw_line.rstrip("\n") + if line: + cache_file.write(line + "\n") + flush_path.unlink(missing_ok=True) + except Exception: + # Best-effort cache restore must never interrupt telemetry flow. + # Leave the flush file in place so a later retry can attempt recovery again. + return + def _flush_cache_file(self, cache_path: Path) -> None: """Flush cached events back to telemetry service. - Approach: - 1. Atomically rename cache → .flush (claims ownership, prevents concurrent flushes) - 2. Read all events from .flush file - 3. Queue all events for sending via telemetry logger - 4. Force flush with 2-second timeout - 5. On success: delete .flush file - 6. On failure: restore .flush → cache for retry - - Multi-process coordination: - - `replace()` is atomic; only one process can successfully rename the cache file - - If another process already renamed it, we get FileNotFoundError and abort - - Stale .flush files from crashes are overwritten by the atomic rename - - Shutdown handling: - - If shutdown flag set during flush, restore cache before returning - - This preserves events even if callbacks don't fire during shutdown - - Callback behavior: - - Queued events trigger callbacks with success/failure - - Failed events are automatically re-cached via callbacks (unless shutting down) - - The _is_flushing flag prevents re-caching of replayed events during flush + Uses atomic rename to claim the cache file, preventing duplicate + sends when multiple processes flush concurrently. """ flush_path = None try: - # Check shutdown before starting (under lock to prevent race) - with self._lock: + with self._condition: if self._shutdown: return - if not cache_path.exists(): - return - - # Atomically rename to .flush file to claim ownership - # Overwrite any stale .flush file from crashed process (C# pattern) + # Atomically rename to claim ownership — only one process can succeed flush_path = cache_path.with_name(f"{cache_path.name}.flush") try: - # On Windows/POSIX, replace() overwrites existing files atomically cache_path.replace(flush_path) except FileNotFoundError: - # Cache already claimed by another flush or doesn't exist return - # Read all cached entries (base64-decoded) entries = _read_cache_entries(flush_path) - if not entries: - # Empty cache, just delete the flush file - flush_path.unlink(missing_ok=True) + if flush_path.stat().st_size == 0: + flush_path.unlink(missing_ok=True) + else: + self._restore_flush_file(flush_path, cache_path) return - # Replay all events through telemetry logger - # Note: _is_flushing flag (set by caller) prevents these callbacks from re-caching or triggering nested flushes - # (unlikely since we just successfully sent an event, indicating network is available) + # Replay cached events — _is_flushing flag prevents re-caching for entry in entries: try: event_name = entry["event_name"] @@ -387,50 +418,24 @@ def _flush_cache_file(self, cache_path: Path) -> None: attributes = json.loads(event_data) if not isinstance(attributes, dict): continue - # Preserve original timestamp attributes["initTs"] = entry.get("initTs", entry["ts"]) self._telemetry.log(event_name, attributes, None) except Exception: - # Skip malformed entries continue - # Check if shutdown happened during flush - with self._lock: - if self._shutdown: - # Restore cache to avoid data loss during shutdown - if flush_path and flush_path.exists(): - try: - cache_path.parent.mkdir(parents=True, exist_ok=True) - flush_path.replace(cache_path) - except Exception: - # Silently ignore errors during cleanup - pass - return - - # Wait for in-flight callbacks to complete before deciding success/failure flush_success = self.wait_for_callbacks(timeout_sec=5.0, during_flush=True) if flush_success: - # Success: delete the flush file (events were sent) - if flush_path: - flush_path.unlink(missing_ok=True) - elif flush_path and flush_path.exists(): - # Failure: restore cache for retry later - cache_path.parent.mkdir(parents=True, exist_ok=True) - flush_path.replace(cache_path) + flush_path.unlink(missing_ok=True) + else: + # Restore cache for next retry + self._restore_flush_file(flush_path, cache_path) except Exception: - # Best-effort restore on any exception to prevent data loss - if flush_path and flush_path.exists(): - try: - cache_path.parent.mkdir(parents=True, exist_ok=True) - flush_path.replace(cache_path) - except Exception: - # If restore fails, we lose the data (acceptable for telemetry) - pass - return + # Best-effort restore on failure + self._restore_flush_file(flush_path, cache_path) @property def is_flushing(self) -> bool: - with self._lock: + with self._condition: return self._is_flushing @@ -445,17 +450,12 @@ class Telemetry: _lock = threading.Lock() def __new__(cls): - """Create or return the singleton instance. - - Thread-safe singleton implementation using double-checked locking. - """ - if cls._instance is None: - with cls._lock: - # Double-check pattern to prevent race conditions - if cls._instance is None: - instance = super().__new__(cls) - instance._initialized = False - cls._instance = instance + """Create or return the singleton instance.""" + with cls._lock: + if cls._instance is None: + instance = super().__new__(cls) + instance._initialized = False + cls._instance = instance return cls._instance def __init__(self): @@ -466,18 +466,18 @@ def __init__(self): self._logger = None self._cache_handler = None + self._recipe_only_ci_telemetry = False try: self._logger = self._create_logger() event_source.disable() - self._cache_handler = TelemetryCacheHandler(self) - self._setup_payload_callbacks() - if self._is_ci_environment(): - self.disable_telemetry() - self._initialized = True - return - self._log_heartbeat() + is_ci = self._is_ci_environment() + self._recipe_only_ci_telemetry = is_ci + if not is_ci: + self._cache_handler = TelemetryCacheHandler(self) + self._setup_payload_callbacks() + self._log_heartbeat() if os.environ.get("OLIVE_DISABLE_TELEMETRY") == "1": self.disable_telemetry() self._initialized = True @@ -488,18 +488,18 @@ def __init__(self): @staticmethod def _is_ci_environment() -> bool: """Detect CI/CD environments by checking well-known environment variables.""" - return any(os.environ.get(var) for var in _CI_ENV_VARS) + return is_ci_environment() def _create_logger(self) -> Optional[TelemetryLogger]: try: - return get_telemetry_logger(base64.b64decode(CONNECTION_STRING).decode()) + return get_telemetry_logger(base64.b64decode(CONNECTION_STRING).decode(), service_name=APP_NAME) except Exception: return None def _setup_payload_callbacks(self) -> None: # Register callback for payload transmission events # No need to store unregister function - logger shutdown will clean up callbacks - if self._logger is None: + if self._logger is None or self._cache_handler is None: return self._logger.register_payload_transmitted_callback( self._cache_handler.on_payload_transmitted, @@ -545,6 +545,8 @@ def log( """ try: + if self._recipe_only_ci_telemetry and event_name != RECIPE_EVENT_NAME: + return attrs = _merge_metadata(attributes, metadata) if self._logger is None: return @@ -743,18 +745,10 @@ def _set_nested_value(data: dict[str, Any], key: str, value: Any) -> None: def _read_cache_entries(cache_path: Path) -> list[dict[str, Any]]: - """Read all entries from a cache file, decoding each line. - - Design decisions: - - Use file locking for multi-process safety - - Continue reading past malformed entries (partial data recovery) - - Return empty list on complete read failure (fail gracefully) - - Each line is base64-decoded before JSON parsing. + """Read all JSON-line entries from a cache file. - Assumptions: - - Cache file contains newline-delimited base64-encoded entries (one per line) - - Each line is independent (one malformed line doesn't affect others) - - Empty or whitespace-only lines are skipped + Each line is independent — malformed lines are skipped without + affecting other entries. Returns empty list on read failure. """ entries = [] try: @@ -764,13 +758,11 @@ def _read_cache_entries(cache_path: Path) -> list[dict[str, Any]]: if not line: continue try: - line = json.loads(_decode_cache_line(line)) - if isinstance(line, dict): - entries.append(line) + parsed = json.loads(line) + if isinstance(parsed, dict): + entries.append(parsed) except Exception: - # Malformed line, skip and continue continue except Exception: - # If file cannot be opened or read, return empty list return [] return entries diff --git a/olive/telemetry/telemetry_extensions.py b/olive/telemetry/telemetry_extensions.py index e5b13395d0..068aa9dd1b 100644 --- a/olive/telemetry/telemetry_extensions.py +++ b/olive/telemetry/telemetry_extensions.py @@ -6,11 +6,11 @@ import functools import inspect import time +import traceback from types import TracebackType from typing import Any, Callable, Optional, TypeVar -from olive.telemetry.telemetry import ACTION_EVENT_NAME, ERROR_EVENT_NAME, _get_logger -from olive.telemetry.utils import _format_exception_message +from olive.telemetry.telemetry import ACTION_EVENT_NAME, ERROR_EVENT_NAME, RECIPE_EVENT_NAME, _get_logger _TFunc = TypeVar("_TFunc", bound=Callable[..., Any]) @@ -45,6 +45,38 @@ def log_error( telemetry.log(ERROR_EVENT_NAME, attributes, metadata) +def log_recipe_result( + recipe_name: str, + success: bool, + metadata: Optional[dict[str, Any]] = None, +) -> None: + telemetry = _get_logger() + attributes = { + "recipe_name": recipe_name, + "success": success, + } + telemetry.log(RECIPE_EVENT_NAME, attributes, metadata) + + +def _format_exception_message(ex: BaseException, tb: Optional[TracebackType] = None) -> str: + """Format an exception and trim local paths for readability.""" + folder = "Olive" + file_line = 'File "' + formatted = traceback.format_exception(type(ex), ex, tb, limit=5) + lines = [] + for line in formatted: + line_trunc = line.strip() + if line_trunc.startswith(file_line) and folder in line_trunc: + idx = line_trunc.find(folder) + if idx != -1: + line_trunc = line_trunc[idx + len(folder) :] + elif line_trunc.startswith(file_line): + idx = line_trunc[len(file_line) :].find('"') + line_trunc = line_trunc[idx + len(file_line) :] + lines.append(line_trunc) + return "\n".join(lines) + + def _resolve_invoked_from(skip_frames: int = 0) -> str: """Resolve how Olive was invoked by examining the call stack. diff --git a/olive/telemetry/utils.py b/olive/telemetry/utils.py index 52a39acded..806f5f93da 100644 --- a/olive/telemetry/utils.py +++ b/olive/telemetry/utils.py @@ -2,17 +2,62 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. # -------------------------------------------------------------------------- -import base64 import functools import os import platform import tempfile -import traceback from pathlib import Path -from types import TracebackType -from typing import Optional +from typing import ClassVar + +if os.name == "nt": + import ctypes + import ctypes.wintypes as wintypes + + _LOCKFILE_EXCLUSIVE_LOCK = 0x00000002 + + class _Overlapped(ctypes.Structure): + _fields_: ClassVar[list[tuple[str, object]]] = [ + ("Internal", ctypes.c_void_p), + ("InternalHigh", ctypes.c_void_p), + ("Offset", wintypes.DWORD), + ("OffsetHigh", wintypes.DWORD), + ("hEvent", wintypes.HANDLE), + ] + + _kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) + _lock_file_ex = _kernel32.LockFileEx + _lock_file_ex.argtypes = [ + wintypes.HANDLE, + wintypes.DWORD, + wintypes.DWORD, + wintypes.DWORD, + wintypes.DWORD, + ctypes.POINTER(_Overlapped), + ] + _lock_file_ex.restype = wintypes.BOOL + _unlock_file_ex = _kernel32.UnlockFileEx + _unlock_file_ex.argtypes = [ + wintypes.HANDLE, + wintypes.DWORD, + wintypes.DWORD, + wintypes.DWORD, + ctypes.POINTER(_Overlapped), + ] + _unlock_file_ex.restype = wintypes.BOOL +else: + ctypes = None + wintypes = None + _lock_file_ex = None + _unlock_file_ex = None + _Overlapped = None ORT_SUPPORT_DIR = r"Microsoft/DeveloperTools/.onnxruntime" +_WINDOWS_FILE_LOCK_LENGTH = 0x7FFFFFFF + + +def _raise_windows_lock_error(message: str) -> None: + error_code = ctypes.get_last_error() if ctypes is not None else 0 + raise OSError(error_code, message) def _resolve_home_dir() -> Path: @@ -50,29 +95,10 @@ def get_telemetry_base_dir() -> Path: return Path(cache_dir).expanduser() / ORT_SUPPORT_DIR -def _format_exception_message(ex: BaseException, tb: Optional[TracebackType] = None) -> str: - """Format an exception and trim local paths for readability.""" - folder = "Olive" - file_line = 'File "' - formatted = traceback.format_exception(type(ex), ex, tb, limit=5) - lines = [] - for line in formatted: - line_trunc = line.strip() - if line_trunc.startswith(file_line) and folder in line_trunc: - idx = line_trunc.find(folder) - if idx != -1: - line_trunc = line_trunc[idx + len(folder) :] - elif line_trunc.startswith(file_line): - idx = line_trunc[len(file_line) :].find('"') - line_trunc = line_trunc[idx + len(file_line) :] - lines.append(line_trunc) - return "\n".join(lines) - - class _ExclusiveFileLock: """Cross-platform exclusive file lock context manager. - Uses fcntl on Unix/Linux/macOS, msvcrt on Windows. + Uses fcntl on Unix/Linux/macOS and LockFileEx on Windows. Prevents cache corruption when multiple processes access the same file. Design decisions: @@ -89,6 +115,7 @@ def __init__(self, file_path: Path, mode: str): self.file_path = file_path self.mode = mode self.file = None + self._windows_overlapped = None def __enter__(self): self.file = open(self.file_path, self.mode, encoding="utf-8") @@ -102,19 +129,44 @@ def __enter__(self): elif os.name == "nt": import msvcrt - # Lock 1 byte at position 0 - msvcrt.locking(self.file.fileno(), msvcrt.LK_LOCK, 1) + self._windows_overlapped = _Overlapped() + handle = msvcrt.get_osfhandle(self.file.fileno()) + if not _lock_file_ex( + handle, + _LOCKFILE_EXCLUSIVE_LOCK, + 0, + _WINDOWS_FILE_LOCK_LENGTH, + _WINDOWS_FILE_LOCK_LENGTH, + ctypes.byref(self._windows_overlapped), + ): + _raise_windows_lock_error("Failed to lock telemetry cache file") except Exception: self.file.close() self.file = None + self._windows_overlapped = None raise return self.file def __exit__(self, exc_type, exc_val, exc_tb): if self.file: - # Unlock happens automatically on close - self.file.close() + try: + if os.name == "nt" and self._windows_overlapped is not None: + import msvcrt + + handle = msvcrt.get_osfhandle(self.file.fileno()) + if not _unlock_file_ex( + handle, + 0, + _WINDOWS_FILE_LOCK_LENGTH, + _WINDOWS_FILE_LOCK_LENGTH, + ctypes.byref(self._windows_overlapped), + ): + _raise_windows_lock_error("Failed to unlock telemetry cache file") + finally: + self.file.close() + self.file = None + self._windows_overlapped = None def _exclusive_file_lock(file_path: Path, mode: str): @@ -125,21 +177,3 @@ def _exclusive_file_lock(file_path: Path, mode: str): :return: Context manager that returns an open file handle. """ return _ExclusiveFileLock(file_path, mode) - - -def _encode_cache_line(plaintext: str) -> str: - """Encode a single cache line using base64. - - :param plaintext: The plaintext string to encode. - :return: Base64-encoded string (safe for a single text line). - """ - return base64.b64encode(plaintext.encode("utf-8")).decode("ascii") - - -def _decode_cache_line(encoded: str) -> str: - """Decode a single base64-encoded cache line. - - :param encoded: The base64-encoded string. - :return: The decoded plaintext string. - """ - return base64.b64decode(encoded.encode("ascii")).decode("utf-8") diff --git a/olive/workflows/run/run.py b/olive/workflows/run/run.py index 89100e1c1c..dc338f69cc 100644 --- a/olive/workflows/run/run.py +++ b/olive/workflows/run/run.py @@ -2,23 +2,57 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. # -------------------------------------------------------------------------- +import functools +import json import logging from copy import deepcopy -from pathlib import Path -from typing import TYPE_CHECKING, Optional, Union +from os import PathLike +from pathlib import Path, PurePosixPath, PureWindowsPath +from typing import TYPE_CHECKING, Any, Optional, Union -from olive.common.utils import set_tempdir +from olive.common.config_utils import load_config_file +from olive.common.utils import hash_dict, set_tempdir from olive.hardware.constants import ExecutionProvider from olive.logging import set_default_logger_severity, set_ort_logger_severity, set_verbosity_info from olive.package_config import OlivePackageConfig from olive.systems.accelerator_creator import create_accelerator from olive.systems.common import SystemType +from olive.telemetry.telemetry import is_ci_environment +from olive.telemetry.telemetry_extensions import _format_exception_message, log_error, log_recipe_result from olive.workflows.run.config import RunConfig if TYPE_CHECKING: from olive.engine.config import RunPassConfig logger = logging.getLogger(__name__) +RECIPE_HASH_REDACTED_VALUE = "" +CONFIG_REFERENCE_REDACTED_VALUE = "" +CONFIG_CALLABLE_REDACTED_VALUE = "" +RECIPE_HASH_REDACTED_KEYS = { + "output_dir", + "cache_dir", + "tempdir", + "additional_files", + "dockerfile", + "build_context_path", + "python_environment_path", + "prepend_to_path", + "script_dir", + "model_script", + # package_config is tracked separately via package_config_provided and + # package_config_overrides, but excluded from recipe_hash because it is an + # environment/infrastructure path. + "package_config", + "work_dir", +} +CONFIG_SNAPSHOT_REDACTED_KEYS = RECIPE_HASH_REDACTED_KEYS | { + "model_path", + "_name_or_path", + "adapter_path", + "user_script", +} +CONFIG_REFERENCE_KEYS = {"host", "target", "evaluator"} +_NO_OVERRIDE = object() def get_required_packages(package_config: OlivePackageConfig, run_config: RunConfig) -> set[str]: @@ -152,30 +186,76 @@ def run( list_required_packages: bool = False, package_config: Optional[Union[str, Path, dict]] = None, tempdir: Optional[Union[str, Path]] = None, + recipe_telemetry_metadata: Optional[dict[str, Any]] = None, + emit_recipe_telemetry: bool = True, ): # set tempdir set_tempdir(tempdir) - if package_config is None: - package_config = OlivePackageConfig.get_default_config_path() - - package_config = OlivePackageConfig.parse_file_or_obj(package_config) - run_config: RunConfig = RunConfig.parse_file_or_obj(run_config) + try: + run_config_telemetry_input = _load_config_input_for_telemetry(run_config) + except Exception: + run_config_telemetry_input = None - if list_required_packages: - # set the log level to INFO for packages - set_verbosity_info() - required_packages = get_required_packages(package_config, run_config) - generate_files_from_packages(required_packages, "olive_requirements.txt") - return None + package_config_input = package_config + try: + package_config_telemetry_input = ( + _load_config_input_for_telemetry(package_config_input) if package_config_input is not None else None + ) + except Exception: + package_config_telemetry_input = None - if run_config.engine.host and run_config.engine.host.type == SystemType.Docker: - docker_system = run_config.engine.host.create_system() - return docker_system.run_workflow(run_config) + package_config_provided = package_config is not None + if package_config is None: + package_config = OlivePackageConfig.get_default_config_path() - # set log level for olive - set_default_logger_severity(run_config.engine.log_severity_level) - return run_engine(package_config, run_config) + parsed_run_config = None + success = False + exception = None + try: + package_config = OlivePackageConfig.parse_file_or_obj(package_config) + parsed_run_config = RunConfig.parse_file_or_obj(run_config) + + if list_required_packages: + # set the log level to INFO for packages + set_verbosity_info() + required_packages = get_required_packages(package_config, parsed_run_config) + generate_files_from_packages(required_packages, "olive_requirements.txt") + success = True + return None + + if parsed_run_config.engine.host and parsed_run_config.engine.host.type == SystemType.Docker: + docker_system = parsed_run_config.engine.host.create_system() + workflow_output = docker_system.run_workflow(deepcopy(parsed_run_config)) + success = True + return workflow_output + + # set log level for olive + set_default_logger_severity(parsed_run_config.engine.log_severity_level) + workflow_output = run_engine(package_config, parsed_run_config) + success = True + return workflow_output + except Exception as exc: + exception = exc + raise + finally: + if exception is not None: + log_error( + exception_type=type(exception).__name__, + exception_message=_format_exception_message(exception, exception.__traceback__), + ) + if emit_recipe_telemetry: + metadata = _build_recipe_result_metadata( + run_config, + run_config_telemetry_input, + parsed_run_config, + recipe_telemetry_metadata, + list_required_packages=list_required_packages, + package_config_input=package_config_telemetry_input, + package_config_provided=package_config_provided, + ) + recipe_name = metadata.pop("recipe_name") + log_recipe_result(recipe_name, success=success, metadata=metadata) def generate_files_from_packages(packages, file_name): @@ -199,3 +279,312 @@ def get_used_passes_configs(run_config: RunConfig) -> list["RunPassConfig"]: def get_run_on_target(package_config: OlivePackageConfig, pass_config: "RunPassConfig") -> bool: pass_module_config = package_config.get_pass_module_config(pass_config.type) return pass_module_config.run_on_target + + +def _build_recipe_result_metadata( + run_config_input: Union[str, Path, dict], + run_config_telemetry_input: Optional[Any], + run_config: Optional[RunConfig], + recipe_telemetry_metadata: Optional[dict[str, Any]], + *, + list_required_packages: bool, + package_config_input: Optional[Union[str, Path, dict]], + package_config_provided: bool, +) -> dict[str, Any]: + metadata = dict(recipe_telemetry_metadata or {}) + default_source, default_format = _classify_run_config_source(run_config_input) + metadata.setdefault("recipe_source", default_source) + metadata.setdefault("recipe_format", default_format) + metadata.setdefault("execution_mode", "list_required_packages" if list_required_packages else "run") + metadata.setdefault("package_config_provided", package_config_provided) + metadata.setdefault("config_overrides", _build_config_overrides(run_config_telemetry_input)) + if package_config_provided: + metadata.setdefault("package_config_overrides", _build_package_config_overrides(package_config_input)) + metadata["is_ci"] = is_ci_environment() + + if run_config is None: + metadata.setdefault("recipe_name", metadata.get("recipe_command") or "WorkflowRun") + return metadata + + run_config_json = run_config.to_json(make_absolute=False) + model_metadata = _extract_input_model_metadata(run_config_json["input_model"]) + target_metadata = _extract_target_metadata(run_config) + host_metadata = _extract_host_metadata(run_config) + pass_types = [pass_config.type for pass_config in get_used_passes_configs(run_config)] + + metadata.setdefault("recipe_name", metadata.get("recipe_command") or run_config.workflow_id) + metadata.setdefault("workflow_id", run_config.workflow_id) + metadata.setdefault("recipe_hash", _build_recipe_hash(run_config_json)) + metadata.setdefault("input_model_type", run_config.input_model.type) + metadata.setdefault("input_model_source", model_metadata["input_model_source"]) + metadata.setdefault("model_task", model_metadata["model_task"]) + _set_metadata_if_present(metadata, target_metadata) + _set_metadata_if_present(metadata, host_metadata) + metadata.setdefault("pass_types", ";".join(pass_types)) + metadata.setdefault("pass_count", len(pass_types)) + metadata.setdefault("data_config_count", len(run_config.data_configs)) + metadata.setdefault("search_enabled", bool(run_config.engine.search_strategy)) + return metadata + + +def _classify_run_config_source(run_config_input: Any) -> tuple[str, str]: + if isinstance(run_config_input, dict): + return "config_dict", "dict" + + if isinstance(run_config_input, (str, PathLike)): + suffix = Path(run_config_input).suffix.lstrip(".").lower() + return "config_file", suffix or "unknown" + + return "config_object", "object" + + +def _build_config_overrides(config_input: Any) -> Optional[str]: + try: + config_data = _load_config_input_for_telemetry(config_input) + if config_data is None: + return None + + snapshot = _sanitize_config_snapshot(config_data) + if snapshot in (None, {}, []): + return None + + return json.dumps(snapshot, ensure_ascii=False, sort_keys=True, separators=(",", ":")) + except Exception: + return None + + +def _build_package_config_overrides(config_input: Any) -> Optional[str]: + try: + config_data = _load_config_input_for_telemetry(config_input) + if not isinstance(config_data, dict): + return None + + default_config = _load_default_package_config_for_telemetry() + baseline = ( + _normalize_package_config_snapshot(default_config) if isinstance(default_config, dict) else _NO_OVERRIDE + ) + overrides = _extract_config_overrides(_normalize_package_config_snapshot(config_data), baseline) + if overrides is _NO_OVERRIDE: + return None + + snapshot = _sanitize_config_snapshot(overrides) + if not isinstance(snapshot, dict): + return None + + return json.dumps(snapshot, ensure_ascii=False, sort_keys=True, separators=(",", ":")) + except Exception: + return None + + +@functools.lru_cache +def _load_default_package_config_for_telemetry() -> Optional[dict[str, Any]]: + try: + default_config = load_config_file(OlivePackageConfig.get_default_config_path()) + except Exception: + return None + + return default_config if isinstance(default_config, dict) else None + + +def _normalize_package_config_snapshot(config_data: Any) -> Any: + if not isinstance(config_data, dict): + return config_data + + normalized = deepcopy(config_data) + passes = normalized.get("passes") + if isinstance(passes, dict): + normalized["passes"] = {str(pass_name).lower(): pass_config for pass_name, pass_config in passes.items()} + return normalized + + +def _extract_config_overrides(value: Any, baseline: Any = _NO_OVERRIDE) -> Any: + if baseline is _NO_OVERRIDE: + return deepcopy(value) + + if isinstance(value, dict) and isinstance(baseline, dict): + overrides = {} + for key, child_value in value.items(): + child_override = _extract_config_overrides(child_value, baseline.get(key, _NO_OVERRIDE)) + if child_override is not _NO_OVERRIDE: + overrides[key] = child_override + if overrides: + return overrides + return _NO_OVERRIDE if value == baseline else {} + + if isinstance(value, list): + if isinstance(baseline, list) and value == baseline: + return _NO_OVERRIDE + return deepcopy(value) + + if isinstance(value, tuple): + value_list = list(value) + baseline_list = list(baseline) if isinstance(baseline, tuple) else baseline + if isinstance(baseline_list, list) and value_list == baseline_list: + return _NO_OVERRIDE + return value_list + + return deepcopy(value) if value != baseline else _NO_OVERRIDE + + +def _load_config_input_for_telemetry(config_input: Any) -> Optional[Any]: + if config_input is None: + return None + if isinstance(config_input, dict): + return deepcopy(config_input) + if isinstance(config_input, (str, PathLike)): + return load_config_file(config_input) + + model_dump = getattr(config_input, "model_dump", None) + if callable(model_dump): + return model_dump(exclude_defaults=True, exclude_none=True, by_alias=True) + return None + + +def _sanitize_config_snapshot(value: Any, key: Optional[str] = None) -> Any: + if key in CONFIG_SNAPSHOT_REDACTED_KEYS or _is_path_like_key(key): + return RECIPE_HASH_REDACTED_VALUE + if key in CONFIG_REFERENCE_KEYS and isinstance(value, str): + return CONFIG_REFERENCE_REDACTED_VALUE + + if isinstance(value, dict): + if key == "systems": + return [_sanitize_config_snapshot(system, "system") for system in value.values()] + if key == "passes": + passes = [] + for pass_configs in value.values(): + if isinstance(pass_configs, list): + passes.extend(pass_configs) + else: + passes.append(pass_configs) + return [_sanitize_config_snapshot(pass_config, "pass") for pass_config in passes] + if key == "evaluators": + return [_sanitize_config_snapshot(evaluator, "evaluator_config") for evaluator in value.values()] + return { + child_key: _sanitize_config_snapshot(child_value, child_key) + for child_key, child_value in value.items() + if child_value is not None + } + if isinstance(value, list): + return [_sanitize_config_snapshot(item, key) for item in value] + if isinstance(value, tuple): + return [_sanitize_config_snapshot(item, key) for item in value] + if isinstance(value, Path): + return RECIPE_HASH_REDACTED_VALUE + if callable(value): + return CONFIG_CALLABLE_REDACTED_VALUE + if isinstance(value, (str, int, float, bool)) or value is None: + return value + if hasattr(value, "value") and isinstance(value.value, (str, int, float, bool)): + return value.value + return f"<{type(value).__name__}>" + + +def _is_path_like_key(key: Optional[str]) -> bool: + if key is None: + return False + return key in {"path", "paths", "dir", "dirs", "file", "files"} or key.endswith( + ("_path", "_paths", "_dir", "_dirs", "_file", "_files") + ) + + +def _extract_input_model_metadata(input_model_config: dict[str, Any]) -> dict[str, Optional[str]]: + model_config = input_model_config.get("config", {}) + model_attributes = model_config.get("model_attributes", {}) + model_task = model_attributes.get("hf_task") or model_config.get("task") + raw_identifier = model_attributes.get("_name_or_path") or model_config.get("model_path") + return { + "input_model_source": _classify_input_model_source(raw_identifier), + "model_task": str(model_task) if model_task is not None else None, + } + + +def _classify_input_model_source(model_identifier: Any) -> str: + if model_identifier is None: + return "unknown" + if isinstance(model_identifier, dict): + resource_type = model_identifier.get("type") + if resource_type == "azureml_registry_model": + return "azureml" + return "structured_resource" + + identifier = str(model_identifier) + if identifier.startswith("azureml://"): + return "azureml" + if identifier.startswith("https://huggingface.co/"): + return "huggingface_url" + if identifier.startswith(("http://", "https://")): + return "url" + + if _is_explicit_local_model_path(identifier): + suffix = PureWindowsPath(identifier).suffix or PurePosixPath(identifier).suffix + return "local_file" if suffix else "local_folder" + return "string_name" + + +def _is_explicit_local_model_path(identifier: str) -> bool: + return ( + identifier.startswith(("./", "../", ".\\", "..\\", "~/", "~\\", "/", "\\\\")) + or PureWindowsPath(identifier).is_absolute() + or PurePosixPath(identifier).is_absolute() + ) + + +def _extract_target_metadata(run_config: RunConfig) -> dict[str, Optional[str]]: + target_system = run_config.engine.target + return _extract_system_metadata(target_system, "target") + + +def _extract_host_metadata(run_config: RunConfig) -> dict[str, Optional[str]]: + host_system = run_config.engine.host + if host_system is None: + return { + "host_system_type": SystemType.Local.value, + } + return _extract_system_metadata(host_system, "host") + + +def _extract_system_metadata(system_config: Optional[Any], field_prefix: str) -> dict[str, Optional[str]]: + system_type = system_config.type.value if system_config is not None else None + device = None + execution_provider = None + execution_providers = None + + accelerators = system_config.config.accelerators if system_config and system_config.config else None + if accelerators: + accelerator = accelerators[0] + device = str(accelerator.device) if accelerator.device is not None else None + ep_values = accelerator.get_ep_strs() or [] + if ep_values: + execution_provider = ep_values[0] + execution_providers = ";".join(ep_values) + + return { + f"{field_prefix}_system_type": system_type, + f"{field_prefix}_device": device, + f"{field_prefix}_execution_provider": execution_provider, + f"{field_prefix}_execution_providers": execution_providers, + } + + +def _set_metadata_if_present(metadata: dict[str, Any], values: dict[str, Optional[str]]) -> None: + for key, value in values.items(): + if value is not None: + metadata.setdefault(key, value) + + +def _build_recipe_hash(run_config_json: dict[str, Any]) -> str: + sanitized = deepcopy(run_config_json) + _redact_recipe_hash_keys(sanitized) + return hash_dict(sanitized)[:16] + + +def _redact_recipe_hash_keys(value: Any, key: Optional[str] = None) -> Any: + if key in RECIPE_HASH_REDACTED_KEYS or _is_path_like_key(key): + return RECIPE_HASH_REDACTED_VALUE + if isinstance(value, dict): + for child_key in list(value): + value[child_key] = _redact_recipe_hash_keys(value[child_key], child_key) + elif isinstance(value, list): + for index, item in enumerate(value): + value[index] = _redact_recipe_hash_keys(item, key) + return value diff --git a/test/cli/test_cli.py b/test/cli/test_cli.py index a7cb39e244..80c8783ca8 100644 --- a/test/cli/test_cli.py +++ b/test/cli/test_cli.py @@ -107,7 +107,17 @@ def test_workflow_run_command(mock_run, tempdir, list_required_packages, tmp_pat # assert mock_run.assert_called_once_with( - {"key": "value"}, package_config=None, tempdir=tempdir, list_required_packages=list_required_packages + {"key": "value"}, + package_config=None, + tempdir=tempdir, + list_required_packages=list_required_packages, + recipe_telemetry_metadata={ + "recipe_command": "WorkflowRun", + "recipe_source": "config_file", + "recipe_format": "json", + "execution_mode": "list_required_packages" if list_required_packages else "run", + "package_config_provided": False, + }, ) @@ -147,6 +157,13 @@ def test_workflow_run_command_with_overrides(mock_run, tmp_path): list_required_packages=False, package_config=None, tempdir=None, + recipe_telemetry_metadata={ + "recipe_command": "WorkflowRun", + "recipe_source": "config_file", + "recipe_format": "json", + "execution_mode": "run", + "package_config_provided": False, + }, ) diff --git a/test/systems/docker/test_docker_system.py b/test/systems/docker/test_docker_system.py index 5430b68587..ef20a43d18 100644 --- a/test/systems/docker/test_docker_system.py +++ b/test/systems/docker/test_docker_system.py @@ -2,6 +2,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. # -------------------------------------------------------------------------- +import json from unittest.mock import MagicMock, patch import pytest @@ -140,6 +141,37 @@ def test_run_workflow(self, mock_find_resources, mock_tempdir, mock_from_env, tm # Verify cleanup mock_container.remove.assert_called_once() + @patch("olive.systems.docker.docker_system.docker.from_env") + def test_prepare_environment_forwards_ci_to_workflow_container(self, mock_from_env, monkeypatch): + mock_docker_client = MagicMock() + mock_from_env.return_value = mock_docker_client + mock_docker_client.images.get.return_value = MagicMock() + monkeypatch.setenv("TF_BUILD", "True") + docker_config = self.get_default_docker_config() + docker_system = DockerSystem( + image_name=docker_config.image_name, + build_context_path=docker_config.build_context_path, + dockerfile=docker_config.dockerfile, + work_dir=docker_config.work_dir, + ) + + environment = docker_system._prepare_environment({}) + + assert environment["CI"] == "1" + + def test_workflow_runner_disables_inner_recipe_telemetry(self, tmp_path, monkeypatch): + from olive.systems.docker import workflow_runner + + monkeypatch.delenv("HF_TOKEN", raising=False) + config = {"input_model": {"type": "ONNXModel", "model_path": "model.onnx"}} + config_path = tmp_path / "config.json" + config_path.write_text(json.dumps(config)) + + with patch.object(workflow_runner, "olive_run") as mock_olive_run: + workflow_runner.runner_entry(config_path) + + mock_olive_run.assert_called_once_with(config, emit_recipe_telemetry=False) + @patch("olive.systems.docker.docker_system.docker.from_env") @patch("olive.systems.docker.docker_system.tempfile.TemporaryDirectory") @patch("olive.systems.docker.docker_system.find_all_resources") diff --git a/test/test_telemetry.py b/test/test_telemetry.py new file mode 100644 index 0000000000..bb394d6cb6 --- /dev/null +++ b/test/test_telemetry.py @@ -0,0 +1,119 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------- +# pylint: disable=protected-access +import os +import subprocess +import sys +import time +from pathlib import Path +from unittest.mock import Mock, patch + +import pytest + +from olive.telemetry.telemetry import ( + ACTION_EVENT_NAME, + CACHE_FILE_NAME, + RECIPE_EVENT_NAME, + Telemetry, + TelemetryCacheHandler, +) +from olive.telemetry.utils import _exclusive_file_lock + + +def test_cache_path_uses_env_override(tmp_path, monkeypatch): + cache_dir = tmp_path / "telemetry-cache" + monkeypatch.setenv("OLIVE_TELEMETRY_CACHE_DIR", str(cache_dir)) + + handler = TelemetryCacheHandler(Mock()) + + assert handler.cache_path == cache_dir / CACHE_FILE_NAME + assert isinstance(handler.cache_path, Path) + + +def test_cache_path_ignores_empty_env_override(tmp_path, monkeypatch): + monkeypatch.setenv("OLIVE_TELEMETRY_CACHE_DIR", " ") + + with patch("olive.telemetry.telemetry.get_telemetry_base_dir", return_value=tmp_path): + handler = TelemetryCacheHandler(Mock()) + assert handler.cache_path == tmp_path / "cache" / CACHE_FILE_NAME + + +def test_telemetry_only_logs_recipe_events_in_ci(monkeypatch): + monkeypatch.setenv("CI", "1") + Telemetry._instance = None + + mock_logger = Mock() + mock_logger.register_payload_transmitted_callback.return_value = lambda: None + + try: + with patch("olive.telemetry.telemetry.get_telemetry_logger", return_value=mock_logger): + telemetry = Telemetry() + telemetry.log(ACTION_EVENT_NAME, {"action_name": "WorkflowRun", "duration_ms": 1, "success": False}) + telemetry.log(RECIPE_EVENT_NAME, {"recipe_name": "WorkflowRun", "success": False}) + + assert mock_logger.log.call_count == 1 + assert mock_logger.log.call_args.args[0] == RECIPE_EVENT_NAME + assert telemetry._cache_handler is None + mock_logger.register_payload_transmitted_callback.assert_not_called() + finally: + Telemetry._instance = None + + +def test_flush_cache_preserves_nonempty_unreadable_file(tmp_path): + handler = TelemetryCacheHandler(Mock()) + cache_path = tmp_path / CACHE_FILE_NAME + flush_path = cache_path.with_name(f"{cache_path.name}.flush") + cache_path.write_text("not-json\n", encoding="utf-8") + + handler._flush_cache_file(cache_path) + + assert cache_path.exists() + assert cache_path.read_text(encoding="utf-8") == "not-json\n" + assert not flush_path.exists() + + +@pytest.mark.skipif(os.name != "nt", reason="Windows locking behavior is specific to Windows.") +def test_exclusive_file_lock_blocks_second_append_on_windows(tmp_path): + file_path = tmp_path / "olive.json" + child_code = """ +import sys +import time +from pathlib import Path +from olive.telemetry.utils import _exclusive_file_lock + +path = Path(sys.argv[1]) +path.write_text("payload", encoding="utf-8") +with _exclusive_file_lock(path, "a") as locked_file: + locked_file.write("child") + locked_file.flush() + print("locked", flush=True) + time.sleep(2) +""" + + with subprocess.Popen( + [sys.executable, "-c", child_code, str(file_path)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) as process: + assert process.stdout is not None + assert process.stdout.readline().strip() == "locked" + + start = time.perf_counter() + with _exclusive_file_lock(file_path, mode="a") as locked_file: + wait_time = time.perf_counter() - start + locked_file.write("parent") + + assert wait_time >= 1.0 + + try: + stdout, stderr = process.communicate(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + stdout, stderr = process.communicate() + pytest.fail(f"child lock process timed out: stdout={stdout!r} stderr={stderr!r}") + + assert process.returncode == 0, stderr + assert file_path.read_text(encoding="utf-8") == "payloadchildparent" diff --git a/test/workflows/test_workflow_run.py b/test/workflows/test_workflow_run.py index 82cc4980bf..a79d0c2517 100644 --- a/test/workflows/test_workflow_run.py +++ b/test/workflows/test_workflow_run.py @@ -1,11 +1,13 @@ +import json import sys from copy import deepcopy from pathlib import Path -from unittest.mock import patch +from unittest.mock import Mock, patch import pytest from olive.workflows import run as olive_run +from olive.workflows.run.run import _build_recipe_hash, _classify_input_model_source, _classify_run_config_source from test.utils import ( get_pytorch_model, get_pytorch_model_config, @@ -125,3 +127,274 @@ def test_run_packages(): # cleanup requirements_file_path.unlink() + + +@patch("olive.workflows.run.run.log_recipe_result") +@patch("olive.workflows.run.run.run_engine") +@patch("olive.workflows.run.run.is_ci_environment", return_value=False) +def test_run_logs_recipe_result_success(_, mock_run_engine, mock_log_recipe_result): + config = { + "input_model": { + "type": "HfModel", + "model_path": "Qwen/Qwen2.5-0.5B-Instruct", + "task": "text-generation", + "load_kwargs": {"attn_implementation": "eager"}, + }, + "systems": { + "local_system": { + "type": "LocalSystem", + "accelerators": [{"device": "gpu", "execution_providers": ["CUDAExecutionProvider"]}], + } + }, + "engine": {"target": "local_system"}, + "passes": {"dynamic_quant": {"type": "OnnxDynamicQuantization"}}, + } + expected_output = object() + mock_run_engine.return_value = expected_output + + output = olive_run( + config, + recipe_telemetry_metadata={ + "recipe_name": "Quantize", + "recipe_command": "Quantize", + "recipe_source": "generated_cli", + "recipe_format": "generated", + }, + ) + + assert output is expected_output + mock_log_recipe_result.assert_called_once() + assert mock_log_recipe_result.call_args.args[0] == "Quantize" + assert mock_log_recipe_result.call_args.kwargs["success"] is True + + metadata = mock_log_recipe_result.call_args.kwargs["metadata"] + assert metadata["recipe_command"] == "Quantize" + assert metadata["recipe_source"] == "generated_cli" + assert metadata["recipe_format"] == "generated" + assert metadata["workflow_id"] == "default_workflow" + assert metadata["input_model_type"] == "hfmodel" + assert metadata["input_model_source"] == "string_name" + assert metadata["model_task"] == "text-generation" + assert metadata["target_system_type"] == "LocalSystem" + assert metadata["target_device"] == "gpu" + assert metadata["target_execution_provider"] == "CUDAExecutionProvider" + assert metadata["target_execution_providers"] == "CUDAExecutionProvider" + assert metadata["host_system_type"] == "LocalSystem" + assert "host_device" not in metadata + assert "host_execution_provider" not in metadata + assert "host_execution_providers" not in metadata + assert metadata["pass_types"] == "onnxdynamicquantization" + assert metadata["pass_count"] == 1 + assert metadata["data_config_count"] == 0 + assert metadata["search_enabled"] is False + assert metadata["package_config_provided"] is False + assert metadata["is_ci"] is False + assert metadata["recipe_hash"] + assert "input_model_name_hash" not in metadata + + config_overrides = json.loads(metadata["config_overrides"]) + assert config_overrides["input_model"]["model_path"] == "" + assert config_overrides["engine"]["target"] == "" + assert config_overrides["systems"][0]["type"] == "LocalSystem" + assert config_overrides["systems"][0]["accelerators"][0]["execution_providers"] == ["CUDAExecutionProvider"] + + +@patch("olive.workflows.run.run.log_error") +@patch("olive.workflows.run.run.log_recipe_result") +@patch("olive.workflows.run.run.run_engine") +def test_run_logs_recipe_result_failure(mock_run_engine, mock_log_recipe_result, mock_log_error): + config = { + "input_model": { + "type": "HfModel", + "model_path": "Qwen/Qwen2.5-0.5B-Instruct", + "task": "text-generation", + "load_kwargs": {"attn_implementation": "eager"}, + }, + "passes": {"dynamic_quant": {"type": "OnnxDynamicQuantization"}}, + } + mock_run_engine.side_effect = ValueError("recipe failed") + + with pytest.raises(ValueError, match="recipe failed"): + olive_run( + config, + recipe_telemetry_metadata={ + "recipe_name": "Quantize", + "recipe_command": "Quantize", + "recipe_source": "generated_cli", + "recipe_format": "generated", + }, + ) + + mock_log_recipe_result.assert_called_once() + assert mock_log_recipe_result.call_args.args[0] == "Quantize" + assert mock_log_recipe_result.call_args.kwargs["success"] is False + assert "exception_type" not in mock_log_recipe_result.call_args.kwargs + mock_log_error.assert_called_once() + assert mock_log_error.call_args.kwargs["exception_type"] == "ValueError" + assert "recipe failed" in mock_log_error.call_args.kwargs["exception_message"] + + +@patch("olive.workflows.run.run.log_recipe_result") +@patch("olive.workflows.run.run.run_engine") +def test_run_skips_recipe_result_when_recipe_telemetry_is_not_emitted(mock_run_engine, mock_log_recipe_result): + expected_output = object() + mock_run_engine.return_value = expected_output + + output = olive_run( + { + "input_model": { + "type": "HfModel", + "model_path": "Qwen/Qwen2.5-0.5B-Instruct", + "task": "text-generation", + } + }, + emit_recipe_telemetry=False, + ) + + assert output is expected_output + mock_log_recipe_result.assert_not_called() + + +@patch("olive.workflows.run.run.log_recipe_result") +@patch("olive.systems.system_config.SystemConfig.create_system") +def test_run_logs_single_parent_recipe_result_for_docker_host(mock_create_system, mock_log_recipe_result): + expected_output = object() + docker_system = Mock() + + def run_workflow(container_run_config): + container_run_config.engine.host = container_run_config.engine.target + return expected_output + + docker_system.run_workflow.side_effect = run_workflow + mock_create_system.return_value = docker_system + config = { + "input_model": {"type": "ONNXModel", "model_path": "model.onnx"}, + "systems": { + "docker_system": { + "type": "Docker", + "config": { + "dockerfile": "Dockerfile", + "build_context_path": "build_context", + "image_name": "test-image:latest", + "work_dir": "/olive-ws", + }, + }, + "local_system": {"type": "LocalSystem"}, + }, + "engine": {"host": "docker_system", "target": "local_system"}, + } + + output = olive_run(config) + + assert output is expected_output + mock_log_recipe_result.assert_called_once() + metadata = mock_log_recipe_result.call_args.kwargs["metadata"] + assert metadata["host_system_type"] == "Docker" + + +@patch("olive.workflows.run.run.log_recipe_result") +@patch("olive.workflows.run.run.run_engine") +def test_run_logs_recipe_host_metadata_without_explicit_target(mock_run_engine, mock_log_recipe_result): + config = { + "input_model": { + "type": "HfModel", + "model_path": "Qwen/Qwen2.5-0.5B-Instruct", + "task": "text-generation", + "load_kwargs": {"attn_implementation": "eager"}, + }, + "systems": { + "host_system": { + "type": "LocalSystem", + "accelerators": [{"device": "cpu", "execution_providers": ["CPUExecutionProvider"]}], + } + }, + "engine": {"host": "host_system"}, + } + mock_run_engine.return_value = object() + + olive_run( + config, + recipe_telemetry_metadata={ + "recipe_name": "Quantize", + "recipe_command": "Quantize", + "recipe_source": "generated_cli", + "recipe_format": "generated", + }, + ) + + metadata = mock_log_recipe_result.call_args.kwargs["metadata"] + assert "target_system_type" not in metadata + assert "target_device" not in metadata + assert "target_execution_provider" not in metadata + assert "target_execution_providers" not in metadata + assert metadata["host_system_type"] == "LocalSystem" + assert metadata["host_device"] == "cpu" + assert metadata["host_execution_provider"] == "CPUExecutionProvider" + assert metadata["host_execution_providers"] == "CPUExecutionProvider" + + +@patch("olive.workflows.run.run.log_recipe_result") +@patch("olive.workflows.run.run.run_engine") +def test_run_logs_package_config_overrides_when_package_config_provided(mock_run_engine, mock_log_recipe_result): + config = { + "input_model": { + "type": "HfModel", + "model_path": "Qwen/Qwen2.5-0.5B-Instruct", + "task": "text-generation", + "load_kwargs": {"attn_implementation": "eager"}, + } + } + mock_run_engine.return_value = object() + + olive_run( + config, + package_config={ + "passes": { + "AddOliveMetadata": { + "module_path": "olive.passes.onnx.add_metadata.AddOliveMetadata", + "supported_providers": ["CPUExecutionProvider"], + } + }, + "extra_dependencies": {"custom_accelerator": ["custom-package"]}, + }, + recipe_telemetry_metadata={ + "recipe_name": "Quantize", + "recipe_command": "Quantize", + "recipe_source": "generated_cli", + "recipe_format": "generated", + }, + ) + + metadata = mock_log_recipe_result.call_args.kwargs["metadata"] + assert metadata["package_config_provided"] is True + package_config_overrides = json.loads(metadata["package_config_overrides"]) + assert package_config_overrides["passes"][0]["supported_providers"] == ["CPUExecutionProvider"] + assert "module_path" not in package_config_overrides["passes"][0] + assert package_config_overrides["extra_dependencies"]["custom_accelerator"] == ["custom-package"] + + +def test_classify_run_config_source_handles_non_pathlike_object(): + assert _classify_run_config_source(object()) == ("config_object", "object") + + +def test_classify_input_model_source_does_not_depend_on_local_filesystem(tmp_path, monkeypatch): + assert _classify_input_model_source("Qwen/Qwen2.5-0.5B-Instruct") == "string_name" + + monkeypatch.chdir(tmp_path) + (tmp_path / "bert-base-uncased").mkdir() + + assert _classify_input_model_source("bert-base-uncased") == "string_name" + assert _classify_input_model_source("./model.onnx") == "local_file" + + +def test_recipe_hash_does_not_depend_on_local_model_path_presence(tmp_path, monkeypatch): + config = { + "input_model": {"type": "HfModel", "config": {"model_path": "bert-base-uncased"}}, + "engine": {"output_dir": "output"}, + } + recipe_hash = _build_recipe_hash(config) + + monkeypatch.chdir(tmp_path) + (tmp_path / "bert-base-uncased").mkdir() + + assert _build_recipe_hash(config) == recipe_hash