Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -1510,7 +1510,7 @@
"filename": "sdk/python/tests/universal/feature_repos/universal/online_store/postgres.py",
"hashed_secret": "95433727ea51026e1e0dc8deadaabd4a3baaaaf4",
"is_verified": false,
"line_number": 19
"line_number": 21
}
],
"sdk/python/tests/universal/feature_repos/universal/online_store/singlestore.py": [
Expand Down
2 changes: 1 addition & 1 deletion infra/feast-operator/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM registry.access.redhat.com/ubi9/go-toolset:1.24 AS builder
FROM registry.access.redhat.com/ubi9/go-toolset:1.22.9 AS builder
ARG TARGETOS
ARG TARGETARCH
ENV GOTOOLCHAIN=auto
Expand Down
6 changes: 3 additions & 3 deletions infra/scripts/compile-templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ def find_repo(path):
# Template README.md
############################
roadmap_path = repo_root / "docs" / "roadmap.md"
with open(roadmap_path, "r") as f:
with open(roadmap_path, "r", encoding="utf-8") as f:
# skip first lines since it has the title
roadmap_contents_lines = f.readlines()[2:]

# Join back again
roadmap_contents = "".join(roadmap_contents_lines)

template_path = repo_root / "infra" / "templates" / "README.md.jinja2"
with open(template_path) as f:
with open(template_path, encoding="utf-8") as f:
template = Template(f.read())

# Compile template
Expand All @@ -49,5 +49,5 @@ def find_repo(path):
)

readme_path = repo_root / "README.md"
with open(readme_path, "w") as f:
with open(readme_path, "w", encoding="utf-8") as f:
f.write(readme_md)
2 changes: 1 addition & 1 deletion sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class VersionedOnlineReadNotSupported(FeastError):
def __init__(self, store_name: str, version: int):
super().__init__(
f"Versioned feature reads (@v{version}) are not yet supported by {store_name}. "
f"Currently only SQLite, PostgreSQL, MySQL, FAISS, Redis, and DynamoDB support version-qualified feature references. "
f"Currently only SQLite, PostgreSQL, MySQL, FAISS, Redis, DynamoDB, and SingleStore support version-qualified feature references. "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 VersionedOnlineReadNotSupported error message lists FAISS, Redis, DynamoDB as supported but they are no longer recognized

The error message at errors.py:145 was updated to include SingleStore but still lists "FAISS, Redis, DynamoDB" as supporting versioned reads. However, the new _check_versioned_read_support logic at online_store.py:280 only allows stores that set supports_versioned_online_reads = True (or are SqliteOnlineStore). FaissOnlineStore does not set this property, and RedisOnlineStore/DynamoDBOnlineStore don't either. So when users encounter this error on these stores, the message incorrectly tells them the store should work.

Suggested change
f"Currently only SQLite, PostgreSQL, MySQL, FAISS, Redis, DynamoDB, and SingleStore support version-qualified feature references. "
f"Currently only SQLite, PostgreSQL, MySQL, and SingleStore support version-qualified feature references. "
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

)


Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,9 @@ def teardown(self):

entities = self.list_entities()

self._get_provider().teardown_infra(self.project, tables, entities)
self._get_provider().teardown_infra(
self.project, tables, entities, registry=self.registry
)
self.registry.teardown()

def get_historical_features(
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/infra/online_stores/bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from feast.feature_view import DUMMY_ENTITY_NAME
from feast.infra.online_stores.helpers import compute_entity_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
Expand Down Expand Up @@ -306,6 +307,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry: Optional[BaseRegistry] = None,
):
# Because of historical reasons, Feast calls them tables. We use this alias for
# readability.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
"""
Delete tables from the database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
"""
Delete tables from the database.
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
online_config = config.online_store
assert isinstance(online_config, DatastoreOnlineStoreConfig)
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
"""
Delete tables from the DynamoDB Online Store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
project = config.project
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
versioning = config.registry.enable_online_feature_view_versioning
for table in tables:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
online_store_config = config.online_store
if not isinstance(online_store_config, HazelcastOnlineStoreConfig):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
"""
Delete tables from the Hbase Online Store.
Expand Down
28 changes: 17 additions & 11 deletions sdk/python/feast/infra/online_stores/helpers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import struct
from datetime import datetime, timezone
from typing import Any, List
from typing import Any, List, Optional

import mmh3

from feast.feature_view import FeatureView
from feast.importer import import_class
from feast.infra.key_encoding_utils import (
serialize_entity_key,
Expand Down Expand Up @@ -72,18 +73,23 @@ def _to_naive_utc(ts: datetime) -> datetime:
return ts.astimezone(tz=timezone.utc).replace(tzinfo=None)


def compute_versioned_name(table: Any, enable_versioning: bool = False) -> str:
"""Return the table name with a ``_v{N}`` suffix when versioning is enabled."""
def online_store_table_id(
project: str,
table: FeatureView,
enable_versioning: bool = False,
version: Optional[int] = None,
) -> str:
name = table.name
if enable_versioning:
version = getattr(table.projection, "version_tag", None)
if version is None:
version = getattr(table, "current_version_number", None)
if version is not None and version > 0:
name = f"{table.name}_v{version}"
return name
resolved_version = version
if resolved_version is None:
resolved_version = getattr(table.projection, "version_tag", None)
if resolved_version is None:
resolved_version = getattr(table, "current_version_number", None)
if resolved_version is not None and resolved_version > 0:
name = f"{table.name}_v{resolved_version}"
return f"{project}_{name}"


def compute_table_id(project: str, table: Any, enable_versioning: bool = False) -> str:
"""Build the online-store table name, appending a version suffix when versioning is enabled."""
return f"{project}_{compute_versioned_name(table, enable_versioning)}"
return online_store_table_id(project, table, enable_versioning)
Comment on lines +76 to +95
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Removed compute_versioned_name from helpers.py but redis.py and dynamodb.py still import it

The PR refactored helpers.py by replacing compute_versioned_name with online_store_table_id, but two online store modules still import the removed function. redis.py:39 imports compute_versioned_name and uses it at redis.py:62, and dynamodb.py:27 imports it and uses it at dynamodb.py:1158. Since the PR explicitly modified both of these files (to add the registry parameter to teardown), this is an incomplete refactoring. Any user of the Redis or DynamoDB online store will get an ImportError the moment the module is loaded.

Prompt for agents
The function compute_versioned_name was removed from helpers.py and replaced with online_store_table_id, but two callers were not migrated:

1. sdk/python/feast/infra/online_stores/redis.py line 39 imports compute_versioned_name from helpers. It is used at line 62 in _versioned_fv_name(). This function returns just the versioned name (without project prefix) for Redis hash key construction. You need to either: (a) re-export a backward-compatible compute_versioned_name wrapper in helpers.py that extracts the name portion from online_store_table_id, or (b) update redis.py to use online_store_table_id and strip the project prefix, or (c) add a simpler helper that returns just the versioned name without the project prefix. Also add supports_versioned_online_reads = True to RedisOnlineStore if Redis should continue supporting versioned reads.

2. sdk/python/feast/infra/online_stores/dynamodb.py line 27 imports compute_versioned_name from helpers. It is used at line 1158 in _get_table_name(). DynamoDB uses it to get the versioned name which is then inserted into a template. Same approach needed here. Also add supports_versioned_online_reads = True to DynamoDBOnlineStore if DynamoDB should continue supporting versioned reads.

Note that the old compute_versioned_name returned just the name (e.g. 'my_view_v2') without a project prefix, while online_store_table_id returns 'project_my_view_v2'. The callers in redis.py and dynamodb.py rely on the old behavior (no project prefix), so a simple rename won't work.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
Expand Up @@ -294,34 +294,32 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
"""
Teardown all managed online stores for the given FeatureViews and Entities.
"""Teardown all managed online stores for the given FeatureViews and Entities."""

Args:
config: Feast RepoConfig.
tables: Sequence of FeatureViews to teardown.
entities: Sequence of Entities to teardown.
"""
# Use a set of (tribe, store_type, conf_id) to avoid duplicate teardowns for the same instance
tribes_seen = set()
online_stores_cfg = getattr(config.online_store, "online_stores", [])
tag_name = getattr(config.online_store, "routing_tag", "tribe")
self._initialize_online_stores(config)
tables_by_tribe: Dict[str, List[FeatureView]] = {}
for table in tables:
tribe = table.tags.get(tag_name)
tribe = self._get_routing_tag_value(table, config)
if not tribe:
continue
# Find all store configs matching this tribe (supporting multiple instances of the same type)
for store_cfg in online_stores_cfg:
store_type = store_cfg.type
# Use id(store_cfg.conf) to distinguish different configs of the same type
key = (tribe, store_type, id(store_cfg.conf))
if key in tribes_seen:
continue
tribes_seen.add(key)
# Only select the online store if tribe matches the type (or you can add a mapping in config for more flexibility)
if tribe.lower() == store_type.split(".")[-1].lower():
online_store = self._get_online_store(tribe, config)
if online_store:
config = RepoConfig(**self._prepare_repo_conf(config, tribe))
online_store.teardown(config, tables, entities)
tag_name = getattr(config.online_store, "routing_tag", "tribe")
raise ValueError(
f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore."
)
tables_by_tribe.setdefault(tribe, []).append(table)

for tribe, tribe_tables in tables_by_tribe.items():
online_store = self._get_online_store(tribe, config)
if not online_store:
raise NotImplementedError(
f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration."
)

tribe_config = RepoConfig(**self._prepare_repo_conf(config, tribe))
online_store.teardown(
tribe_config,
tribe_tables,
entities,
registry=registry,
)
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
self.client = self._connect(config)
for table in tables:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
"""
Drop the backing collection and close the client.
Expand Down
Comment thread
antznette1 marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.helpers import compute_table_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel
Expand Down Expand Up @@ -43,6 +44,10 @@ class MySQLOnlineStore(OnlineStore):

_conn: Optional[Connection] = None

@property
def supports_versioned_online_reads(self) -> bool:
return True

def _get_conn(self, config: RepoConfig) -> Connection:
online_store_config = config.online_store
assert isinstance(online_store_config, MySQLOnlineStoreConfig)
Expand Down Expand Up @@ -78,7 +83,7 @@ def online_write_batch(
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=3,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
timestamp = to_naive_utc(timestamp)
if created_ts is not None:
Expand All @@ -100,14 +105,14 @@ def online_write_batch(
if progress:
progress(1)
else:
batch_size = config.online_store.bacth_size
batch_size = config.online_store.batch_size
if not batch_size or batch_size < 2:
raise ValueError("Batch size must be at least 2")
insert_values = []
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=2,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
timestamp = to_naive_utc(timestamp)
if created_ts is not None:
Expand Down Expand Up @@ -223,7 +228,7 @@ def online_read(
for entity_key in entity_keys:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=3,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()

cur.execute(
Expand Down Expand Up @@ -296,6 +301,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry: Optional[BaseRegistry] = None,
) -> None:
conn = self._get_conn(config)
cur = conn.cursor()
Expand Down
Loading
Loading