From ce3d2299dd37dbda2236784c224180cf5ff716f8 Mon Sep 17 00:00:00 2001 From: Nijat K Date: Sun, 17 May 2026 05:33:58 -0400 Subject: [PATCH 1/5] Add Dep annotation to allow containers containing callable models for @Flow.model Signed-off-by: Nijat K --- ccflow/_flow_model_binding.py | 111 +++++++- .../examples/flow_model/flow_model_example.py | 26 +- .../flow_model_hydra_builder_demo.py | 20 +- ccflow/flow_model.py | 251 ++++++++++++++++++ ccflow/tests/test_flow_model.py | 206 ++++++++++++++ docs/wiki/Flow-Model.md | 54 +++- 6 files changed, 644 insertions(+), 24 deletions(-) diff --git a/ccflow/_flow_model_binding.py b/ccflow/_flow_model_binding.py index 5057ba3..c06e3e4 100644 --- a/ccflow/_flow_model_binding.py +++ b/ccflow/_flow_model_binding.py @@ -53,6 +53,11 @@ def __repr__(self) -> str: return "FromContext" +class _DepMarker: + def __repr__(self) -> str: + return "Dep" + + class FromContext: """Marker used in ``@Flow.model`` signatures for runtime/contextual inputs.""" @@ -70,11 +75,22 @@ def __class_getitem__(cls, item): return Annotated[item, _LazyMarker()] +class Dep: + """Marker used in ``@Flow.model`` signatures for explicit dependency leaves.""" + + def __new__(cls, *args, **kwargs): + raise TypeError("Dep is an annotation marker; use Dep[T] in @Flow.model signatures.") + + def __class_getitem__(cls, item): + return Annotated[item, _DepMarker()] + + @dataclass(frozen=True) class _ParsedAnnotation: base: Any is_lazy: bool is_from_context: bool + is_dep: bool optional_context: bool = False @@ -87,6 +103,7 @@ class _FlowModelParam: has_function_default: bool function_default: Any = _UNSET context_validation_annotation: Any = _UNSET + has_dep_slots: bool = False @property def validation_annotation(self) -> Any: @@ -170,6 +187,7 @@ class _SerializedFlowModelParam(NamedTuple): has_function_default: bool function_default: Any context_validation_annotation: _SerializedAnnotation + has_dep_slots: bool = False class _SerializedFlowModelConfig(NamedTuple): @@ -298,20 +316,24 @@ def _serialize_flow_model_param(param: _FlowModelParam) -> _SerializedFlowModelP has_function_default=param.has_function_default, function_default=param.function_default, context_validation_annotation=_serialize_annotation(param.context_validation_annotation), + has_dep_slots=param.has_dep_slots, ) def _restore_flow_model_param(payload: _SerializedFlowModelParam) -> _FlowModelParam: if not isinstance(payload, _SerializedFlowModelParam): raise TypeError(f"Unknown Flow.model parameter payload: {payload!r}") + annotation = _restore_annotation(payload.annotation) + context_validation_annotation = _restore_annotation(payload.context_validation_annotation) return _FlowModelParam( name=payload.name, - annotation=_restore_annotation(payload.annotation), + annotation=annotation, is_contextual=payload.is_contextual, is_lazy=payload.is_lazy, has_function_default=payload.has_function_default, function_default=payload.function_default, - context_validation_annotation=_restore_annotation(payload.context_validation_annotation), + context_validation_annotation=context_validation_annotation, + has_dep_slots=getattr(payload, "has_dep_slots", _annotation_contains_dep(annotation)), ) @@ -391,6 +413,7 @@ def _resolved_flow_signature( def _parse_annotation(annotation: Any) -> _ParsedAnnotation: is_lazy = False is_from_context = False + is_dep = False optional_context = False while get_origin(annotation) is Annotated: @@ -401,6 +424,8 @@ def _parse_annotation(annotation: Any) -> _ParsedAnnotation: is_lazy = True elif isinstance(metadata, _FromContextMarker): is_from_context = True + elif isinstance(metadata, _DepMarker): + is_dep = True # Detect markers nested inside a top-level Optional/Union, e.g. # ``Optional[FromContext[int]]`` == ``Union[Annotated[int, FromContext], None]``. @@ -432,8 +457,16 @@ def _parse_annotation(annotation: Any) -> _ParsedAnnotation: raise TypeError("FromContext is an annotation marker; use FromContext[T] in @Flow.model signatures.") if annotation is Lazy: raise TypeError("Lazy is an annotation marker; use Lazy[T] in @Flow.model signatures.") - - return _ParsedAnnotation(base=annotation, is_lazy=is_lazy, is_from_context=is_from_context, optional_context=optional_context) + if annotation is Dep: + raise TypeError("Dep is an annotation marker; use Dep[T] in @Flow.model signatures.") + + return _ParsedAnnotation( + base=annotation, + is_lazy=is_lazy, + is_from_context=is_from_context, + is_dep=is_dep, + optional_context=optional_context, + ) def _strip_annotated(annotation: Any) -> Any: @@ -442,6 +475,68 @@ def _strip_annotated(annotation: Any) -> Any: return annotation +def _pop_dep_marker(annotation: Any) -> Tuple[Any, bool]: + """Remove only the outer Dep marker while preserving other Annotated metadata.""" + + if get_origin(annotation) is not Annotated: + return annotation, False + + args = get_args(annotation) + metadata = tuple(item for item in args[1:] if not isinstance(item, _DepMarker)) + has_dep = len(metadata) != len(args[1:]) + base = args[0] + if not metadata: + return base, has_dep + # Python 3.10 cannot spell this as ``Annotated[base, *metadata]``. Keep + # non-Dep metadata, such as pydantic Field constraints, on the annotation + # used to validate literals and resolved dependency results. + return Annotated.__class_getitem__((base, *metadata)), has_dep + + +def _annotation_contains_dep(annotation: Any) -> bool: + annotation, has_dep = _pop_dep_marker(annotation) + if has_dep: + return True + return any(_annotation_contains_dep(arg) for arg in get_args(annotation)) + + +def _validate_dep_annotation(annotation: Any, *, in_dep: bool = False, dep_allowed: bool = False) -> None: + """Validate the deliberately small Dep marker language. + + Dep marks exact substitution slots. It is allowed inside container values, + but not inside another Dep and not in dict keys. + """ + + annotation, has_dep = _pop_dep_marker(annotation) + if has_dep: + if not dep_allowed: + raise TypeError("Dep[...] is only supported in regular parameter container values.") + if in_dep: + raise TypeError("Dep[...] cannot contain another Dep[...] marker.") + _validate_dep_annotation(annotation, in_dep=True, dep_allowed=False) + return + + origin = get_origin(annotation) + args = get_args(annotation) + if origin is list and len(args) == 1: + _validate_dep_annotation(args[0], in_dep=in_dep, dep_allowed=True) + return + if origin is tuple and args: + item_args = args[:1] if len(args) == 2 and args[1] is Ellipsis else args + for arg in item_args: + _validate_dep_annotation(arg, in_dep=in_dep, dep_allowed=True) + return + if origin is dict and len(args) == 2: + key_annotation, value_annotation = args + if _annotation_contains_dep(key_annotation): + raise TypeError("Dep[...] is not supported in dict keys.") + _validate_dep_annotation(value_annotation, in_dep=in_dep, dep_allowed=True) + return + + for arg in args: + _validate_dep_annotation(arg, in_dep=in_dep, dep_allowed=False) + + def _is_result_annotation(annotation: Any) -> bool: annotation = _strip_annotated(annotation) origin = get_origin(annotation) or annotation @@ -542,6 +637,13 @@ def _analyze_flow_function( parsed = _parse_annotation(param.annotation) if parsed.is_lazy and parsed.is_from_context: raise TypeError(f"Parameter '{param.name}' cannot combine Lazy[...] and FromContext[...].") + if (parsed.is_dep or _annotation_contains_dep(parsed.base)) and (parsed.is_lazy or parsed.is_from_context): + marker = "Lazy" if parsed.is_lazy else "FromContext" + raise TypeError(f"Parameter '{param.name}' cannot combine Dep[...] and {marker}[...].") + if parsed.is_dep: + raise TypeError("Dep[...] is only supported in regular parameter container values.") + _validate_dep_annotation(parsed.base) + has_dep_slots = _annotation_contains_dep(parsed.base) has_default = param.default is not inspect.Parameter.empty if parsed.is_lazy and has_default and not is_model_dependency(param.default): raise TypeError(f"Parameter '{param.name}' is marked Lazy[...] and must default to a CallableModel dependency.") @@ -568,6 +670,7 @@ def _analyze_flow_function( is_lazy=parsed.is_lazy, has_function_default=stored_has_default, function_default=stored_default, + has_dep_slots=has_dep_slots, ) ) diff --git a/ccflow/examples/flow_model/flow_model_example.py b/ccflow/examples/flow_model/flow_model_example.py index 3762853..21d45b0 100644 --- a/ccflow/examples/flow_model/flow_model_example.py +++ b/ccflow/examples/flow_model/flow_model_example.py @@ -6,7 +6,8 @@ 1. define stages as plain Python functions, 2. compose stages by passing upstream models as ordinary arguments, 3. rewrite contextual inputs on one dependency edge with `.flow.with_context(...)`, -4. execute the configured graph with `model.flow.compute(...)`. +4. use `Dep[...]` for model leaves inside regular container inputs, +5. execute the configured graph with `model.flow.compute(...)`. Run with: python ccflow/examples/flow_model/flow_model_example.py @@ -14,7 +15,7 @@ from datetime import date, timedelta -from ccflow import DateRangeContext, Flow, FromContext +from ccflow import DateRangeContext, Dep, Flow, FromContext def _format_input_names(inputs: dict[str, object]) -> str: @@ -23,10 +24,16 @@ def _format_input_names(inputs: dict[str, object]) -> str: def _format_bound_inputs(inputs: dict[str, object]) -> str: + def display_value(value: object) -> str: + if hasattr(value, "flow"): + return "model" + if isinstance(value, list): + return "[" + ", ".join(display_value(item) for item in value) + "]" + return repr(value) + parts = [] for name, value in inputs.items(): - display = "model" if hasattr(value, "flow") else repr(value) - parts.append(f"{name}={display}") + parts.append(f"{name}={display_value(value)}") return ", ".join(parts) or "(none)" @@ -45,13 +52,13 @@ def count_visitors( @Flow.model(context_type=DateRangeContext) def visitor_delta( - current: int, - previous: int, + counts: list[Dep[int]], label: str, start_date: FromContext[date], end_date: FromContext[date], ) -> dict[str, object]: """Return both visitor counts plus their difference.""" + current, previous = counts return { "label": label, "window": f"{start_date} -> {end_date}", @@ -75,8 +82,7 @@ def build_visitor_pipeline(location: str): current = count_visitors(location=location) previous = current.flow.with_context(shift_window(days=7)) return visitor_delta( - current=current, - previous=previous, + counts=[current, previous], label="previous_week", ) @@ -101,8 +107,8 @@ def main() -> None: print("\nPipeline:") print(" model: visitor_delta") pipeline_inspection = pipeline.flow.inspect() - current_inspection = pipeline.current.flow.inspect() - previous_inspection = pipeline.previous.flow.inspect() + current_inspection = pipeline.counts[0].flow.inspect() + previous_inspection = pipeline.counts[1].flow.inspect() print(f" bound inputs: {_format_bound_inputs(pipeline_inspection.bound_inputs)}") print(f" declared context inputs: {_format_input_names(pipeline_inspection.context_inputs)}") print(f" runtime inputs: {_format_input_names(pipeline_inspection.runtime_inputs)}") diff --git a/ccflow/examples/flow_model/flow_model_hydra_builder_demo.py b/ccflow/examples/flow_model/flow_model_hydra_builder_demo.py index fbecd7b..e5306e7 100644 --- a/ccflow/examples/flow_model/flow_model_hydra_builder_demo.py +++ b/ccflow/examples/flow_model/flow_model_hydra_builder_demo.py @@ -10,6 +10,7 @@ - keep runtime context (`start_date`, `end_date`) as runtime inputs, - use a plain Python builder function for graph construction, +- use `Dep[...]` when a regular container input holds upstream models, - let Hydra instantiate that builder and register the returned model. Run with: @@ -19,7 +20,7 @@ from datetime import date, timedelta from pathlib import Path -from ccflow import CallableModel, DateRangeContext, Flow, FromContext, ModelRegistry +from ccflow import CallableModel, DateRangeContext, Dep, Flow, FromContext, ModelRegistry CONFIG_PATH = Path(__file__).with_name("config") / "flow_model_hydra_builder_demo.yaml" @@ -30,10 +31,16 @@ def _format_input_names(inputs: dict[str, object]) -> str: def _format_bound_inputs(inputs: dict[str, object]) -> str: + def display_value(value: object) -> str: + if hasattr(value, "flow"): + return "model" + if isinstance(value, list): + return "[" + ", ".join(display_value(item) for item in value) + "]" + return repr(value) + parts = [] for name, value in inputs.items(): - display = "model" if hasattr(value, "flow") else repr(value) - parts.append(f"{name}={display}") + parts.append(f"{name}={display_value(value)}") return ", ".join(parts) or "(none)" @@ -56,13 +63,13 @@ def count_visitors(location: str, start_date: FromContext[date], end_date: FromC @Flow.model(context_type=DateRangeContext) def visitor_delta( - current: int, - previous: int, + counts: list[Dep[int]], label: str, start_date: FromContext[date], end_date: FromContext[date], ) -> dict[str, object]: """Return both visitor counts plus their difference.""" + current, previous = counts return { "label": label, "window": f"{start_date} -> {end_date}", @@ -85,8 +92,7 @@ def build_visitor_delta(current: CallableModel, *, label: str, days_back: int): """Hydra-friendly builder that returns a configured visitor-count model.""" previous = current.flow.with_context(shift_window(days=days_back)) return visitor_delta( - current=current, - previous=previous, + counts=[current, previous], label=label, ) diff --git a/ccflow/flow_model.py b/ccflow/flow_model.py index 0cec78e..8a65e73 100644 --- a/ccflow/flow_model.py +++ b/ccflow/flow_model.py @@ -8,6 +8,8 @@ context instead of model construction. * ``Lazy[T]`` marks a dependency that should be passed as a thunk and evaluated only if user code calls it. +* ``Dep[T]`` marks a nested regular-parameter slot that can be a literal ``T`` + or a ``CallableModel`` dependency returning ``T``. * ``model.flow.compute(...)`` and ``model.flow.with_context(...)`` provide the ergonomic execution and contextual binding API. @@ -88,6 +90,7 @@ from ._flow_model_binding import ( _UNION_ORIGINS, _UNSET, + Dep, FromContext, Lazy, _analyze_flow_context_transform, @@ -95,6 +98,7 @@ _callable_name, _FlowModelConfig, _FlowModelParam, + _pop_dep_marker, _resolved_flow_signature, _restore_flow_model_config, _serialize_flow_model_config, @@ -113,6 +117,7 @@ "BoundModel", "FlowInspection", "InputSpec", + "Dep", "FromContext", "Lazy", ) @@ -364,6 +369,11 @@ class _LiteralIdentity(NamedTuple): value: Any +class _DepMarkedIdentity(NamedTuple): + kind: Literal["dep_marked"] + value: Any + + class _UnresolvedLazyDependencyIdentity(NamedTuple): kind: Literal["unresolved_lazy_dependency"] model_type: str @@ -424,6 +434,31 @@ def _is_model_dependency(value: Any) -> bool: return isinstance(value, CallableModel) +def _strip_outer_non_dep_annotated(annotation: Any) -> Any: + """Strip outer Annotated layers only until an explicit Dep marker is found.""" + + while get_origin(annotation) is Annotated: + _, has_dep = _pop_dep_marker(annotation) + if has_dep: + return annotation + annotation = get_args(annotation)[0] + return annotation + + +def _annotation_origin_args(annotation: Any) -> Tuple[Any, Tuple[Any, ...]]: + # Container walking cares about list/tuple/dict origins, but unrelated + # Annotated metadata should not hide those origins. + annotation = _strip_outer_non_dep_annotated(annotation) + return get_origin(annotation), get_args(annotation) + + +def _path_name(name: str, path: Tuple[Any, ...]) -> str: + # Use a field-like path in nested validation errors, e.g. values[0] or + # rows['left'][2], instead of reporting every failure at the root field. + suffix = "".join(f"[{item!r}]" if not isinstance(item, int) else f"[{item}]" for item in path) + return f"{name}{suffix}" + + def _bound_field_names(model: Any) -> set[str]: fields_set = getattr(model, "model_fields_set", None) if fields_set is not None: @@ -1015,9 +1050,219 @@ def _resolve_regular_param_value(model: "_GeneratedFlowModelBase", param: _FlowM if callable(add_note): add_note(f"Error while evaluating dependency {parent}.{param.name} -> {child}.") raise + if param.has_dep_slots: + return _resolve_dep_marked_value(param.name, value, param.annotation, context) + return value + + +def _walk_dep_marked_value( + value: Any, + annotation: Any, + *, + on_dep_slot: Callable[[Any, Any, Tuple[Any, ...]], Any], + on_literal: Callable[[Any, Any, Tuple[Any, ...]], Any], + on_list: Callable[[Any, List[Any], Any, Tuple[Any, ...]], Any], + on_tuple: Callable[[Any, Tuple[Any, ...], Any, Tuple[Any, ...]], Any], + on_dict: Callable[[Any, List[Tuple[Any, Any]], Any, Any, Any, Tuple[Any, ...]], Any], + path: Tuple[Any, ...] = (), +) -> Any: + """Walk only the container grammar where Dep markers are valid.""" + + annotation = _strip_outer_non_dep_annotated(annotation) + dep_base, is_dep_slot = _pop_dep_marker(annotation) + if is_dep_slot: + return on_dep_slot(value, dep_base, path) + + origin, args = _annotation_origin_args(annotation) + if origin is list and isinstance(value, (list, tuple)) and args: + items = [ + _walk_dep_marked_value( + item, + args[0], + on_dep_slot=on_dep_slot, + on_literal=on_literal, + on_list=on_list, + on_tuple=on_tuple, + on_dict=on_dict, + path=path + (index,), + ) + for index, item in enumerate(value) + ] + return on_list(value, items, annotation, path) + + if origin is tuple and isinstance(value, (list, tuple)) and args: + if len(args) == 2 and args[1] is Ellipsis: + items = tuple( + _walk_dep_marked_value( + item, + args[0], + on_dep_slot=on_dep_slot, + on_literal=on_literal, + on_list=on_list, + on_tuple=on_tuple, + on_dict=on_dict, + path=path + (index,), + ) + for index, item in enumerate(value) + ) + return on_tuple(value, items, annotation, path) + if len(args) == len(value): + items = tuple( + _walk_dep_marked_value( + item, + item_annotation, + on_dep_slot=on_dep_slot, + on_literal=on_literal, + on_list=on_list, + on_tuple=on_tuple, + on_dict=on_dict, + path=path + (index,), + ) + for index, (item, item_annotation) in enumerate(zip(value, args)) + ) + return on_tuple(value, items, annotation, path) + return on_literal(value, annotation, path) + + if origin is dict and isinstance(value, Mapping) and len(args) == 2: + key_annotation, value_annotation = args + items = [ + ( + key, + _walk_dep_marked_value( + item, + value_annotation, + on_dep_slot=on_dep_slot, + on_literal=on_literal, + on_list=on_list, + on_tuple=on_tuple, + on_dict=on_dict, + path=path + (key,), + ), + ) + for key, item in value.items() + ] + return on_dict(value, items, key_annotation, value_annotation, annotation, path) + + return on_literal(value, annotation, path) + + +def _resolve_dep_slot_registry_ref(value: Any, annotation: Any) -> Any: + if not isinstance(value, str): + return value + candidate = _resolve_registry_candidate(value) + if candidate is not None and _registry_candidate_allowed(annotation, candidate): + return candidate return value +def _validate_dep_marked_value(name: str, value: Any, annotation: Any, source: str, path: Tuple[Any, ...] = ()) -> Any: + """Validate construction values that use explicit nested Dep markers.""" + + def validate_dep_slot(item: Any, dep_base: Any, item_path: Tuple[Any, ...]) -> Any: + if not _is_model_dependency(item): + item = _resolve_serialized_dependency_ref(item, include_target_alias=True) + if _is_model_dependency(item): + return item + try: + return _coerce_value(_path_name(name, item_path), item, dep_base, source) + except TypeError: + item_with_alias_dep = _resolve_dep_slot_registry_ref(item, dep_base) + if item_with_alias_dep is not item and _is_model_dependency(item_with_alias_dep): + return item_with_alias_dep + raise + + def validate_literal(item: Any, item_annotation: Any, item_path: Tuple[Any, ...]) -> Any: + return _coerce_value(_path_name(name, item_path), item, item_annotation, source) + + def validate_dict( + _value: Any, + items: List[Tuple[Any, Any]], + key_annotation: Any, + _value_annotation: Any, + _dict_annotation: Any, + dict_path: Tuple[Any, ...], + ) -> Any: + return {_coerce_value(_path_name(name, dict_path + (key, "key")), key, key_annotation, source): item for key, item in items} + + return _walk_dep_marked_value( + value, + annotation, + on_dep_slot=validate_dep_slot, + on_literal=validate_literal, + on_list=lambda _value, items, _annotation, _path: list(items), + on_tuple=lambda _value, items, _annotation, _path: tuple(items), + on_dict=validate_dict, + path=path, + ) + + +def _resolve_dep_marked_value(name: str, value: Any, annotation: Any, context: ContextBase, path: Tuple[Any, ...] = ()) -> Any: + """Resolve CallableModel leaves that appear at explicit Dep marker slots.""" + + def resolve_dep_slot(item: Any, dep_base: Any, item_path: Tuple[Any, ...]) -> Any: + if _is_model_dependency(item): + dependency_model, dependency_context = _resolved_dependency_invocation(item, context) + resolved = _unwrap_model_result(dependency_model(dependency_context)) + return _coerce_value(_path_name(name, item_path), resolved, dep_base, "Regular parameter") + return item + + return _walk_dep_marked_value( + value, + annotation, + on_dep_slot=resolve_dep_slot, + on_literal=lambda item, _annotation, _path: item, + on_list=lambda _value, items, _annotation, _path: list(items), + on_tuple=lambda _value, items, _annotation, _path: tuple(items), + on_dict=lambda _value, items, _key_annotation, _value_annotation, _dict_annotation, _path: dict(items), + path=path, + ) + + +def _dep_marked_dependency_entries(value: Any, annotation: Any, context: ContextBase) -> GraphDepList: + """Collect dependency graph edges from explicit Dep marker slots.""" + + def dep_slot_edges(item: Any, _dep_base: Any, _path: Tuple[Any, ...]) -> GraphDepList: + if not _is_model_dependency(item): + return [] + dependency_model, dependency_context = _resolved_dependency_invocation(item, context) + return [(dependency_model, [dependency_context])] + + def merge_items(items: Any) -> GraphDepList: + deps: GraphDepList = [] + for item in items: + deps.extend(item) + return deps + + return _walk_dep_marked_value( + value, + annotation, + on_dep_slot=dep_slot_edges, + on_literal=lambda _item, _annotation, _path: [], + on_list=lambda _value, items, _annotation, _path: merge_items(items), + on_tuple=lambda _value, items, _annotation, _path: merge_items(items), + on_dict=lambda _value, items, _key_annotation, _value_annotation, _dict_annotation, _path: merge_items(item for _, item in items), + ) + + +def _dep_marked_identity_value(value: Any, annotation: Any, context: ContextBase) -> Any: + """Return an identity payload with explicit Dep leaves replaced by dependencies.""" + + def dep_slot_identity(item: Any, _dep_base: Any, _path: Tuple[Any, ...]) -> Any: + if _is_model_dependency(item): + return _regular_dependency_identity(item, context) + return item + + return _walk_dep_marked_value( + value, + annotation, + on_dep_slot=dep_slot_identity, + on_literal=lambda item, _annotation, _path: item, + on_list=lambda _value, items, _annotation, _path: list(items), + on_tuple=lambda _value, items, _annotation, _path: tuple(items), + on_dict=lambda _value, items, _key_annotation, _value_annotation, _dict_annotation, _path: dict(items), + ) + + def _regular_dependency_identity(value: CallableModel, context: ContextBase) -> _DependencyIdentity: dependency_model, dependency_context = _resolved_dependency_invocation(value, context) return _DependencyIdentity( @@ -1041,6 +1286,8 @@ def _lazy_regular_dependency_identity(value: CallableModel, context: ContextBase def _regular_input_identity(param: _FlowModelParam, value: Any, context: ContextBase) -> _RegularInputIdentity: if _is_model_dependency(value): payload = _lazy_regular_dependency_identity(value, context) if param.is_lazy else _regular_dependency_identity(value, context) + elif param.has_dep_slots: + payload = _DepMarkedIdentity(kind="dep_marked", value=_dep_marked_identity_value(value, param.annotation, context)) else: payload = _LiteralIdentity(kind="literal", value=value) return _RegularInputIdentity(kind="regular_input", name=param.name, lazy=param.is_lazy, payload=payload) @@ -1455,6 +1702,8 @@ def _validate_bound_param_value( return value if _is_model_dependency(value): return value + if param.has_dep_slots: + return _validate_dep_marked_value(param.name, value, param.annotation, source) try: return _coerce_value(param.name, value, param.annotation, source) except TypeError: @@ -2855,6 +3104,8 @@ def __deps__(self, context): if _is_model_dependency(value): dependency_model, dependency_context = _resolved_dependency_invocation(value, context) deps.append((dependency_model, [dependency_context])) + elif param.has_dep_slots: + deps.extend(_dep_marked_dependency_entries(value, param.annotation, context)) return deps cast(Any, __deps__).__signature__ = inspect.Signature( diff --git a/ccflow/tests/test_flow_model.py b/ccflow/tests/test_flow_model.py index a7adf9a..df951f1 100644 --- a/ccflow/tests/test_flow_model.py +++ b/ccflow/tests/test_flow_model.py @@ -24,6 +24,7 @@ CallableModel, ContextBase, DateRangeContext, + Dep, EvaluatorBase, Flow, FlowContext, @@ -386,6 +387,172 @@ def total(values: list[int]) -> int: total(values=[source()]) +def test_regular_param_dep_marker_allows_marked_container_leaves(): + calls = {"source": 0, "row": 0, "total": 0} + + @Flow.model + def source(value: FromContext[int], offset: int) -> int: + calls["source"] += 1 + return value + offset + + @Flow.model + def row_source(value: FromContext[int]) -> list[int]: + calls["row"] += 1 + return [value, value + 1] + + @Flow.model + def list_source(value: FromContext[int]) -> list[int]: + return [value, value * 2] + + @Flow.model + def total(values: list[Dep[int]], rows: list[Dep[list[int]]] = ()) -> int: + calls["total"] += 1 + return sum(values) + sum(sum(row) for row in rows) + + model = total(values=(source(offset=1), "2", 3), rows=([4, 5], row_source())) + deps = model.__deps__(FlowContext(value=10)) + + assert len(deps) == 2 + assert model.flow.compute(value=10).value == 46 + assert calls == {"source": 1, "row": 1, "total": 1} + + whole_param_model = total(values=list_source()) + assert whole_param_model.flow.compute(value=4).value == 12 + + serialized_model = total(values=[source(offset=5).model_dump(mode="python")]) + assert isinstance(serialized_model.values[0], CallableModel) + assert serialized_model.flow.compute(value=10).value == 15 + + registry = ModelRegistry.root().clear() + registry.add("source", source(offset=4)) + try: + registry_model = total(values=["source", "2"], rows=(row_source(),)) + assert registry_model.flow.compute(value=10).value == 37 + finally: + registry.clear() + + +def test_dep_marker_allows_tuple_and_dict_value_slots(): + @Flow.model + def source(value: FromContext[int], offset: int) -> int: + return value + offset + + @Flow.model + def total(pair: tuple[Dep[int], str], values: dict[str, Dep[int]], many: tuple[Dep[int], ...]) -> int: + return pair[0] + sum(values.values()) + sum(many) + + model = total( + pair=(source(offset=1), "ignored"), + values={"literal": "2", "model": source(offset=3)}, + many=(source(offset=5), "7"), + ) + + assert len(model.__deps__(FlowContext(value=10))) == 3 + assert model.flow.compute(value=10).value == 48 + + +def test_dep_marker_participates_in_graph_and_effective_cache_identity(): + calls = {"source": 0, "total": 0} + + @Flow.model + def source(value: FromContext[int]) -> int: + calls["source"] += 1 + return value * 10 + + @Flow.model + def total(values: list[Dep[int]], bonus: FromContext[int]) -> int: + calls["total"] += 1 + return sum(values) + bonus + + model = total(values=[source(), 2]) + graph = get_dependency_graph(model.__call__.get_evaluation_context(model, FlowContext(value=3, bonus=7, unused="one"))) + assert len(graph.ids) == 2 + + cache = MemoryCacheEvaluator() + with FlowOptionsOverride(options={"evaluator": cache, "cacheable": True}): + assert model.flow.compute(value=3, bonus=7, unused="one").value == 39 + assert model.flow.compute(value=3, bonus=7, unused="two").value == 39 + + assert calls == {"source": 1, "total": 1} + + +def test_dep_marker_preserves_other_annotated_metadata(): + @Flow.model + def source(value: FromContext[int]) -> int: + return value + + @Flow.model + def total(values: list[Dep[Annotated[int, Field(gt=0)]]]) -> int: + return sum(values) + + assert total(values=["1", source()]).flow.compute(value=2).value == 3 + + with pytest.raises(TypeError, match="Field 'values"): + total(values=[-1]) + + with pytest.raises(TypeError, match="Regular parameter"): + total(values=[source()]).flow.compute(value=-1) + + +def test_dep_marker_rejects_unmarked_or_dynamic_dependency_shapes(): + @Flow.model + def source(value: FromContext[int]) -> int: + return value + + @Flow.model + def row_source(value: FromContext[int]) -> list[int]: + return [value] + + @Flow.model + def row_total(rows: list[Dep[list[int]]]) -> int: + return sum(sum(row) for row in rows) + + with pytest.raises(TypeError, match="Field 'rows"): + row_total(rows=[[source()]]) + + with pytest.raises(TypeError, match="container values"): + + @Flow.model + def top_level(values: Dep[int]) -> int: + return values + + with pytest.raises(TypeError, match="cannot contain another Dep"): + + @Flow.model + def nested(values: list[Dep[list[Dep[int]]]]) -> int: + return sum(sum(value) for value in values) + + with pytest.raises(TypeError, match="dict keys"): + + @Flow.model + def dict_key(values: dict[Dep[str], int]) -> int: + return sum(values.values()) + + with pytest.raises(TypeError, match="container values"): + + @Flow.model + def set_values(values: set[Dep[int]]) -> int: + return sum(values) + + +def test_dep_marker_is_ordinary_annotation_for_handwritten_callable_model(): + @Flow.model + def source(value: FromContext[int]) -> int: + return value + + class Plain(CallableModel): + values: list[Dep[int]] + + @Flow.call + def __call__(self, context: FlowContext) -> GenericResult[int]: + del context + return GenericResult(value=sum(self.values)) + + assert Plain(values=["1", 2]).flow.compute().value == 3 + with pytest.raises(ValidationError): + Plain(values=[source()]) + + def test_regular_param_upstream_dependency_coerced(): """Upstream model returning str should be coerced to downstream int annotation.""" @@ -951,6 +1118,25 @@ def multiply(a: int, b: FromContext[int]) -> int: assert restored.flow.compute(b=7).value == 42 +def test_generated_model_dep_marker_pickle_roundtrip(): + @Flow.model + def source(value: FromContext[int], offset: int) -> int: + return value + offset + + @Flow.model + def total(values: list[Dep[int]]) -> int: + return sum(values) + + model = total(values=[source(offset=2), 3]) + context = FlowContext(value=10) + + for dumps, loads in ((pickle.dumps, pickle.loads), (rcpdumps, rcploads)): + restored = loads(dumps(model, protocol=5)) + + assert len(restored.__deps__(context)) == 1 + assert restored.flow.compute(context).value == 15 + + def test_generated_model_direct_call_plain_pickle_uses_serialized_factory(monkeypatch): module = ModuleType("ccflow_test_direct_model") @@ -1221,6 +1407,26 @@ def test_generated_model_dependency_input_json_roundtrip(): assert isinstance(restored.source, CallableModel) +def test_generated_model_dep_marker_json_roundtrip(): + from ccflow import BaseModel + + @Flow.model + def source(value: FromContext[int], offset: int) -> int: + return value + offset + + @Flow.model + def total(values: list[Dep[int]]) -> int: + return sum(values) + + model = total(values=[source(offset=1), 2]) + restored = BaseModel.model_validate_json(model.model_dump_json()) + + assert type(restored) is type(model) + assert isinstance(restored.values[0], CallableModel) + assert len(restored.__deps__(FlowContext(value=10))) == 1 + assert restored.flow.compute(value=10).value == 13 + + def test_generated_model_lazy_dependency_input_json_roundtrip(): @Flow.model def choose(source: Lazy[int], use_value: FromContext[bool]) -> int: diff --git a/docs/wiki/Flow-Model.md b/docs/wiki/Flow-Model.md index 4bed310..80854ae 100644 --- a/docs/wiki/Flow-Model.md +++ b/docs/wiki/Flow-Model.md @@ -114,9 +114,57 @@ model = add(a=load_value(offset=5)) assert model.flow.compute(value=7, b=12).value == 24 ``` -Only direct regular-parameter values are treated as upstream dependencies in -this first version. Containers such as `list`, `tuple`, and `dict` are ordinary -literal values; `@Flow.model` does not scan them for nested models. +Direct regular-parameter values are treated as upstream dependencies. Containers +are ordinary literal values unless a nested position is explicitly marked with +`Dep[T]`. + +### Explicit Container Dependencies + +`Dep[T]` marks an exact nested slot where a value may be either a literal `T` +or a `CallableModel` dependency whose unwrapped result validates as `T`. The +function body still receives the resolved underlying value. + +```python +from ccflow import Dep, Flow, FromContext + + +@Flow.model +def source(value: FromContext[int], offset: int) -> int: + return value + offset + + +@Flow.model +def total(values: list[Dep[int]]) -> int: + return sum(values) + + +model = total(values=[source(offset=1), 2, 3]) +assert model.flow.compute(value=10).value == 16 +``` + +The existing whole-parameter dependency rule still applies: + +```python +@Flow.model +def source_list(value: FromContext[int]) -> list[int]: + return [value, value * 2] + + +total(values=source_list()) # valid: source_list supplies the whole list +``` + +So `list[int]` accepts a literal list or a model returning `list[int]`, while +`list[Dep[int]]` additionally accepts model leaves inside the literal list. + +`Dep[...]` is intentionally narrow: + +- it is interpreted only for regular `@Flow.model` parameters, +- it is supported inside `list`, `tuple`, and `dict` values, +- top-level `Dep[...]` is rejected because direct whole-parameter dependencies + already cover that case, +- it is not supported in dict keys, +- nested `Dep[...]` markers are rejected, +- it does not add automatic behavior to handwritten `CallableModel` fields. ### Contextual Parameters From d1b028210388c14fe2098689b42aa9e2b5e28c87 Mon Sep 17 00:00:00 2001 From: Nijat K Date: Sun, 17 May 2026 18:45:41 -0400 Subject: [PATCH 2/5] Fix bugs, update examples and add example test for @Flow.model Signed-off-by: Nijat K --- ccflow/_flow_model_binding.py | 10 +- .../examples/flow_model/flow_model_example.py | 4 +- .../flow_model_hydra_builder_demo.py | 4 +- ccflow/flow_model.py | 65 +++- .../examples/test_flow_model_examples.py | 25 ++ ccflow/tests/test_flow_model.py | 287 ++++++++++++++++-- docs/wiki/Flow-Model.md | 20 ++ 7 files changed, 387 insertions(+), 28 deletions(-) create mode 100644 ccflow/tests/examples/test_flow_model_examples.py diff --git a/ccflow/_flow_model_binding.py b/ccflow/_flow_model_binding.py index c06e3e4..516becb 100644 --- a/ccflow/_flow_model_binding.py +++ b/ccflow/_flow_model_binding.py @@ -504,7 +504,8 @@ def _validate_dep_annotation(annotation: Any, *, in_dep: bool = False, dep_allow """Validate the deliberately small Dep marker language. Dep marks exact substitution slots. It is allowed inside container values, - but not inside another Dep and not in dict keys. + but not inside another Dep, not in dict keys, and not mixed with Lazy or + FromContext markers inside a Dep slot. """ annotation, has_dep = _pop_dep_marker(annotation) @@ -516,8 +517,15 @@ def _validate_dep_annotation(annotation: Any, *, in_dep: bool = False, dep_allow _validate_dep_annotation(annotation, in_dep=True, dep_allowed=False) return + if in_dep and get_origin(annotation) is Annotated: + metadata = get_args(annotation)[1:] + if any(isinstance(item, (_LazyMarker, _FromContextMarker)) for item in metadata): + raise TypeError("Dep[...] cannot contain Lazy[...] or FromContext[...] markers.") + origin = get_origin(annotation) args = get_args(annotation) + if origin in _UNION_ORIGINS and any(_annotation_contains_dep(arg) for arg in args): + raise TypeError("Dep[...] is not supported inside union annotations.") if origin is list and len(args) == 1: _validate_dep_annotation(args[0], in_dep=in_dep, dep_allowed=True) return diff --git a/ccflow/examples/flow_model/flow_model_example.py b/ccflow/examples/flow_model/flow_model_example.py index 21d45b0..9085ff8 100644 --- a/ccflow/examples/flow_model/flow_model_example.py +++ b/ccflow/examples/flow_model/flow_model_example.py @@ -10,7 +10,7 @@ 5. execute the configured graph with `model.flow.compute(...)`. Run with: - python ccflow/examples/flow_model/flow_model_example.py + python -m ccflow.examples.flow_model.flow_model_example """ from datetime import date, timedelta @@ -29,6 +29,8 @@ def display_value(value: object) -> str: return "model" if isinstance(value, list): return "[" + ", ".join(display_value(item) for item in value) + "]" + if isinstance(value, tuple): + return "(" + ", ".join(display_value(item) for item in value) + ")" return repr(value) parts = [] diff --git a/ccflow/examples/flow_model/flow_model_hydra_builder_demo.py b/ccflow/examples/flow_model/flow_model_hydra_builder_demo.py index e5306e7..df3a78e 100644 --- a/ccflow/examples/flow_model/flow_model_hydra_builder_demo.py +++ b/ccflow/examples/flow_model/flow_model_hydra_builder_demo.py @@ -14,7 +14,7 @@ - let Hydra instantiate that builder and register the returned model. Run with: - python ccflow/examples/flow_model/flow_model_hydra_builder_demo.py + python -m ccflow.examples.flow_model.flow_model_hydra_builder_demo """ from datetime import date, timedelta @@ -36,6 +36,8 @@ def display_value(value: object) -> str: return "model" if isinstance(value, list): return "[" + ", ".join(display_value(item) for item in value) + "]" + if isinstance(value, tuple): + return "(" + ", ".join(display_value(item) for item in value) + ")" return repr(value) parts = [] diff --git a/ccflow/flow_model.py b/ccflow/flow_model.py index 8a65e73..2da4bf6 100644 --- a/ccflow/flow_model.py +++ b/ccflow/flow_model.py @@ -657,6 +657,21 @@ def serialized_model_type(item: Dict[str, Any]) -> Optional[type]: return restored if _is_model_dependency(restored) else value +def _resolve_whole_param_serialized_dependency_ref(value: Any) -> Any: + """Restore serialized whole-parameter dependencies before container walking.""" + + restored = _resolve_serialized_dependency_ref(value) + if _is_model_dependency(restored): + return restored + # ``_target_`` is also Hydra syntax. Accept it before literal validation + # only for payloads that look like ccflow serialized models. + if type(value) is dict and "meta" in value: + restored = _resolve_serialized_dependency_ref(value, include_target_alias=True) + if _is_model_dependency(restored): + return restored + return value + + def _ensure_named_python_function(fn: _AnyCallable, *, decorator_name: str) -> None: if not inspect.isfunction(fn): raise TypeError(f"{decorator_name} only supports Python functions.") @@ -1155,21 +1170,42 @@ def _resolve_dep_slot_registry_ref(value: Any, annotation: Any) -> Any: return value +def _dep_slot_prefers_serialized_dependency(annotation: Any) -> bool: + """Return whether literal validation is too broad to preserve serialized deps.""" + + annotation = _strip_outer_non_dep_annotated(annotation) + if annotation in (Any, object, inspect.Parameter.empty): + return True + origin, args = _annotation_origin_args(annotation) + if origin in _UNION_ORIGINS: + return any(_dep_slot_prefers_serialized_dependency(arg) for arg in args) + return False + + def _validate_dep_marked_value(name: str, value: Any, annotation: Any, source: str, path: Tuple[Any, ...] = ()) -> Any: """Validate construction values that use explicit nested Dep markers.""" def validate_dep_slot(item: Any, dep_base: Any, item_path: Tuple[Any, ...]) -> Any: - if not _is_model_dependency(item): - item = _resolve_serialized_dependency_ref(item, include_target_alias=True) if _is_model_dependency(item): return item + tried_serialized_dep = False + if _dep_slot_prefers_serialized_dependency(dep_base): + tried_serialized_dep = True + item_with_serialized_dep = _resolve_serialized_dependency_ref(item, include_target_alias=True) + if _is_model_dependency(item_with_serialized_dep): + return item_with_serialized_dep try: return _coerce_value(_path_name(name, item_path), item, dep_base, source) - except TypeError: - item_with_alias_dep = _resolve_dep_slot_registry_ref(item, dep_base) - if item_with_alias_dep is not item and _is_model_dependency(item_with_alias_dep): - return item_with_alias_dep - raise + except TypeError as error: + literal_error = error + if not tried_serialized_dep: + item_with_serialized_dep = _resolve_serialized_dependency_ref(item, include_target_alias=True) + if _is_model_dependency(item_with_serialized_dep): + return item_with_serialized_dep + item_with_alias_dep = _resolve_dep_slot_registry_ref(item, dep_base) + if item_with_alias_dep is not item and _is_model_dependency(item_with_alias_dep): + return item_with_alias_dep + raise literal_error def validate_literal(item: Any, item_annotation: Any, item_path: Tuple[Any, ...]) -> Any: return _coerce_value(_path_name(name, item_path), item, item_annotation, source) @@ -1703,7 +1739,20 @@ def _validate_bound_param_value( if _is_model_dependency(value): return value if param.has_dep_slots: - return _validate_dep_marked_value(param.name, value, param.annotation, source) + value_with_serialized_dep = _resolve_whole_param_serialized_dependency_ref(value) + if value_with_serialized_dep is not value and _is_model_dependency(value_with_serialized_dep): + return value_with_serialized_dep + try: + return _validate_dep_marked_value(param.name, value, param.annotation, source) + except TypeError as error: + literal_error = error + value_with_alias_deps = _resolve_serialized_dependency_ref(value, include_target_alias=True) + if value_with_alias_deps is not value and _is_model_dependency(value_with_alias_deps): + return value_with_alias_deps + value_with_registry_dep = _resolve_bound_param_registry_ref(param, value) + if value_with_registry_dep is not value and _is_model_dependency(value_with_registry_dep): + return value_with_registry_dep + raise literal_error try: return _coerce_value(param.name, value, param.annotation, source) except TypeError: diff --git a/ccflow/tests/examples/test_flow_model_examples.py b/ccflow/tests/examples/test_flow_model_examples.py new file mode 100644 index 0000000..cce69cf --- /dev/null +++ b/ccflow/tests/examples/test_flow_model_examples.py @@ -0,0 +1,25 @@ +"""Smoke tests for packaged Flow.model examples.""" + +import subprocess +import sys + +import pytest + + +@pytest.mark.parametrize( + ("module_name", "expected_text"), + [ + ("ccflow.examples.flow_model.flow_model_example", "Flow.model Example"), + ("ccflow.examples.flow_model.flow_model_hydra_builder_demo", "Hydra + Flow.model Builder Demo"), + ], +) +def test_flow_model_examples_run_as_package_modules(module_name: str, expected_text: str): + result = subprocess.run( + [sys.executable, "-m", module_name], + check=True, + capture_output=True, + text=True, + ) + + assert expected_text in result.stdout + assert "counts=[model, model]" in result.stdout diff --git a/ccflow/tests/test_flow_model.py b/ccflow/tests/test_flow_model.py index df951f1..6f7c17a 100644 --- a/ccflow/tests/test_flow_model.py +++ b/ccflow/tests/test_flow_model.py @@ -388,46 +388,254 @@ def total(values: list[int]) -> int: def test_regular_param_dep_marker_allows_marked_container_leaves(): - calls = {"source": 0, "row": 0, "total": 0} + calls = {"source": 0, "total": 0} @Flow.model def source(value: FromContext[int], offset: int) -> int: calls["source"] += 1 return value + offset + @Flow.model + def total(values: list[Dep[int]]) -> int: + calls["total"] += 1 + return sum(values) + + model = total(values=[source(offset=1), "2", 3]) + deps = model.__deps__(FlowContext(value=10)) + + assert len(deps) == 1 + assert model.flow.compute(value=10).value == 16 + assert calls == {"source": 1, "total": 1} + + +def test_dep_marker_allows_nested_container_leaves_and_outer_coercion(): @Flow.model def row_source(value: FromContext[int]) -> list[int]: - calls["row"] += 1 return [value, value + 1] + @Flow.model + def total(rows: list[Dep[list[int]]]) -> int: + return sum(sum(row) for row in rows) + + model = total(rows=([4, 5], row_source())) + + assert len(model.__deps__(FlowContext(value=10))) == 1 + assert model.flow.compute(value=10).value == 30 + + +def test_dep_marker_preserves_whole_parameter_dependency_rule(): @Flow.model def list_source(value: FromContext[int]) -> list[int]: return [value, value * 2] @Flow.model - def total(values: list[Dep[int]], rows: list[Dep[list[int]]] = ()) -> int: - calls["total"] += 1 - return sum(values) + sum(sum(row) for row in rows) + def total(values: list[Dep[int]]) -> int: + return sum(values) - model = total(values=(source(offset=1), "2", 3), rows=([4, 5], row_source())) - deps = model.__deps__(FlowContext(value=10)) + model = total(values=list_source()) + + assert model.flow.compute(value=4).value == 12 + + +def test_dep_marker_preserves_serialized_whole_parameter_dependency_rule(): + @Flow.model + def list_source(value: FromContext[int]) -> list[int]: + return [value, value * 2] + + @Flow.model + def total(values: list[Dep[int]]) -> int: + return sum(values) + + for payload in ( + list_source().model_dump(mode="python"), + list_source().model_dump(mode="python", by_alias=True), + ): + model = total(values=payload) + + assert isinstance(model.values, CallableModel) + assert model.flow.compute(value=4).value == 12 + + restored = BaseModel.model_validate_json(total(values=list_source()).model_dump_json()) + + assert isinstance(restored.values, CallableModel) + assert restored.flow.compute(value=4).value == 12 + + +def test_dep_marker_preserves_serialized_dict_shaped_whole_parameter_dependency_rule(): + @Flow.model + def dict_source(value: FromContext[int]) -> dict[str, Any]: + return {"x": value} - assert len(deps) == 2 - assert model.flow.compute(value=10).value == 46 - assert calls == {"source": 1, "row": 1, "total": 1} + @Flow.model + def consume(values: dict[str, Dep[Any]]) -> int: + return values["x"] + + for payload in ( + dict_source().model_dump(mode="python"), + dict_source().model_dump(mode="python", by_alias=True), + ): + model = consume(values=payload) + + assert isinstance(model.values, CallableModel) + assert len(model.__deps__(FlowContext(value=3))) == 1 + assert model.flow.compute(value=3).value == 3 + + restored = BaseModel.model_validate_json(consume(values=dict_source()).model_dump_json()) + + assert isinstance(restored.values, CallableModel) + assert len(restored.__deps__(FlowContext(value=3))) == 1 + assert restored.flow.compute(value=3).value == 3 - whole_param_model = total(values=list_source()) - assert whole_param_model.flow.compute(value=4).value == 12 - serialized_model = total(values=[source(offset=5).model_dump(mode="python")]) - assert isinstance(serialized_model.values[0], CallableModel) - assert serialized_model.flow.compute(value=10).value == 15 +def test_dep_marker_preserves_registry_whole_parameter_dependency_rule(): + @Flow.model + def list_source(value: FromContext[int]) -> list[int]: + return [value, value * 2] + + @Flow.model + def total(values: list[Dep[int]]) -> int: + return sum(values) + + registry = ModelRegistry.root().clear() + registry.add("source", list_source()) + try: + model = total(values="source") + + assert isinstance(model.values, CallableModel) + assert model.flow.compute(value=4).value == 12 + finally: + registry.clear() + + +def test_dep_marker_accepts_serialized_leaf_dependency(): + @Flow.model + def source(value: FromContext[int], offset: int) -> int: + return value + offset + + @Flow.model + def total(values: list[Dep[int]]) -> int: + return sum(values) + + model = total(values=[source(offset=5).model_dump(mode="python")]) + + assert isinstance(model.values[0], CallableModel) + assert model.flow.compute(value=10).value == 15 + + +def test_dep_marker_allows_union_inside_marked_slot(): + @Flow.model + def source(value: FromContext[int | None], offset: int) -> int | None: + if value is None: + return None + return value + offset + + @Flow.model + def total(values: list[Dep[int | None]]) -> int: + return sum(value or 0 for value in values) + + model = total(values=[source(offset=5), None, 2]) + + assert isinstance(model.values[0], CallableModel) + assert model.flow.compute(value=10).value == 17 + assert model.flow.compute(value=None).value == 2 + + +def test_dep_marker_any_slot_keeps_non_restorable_serialized_looking_dicts(): + @Flow.model + def collect(values: list[Dep[Any]]) -> list[Any]: + return values + + payloads = [ + {"type_": "missing.module.Model", "value": 1}, + {"_target_": "missing.module.Model", "meta": {"name": ""}, "value": 2}, + ] + model = collect(values=payloads) + + assert model.values == payloads + assert model.flow.compute().value == payloads + + +def test_dep_marker_malformed_serialized_leaf_preserves_literal_type_error(): + @Flow.model + def source(value: FromContext[int], offset: int) -> int: + return value + offset + + @Flow.model + def total(values: list[Dep[int]]) -> int: + return sum(values) + + payload = source(offset=5).model_dump(mode="python") + payload["offset"] = "bad" + + with pytest.raises(TypeError, match="Field 'values\\[0\\]': expected int, got dict"): + total(values=[payload]) + + +@pytest.mark.parametrize("by_alias", [False, True]) +def test_dep_marker_restores_serialized_leaf_dependency_for_any_slot(by_alias): + @Flow.model + def source(value: FromContext[int], offset: int) -> int: + return value + offset + + @Flow.model + def collect(values: list[Dep[Any]]) -> list[Any]: + return values + + payload = source(offset=5).model_dump(mode="python", by_alias=by_alias) + model = collect(values=[payload]) + + assert isinstance(model.values[0], CallableModel) + assert len(model.__deps__(FlowContext(value=10))) == 1 + assert model.flow.compute(value=10).value == [15] + + +@pytest.mark.parametrize("by_alias", [False, True]) +def test_dep_marker_preserves_literal_dicts_with_serialized_dependency_markers(by_alias): + @Flow.model + def source(value: FromContext[int], offset: int) -> int: + return value + offset + + @Flow.model + def consume(items: list[Dep[dict[str, Any]]]) -> str: + return type(items[0]).__name__ + + payload = source(offset=5).model_dump(mode="python", by_alias=by_alias) + model = consume(items=[payload]) + + assert isinstance(model.items[0], dict) + assert model.flow.compute(value=10).value == "dict" + + +def test_dep_marker_accepts_registry_leaf_dependency(): + @Flow.model + def source(value: FromContext[int], offset: int) -> int: + return value + offset + + @Flow.model + def total(values: list[Dep[int]]) -> int: + return sum(values) registry = ModelRegistry.root().clear() registry.add("source", source(offset=4)) try: - registry_model = total(values=["source", "2"], rows=(row_source(),)) - assert registry_model.flow.compute(value=10).value == 37 + model = total(values=["source", "2"]) + assert model.flow.compute(value=10).value == 16 + finally: + registry.clear() + + +def test_dep_marker_rejects_unknown_or_incompatible_registry_leaf_dependency(): + @Flow.model + def total(values: list[Dep[int]]) -> int: + return sum(values) + + registry = ModelRegistry.root().clear() + registry.add("payload", ExternalCcflowPayload(x=1)) + try: + with pytest.raises(TypeError, match="Field 'values\\[0\\]': expected int, got str"): + total(values=["payload"]) + with pytest.raises(TypeError, match="Field 'values\\[0\\]': expected int, got str"): + total(values=["missing"]) finally: registry.clear() @@ -451,6 +659,21 @@ def total(pair: tuple[Dep[int], str], values: dict[str, Dep[int]], many: tuple[D assert model.flow.compute(value=10).value == 48 +def test_dep_marker_fixed_tuple_arity_mismatch_does_not_walk_slots(): + @Flow.model + def source(value: FromContext[int], offset: int) -> int: + return value + offset + + @Flow.model + def total(pair: tuple[Dep[int], str]) -> int: + return pair[0] + + with pytest.raises(TypeError, match="Field 'pair'"): + total(pair=(source(offset=1),)) + with pytest.raises(TypeError, match="Field 'pair'"): + total(pair=(source(offset=1), "ok", 3)) + + def test_dep_marker_participates_in_graph_and_effective_cache_identity(): calls = {"source": 0, "total": 0} @@ -534,6 +757,36 @@ def dict_key(values: dict[Dep[str], int]) -> int: def set_values(values: set[Dep[int]]) -> int: return sum(values) + with pytest.raises(TypeError, match="union"): + + @Flow.model + def optional_values(values: list[Dep[int]] | None) -> int: + return 0 if values is None else sum(values) + + with pytest.raises(TypeError, match="Lazy.*FromContext|FromContext.*Lazy"): + + @Flow.model + def dep_from_context(values: list[Dep[FromContext[int]]]) -> int: + return sum(values) + + with pytest.raises(TypeError, match="Lazy.*FromContext|FromContext.*Lazy"): + + @Flow.model + def dep_lazy(values: list[Dep[Lazy[int]]]) -> int: + return sum(value() for value in values) + + with pytest.raises(TypeError, match="cannot combine Dep.*FromContext"): + + @Flow.model + def outer_from_context(values: FromContext[list[Dep[int]]]) -> int: + return sum(values) + + with pytest.raises(TypeError, match="cannot combine Dep.*Lazy"): + + @Flow.model + def outer_lazy(values: Lazy[list[Dep[int]]]) -> int: + return sum(values()) + def test_dep_marker_is_ordinary_annotation_for_handwritten_callable_model(): @Flow.model diff --git a/docs/wiki/Flow-Model.md b/docs/wiki/Flow-Model.md index 80854ae..5250a19 100644 --- a/docs/wiki/Flow-Model.md +++ b/docs/wiki/Flow-Model.md @@ -156,12 +156,32 @@ total(values=source_list()) # valid: source_list supplies the whole list So `list[int]` accepts a literal list or a model returning `list[int]`, while `list[Dep[int]]` additionally accepts model leaves inside the literal list. +Union annotations are allowed inside the marked slot: + +```python +@Flow.model +def maybe_source(value: FromContext[int | None]) -> int | None: + return value + + +@Flow.model +def total(values: list[Dep[int | None]]) -> int: + return sum(value or 0 for value in values) + + +total(values=[maybe_source(), None, 2]) # valid: each list item is int | None +``` + `Dep[...]` is intentionally narrow: - it is interpreted only for regular `@Flow.model` parameters, - it is supported inside `list`, `tuple`, and `dict` values, - top-level `Dep[...]` is rejected because direct whole-parameter dependencies already cover that case, +- union annotations may appear inside the marked slot, such as + `list[Dep[int | None]]`, +- union annotations may not wrap a `Dep` marker, including optional container + forms like `list[Dep[int]] | None`, - it is not supported in dict keys, - nested `Dep[...]` markers are rejected, - it does not add automatic behavior to handwritten `CallableModel` fields. From 1d1a521363af1a325924d4f5c09df18f12cf437b Mon Sep 17 00:00:00 2001 From: Nijat K Date: Mon, 18 May 2026 07:39:41 -0400 Subject: [PATCH 3/5] Add 'inspect' to consolidate introspection for @Flow.model Signed-off-by: Nijat K --- ccflow/flow_model.py | 81 ++++++++++++++++++++++++++++----- ccflow/tests/test_flow_model.py | 56 ++++++++++++++++++++++- docs/wiki/Flow-Model.md | 11 +++-- 3 files changed, 131 insertions(+), 17 deletions(-) diff --git a/ccflow/flow_model.py b/ccflow/flow_model.py index 2da4bf6..e4ec93b 100644 --- a/ccflow/flow_model.py +++ b/ccflow/flow_model.py @@ -115,9 +115,9 @@ __all__ = ( "FlowAPI", "BoundModel", + "Dep", "FlowInspection", "InputSpec", - "Dep", "FromContext", "Lazy", ) @@ -1280,6 +1280,48 @@ def merge_items(items: Any) -> GraphDepList: ) +def _dep_marked_dependency_specs( + name: str, + value: Any, + annotation: Any, + context: Optional[ContextBase], + *, + trim_context: bool, +) -> Tuple[DependencySpec, ...]: + """Collect inspect-visible dependency edges from explicit Dep marker slots.""" + + def dep_slot_specs(item: Any, _dep_base: Any, item_path: Tuple[Any, ...]) -> Tuple[DependencySpec, ...]: + if not _is_model_dependency(item): + return () + dependency_model = item + dependency_context = None + if context is not None: + try: + dependency_model, dependency_context = _resolved_dependency_invocation(item, context) + except (TypeError, ValueError, ValidationError): + dependency_model = item + dependency_context = _partial_dependency_context_for_inspect(item, context) + else: + dependency_context = _trim_dependency_context_for_inspect(dependency_model, dependency_context, trim_context=trim_context) + return (DependencySpec(_path_name(name, item_path), dependency_model, dependency_context),) + + def merge_items(items: Any) -> Tuple[DependencySpec, ...]: + specs: List[DependencySpec] = [] + for item in items: + specs.extend(item) + return tuple(specs) + + return _walk_dep_marked_value( + value, + annotation, + on_dep_slot=dep_slot_specs, + on_literal=lambda _item, _annotation, _path: (), + on_list=lambda _value, items, _annotation, _path: merge_items(items), + on_tuple=lambda _value, items, _annotation, _path: merge_items(items), + on_dict=lambda _value, items, _key_annotation, _value_annotation, _dict_annotation, _path: merge_items(item for _, item in items), + ) + + def _dep_marked_identity_value(value: Any, annotation: Any, context: ContextBase) -> Any: """Return an identity payload with explicit Dep leaves replaced by dependencies.""" @@ -2343,6 +2385,26 @@ def _project_bound_dependency_context_for_inspect(model: "BoundModel", context: return FlowContext(**projected) +def _trim_dependency_context_for_inspect( + dependency_model: CallableModel, + dependency_context: Optional[ContextBase], + *, + trim_context: bool, +) -> Optional[ContextBase]: + if not trim_context or dependency_context is None: + return dependency_context + if isinstance(dependency_model, BoundModel): + return _project_bound_dependency_context_for_inspect(dependency_model, dependency_context) + contract = _model_context_contract(dependency_model) + if contract.input_types is None: + return dependency_context + values = _context_values(dependency_context) + return _runtime_context_for_model( + dependency_model, + {name: values[name] for name in contract.input_types if name in values}, + ) + + def _generated_context_argument_specs(generated: "_GeneratedFlowModelBase", input_types: Optional[Dict[str, Any]]) -> Dict[str, InputSpec]: config = type(generated).__flow_model_config__ explicit_fields = _bound_field_names(generated) @@ -2393,7 +2455,11 @@ def _direct_dependency_specs( config = type(generated).__flow_model_config__ for param in config.regular_params: value = getattr(generated, param.name, _UNSET_FLOW_INPUT) - if _is_unset_flow_input(value) or not _is_model_dependency(value): + if _is_unset_flow_input(value): + continue + if not _is_model_dependency(value): + if param.has_dep_slots: + specs.extend(_dep_marked_dependency_specs(param.name, value, param.annotation, context, trim_context=trim_context)) continue dependency_model = value dependency_context = None @@ -2404,16 +2470,7 @@ def _direct_dependency_specs( dependency_model = value dependency_context = _partial_dependency_context_for_inspect(value, context) else: - contract = _model_context_contract(dependency_model) - if trim_context and dependency_context is not None: - if isinstance(dependency_model, BoundModel): - dependency_context = _project_bound_dependency_context_for_inspect(dependency_model, dependency_context) - elif contract.input_types is not None: - values = _context_values(dependency_context) - dependency_context = _runtime_context_for_model( - dependency_model, - {name: values[name] for name in contract.input_types if name in values}, - ) + dependency_context = _trim_dependency_context_for_inspect(dependency_model, dependency_context, trim_context=trim_context) specs.append(DependencySpec(param.name, dependency_model, dependency_context, lazy=param.is_lazy)) return tuple(specs) diff --git a/ccflow/tests/test_flow_model.py b/ccflow/tests/test_flow_model.py index 6f7c17a..5cf1554 100644 --- a/ccflow/tests/test_flow_model.py +++ b/ccflow/tests/test_flow_model.py @@ -434,6 +434,8 @@ def total(values: list[Dep[int]]) -> int: model = total(values=list_source()) + assert tuple(dependency.path for dependency in model.flow.inspect(value=4).dependencies) == ("values",) + assert len(model.flow.inspect(value=4, dependencies="recursive").dependencies) == 1 assert model.flow.compute(value=4).value == 12 @@ -659,6 +661,58 @@ def total(pair: tuple[Dep[int], str], values: dict[str, Dep[int]], many: tuple[D assert model.flow.compute(value=10).value == 48 +def test_flow_inspect_reports_dep_marker_container_dependencies(): + @Flow.model + def source(value: FromContext[int], offset: int) -> int: + return value + offset + + @Flow.model + def total(pair: tuple[Dep[int], str], values: dict[str, Dep[int]], many: tuple[Dep[int], ...]) -> int: + return pair[0] + sum(values.values()) + sum(many) + + model = total( + pair=(source(offset=1), "ignored"), + values={"literal": "2", "model": source(offset=3)}, + many=(source(offset=5), "7"), + ) + + inspection = model.flow.inspect(value=10) + + assert tuple(dependency.path for dependency in inspection.dependencies) == ("pair[0]", "values['model']", "many[0]") + assert tuple(dependency.context for dependency in inspection.dependencies) == ( + FlowContext(value=10), + FlowContext(value=10), + FlowContext(value=10), + ) + assert inspection.inputs["pair"].value is model.pair + assert model.flow.inspect(dependencies="none").dependencies == () + + +def test_flow_inspect_recurses_through_dep_marker_dependencies(): + @Flow.model + def leaf(v: FromContext[int]) -> int: + return v + + @Flow.model + def middle(x: int) -> int: + return x + + @Flow.model + def total(values: list[Dep[int]]) -> int: + return sum(values) + + model = total(values=[middle(x=leaf()), 2]) + + missing = model.flow.inspect(dependencies="recursive") + assert tuple(dependency.path for dependency in missing.dependencies) == ("values[0]", "values[0].x") + assert missing.dependencies[1].model.flow.inspect().required_inputs == {"v": int} + + supplied = model.flow.inspect(v=3, dependencies="recursive") + assert tuple(dependency.context for dependency in supplied.dependencies) == (FlowContext(v=3), FlowContext(v=3)) + leaf_with_context = supplied.dependencies[1].model.flow.inspect(supplied.dependencies[1].context) + assert leaf_with_context.inputs["v"].value == 3 + + def test_dep_marker_fixed_tuple_arity_mismatch_does_not_walk_slots(): @Flow.model def source(value: FromContext[int], offset: int) -> int: @@ -2471,7 +2525,7 @@ def add(a: int, b: FromContext[int]) -> int: assert flow_model_module._is_unset_flow_input(missing_inspection.inputs["b"].value) -def test_flow_inspect_reports_direct_dependencies_and_unused_context(): +def test_flow_inspect_reports_direct_dependencies_and_ignores_unknown_context_fields(): @Flow.model def source(value: FromContext[int]) -> int: return value * 2 diff --git a/docs/wiki/Flow-Model.md b/docs/wiki/Flow-Model.md index 5250a19..c37e925 100644 --- a/docs/wiki/Flow-Model.md +++ b/docs/wiki/Flow-Model.md @@ -114,8 +114,9 @@ model = add(a=load_value(offset=5)) assert model.flow.compute(value=7, b=12).value == 24 ``` -Direct regular-parameter values are treated as upstream dependencies. Containers -are ordinary literal values unless a nested position is explicitly marked with +Direct `CallableModel` values bound to regular parameters are treated as +upstream dependencies. Other literal values are bound inputs. Containers are +ordinary literal values unless a nested position is explicitly marked with `Dep[T]`. ### Explicit Container Dependencies @@ -508,8 +509,10 @@ that value came from construction, a function default, runtime context, or Dependency information lives under `inspection.dependencies`. Each dependency edge reports the parameter path, target model, projected context values when -known, and whether the edge is lazy. To inspect a child, inspect that child -model directly: +known, and whether the edge is lazy. Direct whole-parameter dependencies use the +regular parameter name as the path; explicit container dependencies use nested +paths such as `values[0]` or `items['left']`. To inspect a child, inspect that +child model directly: ```python inspection = model.flow.inspect(value=3) From 771594f558d0091c9e76c1fd72ebfb060a4115ce Mon Sep 17 00:00:00 2001 From: Nijat K Date: Fri, 29 May 2026 20:02:30 -0400 Subject: [PATCH 4/5] Fix dep marker pickle roundtrip after rebase Signed-off-by: Nijat K --- ccflow/tests/test_flow_model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ccflow/tests/test_flow_model.py b/ccflow/tests/test_flow_model.py index 5cf1554..7f5088d 100644 --- a/ccflow/tests/test_flow_model.py +++ b/ccflow/tests/test_flow_model.py @@ -1437,7 +1437,7 @@ def total(values: list[Dep[int]]) -> int: model = total(values=[source(offset=2), 3]) context = FlowContext(value=10) - for dumps, loads in ((pickle.dumps, pickle.loads), (rcpdumps, rcploads)): + for dumps, loads in ((pickle.dumps, pickle.loads), (cloudpickle.dumps, cloudpickle.loads)): restored = loads(dumps(model, protocol=5)) assert len(restored.__deps__(context)) == 1 From 2db2cd97a2499b6647877143a9c4dee373397145 Mon Sep 17 00:00:00 2001 From: Tim Paine <3105306+timkpaine@users.noreply.github.com> Date: Tue, 9 Jun 2026 19:28:48 -0400 Subject: [PATCH 5/5] Fix Dep marker Annotated construction on Python 3.14 `_pop_dep_marker` rebuilt the non-Dep Annotated metadata via `Annotated.__class_getitem__((base, *metadata))`. Python 3.14 removed that attribute (typing.Annotated.__getattr__ now raises AttributeError for `__class_getitem__`), which broke every 3.14 CI build with `AttributeError: __class_getitem__` in test_dep_marker_*. Build the tuple first and subscript: `Annotated[(base, *metadata)]`. This is equivalent, avoids the removed dunder, and stays portable (the original 3.11+-only concern was the star-in-subscript spelling, not a pre-built tuple). Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com> --- ccflow/_flow_model_binding.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/ccflow/_flow_model_binding.py b/ccflow/_flow_model_binding.py index 516becb..b7db73d 100644 --- a/ccflow/_flow_model_binding.py +++ b/ccflow/_flow_model_binding.py @@ -487,10 +487,12 @@ def _pop_dep_marker(annotation: Any) -> Tuple[Any, bool]: base = args[0] if not metadata: return base, has_dep - # Python 3.10 cannot spell this as ``Annotated[base, *metadata]``. Keep - # non-Dep metadata, such as pydantic Field constraints, on the annotation - # used to validate literals and resolved dependency results. - return Annotated.__class_getitem__((base, *metadata)), has_dep + # Keep non-Dep metadata, such as pydantic Field constraints, on the annotation + # used to validate literals and resolved dependency results. Build the tuple + # first and subscript ``Annotated`` with it: ``Annotated[base, *metadata]`` is + # 3.11+-only syntax, and ``Annotated.__class_getitem__`` was removed in 3.14, + # but ``Annotated[(base, *metadata)]`` is portable across both. + return Annotated[(base, *metadata)], has_dep def _annotation_contains_dep(annotation: Any) -> bool: