Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/sentry/workflow_engine/caches/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__all__ = ["CacheAccess"]
__all__ = ["CacheAccess", "CacheMapping"]

from .cache_access import CacheAccess
from .cache_access import CacheAccess, CacheMapping
110 changes: 109 additions & 1 deletion src/sentry/workflow_engine/caches/cache_access.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from abc import abstractmethod
from collections.abc import Callable, Collection, Mapping

from sentry.utils.cache import cache

# Global registry of cache namespaces to detect collisions
_registered_namespaces: set[str] = set()


class CacheAccess[T]:
"""
Expand All @@ -15,8 +19,112 @@ def key(self) -> str:
def get(self) -> T | None:
return cache.get(self.key())

def set(self, value: T, timeout: float | None) -> None:
def set(self, value: T, timeout: float | None = None) -> None:
cache.set(self.key(), value, timeout)

def delete(self) -> bool:
return cache.delete(self.key())


class _MappingAccessor[K, V](CacheAccess[V]):
"""CacheAccess wrapper for a CacheMapping entry."""

def __init__(self, mapping: "CacheMapping[K, V]", input: K):
self._mapping = mapping
self._input = input

def key(self) -> str:
return self._mapping.key(self._input)


class CacheMapping[K, V]:
"""
Defines a family of cache entries keyed by input type K.
K is typically int, str, or a NamedTuple thereof.

CacheMappings should be defined at module level and evaluated at import time.
Namespace collisions are detected at registration time to catch configuration
errors early.

Example with namespace (recommended):
# At module level:
_user_cache = CacheMapping[int, UserData](
lambda uid: str(uid),
namespace="user",
)
# Keys will be "user:{uid}"

Example without namespace:
_user_cache = CacheMapping[int, UserData](lambda uid: f"user:{uid}")
"""

def __init__(
self,
key_func: Callable[[K], str],
*,
namespace: str | None = None,
):
self._key_func = key_func
self._namespace = namespace
if namespace is not None:
if namespace in _registered_namespaces:
raise ValueError(f"Cache namespace '{namespace}' is already registered")
_registered_namespaces.add(namespace)

def key(self, input: K) -> str:
base_key = self._key_func(input)
if self._namespace is not None:
return f"{self._namespace}:{base_key}"
return base_key

def get(self, input: K) -> V | None:
return cache.get(self.key(input))

def set(self, input: K, value: V, timeout: float | None = None) -> None:
cache.set(self.key(input), value, timeout)

def delete(self, input: K) -> bool:
return cache.delete(self.key(input))

def get_many(self, inputs: Collection[K]) -> dict[K, V | None]:
"""
Fetch multiple cache values at once.

Returns a dict with an entry for every input key. Missing cache entries
have a value of None. This guarantees that `get_many([k])[k]` is always
safe (will not raise KeyError).
"""
if not inputs:
return {}
key_to_input = {self.key(inp): inp for inp in inputs}
values = cache.get_many(key_to_input.keys())
return {key_to_input[k]: values.get(k) for k in key_to_input}

def set_many(self, data: Mapping[K, V], timeout: float | None = None) -> list[K]:
"""
Set multiple cache values at once.

Returns a list of input keys that failed to set. An empty list indicates
all keys were set successfully.
"""
if not data:
return []
keyed_data = {self.key(inp): (inp, val) for inp, val in data.items()}
failed_keys = cache.set_many(
{k: val for k, (_, val) in keyed_data.items()},
timeout,
)
failed = set(failed_keys or [])
return [inp for k, (inp, _) in keyed_data.items() if k in failed]

def delete_many(self, inputs: Collection[K]) -> None:
"""
Delete multiple cache values at once.

This is a best-effort operation; partial failures are not reported.
"""
if inputs:
cache.delete_many([self.key(inp) for inp in inputs])

def accessor(self, input: K) -> CacheAccess[V]:
return _MappingAccessor(self, input)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for me, this is getting a little 😵 through the abstractions.

Is this just created to update legacy places that needed the cache access? should we reuse the accessor to access the cache for key safety?

My impression from the rest of this class is that it should be used to replace CacheAccess, since they're both CRUD operators on the cache with safety mechanisms around the key. However, this method kinda muddies the water for me a bit, so i'm not sure if i'm meant to use one or the other -- and then if i have the CacheMapping should i do cache_mapping.get or cache_mapping.accessor.get?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I debated removing this, but hadn't fully convinced myself that it wasn't worth keeping CacheAccess to have a slightly more ergonomic interface for managing single-key interactions, given that it's trivial to generate a CacheAccess from a CacheMapping.
the intended usage pattern there is "you define the cache mapping with CacheMapping, but if you're working with a single key slot, you can generate a CacheAccess for that, and no chance of key confusion, very easy to mock".
I was also leaning toward making CacheMapping exclusively 'many'-based to encourage bulk actions where possible, but ended up discarding it.
I'll just get rid of CacheAccess and rename the file.

60 changes: 31 additions & 29 deletions src/sentry/workflow_engine/caches/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
from django.db.models import Q

from sentry.models.environment import Environment
from sentry.utils.cache import cache
from sentry.workflow_engine.caches import CacheAccess
from sentry.workflow_engine.caches import CacheMapping
from sentry.workflow_engine.models import Detector, DetectorWorkflow, Workflow
from sentry.workflow_engine.utils import scopedstats
from sentry.workflow_engine.utils.metrics import metrics_incr
Expand All @@ -19,6 +18,17 @@
WORKFLOW_CACHE_PREFIX = "workflows_by_detector_env"


class _WorkflowCacheKey(NamedTuple):
detector_id: int
env_id: int | None


_workflow_cache = CacheMapping[_WorkflowCacheKey, set[Workflow]](
lambda key: f"{key.detector_id}:{key.env_id}",
namespace=WORKFLOW_CACHE_PREFIX,
)


class _CacheLookupResult(NamedTuple):
"""
Result of checking cache for detector workflows.
Expand Down Expand Up @@ -60,18 +70,6 @@ class _SplitWorkflowsByDetector(NamedTuple):
env_workflows: _WorkflowsByDetector # env_id=X workflows (may be empty)


class _WorkflowCacheAccess(CacheAccess[set[Workflow]]):
"""
To reduce look-ups, this uses id's instead of requiring the full model for types
"""

def __init__(self, detector_id: int, env_id: int | None):
self._key = f"{WORKFLOW_CACHE_PREFIX}:{detector_id}:{env_id}"

def key(self) -> str:
return self._key


def _invalidate_all_environments(detector_id: int) -> bool:
"""
Invalidate all cache entries for a detector across all environments.
Expand All @@ -84,11 +82,11 @@ def _invalidate_all_environments(detector_id: int) -> bool:
.values_list("workflow__environment_id", flat=True)
)

keys = {_WorkflowCacheAccess(detector_id, env_id).key() for env_id in environment_ids}
keys.add(_WorkflowCacheAccess(detector_id, None).key())
keys = {_WorkflowCacheKey(detector_id, env_id) for env_id in environment_ids}
keys.add(_WorkflowCacheKey(detector_id, None))

if keys:
cache.delete_many(keys)
_workflow_cache.delete_many(keys)
metrics_incr(f"{METRIC_PREFIX}.invalidated_all", value=len(keys))

return len(keys) > 0
Expand Down Expand Up @@ -117,7 +115,7 @@ def invalidate_processing_workflows(
return _invalidate_all_environments(detector_id)

metrics_incr(f"{METRIC_PREFIX}.invalidated")
return _WorkflowCacheAccess(detector_id, env_id).delete()
return _workflow_cache.delete(_WorkflowCacheKey(detector_id, env_id))


def _check_caches_for_detectors(
Expand All @@ -136,15 +134,13 @@ def _check_caches_for_detectors(
workflows: set[Workflow] = set()
missed_detector_ids: list[int] = []

for detector in detectors:
cache_access = _WorkflowCacheAccess(detector.id, env_id)
cached = cache_access.get()

keys = [_WorkflowCacheKey(d.id, env_id) for d in detectors]
for key, cached in _workflow_cache.get_many(keys).items():
if cached is not None:
workflows |= cached
metrics_incr(f"{METRIC_PREFIX}.hit")
else:
missed_detector_ids.append(detector.id)
missed_detector_ids.append(key.detector_id)
metrics_incr(f"{METRIC_PREFIX}.miss")

return _CacheLookupResult(workflows, missed_detector_ids)
Expand Down Expand Up @@ -207,14 +203,20 @@ def _populate_detector_caches(
split_workflows: SplitWorkflowsByDetector with global and env-specific workflows
env_id: Environment ID for the env-specific cache (None for global-only query)
"""
# Always store global workflows in env_id=None cache
for detector_id, workflows in split_workflows.global_workflows.mapping.items():
_WorkflowCacheAccess(detector_id, None).set(workflows, CACHE_TTL)
data: dict[_WorkflowCacheKey, set[Workflow]] = {
_WorkflowCacheKey(detector_id, None): workflows
for detector_id, workflows in split_workflows.global_workflows.mapping.items()
}

# Store env-specific workflows in env_id=X cache (only if env_id was specified)
if env_id is not None:
for detector_id, workflows in split_workflows.env_workflows.mapping.items():
_WorkflowCacheAccess(detector_id, env_id).set(workflows, CACHE_TTL)
data.update(
{
_WorkflowCacheKey(detector_id, env_id): workflows
for detector_id, workflows in split_workflows.env_workflows.mapping.items()
}
)

_workflow_cache.set_many(data, CACHE_TTL)


@scopedstats.timer()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Literal
from typing import Any, Literal, NamedTuple

from sentry import tagstore
from sentry.models.environment import Environment
Expand All @@ -12,38 +12,30 @@
from sentry.search.utils import get_latest_release
from sentry.services.eventstore.models import GroupEvent
from sentry.utils import metrics
from sentry.workflow_engine.caches import CacheAccess
from sentry.workflow_engine.caches import CacheAccess, CacheMapping
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.registry import condition_handler_registry
from sentry.workflow_engine.types import DataConditionHandler, WorkflowEventData


class _LatestReleaseCacheAccess(CacheAccess[Release | Literal[False]]):
"""
If we have a release for a project in an environment, we cache it.
If we don't, we cache False.
"""
class _LatestReleaseCacheKey(NamedTuple):
project_id: int
environment_id: int | None

def __init__(self, event: GroupEvent, environment: Environment | None):
self._key = latest_release_cache_key(
event.group.project_id, environment.id if environment else None
)

def key(self) -> str:
return self._key
class _LatestAdoptedReleaseCacheKey(NamedTuple):
project_id: int
environment_id: int


class _LatestAdoptedReleaseCacheAccess(CacheAccess[Release | Literal[False]]):
"""
If we have a latest adopted release for a project in an environment, we cache it.
If we don't, we cache False.
"""

def __init__(self, event: GroupEvent, environment: Environment):
self._key = latest_adopted_release_cache_key(event.group.project_id, environment.id)

def key(self) -> str:
return self._key
# Cache mappings for latest release lookups.
# Values are Release objects, or False if no release exists (to cache negative lookups).
_latest_release_cache = CacheMapping[_LatestReleaseCacheKey, Release | Literal[False]](
lambda key: latest_release_cache_key(key.project_id, key.environment_id)
)
_latest_adopted_release_cache = CacheMapping[
_LatestAdoptedReleaseCacheKey, Release | Literal[False]
](lambda key: latest_adopted_release_cache_key(key.project_id, key.environment_id))


def get_latest_adopted_release_for_env(
Expand All @@ -52,11 +44,12 @@ def get_latest_adopted_release_for_env(
"""
Get the latest adopted release for a project in an environment.
"""
cache_key = _LatestAdoptedReleaseCacheKey(event.group.project_id, environment.id)
return _get_latest_release_for_env_impl(
environment,
event,
only_adopted=True,
cache_access=_LatestAdoptedReleaseCacheAccess(event, environment),
cache_access=_latest_adopted_release_cache.accessor(cache_key),
)


Expand All @@ -68,11 +61,14 @@ def get_latest_release_for_env(
Get the latest release for a project in an environment.
NOTE: This is independent of whether it has been adopted or not.
"""
cache_key = _LatestReleaseCacheKey(
event.group.project_id, environment.id if environment else None
)
return _get_latest_release_for_env_impl(
environment,
event,
only_adopted=False,
cache_access=_LatestReleaseCacheAccess(event, environment),
cache_access=_latest_release_cache.accessor(cache_key),
)


Expand Down
Loading
Loading