-
Notifications
You must be signed in to change notification settings - Fork 1.3k
fix: Use logging instead of print during materialization #5805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
bdd6245
bba71ac
dca6f93
23de772
91b88b2
ab9c6fc
030a51e
f64034c
66d4052
4cc99aa
ea75d6e
eb874e3
be90ca3
9c38991
5d63092
176ed0d
680725a
46b0967
36c63cb
7629b5c
49c84dd
f752488
e565f1d
73581ea
9516758
7f0b5b0
ac88775
2869e6c
f677c49
9be77c3
8b62ad3
ffeea3e
945d515
4b51239
ee11cc7
e9518f4
99dd48c
43078d3
31c51e6
3f3e636
9d28508
1612cc9
f393e3b
4c8b45f
aba90f4
f2610c5
aa245a8
d3ddce7
5203fc7
78786e9
9f6bd24
1eb20c1
46f869b
18ec522
ff01d4f
1569d61
7083435
641a11d
99a94c0
cd9eb41
7dc3b69
ce6398b
21ea2a9
fa4a8d1
9293b5b
7a8b62d
436404f
1dec68c
8ad3ea2
5d00b56
16cbacf
80b1f7d
53e0269
a861a3c
15987cb
31b3922
5070dfa
543ff29
a946fd6
7caf966
d205a80
558e28e
741274c
7e8d6ac
98a00b2
8e6327c
c2fbc54
5b0df6c
fa9e647
30b406c
2a0d873
6635515
928431b
61e9345
3bf66c6
40323bc
aab9d78
d9ce1d5
c274918
69b4f69
46f3ac0
7f05abe
de8a1d0
d936ff2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,15 +12,14 @@ | |
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| import asyncio | ||
| import copy | ||
| import itertools | ||
| import logging | ||
| import os | ||
| import time | ||
| import warnings | ||
| from datetime import datetime, timedelta | ||
| from pathlib import Path | ||
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Any, | ||
| Callable, | ||
| Dict, | ||
|
|
@@ -29,17 +28,16 @@ | |
| Mapping, | ||
| Optional, | ||
| Sequence, | ||
| TYPE_CHECKING, | ||
| Tuple, | ||
| Union, | ||
| cast, | ||
| ) | ||
|
|
||
| if TYPE_CHECKING: | ||
| from feast.diff.apply_progress import ApplyProgressContext | ||
|
|
||
| import pandas as pd | ||
| import pyarrow as pa | ||
| from colorama import Fore, Style | ||
| from fastapi.concurrency import run_in_threadpool | ||
| from google.protobuf.timestamp_pb2 import Timestamp | ||
| from tqdm import tqdm | ||
|
|
@@ -85,6 +83,9 @@ | |
| from feast.online_response import OnlineResponse | ||
| from feast.permissions.permission import Permission | ||
| from feast.project import Project | ||
|
|
||
| if TYPE_CHECKING: | ||
| from feast.diff.apply_progress import ApplyProgressContext | ||
| from feast.protos.feast.serving.ServingService_pb2 import ( | ||
| FieldStatus, | ||
| GetOnlineFeaturesResponse, | ||
|
|
@@ -125,6 +126,7 @@ def _get_track_materialization(): | |
|
|
||
|
|
||
| warnings.simplefilter("once", DeprecationWarning) | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class FeatureStore: | ||
|
|
@@ -831,7 +833,6 @@ def plan( | |
| self, | ||
| desired_repo_contents: RepoContents, | ||
| skip_feature_view_validation: bool = False, | ||
| progress_ctx: Optional["ApplyProgressContext"] = None, | ||
| ) -> Tuple[RegistryDiff, InfraDiff, Infra]: | ||
| """Dry-run registering objects to metadata store. | ||
|
|
||
|
|
@@ -841,8 +842,6 @@ def plan( | |
|
|
||
| Args: | ||
| desired_repo_contents: The desired repo state. | ||
| skip_feature_view_validation: If True, skip validation of feature views. This can be useful when the validation | ||
| system is being overly strict. Use with caution and report any issues on GitHub. Default is False. | ||
|
|
||
| Raises: | ||
| ValueError: The 'objects' parameter could not be parsed properly. | ||
|
|
@@ -897,9 +896,6 @@ def plan( | |
| # the desired repo state. | ||
| registry_diff = diff_between(self.registry, self.project, desired_repo_contents) | ||
|
|
||
| if progress_ctx: | ||
| progress_ctx.update_phase_progress("Computing infrastructure diff") | ||
|
|
||
| # Compute the desired difference between the current infra, as stored in the registry, | ||
| # and the desired infra. | ||
| self.registry.refresh(project=self.project) | ||
|
|
@@ -927,7 +923,6 @@ def _apply_diffs( | |
| registry_diff: The diff between the current registry and the desired registry. | ||
| infra_diff: The diff between the current infra and the desired infra. | ||
| new_infra: The desired infra. | ||
| progress_ctx: Optional progress context for tracking apply progress. | ||
| """ | ||
| try: | ||
| # Infrastructure phase | ||
|
|
@@ -951,13 +946,7 @@ def _apply_diffs( | |
|
|
||
| self.registry.update_infra(new_infra, self.project, commit=True) | ||
|
|
||
| if progress_ctx: | ||
| progress_ctx.update_phase_progress("Registry update complete") | ||
| progress_ctx.complete_phase() | ||
| finally: | ||
| # Always cleanup progress bars | ||
| if progress_ctx: | ||
| progress_ctx.cleanup() | ||
| self._registry.update_infra(new_infra, self.project, commit=True) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Botched refactoring leaves dangling The refactoring of Even if the syntax error were naively fixed (e.g., by removing the Intended vs actual code structureThe old code was: try:
...
self.registry.update_infra(new_infra, self.project, commit=True)
if progress_ctx:
progress_ctx.update_phase_progress(...)
progress_ctx.complete_phase()
finally:
if progress_ctx:
progress_ctx.cleanup()The new code ended up as: try:
...
self.registry.update_infra(new_infra, self.project, commit=True)
self._registry.update_infra(new_infra, self.project, commit=True) # stray duplicateThe fix should remove both the Prompt for agentsWas this helpful? React with 👍 or 👎 to provide feedback.
Comment on lines
947
to
+949
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Progress bar cleanup removed from The old code had a Prompt for agentsWas this helpful? React with 👍 or 👎 to provide feedback. |
||
|
|
||
| # Emit OpenLineage events for applied objects | ||
| self._emit_openlineage_apply_diffs(registry_diff) | ||
|
|
@@ -1002,18 +991,12 @@ def apply( | |
| an online store), it will commit the updated registry. All operations are idempotent, meaning they can safely | ||
| be rerun. | ||
|
|
||
| Note: The apply method does NOT delete objects that are removed from the provided list. To delete objects | ||
| from the registry, use explicit delete methods like delete_feature_view(), delete_feature_service(), or | ||
| pass objects to the objects_to_delete parameter with partial=False. | ||
|
|
||
| Args: | ||
| objects: A single object, or a list of objects that should be registered with the Feature Store. | ||
| objects_to_delete: A list of objects to be deleted from the registry and removed from the | ||
| provider's infrastructure. This deletion will only be performed if partial is set to False. | ||
| partial: If True, apply will only handle the specified objects; if False, apply will also delete | ||
| all the objects in objects_to_delete, and tear down any associated cloud resources. | ||
| skip_feature_view_validation: If True, skip validation of feature views. This can be useful when the validation | ||
| system is being overly strict. Use with caution and report any issues on GitHub. Default is False. | ||
|
|
||
| Raises: | ||
| ValueError: The 'objects' parameter could not be parsed properly. | ||
|
|
@@ -1364,17 +1347,6 @@ def get_historical_features( | |
| # TODO(achal): _group_feature_refs returns the on demand feature views, but it's not passed into the provider. | ||
| # This is a weird interface quirk - we should revisit the `get_historical_features` to | ||
| # pass in the on demand feature views as well. | ||
|
|
||
| # Deliberately disable writing to online store for ODFVs during historical retrieval | ||
| # since it's not applicable in this context. | ||
| # This does not change the output, since it forces to recompute ODFVs on historical retrieval | ||
| # but that is fine, since ODFVs precompute does not to work for historical retrieval (as per docs), only for online retrieval | ||
| # Copy to avoid side effects outside of this method | ||
| all_on_demand_feature_views = copy.deepcopy(all_on_demand_feature_views) | ||
|
|
||
| for odfv in all_on_demand_feature_views: | ||
| odfv.write_to_online_store = False | ||
|
|
||
| fvs, odfvs = utils._group_feature_refs( | ||
| _feature_refs, | ||
| all_feature_views, | ||
|
|
@@ -1384,7 +1356,7 @@ def get_historical_features( | |
| on_demand_feature_views = list(view for view, _ in odfvs) | ||
|
|
||
| # Check that the right request data is present in the entity_df | ||
| if type(entity_df) == pd.DataFrame: | ||
| if isinstance(entity_df, pd.DataFrame): | ||
| if self.config.coerce_tz_aware: | ||
| entity_df = utils.make_df_tzaware(cast(pd.DataFrame, entity_df)) | ||
| for odfv in on_demand_feature_views: | ||
|
|
@@ -1526,8 +1498,9 @@ def _materialize_odfv( | |
| ): | ||
| """Helper to materialize a single OnDemandFeatureView.""" | ||
| if not feature_view.source_feature_view_projections: | ||
| print( | ||
| f"[WARNING] ODFV {feature_view.name} materialization: No source feature views found." | ||
| logger.warning( | ||
| "ODFV %s materialization: No source feature views found.", | ||
| feature_view.name, | ||
| ) | ||
| return | ||
| start_date = utils.make_tzaware(start_date) | ||
|
|
@@ -1554,20 +1527,24 @@ def _materialize_odfv( | |
| all_join_keys = {key for key in all_join_keys if key} | ||
|
|
||
| if not all_join_keys: | ||
| print( | ||
| f"[WARNING] ODFV {feature_view.name} materialization: No join keys found in source views. Cannot create entity_df. Skipping." | ||
| logger.warning( | ||
| "ODFV %s materialization: No join keys found in source views. Cannot create entity_df. Skipping.", | ||
| feature_view.name, | ||
| ) | ||
| return | ||
|
|
||
| if len(entity_timestamp_col_names) > 1: | ||
| print( | ||
| f"[WARNING] ODFV {feature_view.name} materialization: Found multiple timestamp columns in sources ({entity_timestamp_col_names}). This is not supported. Skipping." | ||
| logger.warning( | ||
| "ODFV %s materialization: Found multiple timestamp columns in sources (%s). This is not supported. Skipping.", | ||
| feature_view.name, | ||
| entity_timestamp_col_names, | ||
| ) | ||
| return | ||
|
|
||
| if not entity_timestamp_col_names: | ||
| print( | ||
| f"[WARNING] ODFV {feature_view.name} materialization: No batch sources with timestamp columns found for sources. Skipping." | ||
| logger.warning( | ||
| "ODFV %s materialization: No batch sources with timestamp columns found for sources. Skipping.", | ||
| feature_view.name, | ||
| ) | ||
| return | ||
|
|
||
|
|
@@ -1596,8 +1573,9 @@ def _materialize_odfv( | |
| all_source_dfs.append(df) | ||
|
|
||
| if not all_source_dfs: | ||
| print( | ||
| f"No source data found for ODFV {feature_view.name} in the given time range. Skipping materialization." | ||
| logger.info( | ||
| "No source data found for ODFV %s in the given time range. Skipping materialization.", | ||
| feature_view.name, | ||
| ) | ||
| return | ||
|
|
||
|
|
@@ -1660,10 +1638,8 @@ def materialize_incremental( | |
| >>> from datetime import datetime, timedelta | ||
| >>> fs = FeatureStore(repo_path="project/feature_repo") | ||
| >>> fs.materialize_incremental(end_date=_utc_now() - timedelta(minutes=5)) | ||
| Materializing... | ||
| <BLANKLINE> | ||
| ... | ||
| """ | ||
| _print_materializing_banner() | ||
| feature_views_to_materialize = self._get_feature_views_to_materialize( | ||
| feature_views | ||
| ) | ||
|
|
@@ -1812,10 +1788,8 @@ def materialize( | |
| >>> fs.materialize( | ||
| ... start_date=_utc_now() - timedelta(hours=3), end_date=_utc_now() - timedelta(minutes=10) | ||
| ... ) | ||
| Materializing... | ||
| <BLANKLINE> | ||
| ... | ||
| """ | ||
| _print_materializing_banner() | ||
| if utils.make_tzaware(start_date) > utils.make_tzaware(end_date): | ||
| raise ValueError( | ||
| f"The given start_date {start_date} is greater than the given end_date {end_date}." | ||
|
|
@@ -3174,7 +3148,7 @@ def validate_logged_features( | |
|
|
||
| return exc | ||
| else: | ||
| print(f"{t.shape[0]} rows were validated.") | ||
| logger.info("%s rows were validated.", t.shape[0]) | ||
|
|
||
| if cache_profile: | ||
| self.apply(reference) | ||
|
|
@@ -3305,20 +3279,27 @@ def _print_materialization_log( | |
| start_date, end_date, num_feature_views: int, online_store: str | ||
| ): | ||
| if start_date: | ||
| print( | ||
| f"Materializing {Style.BRIGHT + Fore.GREEN}{num_feature_views}{Style.RESET_ALL} feature views" | ||
| f" from {Style.BRIGHT + Fore.GREEN}{utils.make_tzaware(start_date.replace(microsecond=0))}{Style.RESET_ALL}" | ||
| f" to {Style.BRIGHT + Fore.GREEN}{utils.make_tzaware(end_date.replace(microsecond=0))}{Style.RESET_ALL}" | ||
| f" into the {Style.BRIGHT + Fore.GREEN}{online_store}{Style.RESET_ALL} online store.\n" | ||
| logger.info( | ||
| "Materializing %s feature views from %s to %s into the %s online store.", | ||
| num_feature_views, | ||
| utils.make_tzaware(start_date.replace(microsecond=0)), | ||
| utils.make_tzaware(end_date.replace(microsecond=0)), | ||
| online_store, | ||
| ) | ||
| else: | ||
| print( | ||
| f"Materializing {Style.BRIGHT + Fore.GREEN}{num_feature_views}{Style.RESET_ALL} feature views" | ||
| f" to {Style.BRIGHT + Fore.GREEN}{utils.make_tzaware(end_date.replace(microsecond=0))}{Style.RESET_ALL}" | ||
| f" into the {Style.BRIGHT + Fore.GREEN}{online_store}{Style.RESET_ALL} online store.\n" | ||
| logger.info( | ||
| "Materializing %s feature views to %s into the %s online store.", | ||
| num_feature_views, | ||
| utils.make_tzaware(end_date.replace(microsecond=0)), | ||
| online_store, | ||
| ) | ||
|
|
||
|
|
||
| def _print_materializing_banner() -> None: | ||
| logger.info("Materializing...") | ||
| logger.info("") | ||
|
|
||
|
|
||
| def _validate_feature_views(feature_views: List[BaseFeatureView]): | ||
| """Verify feature views have case-insensitively unique names across all types. | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 Removed
coloramaimport breaksmaterializeandmaterialize_incrementalwith NameErrorThe import
from colorama import Fore, Stylewas removed (old line 42), butStyle.BRIGHT,Fore.GREEN, andStyle.RESET_ALLare still used in multipleprint()statements withinmaterialize_incremental()(lines 1678, 1701, 1707-1709) andmaterialize()(lines 1819, 1830). When either materialization method is called and reaches a feature view that triggers these print statements, aNameError: name 'Style' is not definedwill be raised at runtime, crashing materialization.(Refers to line 1678)
Prompt for agents
Was this helpful? React with 👍 or 👎 to provide feedback.