diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index e663a9eac29..6453a663ef9 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -268,11 +268,17 @@ def list_feature_services( return self._registry.list_feature_services(self.project, tags=tags) def _list_all_feature_views( - self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None + self, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + skip_feature_view_validation: bool = False, ) -> List[BaseFeatureView]: feature_views = [] for fv in self.registry.list_all_feature_views( - self.project, allow_cache=allow_cache, tags=tags + self.project, + allow_cache=allow_cache, + tags=tags, + skip_udf=skip_feature_view_validation, ): if ( isinstance(fv, FeatureView) @@ -285,18 +291,27 @@ def _list_all_feature_views( return feature_views def list_all_feature_views( - self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None + self, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + skip_feature_view_validation: bool = False, ) -> List[BaseFeatureView]: """ Retrieves the list of feature views from the registry. Args: allow_cache: Whether to allow returning entities from a cached registry. + tags: Filter by tags. + skip_feature_view_validation: Whether to skip validation of feature views (e.g., UDF deserialization). Returns: A list of feature views. """ - return self._list_all_feature_views(allow_cache, tags=tags) + return self._list_all_feature_views( + allow_cache, + tags=tags, + skip_feature_view_validation=skip_feature_view_validation, + ) def list_feature_views( self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None @@ -1741,6 +1756,7 @@ def push( allow_registry_cache: bool = True, to: PushMode = PushMode.ONLINE, transform_on_write: bool = True, + skip_feature_view_validation: bool = False, ): """ Push features to a push source. This updates all the feature views that have the push source as stream source. @@ -1751,6 +1767,7 @@ def push( allow_registry_cache: Whether to allow cached versions of the registry. to: Whether to push to online or offline store. Defaults to online store only. transform_on_write: Whether to transform the data before pushing. + skip_feature_view_validation: Whether to skip validation of feature views (e.g., UDF deserialization). """ for fv in self._fvs_for_push_source_or_raise( push_source_name, allow_registry_cache @@ -1761,6 +1778,7 @@ def push( df, allow_registry_cache=allow_registry_cache, transform_on_write=transform_on_write, + skip_feature_view_validation=skip_feature_view_validation, ) if to == PushMode.OFFLINE or to == PushMode.ONLINE_AND_OFFLINE: self.write_to_offline_store( @@ -1773,6 +1791,7 @@ async def push_async( df: pd.DataFrame, allow_registry_cache: bool = True, to: PushMode = PushMode.ONLINE, + skip_feature_view_validation: bool = False, **kwargs, ): fvs = self._fvs_for_push_source_or_raise(push_source_name, allow_registry_cache) @@ -1781,7 +1800,10 @@ async def push_async( _ = await asyncio.gather( *[ self.write_to_online_store_async( - fv.name, df, allow_registry_cache=allow_registry_cache + fv.name, + df, + allow_registry_cache=allow_registry_cache, + skip_feature_view_validation=skip_feature_view_validation, ) for fv in fvs ] @@ -1959,10 +1981,14 @@ def _get_feature_view_and_df_for_online_write( inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None, allow_registry_cache: bool = True, transform_on_write: bool = True, + skip_feature_view_validation: bool = False, ): feature_view_dict = { fv_proto.name: fv_proto - for fv_proto in self.list_all_feature_views(allow_registry_cache) + for fv_proto in self.list_all_feature_views( + allow_registry_cache, + skip_feature_view_validation=skip_feature_view_validation, + ) } try: feature_view = feature_view_dict[feature_view_name] @@ -1992,6 +2018,7 @@ def write_to_online_store( inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None, allow_registry_cache: bool = True, transform_on_write: bool = True, + skip_feature_view_validation: bool = False, ): """ Persists a dataframe to the online store. @@ -2002,6 +2029,7 @@ def write_to_online_store( inputs: Optional the dictionary object to be written allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry. transform_on_write (optional): Whether to transform the data before pushing. + skip_feature_view_validation (optional): Whether to skip validation of feature views (e.g., UDF deserialization). """ feature_view, df = self._get_feature_view_and_df_for_online_write( @@ -2010,6 +2038,7 @@ def write_to_online_store( inputs=inputs, allow_registry_cache=allow_registry_cache, transform_on_write=transform_on_write, + skip_feature_view_validation=skip_feature_view_validation, ) # Validate that the dataframe has meaningful feature data @@ -2037,6 +2066,7 @@ async def write_to_online_store_async( df: Optional[pd.DataFrame] = None, inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None, allow_registry_cache: bool = True, + skip_feature_view_validation: bool = False, ): """ Persists a dataframe to the online store asynchronously. @@ -2046,6 +2076,7 @@ async def write_to_online_store_async( df: The dataframe to be persisted. inputs: Optional the dictionary object to be written allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry. + skip_feature_view_validation (optional): Whether to skip validation of feature views (e.g., UDF deserialization). """ feature_view, df = self._get_feature_view_and_df_for_online_write( @@ -2053,6 +2084,7 @@ async def write_to_online_store_async( df=df, inputs=inputs, allow_registry_cache=allow_registry_cache, + skip_feature_view_validation=skip_feature_view_validation, ) # Validate that the dataframe has meaningful feature data diff --git a/sdk/python/feast/infra/registry/proto_registry_utils.py b/sdk/python/feast/infra/registry/proto_registry_utils.py index 26a5b7e1689..141ed36e18f 100644 --- a/sdk/python/feast/infra/registry/proto_registry_utils.py +++ b/sdk/python/feast/infra/registry/proto_registry_utils.py @@ -233,14 +233,33 @@ def list_feature_services( return feature_services -@registry_proto_cache_with_tags def list_all_feature_views( + registry_proto: RegistryProto, + project: str, + tags: Optional[dict[str, str]], + skip_udf: bool = False, +) -> List[BaseFeatureView]: + # Skip caching when skip_udf is True to avoid cache pollution + if skip_udf: + return ( + list_feature_views(registry_proto, project, tags) + + list_stream_feature_views(registry_proto, project, tags) + + list_on_demand_feature_views( + registry_proto, project, tags, skip_udf=skip_udf + ) + ) + else: + return _list_all_feature_views_cached(registry_proto, project, tags) + + +@registry_proto_cache_with_tags +def _list_all_feature_views_cached( registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] ) -> List[BaseFeatureView]: return ( list_feature_views(registry_proto, project, tags) + list_stream_feature_views(registry_proto, project, tags) - + list_on_demand_feature_views(registry_proto, project, tags) + + list_on_demand_feature_views(registry_proto, project, tags, skip_udf=False) ) @@ -272,8 +291,37 @@ def list_stream_feature_views( return stream_feature_views -@registry_proto_cache_with_tags def list_on_demand_feature_views( + registry_proto: RegistryProto, + project: str, + tags: Optional[dict[str, str]], + skip_udf: bool = False, +) -> List[OnDemandFeatureView]: + # Skip caching when skip_udf is True to avoid cache pollution with dummy UDFs + if skip_udf: + on_demand_feature_views = [] + for on_demand_feature_view in registry_proto.on_demand_feature_views: + if on_demand_feature_view.spec.project == project and utils.has_all_tags( + on_demand_feature_view.spec.tags, tags + ): + # Only skip UDF deserialization for ODFVs that don't write to online store + # ODFVs with write_to_online_store=True need the actual UDF loaded + # because it will be executed during push operations + should_skip_udf = ( + skip_udf and not on_demand_feature_view.spec.write_to_online_store + ) + on_demand_feature_views.append( + OnDemandFeatureView.from_proto( + on_demand_feature_view, skip_udf=should_skip_udf + ) + ) + return on_demand_feature_views + else: + return _list_on_demand_feature_views_cached(registry_proto, project, tags) + + +@registry_proto_cache_with_tags +def _list_on_demand_feature_views_cached( registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] ) -> List[OnDemandFeatureView]: on_demand_feature_views = [] @@ -282,7 +330,7 @@ def list_on_demand_feature_views( on_demand_feature_view.spec.tags, tags ): on_demand_feature_views.append( - OnDemandFeatureView.from_proto(on_demand_feature_view) + OnDemandFeatureView.from_proto(on_demand_feature_view, skip_udf=False) ) return on_demand_feature_views diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index ff9c1f405a1..60e51573f6d 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -640,12 +640,13 @@ def list_all_feature_views( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, + skip_udf: bool = False, ) -> List[BaseFeatureView]: registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) return proto_registry_utils.list_all_feature_views( - registry_proto, project, tags + registry_proto, project, tags, skip_udf=skip_udf ) def get_any_feature_view( diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 7ead26cb984..6d775e44136 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -531,7 +531,7 @@ def from_proto( # Parse transformation from proto transformation = cls._parse_transformation_from_proto( - on_demand_feature_view_proto + on_demand_feature_view_proto, skip_udf=skip_udf ) # Parse optional fields with defaults @@ -603,7 +603,7 @@ def _parse_sources_from_proto( @classmethod def _parse_transformation_from_proto( - cls, proto: OnDemandFeatureViewProto + cls, proto: OnDemandFeatureViewProto, skip_udf: bool = False ) -> Transformation: """Parse and convert the transformation from the protobuf representation.""" feature_transformation = proto.spec.feature_transformation @@ -616,14 +616,14 @@ def _parse_transformation_from_proto( # Check for non-empty UDF body if udf_proto.body_text: if mode == "pandas": - return PandasTransformation.from_proto(udf_proto) + return PandasTransformation.from_proto(udf_proto, skip_udf=skip_udf) elif mode == "python": - return PythonTransformation.from_proto(udf_proto) + return PythonTransformation.from_proto(udf_proto, skip_udf=skip_udf) else: raise ValueError(ODFVErrorMessages.unsupported_mode_for_udf(mode)) else: # Handle backward compatibility case with empty body_text - return cls._handle_backward_compatible_udf(proto) + return cls._handle_backward_compatible_udf(proto, skip_udf=skip_udf) elif transformation_type == "substrait_transformation": return SubstraitTransformation.from_proto( @@ -631,7 +631,7 @@ def _parse_transformation_from_proto( ) elif transformation_type is None: # Handle backward compatibility case where feature_transformation is cleared - return cls._handle_backward_compatible_udf(proto) + return cls._handle_backward_compatible_udf(proto, skip_udf=skip_udf) else: raise ValueError( ODFVErrorMessages.unsupported_transformation_type(transformation_type) @@ -639,7 +639,7 @@ def _parse_transformation_from_proto( @classmethod def _handle_backward_compatible_udf( - cls, proto: OnDemandFeatureViewProto + cls, proto: OnDemandFeatureViewProto, skip_udf: bool = False ) -> Transformation: """Handle backward compatibility for UDFs with empty body_text.""" if not hasattr(proto.spec, "user_defined_function"): @@ -653,6 +653,7 @@ def _handle_backward_compatible_udf( ) return PandasTransformation.from_proto( user_defined_function_proto=backwards_compatible_udf, + skip_udf=skip_udf, ) @classmethod diff --git a/sdk/python/feast/transformation/pandas_transformation.py b/sdk/python/feast/transformation/pandas_transformation.py index 6e073c30100..47b0503a2c7 100644 --- a/sdk/python/feast/transformation/pandas_transformation.py +++ b/sdk/python/feast/transformation/pandas_transformation.py @@ -145,7 +145,21 @@ def __eq__(self, other): return True @classmethod - def from_proto(cls, user_defined_function_proto: UserDefinedFunctionProto): + def from_proto( + cls, + user_defined_function_proto: UserDefinedFunctionProto, + skip_udf: bool = False, + ): + if skip_udf: + # Return a dummy transformation when skipping UDF deserialization + # Identity function that preserves DataFrame structure + def identity_udf(df: pd.DataFrame) -> pd.DataFrame: + return df + + return PandasTransformation( + udf=identity_udf, + udf_string=user_defined_function_proto.body_text, + ) return PandasTransformation( udf=dill.loads(user_defined_function_proto.body), udf_string=user_defined_function_proto.body_text, diff --git a/sdk/python/feast/transformation/python_transformation.py b/sdk/python/feast/transformation/python_transformation.py index 68e9eee95f6..d94063b928f 100644 --- a/sdk/python/feast/transformation/python_transformation.py +++ b/sdk/python/feast/transformation/python_transformation.py @@ -163,7 +163,21 @@ def __reduce__(self): ) @classmethod - def from_proto(cls, user_defined_function_proto: UserDefinedFunctionProto): + def from_proto( + cls, + user_defined_function_proto: UserDefinedFunctionProto, + skip_udf: bool = False, + ): + if skip_udf: + # Return a dummy transformation when skipping UDF deserialization + # Identity function that preserves dictionary structure + def identity_udf(features_dict: Dict[str, Any]) -> Dict[str, Any]: + return features_dict + + return PythonTransformation( + udf=identity_udf, + udf_string=user_defined_function_proto.body_text, + ) return PythonTransformation( udf=dill.loads(user_defined_function_proto.body), udf_string=user_defined_function_proto.body_text, diff --git a/sdk/python/tests/unit/test_skip_validation.py b/sdk/python/tests/unit/test_skip_validation.py index 8fb916e3776..ed749d897ea 100644 --- a/sdk/python/tests/unit/test_skip_validation.py +++ b/sdk/python/tests/unit/test_skip_validation.py @@ -7,11 +7,22 @@ - Cases where the type/validation system is being too restrictive Users should be encouraged to report issues on GitHub when they need to use this flag. + +Also tests skip_feature_view_validation parameter in push() and related methods to handle +On-Demand Feature Views with UDFs that reference modules not available in the +current environment. """ import inspect +import dill +import pandas as pd + from feast.feature_store import FeatureStore +from feast.on_demand_feature_view import PandasTransformation, PythonTransformation +from feast.protos.feast.core.Transformation_pb2 import ( + UserDefinedFunctionV2 as UserDefinedFunctionProto, +) def test_apply_has_skip_feature_view_validation_parameter(): @@ -48,9 +59,113 @@ def test_plan_has_skip_feature_view_validation_parameter(): assert param.annotation == bool -def test_skip_feature_view_validation_use_case_documentation(): +def test_push_has_skip_feature_view_validation_parameter(): + """Test that FeatureStore.push() method has skip_feature_view_validation parameter""" + # Get the signature of the push method + sig = inspect.signature(FeatureStore.push) + + # Check that skip_feature_view_validation parameter exists + assert "skip_feature_view_validation" in sig.parameters + + # Check that it has a default value of False + param = sig.parameters["skip_feature_view_validation"] + assert param.default is False + + # Check that it's a boolean type hint (if type hints are present) + if param.annotation != inspect.Parameter.empty: + assert param.annotation == bool + + +def test_push_async_has_skip_feature_view_validation_parameter(): + """Test that FeatureStore.push_async() method has skip_feature_view_validation parameter""" + # Get the signature of the push_async method + sig = inspect.signature(FeatureStore.push_async) + + # Check that skip_feature_view_validation parameter exists + assert "skip_feature_view_validation" in sig.parameters + + # Check that it has a default value of False + param = sig.parameters["skip_feature_view_validation"] + assert param.default is False + + # Check that it's a boolean type hint (if type hints are present) + if param.annotation != inspect.Parameter.empty: + assert param.annotation == bool + + +def test_pandas_transformation_from_proto_with_skip_udf(): + """Test that PandasTransformation.from_proto works with skip_udf=True.""" + + # Create a UDF that would reference a non-existent module + def udf_with_missing_module(df: pd.DataFrame) -> pd.DataFrame: + # This would normally fail if a module is missing during deserialization + import nonexistent_module # noqa: F401 + + return df + + # Serialize the UDF + serialized_udf = dill.dumps(udf_with_missing_module) + udf_string = "import nonexistent_module\ndef udf(df): return df" + + # Create proto + udf_proto = UserDefinedFunctionProto( + name="test_udf", + body=serialized_udf, + body_text=udf_string, + ) + + # Test that skip_udf=True doesn't try to deserialize the UDF + # This would normally fail with ModuleNotFoundError + transformation = PandasTransformation.from_proto(udf_proto, skip_udf=True) + + # Should get a dummy transformation with identity function + assert transformation is not None + assert transformation.udf_string == udf_string + + # The dummy UDF should be callable and act as identity + test_df = pd.DataFrame({"col1": [1, 2, 3]}) + result = transformation.udf(test_df) + assert result.equals(test_df) + + +def test_python_transformation_from_proto_with_skip_udf(): + """Test that PythonTransformation.from_proto works with skip_udf=True.""" + + # Create a UDF that would reference a non-existent module + def udf_with_missing_module(features_dict): + # This would normally fail if a module is missing during deserialization + import nonexistent_module # noqa: F401 + + return features_dict + + # Serialize the UDF + serialized_udf = dill.dumps(udf_with_missing_module) + udf_string = "import nonexistent_module\ndef udf(d): return d" + + # Create proto + udf_proto = UserDefinedFunctionProto( + name="test_udf", + body=serialized_udf, + body_text=udf_string, + ) + + # Test that skip_udf=True doesn't try to deserialize the UDF + # This would normally fail with ModuleNotFoundError + transformation = PythonTransformation.from_proto(udf_proto, skip_udf=True) + + # Should get a dummy transformation with identity function + assert transformation is not None + assert transformation.udf_string == udf_string + + # The dummy UDF should be callable and act as identity + test_dict = {"col1": 1} + result = transformation.udf(test_dict) + assert result == test_dict + + +def test_skip_feature_view_validation_in_apply_use_case_documentation(): """ - Documentation test: This test documents the key use case for skip_feature_view_validation. + Documentation test: This test documents the key use case for skip_feature_view_validation in apply(). The skip_feature_view_validation flag is particularly important for On-Demand Feature Views (ODFVs) that use feature transformations. During the apply() process, ODFVs call infer_features() @@ -69,3 +184,140 @@ def test_skip_feature_view_validation_use_case_documentation(): can improve the validation system. """ pass # This is a documentation test + + +def test_skip_feature_view_validation_in_push_use_case_documentation(): + """ + Documentation test: This test documents the key use case for skip_feature_view_validation in push(). + + The skip_feature_view_validation flag in push() addresses the ModuleNotFoundError issue when: + 1. An OnDemandFeatureView with a UDF is defined in an environment with specific modules + 2. The UDF references functions, classes, or constants from those modules (e.g., 'training') + 3. feast.apply() is run to save the definition to the remote registry + 4. store.push() is called from a different environment without those modules + + Without skip_feature_view_validation: + - push() calls list_all_feature_views() which deserializes ODFVs + - Deserialization uses dill.loads() which fails if referenced modules are missing + - Results in: ModuleNotFoundError: No module named 'training' + + With skip_feature_view_validation=True: + - push() calls list_all_feature_views(skip_feature_view_validation=True) + - ODFVs with write_to_online_store=False are loaded with dummy UDFs (identity functions) + - ODFVs with write_to_online_store=True are loaded normally (UDF is deserialized) + - No deserialization of the actual UDF happens for ODFVs that won't execute transformations + - push() can proceed successfully + + IMPORTANT: ODFVs with write_to_online_store=True will have their UDFs executed during + push operations, so their UDFs MUST be properly deserialized even when skip_feature_view_validation=True. + Only ODFVs that don't execute transformations during push can safely skip UDF loading. + + Example usage: + store.push("my_push_source", df, skip_feature_view_validation=True) + + This is particularly useful in production environments where: + - Data ingestion services don't need the training/modeling code + - The UDF logic isn't needed during push operations + - Different teams manage training vs. serving infrastructure + """ + pass # This is a documentation test + + +def test_skip_feature_view_validation_only_applies_to_non_writing_odfvs(): + """ + Test that skip_feature_view_validation only skips UDF loading for ODFVs that don't write to online store. + + ODFVs with write_to_online_store=True need their UDFs loaded because they will be executed + during push operations. Only ODFVs with write_to_online_store=False can safely skip UDF loading. + """ + from feast.infra.registry.proto_registry_utils import list_on_demand_feature_views + from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + OnDemandFeatureView as OnDemandFeatureViewProto, + ) + from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + OnDemandFeatureViewSpec, + ) + from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto + from feast.protos.feast.core.Transformation_pb2 import ( + FeatureTransformationV2, + ) + from feast.protos.feast.core.Transformation_pb2 import ( + UserDefinedFunctionV2 as UserDefinedFunctionProto, + ) + + # Create a UDF that doesn't reference any modules (will work fine) + def simple_udf(df): + return df + + serialized_udf = dill.dumps(simple_udf) + udf_string = "def simple_udf(df): return df" + + udf_proto = UserDefinedFunctionProto( + name="test_udf", + body=serialized_udf, + body_text=udf_string, + ) + + feature_transformation = FeatureTransformationV2(user_defined_function=udf_proto) + + # Create two ODFVs: one with write_to_online_store=True, one with False + odfv_with_write_spec = OnDemandFeatureViewSpec( + name="odfv_with_write", + project="test_project", + mode="pandas", + feature_transformation=feature_transformation, + write_to_online_store=True, + ) + odfv_with_write_proto = OnDemandFeatureViewProto(spec=odfv_with_write_spec) + + odfv_without_write_spec = OnDemandFeatureViewSpec( + name="odfv_without_write", + project="test_project", + mode="pandas", + feature_transformation=feature_transformation, + write_to_online_store=False, + ) + odfv_without_write_proto = OnDemandFeatureViewProto(spec=odfv_without_write_spec) + + # Create a registry with both ODFVs + registry_proto = RegistryProto( + on_demand_feature_views=[odfv_with_write_proto, odfv_without_write_proto] + ) + + # Test with skip_udf=True + odfvs = list_on_demand_feature_views( + registry_proto, "test_project", None, skip_udf=True + ) + + # We should get exactly 2 ODFVs back + assert len(odfvs) == 2 + + # Find each ODFV + odfv_with_write = next(fv for fv in odfvs if fv.name == "odfv_with_write") + odfv_without_write = next(fv for fv in odfvs if fv.name == "odfv_without_write") + + # Verify write_to_online_store flags are correct + assert odfv_with_write.write_to_online_store is True + assert odfv_without_write.write_to_online_store is False + + # The key test: Check if the UDFs behave correctly + # The ODFV with write_to_online_store=True should have the REAL UDF + # The ODFV with write_to_online_store=False should have a DUMMY UDF (identity function) + + test_df = pd.DataFrame({"col1": [1, 2, 3]}) + + # Test the ODFV with write_to_online_store=False - should have dummy UDF + # The dummy UDF is an identity function, so output equals input + result_without_write = odfv_without_write.feature_transformation.udf(test_df) + assert result_without_write.equals(test_df), ( + "ODFV without write_to_online_store should have identity UDF" + ) + + # Test the ODFV with write_to_online_store=True - should have real UDF + # The real UDF is also an identity function in this test, but it's the ACTUAL deserialized UDF + # We can't easily distinguish between real and dummy identity functions in this test + # But the important thing is that it loaded without error + result_with_write = odfv_with_write.feature_transformation.udf(test_df) + assert result_with_write.equals(test_df), ( + "ODFV with write_to_online_store should have real UDF" + )