diff --git a/ccflow/evaluators/__init__.py b/ccflow/evaluators/__init__.py index 7c4d23e..d9ad6ab 100644 --- a/ccflow/evaluators/__init__.py +++ b/ccflow/evaluators/__init__.py @@ -1,2 +1,3 @@ from .common import * +from .reporting import * from .retry import * diff --git a/ccflow/evaluators/common.py b/ccflow/evaluators/common.py index 2cab984..70c39ca 100644 --- a/ccflow/evaluators/common.py +++ b/ccflow/evaluators/common.py @@ -1,13 +1,8 @@ -import itertools import logging -import time -from contextlib import nullcontext -from datetime import timedelta -from pprint import pformat from types import MappingProxyType -from typing import Any, Callable, Dict, List, Optional, Set, Union +from typing import Callable, Dict, List, Optional, Set, Union -from pydantic import Field, PrivateAttr, field_validator +from pydantic import Field, PrivateAttr from typing_extensions import override from ..base import BaseModel, make_lazy_result @@ -19,11 +14,14 @@ ResultType, TransparentModelEvaluationContext, ) +from ..utils.reporting import FormatConfig, LoggingPolicy +from .reporting import ReportingEvaluator, _descriptor __all__ = [ "cache_key", "combine_evaluators", "FallbackEvaluator", + "FormatConfig", "LazyEvaluator", "LoggingEvaluator", "MemoryCacheEvaluator", @@ -124,106 +122,23 @@ def make_result(): return make_lazy_result(context.model.result_type, make_result) -class FormatConfig(BaseModel): - """Configuration for formatting the result of the evaluation. - - This is used by the LoggingEvaluator to control how the result is formatted. - """ - - arrow_as_polars: bool = Field( - False, - description="Whether to convert pyarrow tables to polars tables for formatting, as arrow formatting does not work well with large tables or provide control over options", - ) - pformat_config: Dict[str, Any] = Field({}, description="pformat config to use for formatting data") - polars_config: Dict[str, Any] = Field({}, description="polars config to use for formatting polars frames") - pandas_config: Dict[str, Any] = Field({}, description="pandas config to use for formatting pandas objects") - - -class LoggingEvaluator(EvaluatorBase): +class LoggingEvaluator(ReportingEvaluator, LoggingPolicy): """Evaluator that logs information about evaluating the callable. - It logs start and end times, the model name, and the context.""" - - log_level: int = Field(logging.DEBUG, description="The log level for start/end of evaluation") - verbose: bool = Field(True, description="Whether to output the model definition as part of logging") - log_result: bool = Field(False, description="Whether to log the result of the evaluation") - format_config: FormatConfig = Field(FormatConfig(), description="Configuration for formatting the result of the evaluation if log_result=True") - - def is_transparent(self, context: ModelEvaluationContext) -> bool: - return True - - @field_validator("log_level", mode="before") - @classmethod - def _validate_log_level(cls, v: Union[int, str]) -> int: - """Validate that the log level is a valid logging level.""" - if isinstance(v, str): - return getattr(logging, v.upper(), "") - return v + It logs start and end times, the model name, and the context. This is the *default* evaluator + when no other is configured. It is now a thin combination of :class:`ReportingEvaluator` (span / + contextvar correlation, optional structured events when a ``reporter`` is set) and + :class:`~ccflow.utils.reporting.LoggingPolicy` (the actual log output, preserved exactly), so it + also participates in the reporting span tree. + """ @override def __call__(self, context: ModelEvaluationContext) -> ResultType: - model_name = context.model.meta.name or context.model.__class__.__name__ - log_level = context.options.get("log_level", self.log_level) - verbose = context.options.get("verbose", self.verbose) - log.log(log_level, "[%s]: Start evaluation of %s on %s.", model_name, context.fn, context.context) - if verbose: - log.log(log_level, "[%s]: %s", model_name, context.model) - start = time.time() - result = None - try: - result = context() - return result - finally: - end = time.time() - if self.log_result and result is not None: - log.log( - log_level, - self._format_result(result), - model_name, - context.fn, - context.context, - ) - log.log( - log_level, - "[%s]: End evaluation of %s on %s (time elapsed: %s).", - model_name, - context.fn, - context.context, - timedelta(seconds=end - start), - ) - - def _format_result(self, result: ResultType) -> str: - """Handle formatting of the result""" - # Add special formatting for eager table/data frame types embedded in the results - import pyarrow as pa - - result_dict = result.model_dump(by_alias=True) - for k, v in result_dict.items(): - try: - if self.format_config.arrow_as_polars and isinstance(v, pa.Table): - import polars as pl # Only import polars if needed - - result_dict[k] = pl.from_arrow(v) - except TypeError: - pass - - if self.format_config.polars_config: # Control formatting of polars tables if set - import polars as pl # Only import polars if needed - - polars_context = pl.Config(**self.format_config.polars_config) - else: - polars_context = nullcontext() - - if self.format_config.pandas_config: # Control formatting of pandas tables if set - import pandas as pd - - pandas_context = pd.option_context(*itertools.chain.from_iterable(self.format_config.pandas_config.items())) - else: - pandas_context = nullcontext() - - with polars_context, pandas_context: - msg_str = "[%s]: Result of %s on %s:\n" - return f"{msg_str}{pformat(result_dict, **self.format_config.pformat_config)}" + return self._run_with_reporting( + context, + extra={"model": context.model, "raw_context": context.context, "options": dict(context.options)}, + **_descriptor(context), + ) def cache_key(flow_obj: Union[ModelEvaluationContext, ContextBase, CallableModel]) -> bytes: @@ -304,7 +219,12 @@ def __deepcopy__(self, memo): class CallableModelGraph(BaseModel): - """Class to hold a "graph" """ + """Dependency graph of callable-model evaluation contexts. + + ``graph`` maps each node's cache key to the set of its dependency cache keys, ``ids`` maps cache + keys back to their :class:`ModelEvaluationContext`, and ``root_id`` is the cache key of the node + the graph was built from. + """ graph: Dict[bytes, Set[bytes]] ids: Dict[bytes, ModelEvaluationContext] @@ -357,7 +277,7 @@ def __call__(self, context: ModelEvaluationContext) -> ResultType: import graphlib # If we are evaluating deps, or if we have already started using the graph evaluator further up the call tree, - # no not apply it any further + # do not apply it any further if self._is_evaluating: return context() self._is_evaluating = True diff --git a/ccflow/evaluators/reporting.py b/ccflow/evaluators/reporting.py new file mode 100644 index 0000000..83a50e2 --- /dev/null +++ b/ccflow/evaluators/reporting.py @@ -0,0 +1,285 @@ +"""Reporting (telemetry / tracing / metrics / alerting) evaluators. + +These are the cross-cutting ("how to run") way to attach reporting to a callable graph: a +:class:`ReportingEvaluator` wraps the evaluation of every model in its scope and is configured at +runtime via ``FlowOptions`` / ``FlowOptionsOverride``. The reporting mechanics come from the shared +policies in :mod:`ccflow.utils.reporting`, mirroring how ``RetryEvaluator`` reuses ``RetryPolicy``. + +A reporting evaluator is always *transparent*: a successful evaluation returns exactly the same value +as evaluating the wrapped context directly, so it does not affect cache keys or dependency-graph +deduplication. Reporting describes the *evaluation* (model identity, context, timing, graph topology, +failures) -- it never acts on the result payload, which is what distinguishes it from a publisher. + +The class taxonomy is ``ReportingEvaluator`` where ``Signal`` is one of ``Tracing``, +``Metrics`` or ``Alerts``. OpenTelemetry tracing/metrics are implemented; the remaining vendor +backends (Datadog, Opsgenie, JSM, NewRelic, ...) are placeholders that raise ``NotImplementedError``. +""" + +from collections import deque +from contextvars import ContextVar + +from pydantic import Field +from typing_extensions import override + +from ..callable import EvaluatorBase, ModelEvaluationContext, ResultType +from ..exttypes import PyObjectPath +from ..utils.reporting import ( + AlertsPolicy, + MetricsPolicy, + OpenTelemetryMetricsPolicy, + OpenTelemetryTracingPolicy, + ReportContext, + ReportingPolicy, + ReportPhase, + TracingPolicy, + _new_span_id, +) + +__all__ = [ + "ReportingEvaluator", + "DryRunEvaluator", + "TracingReportingEvaluator", + "MetricsReportingEvaluator", + "AlertsReportingEvaluator", + "OpenTelemetryTracingReportingEvaluator", + "OpenTelemetryMetricsReportingEvaluator", + "OpenTelemetryEvaluator", + "DatadogTracingReportingEvaluator", + "DatadogMetricsReportingEvaluator", + "DatadogAlertsReportingEvaluator", + "OpsgenieMetricsReportingEvaluator", + "OpsgenieAlertsReportingEvaluator", + "JSMAlertsReportingEvaluator", + "NewRelicTracingReportingEvaluator", + "NewRelicMetricsReportingEvaluator", + "NewRelicAlertsReportingEvaluator", +] + + +def _model_type(model) -> str: + """Best-effort fully-qualified type name for a model. + + Falls back to ``module.qualname`` when the type cannot be resolved as an importable + :class:`PyObjectPath` (e.g. dynamically-generated classes such as Ray-wrapped callables), so that + reporting never breaks evaluation. + """ + try: + return str(PyObjectPath.validate(type(model))) + except Exception: + cls = type(model) + return f"{cls.__module__}.{cls.__qualname__}" + + +def _descriptor(context: ModelEvaluationContext) -> dict: + """Build the reporting descriptor (model name/type/fn/context) for an evaluation context.""" + model = context.model + return { + "model_name": model.meta.name or model.__class__.__name__, + "model_type": _model_type(model), + "fn": context.fn, + "context": context.context, + } + + +# Re-entrancy guard for DryRunEvaluator, kept context-local (not on the instance) so a single +# evaluator instance shared across threads / concurrent evaluations cannot leak planning state from +# one evaluation into another (which would cause the second to run bodies instead of planning). +_DRY_RUN_PLANNING: ContextVar[bool] = ContextVar("ccflow_dry_run_planning", default=False) + + +class ReportingEvaluator(EvaluatorBase, ReportingPolicy): + """Evaluator that reports telemetry about the evaluation of the callable models in its scope. + + This is the base of the reporting evaluator family. It is *transparent* (the result is identical + to evaluating the wrapped context directly) and keeps no mutable per-call state on the instance -- + all per-call state lives in local variables / context vars -- so a single instance can be shared + safely across threads and combined with parallel evaluators (Ray / Celery). + + Configure a :class:`~ccflow.utils.reporting.Reporter` to receive the emitted + :class:`~ccflow.utils.reporting.ReportEvent` objects. To attach reporting to a single specific + model as part of the graph itself, use :class:`~ccflow.models.reporting.ReportingModel` instead. + """ + + def is_transparent(self, context: ModelEvaluationContext) -> bool: + return True + + @override + def __call__(self, context: ModelEvaluationContext) -> ResultType: + return self._run_with_reporting(context, **_descriptor(context)) + + +class DryRunEvaluator(EvaluatorBase, ReportingPolicy): + """Evaluator that *plans* an evaluation by walking the dependency graph without running bodies. + + It discovers the dependency graph (which does evaluate ``__deps__`` -- the cheap dependency + declarations -- but never ``__call__``), emits a :attr:`~ccflow.utils.reporting.ReportPhase.QUEUED` + event followed by a :attr:`~ccflow.utils.reporting.ReportPhase.SKIPPED` event for every node in + breadth-first order with a parent/child span tree mirroring the graph, and returns a *synthetic* + result (constructed with ``model_construct`` so no fields are validated/populated). No model body + is executed, making it useful for previewing what a run would do and for driving a UI. + + .. warning:: + + With the default ``synthetic_result=True`` the returned object is built with + ``model_construct`` -- required fields may be **unset** and validators do **not** run. It is a + placeholder for previewing/UIs only and must not be fed into downstream computation. Because + this changes the return value, the evaluator is **not transparent** in that mode (see + :meth:`is_transparent`), so it participates in cache keys and will not contaminate the real + model/context cache entry. + """ + + synthetic_result: bool = Field( + True, + description="If True, return a synthetic (model_construct) result without running any body. If False, fall back to running the wrapped evaluation after planning.", + ) + + def is_transparent(self, context: ModelEvaluationContext) -> bool: + # Transparent only when we actually run the body and return the real value. When a synthetic + # result is returned the value differs from ``context()``, so the layer must NOT be stripped + # from cache keys (otherwise a synthetic result could be cached under the real key). + return not self.synthetic_result + + @override + def __call__(self, context: ModelEvaluationContext) -> ResultType: + # Dependency declarations (and any re-entry while already planning) must run directly rather + # than re-planning, otherwise graph discovery -- which evaluates ``__deps__`` through the + # active evaluator -- recurses. The guard is context-local so a shared instance is safe under + # concurrency. This keeps the documented ``FlowOptionsOverride`` path working. + if _DRY_RUN_PLANNING.get() or context.fn == "__deps__": + return context() + + from .common import _flatten_cache_key_context, cache_key, get_dependency_graph # local import to avoid an import cycle + + token = _DRY_RUN_PLANNING.set(True) + try: + graph = get_dependency_graph(context) + span_ids = {key: _new_span_id() for key in graph.graph} + parent_of: dict = {} + depth_of: dict = {graph.root_id: 0} + order: list = [] + seen: set = set() + queue = deque([graph.root_id]) + while queue: + key = queue.popleft() + if key in seen: + continue + seen.add(key) + order.append(key) + for child in graph.graph.get(key, ()): + parent_of.setdefault(child, key) + depth_of.setdefault(child, depth_of.get(key, 0) + 1) + queue.append(child) + + for key in order: + evaluation_context = graph.ids[key] + # Strip any transparent evaluator layers so we report on the underlying model/context. + flattened, fn, _ = _flatten_cache_key_context(evaluation_context) + model = flattened.model + # The raw graph key can include this (non-transparent) dry-run evaluator layer, so it + # is not a stable identity for the logical node. Compute the node_key from the + # flattened model/context/fn/options alone (mirroring ``cache_key()``) so it matches + # across invocation styles while still distinguishing differing non-evaluator options. + logical_key = cache_key(ModelEvaluationContext(model=model, context=flattened.context, fn=fn, options=flattened.options)) + ctx = ReportContext( + model_name=model.meta.name or model.__class__.__name__, + model_type=_model_type(model), + fn=fn, + context_repr=self._context_repr(flattened.context), + span_id=span_ids[key], + parent_span_id=span_ids.get(parent_of.get(key)), + depth=depth_of.get(key, 0), + extra={"node_key": logical_key.decode("utf-8"), "dry_run": True}, + ) + self._emit(self._event(ctx, ReportPhase.QUEUED, extra=ctx.extra)) + self._emit(self._event(ctx, ReportPhase.SKIPPED, extra=ctx.extra)) + + if self.synthetic_result: + return context.model.result_type.model_construct() + return context() + finally: + _DRY_RUN_PLANNING.reset(token) + + +class TracingReportingEvaluator(ReportingEvaluator, TracingPolicy): + """Reporting evaluator specialised for distributed *tracing* (spans).""" + + +class MetricsReportingEvaluator(ReportingEvaluator, MetricsPolicy): + """Reporting evaluator specialised for *metrics* (counters / latency histograms).""" + + +class AlertsReportingEvaluator(ReportingEvaluator, AlertsPolicy): + """Reporting evaluator specialised for *alerting*, with ``P1``-``P5`` priority tags.""" + + +# ***************************************************************************** +# OpenTelemetry (implemented; optional dependency, imported lazily) +# ***************************************************************************** + + +class OpenTelemetryTracingReportingEvaluator(TracingReportingEvaluator, OpenTelemetryTracingPolicy): + """Tracing evaluator backed by OpenTelemetry spans (requires ``opentelemetry-api``).""" + + +class OpenTelemetryMetricsReportingEvaluator(MetricsReportingEvaluator, OpenTelemetryMetricsPolicy): + """Metrics evaluator backed by OpenTelemetry counters/histograms (requires ``opentelemetry-api``).""" + + +#: Convenience alias for the most common OpenTelemetry use case (tracing). +OpenTelemetryEvaluator = OpenTelemetryTracingReportingEvaluator + + +# ***************************************************************************** +# Vendor placeholders (not yet implemented) +# +# These are intentionally defined so they can be referenced in config / type hints and tracked, but +# they raise NotImplementedError when used. Implementations should replace the placeholder body with +# a real backend policy (mirroring the OpenTelemetry classes above). +# ***************************************************************************** + + +class _NotImplementedReportingEvaluator(ReportingEvaluator): + """Base for reporting evaluators whose backend integration is not yet implemented.""" + + @override + def __call__(self, context: ModelEvaluationContext) -> ResultType: + raise NotImplementedError( + f"{type(self).__name__} is not yet implemented. " + "Track/implement this integration or use an implemented reporting evaluator (e.g. OpenTelemetry*)." + ) + + +class DatadogTracingReportingEvaluator(_NotImplementedReportingEvaluator): + """Placeholder: tracing via Datadog APM (not yet implemented).""" + + +class DatadogMetricsReportingEvaluator(_NotImplementedReportingEvaluator): + """Placeholder: metrics via Datadog (not yet implemented).""" + + +class DatadogAlertsReportingEvaluator(_NotImplementedReportingEvaluator): + """Placeholder: alerting via Datadog monitors (not yet implemented).""" + + +class OpsgenieMetricsReportingEvaluator(_NotImplementedReportingEvaluator): + """Placeholder: metrics via Opsgenie (not yet implemented).""" + + +class OpsgenieAlertsReportingEvaluator(_NotImplementedReportingEvaluator): + """Placeholder: alerting via Opsgenie (not yet implemented).""" + + +class JSMAlertsReportingEvaluator(_NotImplementedReportingEvaluator): + """Placeholder: alerting via Jira Service Management (not yet implemented).""" + + +class NewRelicTracingReportingEvaluator(_NotImplementedReportingEvaluator): + """Placeholder: tracing via New Relic (not yet implemented).""" + + +class NewRelicMetricsReportingEvaluator(_NotImplementedReportingEvaluator): + """Placeholder: metrics via New Relic (not yet implemented).""" + + +class NewRelicAlertsReportingEvaluator(_NotImplementedReportingEvaluator): + """Placeholder: alerting via New Relic (not yet implemented).""" diff --git a/ccflow/models/__init__.py b/ccflow/models/__init__.py index 9921c29..fc98032 100644 --- a/ccflow/models/__init__.py +++ b/ccflow/models/__init__.py @@ -1,3 +1,4 @@ from .narwhals import * from .publisher import * +from .reporting import * from .retry import * diff --git a/ccflow/models/reporting.py b/ccflow/models/reporting.py new file mode 100644 index 0000000..b3c32bd --- /dev/null +++ b/ccflow/models/reporting.py @@ -0,0 +1,112 @@ +"""Reporting (telemetry / tracing / metrics / alerting) models. + +These are the structural ("what the graph is") way to attach reporting to a *specific* model: a +:class:`ReportingModel` is a first-class :class:`~ccflow.callable.CallableModel` node, so a reporting +policy can be declared statically in config/registries and shows up explicitly in the graph and in +serialization. It wraps another ``CallableModel`` and reuses the shared policies from +:mod:`ccflow.utils.reporting`, exactly as :class:`~ccflow.models.retry.RetryModel` reuses +``RetryPolicy``. + +Use a reporting *evaluator* (:mod:`ccflow.evaluators.reporting`) for cross-cutting, runtime reporting +across a whole scope; use a reporting *model* when reporting on one model is a declarative, visible +part of the graph itself. +""" + +from typing import Generic + +from typing_extensions import override + +from ..callable import CallableModelType, ContextType, Flow, ResultType, WrapperModel +from ..evaluators.reporting import _model_type +from ..utils.reporting import ( + AlertsPolicy, + LoggingPolicy, + MetricsPolicy, + OpenTelemetryMetricsPolicy, + OpenTelemetryTracingPolicy, + ReportingPolicy, + TracingPolicy, +) + +__all__ = [ + "ReportingModel", + "LoggingModel", + "TracingReportingModel", + "MetricsReportingModel", + "AlertsReportingModel", + "OpenTelemetryTracingReportingModel", + "OpenTelemetryMetricsReportingModel", + "OpenTelemetryModel", +] + + +class ReportingModel(WrapperModel[CallableModelType], ReportingPolicy, Generic[CallableModelType]): + """A callable model that wraps another model and reports telemetry about its evaluation. + + This is the structural counterpart to :class:`~ccflow.evaluators.reporting.ReportingEvaluator`: + it is a first-class node (declarable in config/registries, visible in serialization and the + dependency graph) that inherits the wrapped model's ``context_type`` / ``result_type`` (from + ``WrapperModel``) and reuses the reporting mechanics from + :class:`~ccflow.utils.reporting.ReportingPolicy`. + + Reporting is transparent: the wrapped model's result is returned unchanged. + """ + + @override + @Flow.call + def __call__(self, context: ContextType) -> ResultType: + name = self.model.meta.name or self.model.__class__.__name__ + return self._run_with_reporting( + lambda: self.model(context), + model_name=name, + model_type=_model_type(self.model), + fn="__call__", + context=context, + ) + + +class LoggingModel(ReportingModel[CallableModelType], LoggingPolicy, Generic[CallableModelType]): + """A callable model that wraps another model and logs its evaluation. + + This is the structural counterpart to ``LoggingEvaluator``: it produces the same start/end (and + optional result) log output via :class:`~ccflow.utils.reporting.LoggingPolicy`, but as an explicit + graph node rather than a cross-cutting evaluator. Logging is transparent: the wrapped model's + result is returned unchanged. + """ + + @override + @Flow.call + def __call__(self, context: ContextType) -> ResultType: + name = self.model.meta.name or self.model.__class__.__name__ + return self._run_with_reporting( + lambda: self.model(context), + model_name=name, + model_type=_model_type(self.model), + fn="__call__", + context=context, + extra={"model": self.model, "raw_context": context, "options": {}}, + ) + + +class TracingReportingModel(ReportingModel[CallableModelType], TracingPolicy, Generic[CallableModelType]): + """Reporting model specialised for distributed *tracing* (spans).""" + + +class MetricsReportingModel(ReportingModel[CallableModelType], MetricsPolicy, Generic[CallableModelType]): + """Reporting model specialised for *metrics* (counters / latency histograms).""" + + +class AlertsReportingModel(ReportingModel[CallableModelType], AlertsPolicy, Generic[CallableModelType]): + """Reporting model specialised for *alerting*, with ``P1``-``P5`` priority tags.""" + + +class OpenTelemetryTracingReportingModel(TracingReportingModel[CallableModelType], OpenTelemetryTracingPolicy, Generic[CallableModelType]): + """Tracing model backed by OpenTelemetry spans (requires ``opentelemetry-api``).""" + + +class OpenTelemetryMetricsReportingModel(MetricsReportingModel[CallableModelType], OpenTelemetryMetricsPolicy, Generic[CallableModelType]): + """Metrics model backed by OpenTelemetry counters/histograms (requires ``opentelemetry-api``).""" + + +#: Convenience alias for the most common OpenTelemetry use case (tracing). +OpenTelemetryModel = OpenTelemetryTracingReportingModel diff --git a/ccflow/tests/evaluators/test_reporting.py b/ccflow/tests/evaluators/test_reporting.py new file mode 100644 index 0000000..22cc9bf --- /dev/null +++ b/ccflow/tests/evaluators/test_reporting.py @@ -0,0 +1,311 @@ +import pickle +from datetime import date +from unittest import TestCase + +from ccflow import DateContext, FlowOptionsOverride, GenericResult, ModelEvaluationContext, TransparentModelEvaluationContext +from ccflow.evaluators import ( + AlertsReportingEvaluator, + DatadogAlertsReportingEvaluator, + DatadogMetricsReportingEvaluator, + DatadogTracingReportingEvaluator, + DryRunEvaluator, + JSMAlertsReportingEvaluator, + MemoryCacheEvaluator, + MetricsReportingEvaluator, + MultiEvaluator, + NewRelicAlertsReportingEvaluator, + NewRelicMetricsReportingEvaluator, + NewRelicTracingReportingEvaluator, + OpenTelemetryEvaluator, + OpenTelemetryMetricsReportingEvaluator, + OpenTelemetryTracingReportingEvaluator, + OpsgenieAlertsReportingEvaluator, + OpsgenieMetricsReportingEvaluator, + ReportingEvaluator, + TracingReportingEvaluator, +) +from ccflow.utils.reporting import AlertPriority, InMemoryReporter, ReportPhase + +from .util import MyFlakyCallable, MyResult, NodeModel + + +class TestReportingEvaluator(TestCase): + def setUp(self): + self.context = DateContext(date=date(2022, 1, 1)) + + def _eval_context(self, model): + return ModelEvaluationContext(model=model, context=self.context) + + def test_success_reports_and_is_transparent(self): + reporter = InMemoryReporter() + model = MyFlakyCallable(offset=1, fail_times=0) + evaluator = ReportingEvaluator(reporter=reporter) + out = evaluator(self._eval_context(model)) + self.assertEqual(out, MyResult(x=2)) + self.assertEqual([e.phase for e in reporter.events], [ReportPhase.START, ReportPhase.SUCCESS, ReportPhase.END]) + self.assertEqual(reporter.events[0].model_name, "MyFlakyCallable") + + def test_is_transparent(self): + model = MyFlakyCallable(fail_times=0) + evaluator = ReportingEvaluator() + context = self._eval_context(model) + self.assertTrue(evaluator.is_transparent(context)) + wrapped = evaluator.make_evaluation_context(context, options=context.options) + self.assertIsInstance(wrapped, TransparentModelEvaluationContext) + + def test_error_reports_and_reraises(self): + reporter = InMemoryReporter() + model = MyFlakyCallable(fail_times=5) + evaluator = ReportingEvaluator(reporter=reporter) + with self.assertRaises(ValueError): + evaluator(self._eval_context(model)) + self.assertEqual([e.phase for e in reporter.events], [ReportPhase.START, ReportPhase.ERROR, ReportPhase.END]) + + def test_flow_options_override_integration(self): + reporter = InMemoryReporter() + model = MyFlakyCallable(offset=1, fail_times=0) + evaluator = ReportingEvaluator(reporter=reporter) + with FlowOptionsOverride(options={"evaluator": evaluator}): + out = model(self.context) + self.assertEqual(out, MyResult(x=2)) + self.assertTrue(reporter.events) + + def test_alerts_priority(self): + reporter = InMemoryReporter() + model = MyFlakyCallable(fail_times=5) + evaluator = AlertsReportingEvaluator(reporter=reporter, priority=AlertPriority.P1) + with self.assertRaises(ValueError): + evaluator(self._eval_context(model)) + errors = [e for e in reporter.events if e.phase == ReportPhase.ERROR] + self.assertEqual(errors[0].priority, AlertPriority.P1) + + def test_metrics_evaluator(self): + reporter = InMemoryReporter() + model = MyFlakyCallable(offset=1, fail_times=0) + evaluator = MetricsReportingEvaluator(reporter=reporter) + evaluator(self._eval_context(model)) + self.assertTrue(any(e.extra.get("metric") for e in reporter.events)) + + def test_serializable(self): + evaluator = TracingReportingEvaluator(reporter=InMemoryReporter()) + restored = pickle.loads(pickle.dumps(evaluator)) + self.assertEqual(restored, evaluator) + dumped = evaluator.model_dump() + self.assertEqual(TracingReportingEvaluator.model_validate(dumped), evaluator) + + +class TestNotImplementedReportingEvaluators(TestCase): + def setUp(self): + self.context = ModelEvaluationContext(model=MyFlakyCallable(fail_times=0), context=DateContext(date=date(2022, 1, 1))) + + def test_placeholders_raise(self): + placeholders = [ + DatadogTracingReportingEvaluator, + DatadogMetricsReportingEvaluator, + DatadogAlertsReportingEvaluator, + OpsgenieMetricsReportingEvaluator, + OpsgenieAlertsReportingEvaluator, + JSMAlertsReportingEvaluator, + NewRelicTracingReportingEvaluator, + NewRelicMetricsReportingEvaluator, + NewRelicAlertsReportingEvaluator, + ] + for cls in placeholders: + with self.assertRaises(NotImplementedError): + cls()(self.context) + + +class TestOpenTelemetryEvaluator(TestCase): + def setUp(self): + self.context = ModelEvaluationContext(model=MyFlakyCallable(offset=1, fail_times=0), context=DateContext(date=date(2022, 1, 1))) + + def test_alias(self): + self.assertIs(OpenTelemetryEvaluator, OpenTelemetryTracingReportingEvaluator) + + def test_tracing_runs_with_otel_if_available(self): + try: + import opentelemetry # noqa: F401 + except ImportError: + self.skipTest("opentelemetry not installed") + out = OpenTelemetryTracingReportingEvaluator()(self.context) + self.assertEqual(out, MyResult(x=2)) + + def test_metrics_runs_with_otel_if_available(self): + try: + import opentelemetry # noqa: F401 + except ImportError: + self.skipTest("opentelemetry not installed") + out = OpenTelemetryMetricsReportingEvaluator()(self.context) + self.assertEqual(out, MyResult(x=2)) + + +class TestDryRunEvaluator(TestCase): + def setUp(self): + NodeModel._calls.clear() + NodeModel._deps_calls.clear() + self.context = DateContext(date=date(2022, 1, 1)) + leaf = NodeModel(meta={"name": "leaf"}) + self.root = NodeModel(meta={"name": "root"}, deps_model=[leaf]) + + def _eval_context(self, model): + return ModelEvaluationContext(model=model, context=self.context) + + def test_does_not_execute_bodies(self): + reporter = InMemoryReporter() + evaluator = DryRunEvaluator(reporter=reporter) + evaluator(self._eval_context(self.root)) + # No __call__ bodies ran (NodeModel records call bodies in _calls). + self.assertEqual(NodeModel._calls, []) + + def test_emits_queued_and_skipped_for_each_node(self): + reporter = InMemoryReporter() + evaluator = DryRunEvaluator(reporter=reporter) + evaluator(self._eval_context(self.root)) + phases = [e.phase for e in reporter.events] + self.assertEqual(phases.count(ReportPhase.QUEUED), 2) + self.assertEqual(phases.count(ReportPhase.SKIPPED), 2) + names = {e.model_name for e in reporter.events} + self.assertEqual(names, {"root", "leaf"}) + + def test_parent_child_span_tree(self): + reporter = InMemoryReporter() + evaluator = DryRunEvaluator(reporter=reporter) + evaluator(self._eval_context(self.root)) + queued = {e.model_name: e for e in reporter.events if e.phase == ReportPhase.QUEUED} + self.assertIsNone(queued["root"].parent_span_id) + self.assertEqual(queued["leaf"].parent_span_id, queued["root"].span_id) + self.assertEqual(queued["leaf"].depth, queued["root"].depth + 1) + + def test_returns_synthetic_result(self): + evaluator = DryRunEvaluator() + out = evaluator(self._eval_context(self.root)) + self.assertIsInstance(out, self.root.result_type) + + def test_synthetic_result_has_unset_fields(self): + # Guardrail: the synthetic result is built with model_construct, so required fields are unset + # and must not be relied upon downstream. + out = DryRunEvaluator()(self._eval_context(self.root)) + self.assertNotIn("value", out.model_fields_set) + + def test_is_transparent(self): + # Synthetic result changes the return value -> NOT transparent (must affect cache keys). + self.assertFalse(DryRunEvaluator().is_transparent(self._eval_context(self.root))) + # When it falls back to running the body, it returns the real value -> transparent. + self.assertTrue(DryRunEvaluator(synthetic_result=False).is_transparent(self._eval_context(self.root))) + + def test_flow_options_override_does_not_recurse(self): + # Regression: the documented FlowOptionsOverride path used to recurse while discovering deps + # because __deps__ is evaluated through the active (dry-run) evaluator. + reporter = InMemoryReporter() + with FlowOptionsOverride(options={"evaluator": DryRunEvaluator(reporter=reporter)}): + out = self.root(self.context) + self.assertIsInstance(out, self.root.result_type) + self.assertEqual(NodeModel._calls, []) + names = {e.model_name for e in reporter.events if e.phase == ReportPhase.QUEUED} + self.assertEqual(names, {"root", "leaf"}) + + def test_composed_with_cache_does_not_contaminate(self): + # A dry run composed with caching must not store its synthetic result under the real key, so a + # subsequent real run still computes (and caches) the genuine result. + cache = MemoryCacheEvaluator() + with FlowOptionsOverride(options={"evaluator": MultiEvaluator(evaluators=[DryRunEvaluator(), cache])}): + dry = self.root(self.context) + self.assertIsInstance(dry, self.root.result_type) + self.assertEqual(NodeModel._calls, []) + with FlowOptionsOverride(options={"evaluator": cache}): + real = self.root(self.context) + self.assertEqual(real, GenericResult(value=True)) + self.assertNotEqual(NodeModel._calls, []) + + def test_node_key_in_extra(self): + reporter = InMemoryReporter() + DryRunEvaluator(reporter=reporter)(self._eval_context(self.root)) + for event in reporter.events: + self.assertIn("node_key", event.extra) + + def test_node_key_stable_across_invocation_styles(self): + # The emitted node_key must strip the (non-transparent) DryRunEvaluator layer that appears in + # the FlowOptionsOverride dependency-graph keys, so a dependency has the same logical identity + # whether the dry run is invoked directly on the evaluator or through FlowOptionsOverride. + # (The dependency `leaf` is built by the framework identically in both styles; the root differs + # only because a bare direct MEC carries different FlowOptions than the framework-built one.) + direct_reporter = InMemoryReporter() + DryRunEvaluator(reporter=direct_reporter)(self._eval_context(self.root)) + direct_leaf = next(e.extra["node_key"] for e in direct_reporter.events if e.phase == ReportPhase.QUEUED and e.model_name == "leaf") + + NodeModel._calls.clear() + NodeModel._deps_calls.clear() + override_reporter = InMemoryReporter() + with FlowOptionsOverride(options={"evaluator": DryRunEvaluator(reporter=override_reporter)}): + self.root(self.context) + override_leaf = next(e.extra["node_key"] for e in override_reporter.events if e.phase == ReportPhase.QUEUED and e.model_name == "leaf") + + self.assertEqual(direct_leaf, override_leaf) + + def test_node_key_strips_evaluator_layer(self): + # In the FlowOptionsOverride path the raw dependency-graph keys include the non-transparent + # DryRunEvaluator layer. The emitted node_key must equal the logical cache_key() (evaluator + # layer stripped), not that raw graph key. + from ccflow.evaluators.common import _flatten_cache_key_context, cache_key, get_dependency_graph + + reporter = InMemoryReporter() + with FlowOptionsOverride(options={"evaluator": DryRunEvaluator(reporter=reporter)}): + self.root(self.context) + emitted = {e.model_name: e.extra["node_key"] for e in reporter.events if e.phase == ReportPhase.QUEUED} + + # Rebuild the logical graph to compute the expected cache_key() for each node. + NodeModel._calls.clear() + NodeModel._deps_calls.clear() + with FlowOptionsOverride(options={"evaluator": DryRunEvaluator()}): + inner = self.root.__call__.get_evaluation_context(self.root, self.context) + flattened, _, _ = _flatten_cache_key_context(inner) + graph = get_dependency_graph(flattened) + for key in graph.graph: + f, fn, _ = _flatten_cache_key_context(graph.ids[key]) + logical = cache_key(ModelEvaluationContext(model=f.model, context=f.context, fn=fn, options=f.options)).decode("utf-8") + self.assertEqual(emitted[f.model.meta.name], logical) + + def test_node_key_distinguishes_non_evaluator_options(self): + # node_key mirrors cache_key(), which includes non-evaluator FlowOptions, so two dry runs that + # differ only in such options must not collapse to the same logical node identity. + from ccflow.evaluators.common import cache_key + + default_reporter = InMemoryReporter() + DryRunEvaluator(reporter=default_reporter)(ModelEvaluationContext(model=self.root, context=self.context)) + default_key = next(e.extra["node_key"] for e in default_reporter.events if e.phase == ReportPhase.QUEUED and e.model_name == "root") + + other_reporter = InMemoryReporter() + other_context = ModelEvaluationContext(model=self.root, context=self.context, options={"validate_result": False}) + DryRunEvaluator(reporter=other_reporter)(other_context) + other_key = next(e.extra["node_key"] for e in other_reporter.events if e.phase == ReportPhase.QUEUED and e.model_name == "root") + + self.assertNotEqual(default_key, other_key) + # And the emitted key equals the real cache_key() of the logical node. + self.assertEqual(other_key, cache_key(other_context).decode("utf-8")) + + def test_concurrent_dry_runs_share_instance_without_running_bodies(self): + # The planning guard is context-local, so a single shared instance used by two concurrent + # evaluations must never let one run's planning state leak into the other (which would make + # it execute bodies instead of planning). + import threading + + evaluator = DryRunEvaluator() + barrier = threading.Barrier(2) + errors = [] + + def run(): + try: + barrier.wait() + for _ in range(20): + evaluator(self._eval_context(self.root)) + except Exception as exc: # pragma: no cover - surfaced via errors list + errors.append(exc) + + threads = [threading.Thread(target=run) for _ in range(2)] + for t in threads: + t.start() + for t in threads: + t.join() + self.assertEqual(errors, []) + # No model body ran in either thread. + self.assertEqual(NodeModel._calls, []) diff --git a/ccflow/tests/models/test_reporting.py b/ccflow/tests/models/test_reporting.py new file mode 100644 index 0000000..844cdd0 --- /dev/null +++ b/ccflow/tests/models/test_reporting.py @@ -0,0 +1,88 @@ +import logging +import pickle +from datetime import date +from unittest import TestCase + +from ccflow import DateContext +from ccflow.models import ( + AlertsReportingModel, + LoggingModel, + MetricsReportingModel, + OpenTelemetryModel, + OpenTelemetryTracingReportingModel, + ReportingModel, + TracingReportingModel, +) +from ccflow.utils.reporting import AlertPriority, InMemoryReporter, ReportPhase + +from ..evaluators.util import MyFlakyCallable, MyResult + + +class TestReportingModel(TestCase): + def setUp(self): + self.context = DateContext(date=date(2022, 1, 1)) + + def test_success_reports_and_is_transparent(self): + reporter = InMemoryReporter() + inner = MyFlakyCallable(offset=1, fail_times=0) + model = ReportingModel(model=inner, reporter=reporter) + out = model(self.context) + self.assertEqual(out, MyResult(x=2)) + self.assertEqual([e.phase for e in reporter.events], [ReportPhase.START, ReportPhase.SUCCESS, ReportPhase.END]) + self.assertEqual(reporter.events[0].model_name, "MyFlakyCallable") + + def test_error_reports_and_reraises(self): + reporter = InMemoryReporter() + inner = MyFlakyCallable(fail_times=5) + model = ReportingModel(model=inner, reporter=reporter) + with self.assertRaises(ValueError): + model(self.context) + self.assertEqual([e.phase for e in reporter.events], [ReportPhase.START, ReportPhase.ERROR, ReportPhase.END]) + + def test_preserves_context_and_result_type(self): + inner = MyFlakyCallable() + model = ReportingModel(model=inner) + self.assertEqual(model.context_type, inner.context_type) + self.assertEqual(model.result_type, inner.result_type) + + def test_alerts_priority(self): + reporter = InMemoryReporter() + inner = MyFlakyCallable(fail_times=5) + model = AlertsReportingModel(model=inner, reporter=reporter, priority=AlertPriority.P2) + with self.assertRaises(ValueError): + model(self.context) + errors = [e for e in reporter.events if e.phase == ReportPhase.ERROR] + self.assertEqual(errors[0].priority, AlertPriority.P2) + + def test_metrics_model(self): + reporter = InMemoryReporter() + inner = MyFlakyCallable(offset=1, fail_times=0) + model = MetricsReportingModel(model=inner, reporter=reporter) + model(self.context) + self.assertTrue(any(e.extra.get("metric") for e in reporter.events)) + + def test_logging_model_logs_and_is_transparent(self): + inner = MyFlakyCallable(offset=1, fail_times=0) + model = LoggingModel(model=inner, log_level=logging.INFO, verbose=False) + with self.assertLogs(level=logging.INFO) as captured: + out = model(self.context) + self.assertEqual(out, MyResult(x=2)) + messages = [r.getMessage() for r in captured.records] + self.assertTrue(any("Start evaluation" in m for m in messages)) + self.assertTrue(any("End evaluation" in m and "time elapsed" in m for m in messages)) + + def test_serializable(self): + model = TracingReportingModel(model=MyFlakyCallable(offset=1), reporter=InMemoryReporter()) + restored = pickle.loads(pickle.dumps(model)) + self.assertEqual(restored.model, model.model) + + def test_opentelemetry_alias(self): + self.assertIs(OpenTelemetryModel, OpenTelemetryTracingReportingModel) + + def test_opentelemetry_runs_if_available(self): + try: + import opentelemetry # noqa: F401 + except ImportError: + self.skipTest("opentelemetry not installed") + model = OpenTelemetryModel(model=MyFlakyCallable(offset=1, fail_times=0)) + self.assertEqual(model(self.context), MyResult(x=2)) diff --git a/ccflow/tests/utils/test_reporting.py b/ccflow/tests/utils/test_reporting.py new file mode 100644 index 0000000..21cef6c --- /dev/null +++ b/ccflow/tests/utils/test_reporting.py @@ -0,0 +1,384 @@ +import logging +import pickle +from typing import Callable +from unittest import TestCase + +from ccflow.utils.reporting import ( + AlertPriority, + AlertsPolicy, + CompositeReporter, + InMemoryReporter, + LoggingPolicy, + LoggingReporter, + MetricsPolicy, + NodeState, + NoOpReporter, + ReportEvent, + ReportingPolicy, + ReportingStateStore, + ReportPhase, + TracingPolicy, + UIReporter, + current_run_id, + current_span_depth, + current_span_id, + run_scope, +) + + +def _ok() -> str: + return "ok" + + +def _boom() -> str: + raise ValueError("boom") + + +class TestReporters(TestCase): + def test_in_memory_collects(self): + reporter = InMemoryReporter() + event = ReportEvent(phase=ReportPhase.START, model_name="m", span_id="abc") + reporter.emit(event) + self.assertEqual(reporter.events, [event]) + reporter.clear() + self.assertEqual(reporter.events, []) + + def test_in_memory_deepcopy_shares_buffer(self): + import copy + + reporter = InMemoryReporter() + self.assertIs(copy.deepcopy(reporter), reporter) + + def test_noop_discards(self): + reporter = NoOpReporter() + reporter.emit(ReportEvent(phase=ReportPhase.START, model_name="m", span_id="abc")) # no error + + def test_composite_fans_out(self): + a, b = InMemoryReporter(), InMemoryReporter() + composite = CompositeReporter(reporters=[a, b]) + event = ReportEvent(phase=ReportPhase.END, model_name="m", span_id="abc") + composite.emit(event) + self.assertEqual(a.events, [event]) + self.assertEqual(b.events, [event]) + + def test_composite_isolates_failing_child(self): + class BrokenReporter(NoOpReporter): + def emit(self, event): + raise RuntimeError("sink down") + + good = InMemoryReporter() + composite = CompositeReporter(reporters=[BrokenReporter(), good]) + event = ReportEvent(phase=ReportPhase.END, model_name="m", span_id="abc") + with self.assertLogs("ccflow.utils.reporting", level="ERROR"): + composite.emit(event) # must not raise + # A broken child must not prevent the healthy sink from receiving the event. + self.assertEqual(good.events, [event]) + + def test_logging_reporter(self): + reporter = LoggingReporter() + with self.assertLogs("ccflow.reporting", level="INFO") as cm: + reporter.emit(ReportEvent(phase=ReportPhase.START, model_name="m", span_id="abc")) + self.assertTrue(any("START" in line for line in cm.output)) + + +class TestReportingPolicy(TestCase): + def _run(self, policy: ReportingPolicy, fn: Callable[[], str]) -> str: + return policy._run_with_reporting(fn, model_name="M", model_type="pkg.M", fn="__call__", context="ctx") + + def test_transparent_success(self): + reporter = InMemoryReporter() + policy = ReportingPolicy(reporter=reporter) + self.assertEqual(self._run(policy, _ok), "ok") + phases = [e.phase for e in reporter.events] + self.assertEqual(phases, [ReportPhase.START, ReportPhase.SUCCESS, ReportPhase.END]) + self.assertEqual(reporter.events[0].model_name, "M") + self.assertEqual(reporter.events[0].model_type, "pkg.M") + + def test_no_reporter_is_passthrough(self): + policy = ReportingPolicy() + self.assertEqual(self._run(policy, _ok), "ok") + + def test_error_reports_and_reraises(self): + reporter = InMemoryReporter() + policy = ReportingPolicy(reporter=reporter) + with self.assertRaises(ValueError): + self._run(policy, _boom) + phases = [e.phase for e in reporter.events] + self.assertEqual(phases, [ReportPhase.START, ReportPhase.ERROR, ReportPhase.END]) + self.assertEqual(reporter.events[1].exception_type, "ValueError") + self.assertEqual(reporter.events[1].exception_message, "boom") + + def test_span_correlation_nesting(self): + reporter = InMemoryReporter() + policy = ReportingPolicy(reporter=reporter) + + def outer() -> str: + # The inner evaluation should see the outer span as its parent. + self.assertIsNotNone(current_span_id()) + policy._run_with_reporting(_ok, model_name="inner", model_type="pkg.inner", fn="__call__", context="c") + return "ok" + + self._run(policy, outer) + starts = [e for e in reporter.events if e.phase == ReportPhase.START] + outer_start = next(e for e in starts if e.model_name == "M") + inner_start = next(e for e in starts if e.model_name == "inner") + self.assertIsNone(outer_start.parent_span_id) + self.assertEqual(inner_start.parent_span_id, outer_start.span_id) + self.assertEqual(inner_start.depth, outer_start.depth + 1) + + def test_span_reset_after_call(self): + policy = ReportingPolicy(reporter=InMemoryReporter()) + self.assertIsNone(current_span_id()) + self._run(policy, _ok) + self.assertIsNone(current_span_id()) + + def test_current_span_depth_tracks_nesting(self): + policy = ReportingPolicy(reporter=InMemoryReporter()) + self.assertIsNone(current_span_depth()) + seen = [] + + def outer() -> str: + seen.append(current_span_depth()) + + def inner() -> str: + seen.append(current_span_depth()) + return "ok" + + policy._run_with_reporting(inner, model_name="inner", model_type="pkg.inner", fn="__call__", context="c") + return "ok" + + self._run(policy, outer) + self.assertEqual(seen, [0, 1]) + self.assertIsNone(current_span_depth()) + + def test_context_repr_truncation(self): + reporter = InMemoryReporter() + policy = ReportingPolicy(reporter=reporter, max_context_repr=10) + policy._run_with_reporting(_ok, model_name="M", model_type="pkg.M", fn="__call__", context="x" * 100) + self.assertLessEqual(len(reporter.events[0].context_repr), 10) + + def test_capture_context_repr_disabled(self): + reporter = InMemoryReporter() + policy = ReportingPolicy(reporter=reporter, capture_context_repr=False) + policy._run_with_reporting(_ok, model_name="M", model_type="pkg.M", fn="__call__", context="secret") + self.assertEqual(reporter.events[0].context_repr, "") + + def test_context_repr_zero_is_empty(self): + reporter = InMemoryReporter() + policy = ReportingPolicy(reporter=reporter, max_context_repr=0) + policy._run_with_reporting(_ok, model_name="M", model_type="pkg.M", fn="__call__", context="secret") + self.assertEqual(reporter.events[0].context_repr, "") + + def test_reporter_failure_does_not_break_evaluation(self): + class BrokenReporter(NoOpReporter): + def emit(self, event): + raise RuntimeError("sink down") + + policy = ReportingPolicy(reporter=BrokenReporter()) + with self.assertLogs("ccflow.utils.reporting", level="ERROR"): + # The broken sink must not change the result nor raise. + self.assertEqual(self._run(policy, _ok), "ok") + + def test_serializable(self): + policy = ReportingPolicy(reporter=LoggingReporter(log_level=20)) + restored = pickle.loads(pickle.dumps(policy)) + self.assertEqual(restored, policy) + dumped = policy.model_dump() + self.assertEqual(ReportingPolicy.model_validate(dumped), policy) + + +class TestMetricsPolicy(TestCase): + def test_metrics_extra(self): + reporter = InMemoryReporter() + policy = MetricsPolicy(reporter=reporter, metric_prefix="x") + policy._run_with_reporting(_ok, model_name="M", model_type="pkg.M", fn="__call__", context="c") + success = next(e for e in reporter.events if e.phase == ReportPhase.SUCCESS) + self.assertEqual(success.extra["metric"], "x.success") + self.assertIn("latency_seconds", success.extra) + + +class TestAlertsPolicy(TestCase): + def test_alert_on_error_with_priority(self): + reporter = InMemoryReporter() + policy = AlertsPolicy(reporter=reporter, priority=AlertPriority.P1) + with self.assertRaises(ValueError): + policy._run_with_reporting(_boom, model_name="M", model_type="pkg.M", fn="__call__", context="c") + errors = [e for e in reporter.events if e.phase == ReportPhase.ERROR] + self.assertEqual(len(errors), 1) + self.assertEqual(errors[0].priority, AlertPriority.P1) + + def test_no_alert_on_success_by_default(self): + reporter = InMemoryReporter() + policy = AlertsPolicy(reporter=reporter, priority=AlertPriority.P2) + policy._run_with_reporting(_ok, model_name="M", model_type="pkg.M", fn="__call__", context="c") + self.assertEqual(reporter.events, []) + + def test_alert_on_success_opt_in(self): + reporter = InMemoryReporter() + policy = AlertsPolicy(reporter=reporter, alert_on_success=True, priority=AlertPriority.P5) + policy._run_with_reporting(_ok, model_name="M", model_type="pkg.M", fn="__call__", context="c") + self.assertEqual([e.phase for e in reporter.events], [ReportPhase.SUCCESS]) + self.assertEqual(reporter.events[0].priority, AlertPriority.P5) + + +class TestTracingPolicy(TestCase): + def test_tracing_default_emits_span_events(self): + reporter = InMemoryReporter() + policy = TracingPolicy(reporter=reporter) + policy._run_with_reporting(_ok, model_name="M", model_type="pkg.M", fn="__call__", context="c") + self.assertEqual([e.phase for e in reporter.events], [ReportPhase.START, ReportPhase.SUCCESS, ReportPhase.END]) + + +class TestRunScope(TestCase): + def test_run_scope_sets_and_resets(self): + self.assertIsNone(current_run_id()) + with run_scope("run-1") as rid: + self.assertEqual(rid, "run-1") + self.assertEqual(current_run_id(), "run-1") + self.assertIsNone(current_run_id()) + + def test_run_scope_generates_id(self): + with run_scope() as rid: + self.assertEqual(current_run_id(), rid) + self.assertTrue(rid) + + def test_nested_run_scope_reuses_outer(self): + with run_scope("outer"): + with run_scope() as inner: + self.assertEqual(inner, "outer") + + def test_events_tagged_with_run_id(self): + reporter = InMemoryReporter() + policy = ReportingPolicy(reporter=reporter) + with run_scope("R"): + policy._run_with_reporting(_ok, model_name="M", model_type="pkg.M", fn="__call__", context="c") + self.assertTrue(all(e.run_id == "R" for e in reporter.events)) + + +class TestUIReporter(TestCase): + def test_drain_returns_and_clears(self): + reporter = UIReporter() + reporter.emit(ReportEvent(phase=ReportPhase.START, model_name="m", span_id="a")) + reporter.emit(ReportEvent(phase=ReportPhase.END, model_name="m", span_id="a")) + drained = reporter.drain() + self.assertEqual([e.phase for e in drained], [ReportPhase.START, ReportPhase.END]) + self.assertEqual(reporter.drain(), []) + + def test_bounded_drops_oldest(self): + reporter = UIReporter(maxlen=2) + for i in range(5): + reporter.emit(ReportEvent(phase=ReportPhase.START, model_name=str(i), span_id=str(i))) + drained = reporter.drain() + self.assertEqual([e.model_name for e in drained], ["3", "4"]) + + def test_deepcopy_shares_buffer(self): + import copy + + reporter = UIReporter() + self.assertIs(copy.deepcopy(reporter), reporter) + + +class TestReportingStateStore(TestCase): + def test_folds_terminal_phase(self): + store = ReportingStateStore() + store.apply(ReportEvent(phase=ReportPhase.START, model_name="m", span_id="a")) + store.apply(ReportEvent(phase=ReportPhase.SUCCESS, model_name="m", span_id="a", duration=1.5)) + node = store.nodes["a"] + self.assertEqual(node.phase, ReportPhase.SUCCESS) + self.assertEqual(node.duration, 1.5) + + def test_terminal_not_overwritten_by_transient(self): + store = ReportingStateStore() + store.apply(ReportEvent(phase=ReportPhase.ERROR, model_name="m", span_id="a", exception_type="ValueError")) + store.apply(ReportEvent(phase=ReportPhase.END, model_name="m", span_id="a")) + # END marks completion but is not an outcome: the ERROR outcome and its detail must survive. + self.assertEqual(store.nodes["a"].phase, ReportPhase.ERROR) + self.assertEqual(store.nodes["a"].exception_type, "ValueError") + + def test_success_then_end_keeps_success(self): + store = ReportingStateStore() + store.apply_all( + [ + ReportEvent(phase=ReportPhase.START, model_name="m", span_id="a"), + ReportEvent(phase=ReportPhase.SUCCESS, model_name="m", span_id="a"), + ReportEvent(phase=ReportPhase.END, model_name="m", span_id="a", duration=2.0), + ] + ) + node = store.nodes["a"] + # The folded outcome is SUCCESS, but the duration carried on END is still merged in. + self.assertEqual(node.phase, ReportPhase.SUCCESS) + self.assertEqual(node.duration, 2.0) + + def test_retry_stream_error_then_retry_then_success(self): + store = ReportingStateStore() + store.apply_all( + [ + ReportEvent(phase=ReportPhase.ERROR, model_name="m", span_id="a", attempt=1, exception_type="ValueError"), + ReportEvent(phase=ReportPhase.RETRY, model_name="m", span_id="a", attempt=1), + ReportEvent(phase=ReportPhase.SUCCESS, model_name="m", span_id="a", attempt=2), + ] + ) + node = store.nodes["a"] + # The intermediate ERROR must yield to RETRY and finally SUCCESS, not stay stuck on ERROR. + self.assertEqual(node.phase, ReportPhase.SUCCESS) + self.assertEqual(node.attempt, 2) + + def test_late_start_does_not_clobber_outcome(self): + store = ReportingStateStore() + store.apply(ReportEvent(phase=ReportPhase.SUCCESS, model_name="m", span_id="a")) + store.apply(ReportEvent(phase=ReportPhase.START, model_name="m", span_id="a")) + self.assertEqual(store.nodes["a"].phase, ReportPhase.SUCCESS) + + def test_parent_child_tree(self): + store = ReportingStateStore() + store.apply(ReportEvent(phase=ReportPhase.START, model_name="root", span_id="r")) + store.apply(ReportEvent(phase=ReportPhase.START, model_name="child", span_id="c", parent_span_id="r")) + roots = store.roots() + self.assertEqual([n.span_id for n in roots], ["r"]) + self.assertEqual([n.span_id for n in store.children("r")], ["c"]) + + def test_run_phases_recorded(self): + store = ReportingStateStore() + store.apply(ReportEvent(phase=ReportPhase.RUN_STARTED, model_name="", span_id="x", run_id="R")) + store.apply(ReportEvent(phase=ReportPhase.RUN_FINISHED, model_name="", span_id="x", run_id="R")) + self.assertEqual(store.runs["R"], ReportPhase.RUN_FINISHED) + # Run events should not create node state. + self.assertEqual(store.nodes, {}) + + def test_apply_all_node_state_type(self): + store = ReportingStateStore() + store.apply_all([ReportEvent(phase=ReportPhase.QUEUED, model_name="m", span_id="a")]) + self.assertIsInstance(store.nodes["a"], NodeState) + + +class TestLoggingPolicy(TestCase): + def _ctx_extra(self): + return {"model": "MODELREPR", "raw_context": "CTXREPR", "options": {}} + + def test_logs_start_and_end(self): + policy = LoggingPolicy(log_level=logging.INFO) + with self.assertLogs(level=logging.INFO) as captured: + policy._run_with_reporting(_ok, model_name="M", model_type="pkg.M", fn="__call__", context="c", extra=self._ctx_extra()) + messages = [r.getMessage() for r in captured.records] + self.assertTrue(any("Start evaluation of __call__ on CTXREPR" in m for m in messages)) + self.assertTrue(any("End evaluation of __call__ on CTXREPR" in m and "time elapsed" in m for m in messages)) + + def test_verbose_logs_model(self): + policy = LoggingPolicy(log_level=logging.INFO, verbose=True) + with self.assertLogs(level=logging.INFO) as captured: + policy._run_with_reporting(_ok, model_name="M", model_type="pkg.M", fn="__call__", context="c", extra=self._ctx_extra()) + self.assertTrue(any("MODELREPR" in r.getMessage() for r in captured.records)) + + def test_options_override_log_level(self): + policy = LoggingPolicy(log_level=logging.DEBUG) + extra = {"model": "m", "raw_context": "c", "options": {"log_level": logging.WARNING}} + with self.assertLogs(level=logging.WARNING) as captured: + policy._run_with_reporting(_ok, model_name="M", model_type="pkg.M", fn="__call__", context="c", extra=extra) + self.assertTrue(all(r.levelno == logging.WARNING for r in captured.records)) + + def test_emits_structured_events_when_reporter_set(self): + reporter = InMemoryReporter() + policy = LoggingPolicy(log_level=logging.INFO, verbose=False, reporter=reporter) + with self.assertLogs(level=logging.INFO): + policy._run_with_reporting(_ok, model_name="M", model_type="pkg.M", fn="__call__", context="c", extra=self._ctx_extra()) + self.assertEqual([e.phase for e in reporter.events], [ReportPhase.START, ReportPhase.SUCCESS, ReportPhase.END]) diff --git a/ccflow/tests/utils/test_retry.py b/ccflow/tests/utils/test_retry.py index e128c9b..3ead23b 100644 --- a/ccflow/tests/utils/test_retry.py +++ b/ccflow/tests/utils/test_retry.py @@ -160,3 +160,110 @@ def test_serializable(self): dumped = policy.model_dump() self.assertEqual(dumped["retry_exceptions"], [PyObjectPath.validate(ValueError), PyObjectPath.validate(KeyError)]) self.assertEqual(RetryPolicy.model_validate(dumped), policy) + + +class TestRetryReporting(TestCase): + def _run(self, policy: RetryPolicy, attempt_fn: Callable[[], str]) -> str: + return policy._run_with_retry(attempt_fn, name="test", detail="__call__ on ctx") + + def test_no_reporter_no_events(self): + # Reporting is fully opt-in; with no reporter there is zero overhead and no events. + policy = RetryPolicy(max_attempts=3) + attempt = _flaky(1, ValueError("boom")) + self.assertEqual(self._run(policy, attempt), "ok") # does not raise + + def test_success_first_try_reports_success(self): + from ccflow.utils.reporting import InMemoryReporter, ReportPhase + + reporter = InMemoryReporter() + policy = RetryPolicy(max_attempts=3, reporter=reporter) + self._run(policy, _flaky(0, ValueError("boom"))) + self.assertEqual([e.phase for e in reporter.events], [ReportPhase.SUCCESS]) + self.assertEqual(reporter.events[0].attempt, 1) + self.assertEqual(reporter.events[0].max_attempts, 3) + + def test_retry_then_succeed_lifecycle(self): + from ccflow.utils.reporting import InMemoryReporter, ReportPhase + + reporter = InMemoryReporter() + policy = RetryPolicy(max_attempts=3, reporter=reporter) + self._run(policy, _flaky(2, ValueError("boom"))) + # Two failures, each producing ERROR then RETRY, then a final SUCCESS. + self.assertEqual( + [e.phase for e in reporter.events], + [ReportPhase.ERROR, ReportPhase.RETRY, ReportPhase.ERROR, ReportPhase.RETRY, ReportPhase.SUCCESS], + ) + + def test_events_tagged_with_run_id(self): + from ccflow.utils.reporting import InMemoryReporter, run_scope + + reporter = InMemoryReporter() + policy = RetryPolicy(max_attempts=3, reporter=reporter) + with run_scope("R"): + self._run(policy, _flaky(2, ValueError("boom"))) + self.assertTrue(reporter.events) + self.assertTrue(all(e.run_id == "R" for e in reporter.events)) + + def test_events_nested_under_reporting_span_have_parent_and_depth(self): + from ccflow.utils.reporting import InMemoryReporter, ReportingPolicy + + reporter = InMemoryReporter() + retry = RetryPolicy(max_attempts=3, reporter=reporter) + outer = ReportingPolicy(reporter=reporter) + + def attempt() -> str: + return self._run(retry, _flaky(2, ValueError("boom"))) + + # Run the retry inside an active reporting span; retry events should nest one level deeper. + outer._run_with_reporting(attempt, model_name="outer", model_type="pkg.outer", fn="__call__", context="c") + outer_start = next(e for e in reporter.events if e.model_name == "outer" and e.phase.name == "START") + retry_events = [e for e in reporter.events if e.model_name == "test"] + self.assertTrue(retry_events) + for event in retry_events: + self.assertEqual(event.parent_span_id, outer_start.span_id) + self.assertEqual(event.depth, outer_start.depth + 1) + + def test_reporter_failure_does_not_break_retry(self): + from ccflow.utils.reporting import NoOpReporter + + class BrokenReporter(NoOpReporter): + def emit(self, event): + raise RuntimeError("sink down") + + policy = RetryPolicy(max_attempts=3, reporter=BrokenReporter()) + with self.assertLogs("ccflow.utils.retry", level="ERROR"): + self.assertEqual(self._run(policy, _flaky(1, ValueError("boom"))), "ok") + + def test_give_up_on_exhaustion(self): + from ccflow.utils.reporting import InMemoryReporter, ReportPhase + + reporter = InMemoryReporter() + policy = RetryPolicy(max_attempts=2, reporter=reporter) + with self.assertRaises(ValueError): + self._run(policy, _flaky(5, ValueError("boom"))) + phases = [e.phase for e in reporter.events] + self.assertEqual(phases[-1], ReportPhase.GIVE_UP) + give_up = reporter.events[-1] + self.assertEqual(give_up.extra["reason"], "attempts exhausted") + + def test_give_up_on_non_retryable(self): + from ccflow.utils.reporting import InMemoryReporter, ReportPhase + + reporter = InMemoryReporter() + policy = RetryPolicy(max_attempts=3, retry_exceptions=[ValueError], reporter=reporter) + with self.assertRaises(KeyError): + self._run(policy, _flaky(5, KeyError("boom"))) + self.assertEqual([e.phase for e in reporter.events], [ReportPhase.ERROR, ReportPhase.GIVE_UP]) + self.assertEqual(reporter.events[-1].extra["reason"], "non-retryable") + + def test_give_up_on_budget(self): + from ccflow.utils.reporting import InMemoryReporter, ReportPhase + + reporter = InMemoryReporter() + policy = RetryPolicy(max_attempts=10, wait_initial=5.0, wait_multiplier=1.0, max_delay=1.0, reporter=reporter) + with patch("ccflow.utils.retry.time.sleep"): + with self.assertRaises(ValueError): + self._run(policy, _flaky(5, ValueError("boom"))) + give_up = reporter.events[-1] + self.assertEqual(give_up.phase, ReportPhase.GIVE_UP) + self.assertEqual(give_up.extra["reason"], "max_delay budget exceeded") diff --git a/ccflow/utils/__init__.py b/ccflow/utils/__init__.py index e539aa7..6520371 100644 --- a/ccflow/utils/__init__.py +++ b/ccflow/utils/__init__.py @@ -1,5 +1,6 @@ from .chunker import * from .core import * from .logging import * +from .reporting import * from .retry import * from .tokenize import compute_behavior_token, compute_cache_token, compute_data_token, normalize_token, tokenize diff --git a/ccflow/utils/reporting.py b/ccflow/utils/reporting.py new file mode 100644 index 0000000..e9b3f3c --- /dev/null +++ b/ccflow/utils/reporting.py @@ -0,0 +1,907 @@ +"""Shared core for *reporting* (telemetry / observability) of callable model evaluation. + +This module is the single source of truth for reporting behaviour, reused by the reporting +evaluators (:mod:`ccflow.evaluators.reporting`) and reporting models (:mod:`ccflow.models.reporting`), +exactly as :mod:`ccflow.utils.retry` is shared by ``RetryEvaluator`` / ``RetryModel``. + +Reporting is *telemetry about the evaluation itself* -- which model ran, on what context, how long it +took, how it relates to other evaluations in the graph (parent/child spans) and whether it failed. +It is strictly **transparent**: it never changes the value returned by the wrapped evaluation. + +The building blocks are: + +* :class:`ReportEvent` -- a structured, serializable lifecycle record. +* :class:`Reporter` -- a pluggable sink for events (in-memory, logging, composite, vendor-specific). +* :class:`ReportingPolicy` and its signal-specific subclasses (:class:`TracingPolicy`, + :class:`MetricsPolicy`, :class:`AlertsPolicy`) -- the orchestration mixins that wrap a single + evaluation in a span and emit events at each lifecycle phase. +""" + +import itertools +import logging +import secrets +import threading +import time +from collections import deque +from contextlib import contextmanager, nullcontext +from contextvars import ContextVar +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from enum import Enum +from pprint import pformat +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, List, Optional, Tuple + +from pydantic import ConfigDict, Field, PrivateAttr, field_validator + +from ..base import BaseModel, ResultType + +if TYPE_CHECKING: + from opentelemetry.trace import Span + +__all__ = [ + "ReportPhase", + "AlertPriority", + "ReportEvent", + "ReportContext", + "Reporter", + "NoOpReporter", + "InMemoryReporter", + "LoggingReporter", + "CompositeReporter", + "UIReporter", + "ReportingStateStore", + "NodeState", + "current_span_id", + "current_span_depth", + "current_run_id", + "run_scope", + "FormatConfig", + "ReportingPolicy", + "LoggingPolicy", + "TracingPolicy", + "MetricsPolicy", + "AlertsPolicy", + "OpenTelemetryTracingPolicy", + "OpenTelemetryMetricsPolicy", +] + +log = logging.getLogger(__name__) + + +class ReportPhase(str, Enum): + """Lifecycle phase of a reported evaluation. + + The first group are *node* phases (a single model evaluation) and are emitted by the reporting + policies/evaluators/models during normal flows. The second group are *run* / *graph* scoped phases + used by the UI/observability layer to frame an overall run and its discovered graph. + + .. note:: + + ``RUN_STARTED`` / ``RUN_FINISHED`` / ``GRAPH_DISCOVERED`` are **reserved** for an explicit + run/graph observer layer and are *not* emitted by the current evaluator/model paths. + :class:`ReportingStateStore` already knows how to fold them so that the consuming layer can be + added without a breaking change. ``QUEUED`` / ``SKIPPED`` are emitted today by + :class:`~ccflow.evaluators.reporting.DryRunEvaluator`. + """ + + START = "START" + SUCCESS = "SUCCESS" + ERROR = "ERROR" + RETRY = "RETRY" + GIVE_UP = "GIVE_UP" + END = "END" + # UI / observability scoped phases + QUEUED = "QUEUED" + SKIPPED = "SKIPPED" + RUN_STARTED = "RUN_STARTED" + RUN_FINISHED = "RUN_FINISHED" + GRAPH_DISCOVERED = "GRAPH_DISCOVERED" + + +class AlertPriority(str, Enum): + """Severity / priority tag for alerts, following the common ``P1``-``P5`` convention. + + ``P1`` is the most severe (page now), ``P5`` the least (informational). + """ + + P1 = "P1" + P2 = "P2" + P3 = "P3" + P4 = "P4" + P5 = "P5" + + +# ***************************************************************************** +# Span correlation +# +# A module-level ContextVar tracks the *current* span so that nested evaluations +# (e.g. a RetryModel wrapping a CallableModel, or dependencies evaluated within a +# parent) link together into a span tree. ContextVars are per-thread / per-async +# context, so this preserves the thread-safety guarantee of the evaluators. +# ***************************************************************************** + +_CURRENT_SPAN: ContextVar[Optional[Tuple[str, int]]] = ContextVar("ccflow_report_span", default=None) +_CURRENT_RUN: ContextVar[Optional[str]] = ContextVar("ccflow_report_run", default=None) + + +def current_span_id() -> Optional[str]: + """Return the span id of the currently-active reporting span, or ``None``.""" + current = _CURRENT_SPAN.get() + return current[0] if current else None + + +def current_span_depth() -> Optional[int]: + """Return the depth of the currently-active reporting span, or ``None`` if there is none. + + Useful for events emitted outside the :meth:`ReportingPolicy._run_with_reporting` flow (e.g. retry + lifecycle events) so they can nest one level below the active span (``current_span_depth() + 1``). + """ + current = _CURRENT_SPAN.get() + return current[1] if current else None + + +def current_run_id() -> Optional[str]: + """Return the id of the currently-active run, or ``None``. + + A *run* scopes all events emitted while a root evaluation is in flight, so a UI can group events + belonging to one logical execution (multiple runs may be live concurrently across threads). + """ + return _CURRENT_RUN.get() + + +def _new_span_id() -> str: + return secrets.token_hex(8) + + +@contextmanager +def run_scope(run_id: Optional[str] = None) -> Iterator[str]: + """Bind a ``run_id`` for the duration of the ``with`` block. + + Events emitted inside the block (via any reporting policy) are tagged with this run id. If no id is + supplied a fresh one is generated. Nested ``run_scope`` calls reuse the outermost run id unless an + explicit id is given, so a single root evaluation maps to a single run. + """ + if run_id is None: + existing = _CURRENT_RUN.get() + run_id = existing if existing is not None else _new_span_id() + token = _CURRENT_RUN.set(run_id) + try: + yield run_id + finally: + _CURRENT_RUN.reset(token) + + +@dataclass +class ReportContext: + """Live, per-call state for a single reported evaluation. + + This object is created fresh inside :meth:`ReportingPolicy._run_with_reporting` and never stored + on the policy instance, so a single evaluator/model can be shared safely across threads. + """ + + model_name: str + model_type: str + fn: str + context_repr: str + span_id: str + parent_span_id: Optional[str] + depth: int + start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + _start_perf: float = field(default_factory=time.perf_counter) + extra: Dict[str, Any] = field(default_factory=dict) + + def elapsed(self) -> float: + """Seconds elapsed since the context was created.""" + return time.perf_counter() - self._start_perf + + +class ReportEvent(BaseModel): + """A structured, serializable record of a single lifecycle event for an evaluation.""" + + phase: ReportPhase = Field(description="The lifecycle phase this event represents.") + model_name: str = Field(description="The name of the model being evaluated (meta.name or class name).") + model_type: str = Field("", description="The fully-qualified type of the model being evaluated.") + fn: str = Field("__call__", description="The function being evaluated.") + context_repr: str = Field("", description="A bounded repr of the context the model is evaluated on.") + span_id: str = Field(description="Unique id of this evaluation span.") + parent_span_id: Optional[str] = Field(None, description="Span id of the enclosing evaluation, if any.") + run_id: Optional[str] = Field(None, description="Id of the run this event belongs to, if a run scope is active.") + depth: int = Field(0, ge=0, description="Nesting depth of this span in the evaluation tree.") + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), description="When the event was emitted.") + duration: Optional[float] = Field(None, ge=0.0, description="Elapsed seconds for the evaluation (set on terminal phases).") + attempt: Optional[int] = Field(None, ge=1, description="Attempt number, for retry lifecycle events.") + max_attempts: Optional[int] = Field(None, ge=1, description="Maximum attempts, for retry lifecycle events.") + exception_type: Optional[str] = Field(None, description="Type name of the exception, for ERROR/GIVE_UP events.") + exception_message: Optional[str] = Field(None, description="Message of the exception, for ERROR/GIVE_UP events.") + priority: Optional[AlertPriority] = Field(None, description="Alert priority, for alert events.") + extra: Dict[str, Any] = Field(default_factory=dict, description="Additional sink-specific metadata.") + + +# ***************************************************************************** +# Reporters (pluggable sinks) +# ***************************************************************************** + + +class Reporter(BaseModel): + """A pluggable sink that consumes :class:`ReportEvent` objects. + + The naming convention for concrete reporters mirrors the publishers: ``Reporter``. + """ + + model_config = ConfigDict(arbitrary_types_allowed=True) + + def emit(self, event: ReportEvent) -> None: + """Consume a single event. The base implementation is a no-op.""" + + +class NoOpReporter(Reporter): + """A reporter that discards all events.""" + + +class InMemoryReporter(Reporter): + """A reporter that collects events in memory, for testing and introspection. + + The collected events can be used to reconstruct the span tree of an evaluation. + """ + + # Private so it does not affect tokenization/serialization of the reporter itself. + _events: List[ReportEvent] = PrivateAttr(default_factory=list) + + @property + def events(self) -> List[ReportEvent]: + """The collected events, in emission order.""" + return list(self._events) + + def emit(self, event: ReportEvent) -> None: + self._events.append(event) + + def clear(self) -> None: + """Discard all collected events.""" + self._events.clear() + + def __deepcopy__(self, memo): + # Share the same buffer when the framework deep-copies the reporter (e.g. inside the + # ModelEvaluationContext), mirroring MemoryCacheEvaluator. + return self + + +class LoggingReporter(Reporter): + """A reporter that writes events to a :mod:`logging` logger.""" + + logger_name: str = Field("ccflow.reporting", description="Name of the logger to write events to.") + log_level: int = Field(logging.INFO, description="Level at which events are logged.") + + def emit(self, event: ReportEvent) -> None: + logging.getLogger(self.logger_name).log( + self.log_level, + "[%s] %s %s on %s (span=%s parent=%s depth=%d)%s", + event.model_name, + event.phase.value, + event.fn, + event.context_repr, + event.span_id, + event.parent_span_id, + event.depth, + f" :: {event.exception_type}: {event.exception_message}" if event.exception_type else "", + ) + + +class CompositeReporter(Reporter): + """A reporter that fans events out to a list of child reporters.""" + + reporters: List[Reporter] = Field(default_factory=list, description="Child reporters to fan events out to.") + + def emit(self, event: ReportEvent) -> None: + for reporter in self.reporters: + # Isolate each child: a broken sink must not prevent the others from receiving the event, + # nor fail the user's computation. + try: + reporter.emit(event) + except Exception: + log.exception("Child reporter %r failed to emit %s event; continuing.", type(reporter).__name__, event.phase.value) + + +class UIReporter(Reporter): + """A thread-safe, bounded reporter that buffers events for a UI / observability consumer. + + Events are appended to a bounded :class:`collections.deque`; when full, the oldest events are + dropped (the UI is expected to poll :meth:`drain` regularly and tolerate gaps). This decouples the + hot evaluation path from any (possibly slow) transport: the producing thread only does a cheap + append under a lock, and the consumer drains on its own cadence. + """ + + maxlen: int = Field(10_000, ge=1, description="Maximum number of buffered events before the oldest are dropped.") + + _buffer: "deque[ReportEvent]" = PrivateAttr(default=None) + _lock: Any = PrivateAttr(default=None) + + def model_post_init(self, __context: Any) -> None: + self._buffer = deque(maxlen=self.maxlen) + self._lock = threading.Lock() + + def emit(self, event: ReportEvent) -> None: + with self._lock: + self._buffer.append(event) + + def drain(self) -> List[ReportEvent]: + """Atomically remove and return all currently-buffered events, in emission order.""" + with self._lock: + events = list(self._buffer) + self._buffer.clear() + return events + + def __deepcopy__(self, memo): + # Share the same buffer when the framework deep-copies the reporter (e.g. inside the + # ModelEvaluationContext), mirroring InMemoryReporter. + return self + + +@dataclass +class NodeState: + """Folded UI state for a single node (one ``span_id``), reconstructed from its events.""" + + span_id: str + model_name: str + model_type: str + fn: str + context_repr: str + parent_span_id: Optional[str] + run_id: Optional[str] + depth: int + phase: ReportPhase + attempt: Optional[int] = None + max_attempts: Optional[int] = None + duration: Optional[float] = None + exception_type: Optional[str] = None + exception_message: Optional[str] = None + priority: Optional["AlertPriority"] = None + + +class ReportingStateStore: + """Folds a stream of :class:`ReportEvent` objects into per-node state for a UI. + + The store is *event-sourced*: feeding it the same events (e.g. drained from a :class:`UIReporter`) + in order reconstructs the current state of every node, keyed by ``span_id``. This is intentionally + transport-agnostic so it can run in-process or behind a socket. + + Phase folding preserves the *outcome*: once a node reaches a terminal outcome + (SUCCESS / ERROR / GIVE_UP / SKIPPED) a later transient phase (e.g. a stray START) never clobbers + it. ``END`` marks completion but is **not** an outcome, so it never replaces a recorded outcome -- + it only merges metadata (e.g. duration). The one allowed terminal transition is ``ERROR -> RETRY``, + so a retry span (which emits ERROR for a failed attempt, then RETRY) shows the live retry phase + rather than getting stuck on the intermediate error. + """ + + # Terminal *outcomes* of a node. END is intentionally excluded: it signals completion, not outcome. + _OUTCOME = frozenset( + { + ReportPhase.SUCCESS, + ReportPhase.ERROR, + ReportPhase.GIVE_UP, + ReportPhase.SKIPPED, + } + ) + + def __init__(self) -> None: + self.nodes: Dict[str, NodeState] = {} + self.runs: Dict[str, ReportPhase] = {} + + def _next_phase(self, current: ReportPhase, incoming: ReportPhase) -> ReportPhase: + """Decide the node phase after folding ``incoming`` onto ``current``.""" + # END marks completion but is not an outcome: never let it hide a recorded outcome. + if incoming == ReportPhase.END and current in self._OUTCOME: + return current + # Once an outcome is recorded, only allow ERROR -> RETRY (a failed attempt that will retry). + if current in self._OUTCOME and incoming not in self._OUTCOME: + if not (current == ReportPhase.ERROR and incoming == ReportPhase.RETRY): + return current + return incoming + + def apply(self, event: ReportEvent) -> None: + """Fold a single event into the store.""" + if event.phase in (ReportPhase.RUN_STARTED, ReportPhase.RUN_FINISHED): + if event.run_id is not None: + self.runs[event.run_id] = event.phase + return + node = self.nodes.get(event.span_id) + if node is None: + self.nodes[event.span_id] = NodeState( + span_id=event.span_id, + model_name=event.model_name, + model_type=event.model_type, + fn=event.fn, + context_repr=event.context_repr, + parent_span_id=event.parent_span_id, + run_id=event.run_id, + depth=event.depth, + phase=event.phase, + attempt=event.attempt, + max_attempts=event.max_attempts, + duration=event.duration, + exception_type=event.exception_type, + exception_message=event.exception_message, + priority=event.priority, + ) + return + # Fold the phase (preserving outcome), but always merge available metadata so e.g. the + # duration carried on a trailing END is recorded even when the outcome phase is kept. + node.phase = self._next_phase(node.phase, event.phase) + if event.attempt is not None: + node.attempt = event.attempt + if event.max_attempts is not None: + node.max_attempts = event.max_attempts + if event.duration is not None: + node.duration = event.duration + if event.exception_type is not None: + node.exception_type = event.exception_type + node.exception_message = event.exception_message + if event.priority is not None: + node.priority = event.priority + + def apply_all(self, events: List[ReportEvent]) -> None: + """Fold a batch of events into the store, in order.""" + for event in events: + self.apply(event) + + def roots(self) -> List[NodeState]: + """Return the nodes that have no parent within the store (the run roots).""" + return [n for n in self.nodes.values() if n.parent_span_id is None or n.parent_span_id not in self.nodes] + + def children(self, span_id: str) -> List[NodeState]: + """Return the nodes whose parent is ``span_id``.""" + return [n for n in self.nodes.values() if n.parent_span_id == span_id] + + +# ***************************************************************************** +# Reporting policies (orchestration mixins) +# ***************************************************************************** + + +class ReportingPolicy(BaseModel): + """Shared configuration and orchestration for reporting on a single evaluation. + + Subclasses specialise the *signal* (tracing, metrics, alerts) by overriding the lifecycle hooks + (:meth:`_on_start`, :meth:`_on_success`, :meth:`_on_error`, :meth:`_on_end`) and/or the span + context manager :meth:`_span`. The base implementation emits :class:`ReportEvent` objects to the + configured :attr:`reporter`, and always returns the wrapped result unchanged (transparency). + """ + + reporter: Optional[Reporter] = Field(None, description="Sink that events are emitted to. If None, the policy is a transparent pass-through.") + capture_context_repr: bool = Field(True, description="Whether to include a bounded repr of the context in events.") + max_context_repr: int = Field(200, ge=0, description="Maximum length of the captured context repr.") + + def _emit(self, event: ReportEvent) -> None: + if self.reporter is None: + return + try: + self.reporter.emit(event) + except Exception: + # Reporting is transparent: a broken sink must never change the evaluation result. + log.exception("Reporter %r failed to emit %s event; continuing.", type(self.reporter).__name__, event.phase.value) + + def _context_repr(self, context: Any) -> str: + if not self.capture_context_repr or self.max_context_repr <= 0: + return "" + text = repr(context) + if len(text) > self.max_context_repr: + text = text[: self.max_context_repr - 1] + "\u2026" + return text + + def _event(self, ctx: ReportContext, phase: ReportPhase, **kwargs: Any) -> ReportEvent: + return ReportEvent( + phase=phase, + model_name=ctx.model_name, + model_type=ctx.model_type, + fn=ctx.fn, + context_repr=ctx.context_repr, + span_id=ctx.span_id, + parent_span_id=ctx.parent_span_id, + run_id=_CURRENT_RUN.get(), + depth=ctx.depth, + **kwargs, + ) + + # -- lifecycle hooks (override in subclasses) ---------------------------- + + def _on_start(self, ctx: ReportContext, span: "Optional[Span]") -> None: + self._emit(self._event(ctx, ReportPhase.START)) + + def _on_success(self, ctx: ReportContext, span: "Optional[Span]", result: ResultType) -> None: + self._emit(self._event(ctx, ReportPhase.SUCCESS, duration=ctx.elapsed())) + + def _on_error(self, ctx: ReportContext, span: "Optional[Span]", exc: BaseException) -> None: + self._emit( + self._event( + ctx, + ReportPhase.ERROR, + duration=ctx.elapsed(), + exception_type=type(exc).__name__, + exception_message=str(exc), + ) + ) + + def _on_end(self, ctx: ReportContext, span: "Optional[Span]") -> None: + self._emit(self._event(ctx, ReportPhase.END, duration=ctx.elapsed())) + + @contextmanager + def _span(self, ctx: ReportContext) -> Iterator["Optional[Span]"]: + """Context manager that is active for the duration of the wrapped evaluation. + + Subclasses (e.g. OpenTelemetry) override this to open a real span; the base implementation + yields ``None`` and relies on the lifecycle hooks for emission. + """ + yield None + + # -- orchestration ------------------------------------------------------- + + def _make_context(self, *, model_name: str, model_type: str, fn: str, context: Any) -> ReportContext: + parent = _CURRENT_SPAN.get() + parent_span_id = parent[0] if parent else None + depth = (parent[1] + 1) if parent else 0 + return ReportContext( + model_name=model_name, + model_type=model_type, + fn=fn, + context_repr=self._context_repr(context), + span_id=_new_span_id(), + parent_span_id=parent_span_id, + depth=depth, + ) + + def _run_with_reporting( + self, + attempt_fn: Callable[[], ResultType], + *, + model_name: str, + model_type: str, + fn: str, + context: Any, + extra: Optional[Dict[str, Any]] = None, + ) -> ResultType: + """Run ``attempt_fn`` once, wrapped in a reporting span. Returns its result unchanged. + + Args: + attempt_fn: A zero-argument callable that performs the evaluation and returns the result. + model_name: Short name of the model (meta.name or class name). + model_type: Fully-qualified type name of the model. + fn: The function being evaluated (e.g. ``"__call__"``). + context: The context the model is evaluated on (used for a bounded repr). + extra: Optional per-call metadata stashed on the :class:`ReportContext` for use by hooks + (e.g. the live model / context / options needed by :class:`LoggingPolicy`). + """ + ctx = self._make_context(model_name=model_name, model_type=model_type, fn=fn, context=context) + if extra: + ctx.extra.update(extra) + token = _CURRENT_SPAN.set((ctx.span_id, ctx.depth)) + try: + with self._span(ctx) as span: + self._on_start(ctx, span) + try: + result = attempt_fn() + except BaseException as exc: + self._on_error(ctx, span, exc) + raise + else: + self._on_success(ctx, span, result) + return result + finally: + self._on_end(ctx, span) + finally: + _CURRENT_SPAN.reset(token) + + +# ***************************************************************************** +# Logging policy (the default, back-compat reporting signal) +# ***************************************************************************** + + +class FormatConfig(BaseModel): + """Configuration for formatting the result of the evaluation. + + This is used by the :class:`LoggingPolicy` (and ``LoggingEvaluator``) to control how the result + is formatted when ``log_result=True``. + """ + + arrow_as_polars: bool = Field( + False, + description="Whether to convert pyarrow tables to polars tables for formatting, as arrow formatting does not work well with large tables or provide control over options", + ) + pformat_config: Dict[str, Any] = Field({}, description="pformat config to use for formatting data") + polars_config: Dict[str, Any] = Field({}, description="polars config to use for formatting polars frames") + pandas_config: Dict[str, Any] = Field({}, description="pandas config to use for formatting pandas objects") + + +class LoggingPolicy(ReportingPolicy): + """Reporting policy that logs the start/end (and optionally the result) of each evaluation. + + This lifts the behaviour of the original ``LoggingEvaluator`` into a reusable mixin so it can be + shared by both ``LoggingEvaluator`` (cross-cutting) and ``LoggingModel`` (structural). The logging + is done *inline in the lifecycle hooks* to preserve the exact historical output and ``FormatConfig`` + behaviour; the hooks also call ``super()`` so that, if a :attr:`~ReportingPolicy.reporter` is + configured, structured :class:`ReportEvent` objects are emitted in addition to the log lines. + + The live model / context / options are read from ``ctx.extra`` (populated by the evaluator/model), + so per-call ``log_level`` / ``verbose`` overrides from ``FlowOptions`` continue to work. + """ + + log_level: int = Field(logging.DEBUG, description="The log level for start/end of evaluation") + verbose: bool = Field(True, description="Whether to output the model definition as part of logging") + log_result: bool = Field(False, description="Whether to log the result of the evaluation") + format_config: FormatConfig = Field(FormatConfig(), description="Configuration for formatting the result of the evaluation if log_result=True") + + @field_validator("log_level", mode="before") + @classmethod + def _validate_log_level(cls, v: Any) -> int: + """Validate that the log level is a valid logging level.""" + if isinstance(v, str): + return getattr(logging, v.upper(), "") + return v + + def _log_level(self, ctx: ReportContext) -> int: + options = ctx.extra.get("options") or {} + return options.get("log_level", self.log_level) + + def _verbose(self, ctx: ReportContext) -> bool: + options = ctx.extra.get("options") or {} + return options.get("verbose", self.verbose) + + def _on_start(self, ctx: ReportContext, span: "Optional[Span]") -> None: + log_level = self._log_level(ctx) + raw_context = ctx.extra.get("raw_context") + log.log(log_level, "[%s]: Start evaluation of %s on %s.", ctx.model_name, ctx.fn, raw_context) + if self._verbose(ctx): + log.log(log_level, "[%s]: %s", ctx.model_name, ctx.extra.get("model")) + super()._on_start(ctx, span) + + def _on_success(self, ctx: ReportContext, span: "Optional[Span]", result: ResultType) -> None: + if self.log_result and result is not None: + log.log( + self._log_level(ctx), + self._format_result(result), + ctx.model_name, + ctx.fn, + ctx.extra.get("raw_context"), + ) + super()._on_success(ctx, span, result) + + def _on_end(self, ctx: ReportContext, span: "Optional[Span]") -> None: + log.log( + self._log_level(ctx), + "[%s]: End evaluation of %s on %s (time elapsed: %s).", + ctx.model_name, + ctx.fn, + ctx.extra.get("raw_context"), + timedelta(seconds=ctx.elapsed()), + ) + super()._on_end(ctx, span) + + def _format_result(self, result: ResultType) -> str: + """Handle formatting of the result, returning a ``log``-style format string.""" + # Add special formatting for eager table/data frame types embedded in the results + import pyarrow as pa + + result_dict = result.model_dump(by_alias=True) + for k, v in result_dict.items(): + try: + if self.format_config.arrow_as_polars and isinstance(v, pa.Table): + import polars as pl # Only import polars if needed + + result_dict[k] = pl.from_arrow(v) + except TypeError: + pass + + if self.format_config.polars_config: # Control formatting of polars tables if set + import polars as pl # Only import polars if needed + + polars_context = pl.Config(**self.format_config.polars_config) + else: + polars_context = nullcontext() + + if self.format_config.pandas_config: # Control formatting of pandas tables if set + import pandas as pd + + pandas_context = pd.option_context(*itertools.chain.from_iterable(self.format_config.pandas_config.items())) + else: + pandas_context = nullcontext() + + with polars_context, pandas_context: + msg_str = "[%s]: Result of %s on %s:\n" + return f"{msg_str}{pformat(result_dict, **self.format_config.pformat_config)}" + + +class TracingPolicy(ReportingPolicy): + """Reporting policy specialised for distributed *tracing* (spans). + + The generic implementation emits START/SUCCESS/ERROR/END events that describe a span; concrete + backends (e.g. :class:`OpenTelemetryTracingPolicy`) override :meth:`_span` to open real spans. + """ + + +class MetricsPolicy(ReportingPolicy): + """Reporting policy specialised for *metrics* (counters / latency histograms). + + The generic implementation records an evaluation count and a latency measurement (in the event + ``extra``); concrete backends override the hooks to push to a metrics backend. + """ + + metric_prefix: str = Field("ccflow.model", description="Prefix applied to emitted metric names.") + + def _on_success(self, ctx: ReportContext, span: "Optional[Span]", result: ResultType) -> None: + self._emit( + self._event( + ctx, + ReportPhase.SUCCESS, + duration=ctx.elapsed(), + extra={"metric": f"{self.metric_prefix}.success", "value": 1, "latency_seconds": ctx.elapsed()}, + ) + ) + + def _on_error(self, ctx: ReportContext, span: "Optional[Span]", exc: BaseException) -> None: + self._emit( + self._event( + ctx, + ReportPhase.ERROR, + duration=ctx.elapsed(), + exception_type=type(exc).__name__, + exception_message=str(exc), + extra={"metric": f"{self.metric_prefix}.error", "value": 1, "latency_seconds": ctx.elapsed()}, + ) + ) + + def _on_end(self, ctx: ReportContext, span: "Optional[Span]") -> None: + self._emit( + self._event( + ctx, + ReportPhase.END, + duration=ctx.elapsed(), + extra={"metric": f"{self.metric_prefix}.latency", "value": ctx.elapsed()}, + ) + ) + + +class AlertsPolicy(ReportingPolicy): + """Reporting policy specialised for *alerting*. + + Emits a prioritised alert event when an evaluation fails (and, optionally, when it succeeds again + after previously failing). Concrete backends (PagerDuty / Opsgenie / JSM / ...) override the hooks + to route alerts to an on-call system. + """ + + priority: AlertPriority = Field(AlertPriority.P3, description="Priority tag applied to emitted alerts (P1 = most severe).") + alert_on_error: bool = Field(True, description="Whether to emit an alert when an evaluation fails.") + alert_on_success: bool = Field(False, description="Whether to emit a (recovery) alert when an evaluation succeeds.") + + def _on_start(self, ctx: ReportContext, span: "Optional[Span]") -> None: + # Alerts are only interesting on terminal phases. + pass + + def _on_success(self, ctx: ReportContext, span: "Optional[Span]", result: ResultType) -> None: + if self.alert_on_success: + self._emit(self._event(ctx, ReportPhase.SUCCESS, duration=ctx.elapsed(), priority=self.priority)) + + def _on_error(self, ctx: ReportContext, span: "Optional[Span]", exc: BaseException) -> None: + if self.alert_on_error: + self._emit( + self._event( + ctx, + ReportPhase.ERROR, + duration=ctx.elapsed(), + exception_type=type(exc).__name__, + exception_message=str(exc), + priority=self.priority, + ) + ) + + def _on_end(self, ctx: ReportContext, span: "Optional[Span]") -> None: + pass + + +# ***************************************************************************** +# OpenTelemetry policies (real, optional-dependency backends) +# ***************************************************************************** + + +def _require_opentelemetry(): + try: + from opentelemetry import trace + except ImportError as exc: # pragma: no cover - exercised only without the optional dep + raise ImportError( + "OpenTelemetry reporting requires the 'opentelemetry-api' package. Install it with `pip install opentelemetry-api`." + ) from exc + return trace + + +def _require_opentelemetry_metrics(): + try: + from opentelemetry import metrics + except ImportError as exc: # pragma: no cover - exercised only without the optional dep + raise ImportError( + "OpenTelemetry metrics reporting requires the 'opentelemetry-api' package. Install it with `pip install opentelemetry-api`." + ) from exc + return metrics + + +class OpenTelemetryTracingPolicy(TracingPolicy): + """Tracing policy backed by OpenTelemetry spans. + + Opens a real OpenTelemetry span around each evaluation, records exceptions, and sets the span + status. ``opentelemetry-api`` is an optional dependency, imported lazily on first use. + """ + + tracer_name: str = Field("ccflow", description="Name passed to ``opentelemetry.trace.get_tracer``.") + span_name_prefix: str = Field("ccflow", description="Prefix for generated span names (``.``).") + + _tracer: Any = PrivateAttr(None) + + def _get_tracer(self): + if self._tracer is None: + trace = _require_opentelemetry() + self._tracer = trace.get_tracer(self.tracer_name) + return self._tracer + + @contextmanager + def _span(self, ctx: ReportContext) -> Iterator["Optional[Span]"]: + tracer = self._get_tracer() + with tracer.start_as_current_span(f"{self.span_name_prefix}.{ctx.model_name}") as span: + span.set_attribute("ccflow.model_name", ctx.model_name) + span.set_attribute("ccflow.model_type", ctx.model_type) + span.set_attribute("ccflow.fn", ctx.fn) + span.set_attribute("ccflow.depth", ctx.depth) + yield span + + def _on_start(self, ctx: ReportContext, span: "Optional[Span]") -> None: + super()._on_start(ctx, span) + + def _on_success(self, ctx: ReportContext, span: "Optional[Span]", result: ResultType) -> None: + trace = _require_opentelemetry() + if span is not None: + span.set_status(trace.Status(trace.StatusCode.OK)) + super()._on_success(ctx, span, result) + + def _on_error(self, ctx: ReportContext, span: "Optional[Span]", exc: BaseException) -> None: + trace = _require_opentelemetry() + if span is not None: + span.record_exception(exc) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(exc))) + super()._on_error(ctx, span, exc) + + +class OpenTelemetryMetricsPolicy(MetricsPolicy): + """Metrics policy backed by OpenTelemetry counters and histograms. + + Records a success/error counter and a latency histogram per evaluation. ``opentelemetry-api`` is + an optional dependency, imported lazily on first use. + """ + + meter_name: str = Field("ccflow", description="Name passed to ``opentelemetry.metrics.get_meter``.") + + _meter: Any = PrivateAttr(None) + _success_counter: Any = PrivateAttr(None) + _error_counter: Any = PrivateAttr(None) + _latency: Any = PrivateAttr(None) + + def _ensure_instruments(self): + if self._meter is None: + metrics = _require_opentelemetry_metrics() + self._meter = metrics.get_meter(self.meter_name) + self._success_counter = self._meter.create_counter(f"{self.metric_prefix}.success") + self._error_counter = self._meter.create_counter(f"{self.metric_prefix}.error") + self._latency = self._meter.create_histogram(f"{self.metric_prefix}.latency", unit="s") + + def _on_success(self, ctx: ReportContext, span: "Optional[Span]", result: ResultType) -> None: + self._ensure_instruments() + attrs = {"model_name": ctx.model_name, "fn": ctx.fn} + self._success_counter.add(1, attrs) + self._latency.record(ctx.elapsed(), attrs) + super()._on_success(ctx, span, result) + + def _on_error(self, ctx: ReportContext, span: "Optional[Span]", exc: BaseException) -> None: + self._ensure_instruments() + attrs = {"model_name": ctx.model_name, "fn": ctx.fn, "exception_type": type(exc).__name__} + self._error_counter.add(1, attrs) + self._latency.record(ctx.elapsed(), attrs) + super()._on_error(ctx, span, exc) + + def _on_end(self, ctx: ReportContext, span: "Optional[Span]") -> None: + # The histogram is recorded on success/error; avoid double-counting here. + ReportingPolicy._on_end(self, ctx, span) diff --git a/ccflow/utils/retry.py b/ccflow/utils/retry.py index 6b6200c..9728f6f 100644 --- a/ccflow/utils/retry.py +++ b/ccflow/utils/retry.py @@ -1,5 +1,6 @@ import logging import random +import secrets import time from typing import Callable, List, Optional, Union @@ -7,6 +8,7 @@ from ..base import BaseModel, ResultType from ..exttypes import PyObjectPath +from .reporting import Reporter, ReportEvent, ReportPhase, current_run_id, current_span_depth, current_span_id __all__ = [ "RetryError", @@ -71,6 +73,10 @@ class RetryPolicy(BaseModel): default=True, description="If True, re-raise the last underlying exception on failure. If False, raise a RetryError that wraps it." ) log_level: int = Field(default=logging.WARNING, description="Log level used to report each retry attempt.") + reporter: Optional[Reporter] = Field( + default=None, + description="Optional reporting sink that receives retry lifecycle events (failure, retry, success, give-up). See ccflow.utils.reporting.", + ) @field_validator("log_level", mode="before") @classmethod @@ -109,6 +115,51 @@ def _compute_delay(self, attempt: int, jitter_value: Optional[float] = None) -> delay += jitter_value return max(delay, 0.0) + def _emit_retry( + self, + phase: ReportPhase, + *, + name: str, + detail: str, + span_id: Optional[str], + attempt: int, + exc: Optional[BaseException] = None, + delay: Optional[float] = None, + reason: Optional[str] = None, + ) -> None: + """Emit a single retry-lifecycle event to the configured reporter (no-op if unset).""" + if self.reporter is None or span_id is None: + return + extra: dict = {} + if delay is not None: + extra["delay"] = delay + if reason is not None: + extra["reason"] = reason + parent_span_id = current_span_id() + parent_depth = current_span_depth() + depth = parent_depth + 1 if parent_depth is not None else 0 + try: + self.reporter.emit( + ReportEvent( + phase=phase, + model_name=name, + fn="__call__", + context_repr=detail, + span_id=span_id, + parent_span_id=parent_span_id, + run_id=current_run_id(), + depth=depth, + attempt=attempt, + max_attempts=self.max_attempts, + exception_type=type(exc).__name__ if exc is not None else None, + exception_message=str(exc) if exc is not None else None, + extra=extra, + ) + ) + except Exception: + # Retry reporting is best-effort: a broken sink must never change retry behaviour. + log.exception("Reporter %r failed to emit %s retry event; continuing.", type(self.reporter).__name__, phase.value) + def _run_with_retry(self, attempt_fn: Callable[[], ResultType], name: str, detail: str) -> ResultType: """Run ``attempt_fn`` with retries according to this policy. @@ -120,11 +171,14 @@ def _run_with_retry(self, attempt_fn: Callable[[], ResultType], name: str, detai total_wait = 0.0 last_exception: Optional[Exception] = None budget_exceeded = False + span_id = secrets.token_hex(8) if self.reporter is not None else None for attempt in range(1, self.max_attempts + 1): try: - return attempt_fn() + result = attempt_fn() except Exception as exc: + self._emit_retry(ReportPhase.ERROR, name=name, detail=detail, span_id=span_id, attempt=attempt, exc=exc) if not self._should_retry(exc): + self._emit_retry(ReportPhase.GIVE_UP, name=name, detail=detail, span_id=span_id, attempt=attempt, exc=exc, reason="non-retryable") raise last_exception = exc if attempt >= self.max_attempts: @@ -133,6 +187,7 @@ def _run_with_retry(self, attempt_fn: Callable[[], ResultType], name: str, detai if self.max_delay is not None and total_wait + delay > self.max_delay: budget_exceeded = True break + self._emit_retry(ReportPhase.RETRY, name=name, detail=detail, span_id=span_id, attempt=attempt, exc=exc, delay=delay) log.log( self.log_level, "[%s]: Attempt %d/%d (%s) failed with %s: %s. Retrying in %.3fs.", @@ -147,12 +202,18 @@ def _run_with_retry(self, attempt_fn: Callable[[], ResultType], name: str, detai if delay > 0: time.sleep(delay) total_wait += delay + else: + self._emit_retry(ReportPhase.SUCCESS, name=name, detail=detail, span_id=span_id, attempt=attempt) + return result if budget_exceeded: message = f"Retry stopped after {attempt} attempt(s) for {name}: max_delay budget exceeded." + reason = "max_delay budget exceeded" else: message = f"Retry attempts exhausted after {attempt}/{self.max_attempts} attempt(s) for {name}." + reason = "attempts exhausted" assert last_exception is not None # The loop only breaks/exits after at least one caught exception. + self._emit_retry(ReportPhase.GIVE_UP, name=name, detail=detail, span_id=span_id, attempt=attempt, exc=last_exception, reason=reason) if self.reraise: raise last_exception raise RetryError(message, attempts=attempt, last_exception=last_exception) from last_exception diff --git a/docs/wiki/Key-Features.md b/docs/wiki/Key-Features.md index c73aa9c..ce6978c 100644 --- a/docs/wiki/Key-Features.md +++ b/docs/wiki/Key-Features.md @@ -96,6 +96,8 @@ The following table summarizes the "evaluator" models. | `MultiEvaluator` | `ccflow.evaluators` | An evaluator that combines multiple evaluators. | | `GraphEvaluator` | `ccflow.evaluators` | Evaluator that evaluates the dependency graph of callable models in topologically sorted order. | | `RetryEvaluator` | `ccflow.evaluators` | Evaluator that retries the evaluation of a callable model on failure, with exponential backoff and jitter. | +| `ReportingEvaluator` | `ccflow.evaluators` | Transparent evaluator that reports telemetry (tracing / metrics / alerting) about each evaluation in its scope. | +| `DryRunEvaluator` | `ccflow.evaluators` | Evaluator that walks the dependency graph and reports a plan *without running any model bodies*. | | `ChunkedDateRangeEvaluator` | *Coming Soon!* | | | `ChunkedDateRangeResultsAggregator` | *Coming Soon!* | | | `RayChunkedDateRangeEvaluator` | *Coming Soon!* | | @@ -197,6 +199,102 @@ result = flaky(my_context) # same context/result types as fetch_from_api Use `RetryEvaluator` for runtime, cross-cutting retries (including across parallel evaluators), and `RetryModel` when the retry policy is a declarative, visible part of the model graph. +### Reporting, tracing & dry-run + +Reporting is *telemetry about the evaluation itself* — which model ran, on what context, how long it +took, how it relates to other evaluations (parent/child spans), and whether it failed. It is +strictly **transparent**: it never changes the value returned by the wrapped evaluation, so caching +and dependency graphs are unaffected. This is what distinguishes reporting from *publishing*: + +| Concern | Acts on | Changes the result? | Example | +| :------------- | :-------------------------- | :------------------ | :------------------------------------- | +| **Reporting** | the *evaluation* (metadata) | No (transparent) | spans, latency metrics, failure alerts | +| **Publishing** | the *result payload* | No, but consumes it | write a DataFrame to disk / a database | + +Like retries, reporting comes in both the cross-cutting (`ReportingEvaluator`) and structural +(`ReportingModel`) forms, both built on a shared `ReportingPolicy`. Events are delivered to a +pluggable `Reporter` sink. Signal-specific subclasses specialise the policy: + +- **Tracing** (`TracingReportingEvaluator` / `TracingReportingModel`) — spans with parent/child + correlation. `OpenTelemetryTracingReportingEvaluator` (aliased `OpenTelemetryEvaluator`) opens + real OpenTelemetry spans (install the optional extra: `pip install ccflow[otel]`). +- **Metrics** (`MetricsReportingEvaluator` / `MetricsReportingModel`) — success/error counters and a + latency histogram. `OpenTelemetryMetricsReportingEvaluator` pushes to OpenTelemetry instruments. +- **Alerting** (`AlertsReportingEvaluator` / `AlertsReportingModel`) — prioritised alerts with + `P1`–`P5` tags (`AlertPriority`), emitted on failure (and optionally on recovery). + +```python +from ccflow import FlowOptionsOverride +from ccflow.evaluators import TracingReportingEvaluator +from ccflow.utils.reporting import InMemoryReporter, ReportPhase + +reporter = InMemoryReporter() +tracing = TracingReportingEvaluator(reporter=reporter) + +with FlowOptionsOverride(options={"evaluator": tracing}): + result = my_model(my_context) # result is unchanged + +# reporter.events is a list of structured ReportEvent objects forming a span tree +phases = [e.phase for e in reporter.events] # e.g. [START, SUCCESS, END, ...] +``` + +Available reporters (`ccflow.utils.reporting`): `NoOpReporter`, `InMemoryReporter` (testing / +introspection), `LoggingReporter` (writes events to a logger), `CompositeReporter` (fan-out), and +`UIReporter` (a thread-safe, bounded buffer drained by an observability UI). `ReportingStateStore` +folds a stream of events into per-node state (keyed by `span_id`) so a UI can reconstruct the live +span tree, and `run_scope(...)` tags all events emitted within a run with a shared `run_id`. + +> [!NOTE] +> +> Current evaluators and models emit **node** lifecycle events (`START`, `SUCCESS`, `ERROR`, `RETRY`, +> `GIVE_UP`, `END`, plus `QUEUED` / `SKIPPED` from `DryRunEvaluator`). The run/graph envelope phases +> `RUN_STARTED`, `RUN_FINISHED` and `GRAPH_DISCOVERED` are **reserved** for a future explicit +> observer layer — `ReportingStateStore` already folds them, but no evaluator emits them today, so +> `run_scope(...)` gives you a shared `run_id` on node events rather than a complete run envelope. + +Each `ReportEvent` carries a structured `extra` dict whose keys depend on the signal/source: + +| Source | `extra` keys | Meaning | +| :------------------------------ | :-------------------- | :------------------------------------------------- | +| `DryRunEvaluator` | `node_key`, `dry_run` | Logical node identity (equals `cache_key()`); flag | +| `MetricsReportingEvaluator` | `metric`, `value` | Metric name (counter/histogram) and its value | +| Retry lifecycle (`RetryPolicy`) | `delay`, `reason` | Backoff before next attempt; give-up reason | + +> [!NOTE] +> +> `LoggingEvaluator` — the default evaluator — is itself a reporting evaluator built on a +> `LoggingPolicy`, so it participates in the same span tree. Configure a `reporter` on it to receive +> structured events in addition to its log lines. A structural `LoggingModel` is also available. + +Placeholders for additional backends (`Datadog*`, `Opsgenie*`, `JSMAlerts*`, `NewRelic*`) are defined +so they can be referenced in config and tracked, but raise `NotImplementedError` until implemented. + +#### Previewing a run with `DryRunEvaluator` + +`DryRunEvaluator` *plans* an evaluation: it walks the dependency graph (which evaluates the cheap +`__deps__` declarations but never a model body), emits `QUEUED` then `SKIPPED` events for every node +with a parent/child span tree mirroring the graph, and returns a synthetic result without running +anything. This is useful for previewing what a run would do and for driving a UI. + +```python +from ccflow.evaluators import DryRunEvaluator +from ccflow.utils.reporting import InMemoryReporter + +reporter = InMemoryReporter() +with FlowOptionsOverride(options={"evaluator": DryRunEvaluator(reporter=reporter)}): + pipeline(my_context) # no model bodies run + +planned = [(e.model_name, e.phase) for e in reporter.events] +``` + +> [!WARNING] +> +> With the default `synthetic_result=True`, the value returned by a dry run is built with +> `model_construct`: required fields may be **unset** and validators do **not** run. Treat it purely +> as a planning placeholder for previews/UIs — do **not** feed it into downstream computation. Because +> the return value differs from a real run, `DryRunEvaluator` is **not** transparent in this mode, so +> it participates in cache keys and will not contaminate a cached real result. + ## Results A Result is an object that holds the results from a callable model. It provides the equivalent of a strongly typed dictionary where the keys and schema are known upfront. diff --git a/pyproject.toml b/pyproject.toml index ba3f708..3bec3fb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,7 @@ full = [ "cexprtk", "duckdb", "IPython", + "opentelemetry-api", "panel", "panel_material_ui", "plotly", @@ -66,6 +67,9 @@ full = [ "smart_open", "xarray", ] +otel = [ + "opentelemetry-api", +] develop = [ "build", "bump-my-version", @@ -94,6 +98,8 @@ develop = [ "scipy", "smart_open", "xarray", + # Reporting deps + "opentelemetry-api", # Test deps "beautifulsoup4", "httpx", @@ -106,6 +112,7 @@ develop = [ test = [ "beautifulsoup4", "httpx", + "opentelemetry-api", "pytest", "pytest-asyncio", "pytest-cov",