Skip to content
11 changes: 10 additions & 1 deletion hamilton/function_modifiers/delayed.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,16 @@ def resolve(self, config: dict[str, Any], fn: Callable) -> NodeTransformLifecycl
for key in self._optional_config:
if key in config:
kwargs[key] = config[key]
return self.decorate_with(**kwargs)
decorator = self.decorate_with(**kwargs)

# NOTE: cases where `decorator` has no `validate` method should be caught by type checkers
# since `decorate_with` is typed as `Callable[..., NodeTransformLifecycle]`. The following
# check allows non-conforming functions to be used with `resolve` without immediately
# throwing an error, which may be undesirable.
if hasattr(decorator, "validate"):
decorator.validate(fn)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Since decorate_with is typed as Callable[..., NodeTransformLifecycle] and NodeTransformLifecycle always defines validate(), the hasattr check will always be True. You could simplify to just decorator.validate(fn). That said, the defensive check is reasonable as a guard against non-conforming lambdas, so this is just a nit — either way is fine.


return decorator


class resolve_from_config(resolve):
Expand Down
140 changes: 127 additions & 13 deletions tests/function_modifiers/test_delayed.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

from collections.abc import Callable

import pandas as pd
import pytest

from hamilton import settings
from hamilton.function_modifiers import (
ResolveAt,
base,
extract_columns,
extract_fields,
parameterize_sources,
resolve,
resolve_from_config,
)
Expand Down Expand Up @@ -68,12 +71,17 @@ def test_extract_and_validate_params_unhappy(fn: Callable):


def test_dynamic_resolves():
# Note: we use an empty DataFrame for validation only. This test would fail at runtime
# if we actually tried to execute the DAG because there are no columns "a" or "b" to extract.
def fn() -> pd.DataFrame:
return pd.DataFrame()

decorator = resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda cols_to_extract: extract_columns(*cols_to_extract),
)
decorator_resolved = decorator.resolve(
{"cols_to_extract": ["a", "b"], **CONFIG_WITH_POWER_MODE_ENABLED}, fn=test_dynamic_resolves
{"cols_to_extract": ["a", "b"], **CONFIG_WITH_POWER_MODE_ENABLED}, fn=fn
)
# This uses an internal component of extract_columns
# We may want to add a little more comprehensive testing
Expand All @@ -82,31 +90,32 @@ def test_dynamic_resolves():


def test_dynamic_resolve_with_configs():
def fn() -> pd.DataFrame:
return pd.DataFrame()

decorator = resolve_from_config(
decorate_with=lambda cols_to_extract: extract_columns(*cols_to_extract),
)
decorator_resolved = decorator.resolve(
{"cols_to_extract": ["a", "b"], **CONFIG_WITH_POWER_MODE_ENABLED}, fn=test_dynamic_resolves
{"cols_to_extract": ["a", "b"], **CONFIG_WITH_POWER_MODE_ENABLED},
fn=fn,
)
# This uses an internal component of extract_columns
# We may want to add a little more comprehensive testing
# But for now this will work
assert decorator_resolved.columns == ("a", "b")


def test_dynamic_fails_without_power_mode_fails():
def test_dynamic_resolve_without_power_mode_fails():
def fn() -> pd.DataFrame:
return pd.DataFrame()

decorator = resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda cols_to_extract: extract_columns(*cols_to_extract),
)
with pytest.raises(base.InvalidDecoratorException):
decorator_resolved = decorator.resolve(
CONFIG_WITH_POWER_MODE_DISABLED, fn=test_dynamic_fails_without_power_mode_fails
)
# This uses an internal component of extract_columns
# We may want to add a little more comprehensive testing
# But for now this will work
assert decorator_resolved.columns == ("a", "b")
decorator.resolve(CONFIG_WITH_POWER_MODE_DISABLED, fn=fn)


def test_config_derivation():
Expand All @@ -123,6 +132,9 @@ def test_config_derivation():


def test_delayed_with_optional():
def fn() -> pd.DataFrame:
return pd.DataFrame()

decorator = resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda cols_to_extract, some_cols_you_might_want_to_extract=["c"]: (
Expand All @@ -131,7 +143,7 @@ def test_delayed_with_optional():
)
resolved = decorator.resolve(
{"cols_to_extract": ["a", "b"], **CONFIG_WITH_POWER_MODE_ENABLED},
fn=test_delayed_with_optional,
fn=fn,
)
assert list(resolved.columns) == ["a", "b", "c"]
resolved = decorator.resolve(
Expand All @@ -140,12 +152,15 @@ def test_delayed_with_optional():
"some_cols_you_might_want_to_extract": ["d"],
**CONFIG_WITH_POWER_MODE_ENABLED,
},
fn=test_delayed_with_optional,
fn=fn,
)
assert list(resolved.columns) == ["a", "b", "d"]


def test_delayed_without_power_mode_fails():
def fn() -> pd.DataFrame:
return pd.DataFrame()

decorator = resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda cols_to_extract, some_cols_you_might_want_to_extract=["c"]: (
Expand All @@ -155,5 +170,104 @@ def test_delayed_without_power_mode_fails():
with pytest.raises(base.InvalidDecoratorException):
decorator.resolve(
{"cols_to_extract": ["a", "b"], **CONFIG_WITH_POWER_MODE_DISABLED},
fn=test_delayed_with_optional,
fn=fn,
)


def test_dynamic_resolve_with_extract_fields():
"""Test that @resolve with @extract_fields calls validate() correctly."""

def fn() -> dict[str, int]:
return {"a": 1, "b": 2}

decorator = resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda fields: extract_fields(fields),
)
decorator_resolved = decorator.resolve(
{"fields": {"a": int, "b": int}, **CONFIG_WITH_POWER_MODE_ENABLED},
fn=fn,
)
assert hasattr(decorator_resolved, "resolved_fields")
assert decorator_resolved.resolved_fields == {"a": int, "b": int}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good test! One small thought: it might be worth adding a negative test case that verifies resolve() properly propagates a validate() failure (e.g., passing a function with the wrong return type to extract_fields). This would confirm the error path works correctly through the dynamic resolution. Not blocking — the happy path coverage is sufficient for this fix.



def test_resolve_with_parameterize_sources():
"""Test that @resolve with @parameterize_sources calls validate() correctly."""

def fn(x: int, y: int) -> int:
return x + y

decorator = resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda: parameterize_sources(result_1={"x": "source_x", "y": "source_y"}),
)
decorator_resolved = decorator.resolve(
{**CONFIG_WITH_POWER_MODE_ENABLED},
fn=fn,
)
assert "result_1" in decorator_resolved.parameterization
mapping = decorator_resolved.parameterization["result_1"]
assert mapping["x"].source == "source_x"
assert mapping["y"].source == "source_y"


def test_resolve_from_config_with_extract_fields():
"""Test @resolve_from_config with @extract_fields calls validate() correctly."""

def fn() -> dict[str, int]:
return {"a": 1, "b": 2}

decorator = resolve_from_config(
decorate_with=lambda fields: extract_fields(fields),
)
decorator_resolved = decorator.resolve(
{"fields": {"a": int, "b": int}, **CONFIG_WITH_POWER_MODE_ENABLED},
fn=fn,
)
assert hasattr(decorator_resolved, "resolved_fields")
assert decorator_resolved.resolved_fields == {"a": int, "b": int}


def test_resolve_propagates_validate_failure():
"""Test that validate() failures are propagated through resolve."""

def fn() -> str:
return "not what you were expecting..."

decorator = resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda fields: extract_fields(fields),
)
with pytest.raises(base.InvalidDecoratorException):
decorator.resolve(
{"fields": {"a": int, "b": int}, **CONFIG_WITH_POWER_MODE_ENABLED},
fn=fn,
)


def test_resolve_with_arbitrary_decorator():
"""Test behavior when decorate_with returns something that is not a NodeTransformLifecycle."""

# NOTE: we want to ensure we don't interfere with other decorators on functions.

# A decorator that doesn't inherit from NodeTransformLifecycle (but still uses kwargs only)
class ArbitraryDecorator:
def __init__(self, a: int, b: int) -> None:
pass

def __call__(self, f: Callable) -> Callable:
return f

def fn() -> pd.DataFrame:
return pd.DataFrame()

decorator = resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda kwargs: ArbitraryDecorator(**kwargs),
)
decorator_resolved = decorator.resolve(
{"kwargs": {"a": 1, "b": 2}, **CONFIG_WITH_POWER_MODE_ENABLED},
fn=fn,
)
assert isinstance(decorator_resolved, ArbitraryDecorator)
Comment on lines +249 to +273
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that it's a good thing for this test to pass (which it currently does).

I guess the way I see it (with the caveat that I'm pretty unfamiliar with the Hamilton codebase) is

  • Pro: the way decorators are/aren't checked is the same whether or not they have been wrapped in @resolve
  • Con: there is an missed opportunity to check decorator validity earlier on, inside resolve.resolve(), which might be preferable than failing in a confusing way later on

Concretely, we could add the following in resolve.resolve()

if not isinstance(decorator, NodeTransformLifecycle):
    raise InvalidDecoratorException(
        f"decorate_with must return a NodeTransformLifecycle, got {type(decorator).__name__}"
    )

and remove the hasattr check.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so people should be free to add non-hamilton decorators.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I think this is fine.