diff --git a/src/sentry/workflow_engine/caches/__init__.py b/src/sentry/workflow_engine/caches/__init__.py index d76909c5f344f7..ad0f0a3c174c51 100644 --- a/src/sentry/workflow_engine/caches/__init__.py +++ b/src/sentry/workflow_engine/caches/__init__.py @@ -1,3 +1,3 @@ -__all__ = ["CacheAccess"] +__all__ = ["CacheAccess", "CacheMapping"] -from .cache_access import CacheAccess +from .cache_access import CacheAccess, CacheMapping diff --git a/src/sentry/workflow_engine/caches/cache_access.py b/src/sentry/workflow_engine/caches/cache_access.py index 312d1889de9df5..9993f687896f1d 100644 --- a/src/sentry/workflow_engine/caches/cache_access.py +++ b/src/sentry/workflow_engine/caches/cache_access.py @@ -1,7 +1,11 @@ from abc import abstractmethod +from collections.abc import Callable, Collection, Mapping from sentry.utils.cache import cache +# Global registry of cache namespaces to detect collisions +_registered_namespaces: set[str] = set() + class CacheAccess[T]: """ @@ -15,8 +19,112 @@ def key(self) -> str: def get(self) -> T | None: return cache.get(self.key()) - def set(self, value: T, timeout: float | None) -> None: + def set(self, value: T, timeout: float | None = None) -> None: cache.set(self.key(), value, timeout) def delete(self) -> bool: return cache.delete(self.key()) + + +class _MappingAccessor[K, V](CacheAccess[V]): + """CacheAccess wrapper for a CacheMapping entry.""" + + def __init__(self, mapping: "CacheMapping[K, V]", input: K): + self._mapping = mapping + self._input = input + + def key(self) -> str: + return self._mapping.key(self._input) + + +class CacheMapping[K, V]: + """ + Defines a family of cache entries keyed by input type K. + K is typically int, str, or a NamedTuple thereof. + + CacheMappings should be defined at module level and evaluated at import time. + Namespace collisions are detected at registration time to catch configuration + errors early. + + Example with namespace (recommended): + # At module level: + _user_cache = CacheMapping[int, UserData]( + lambda uid: str(uid), + namespace="user", + ) + # Keys will be "user:{uid}" + + Example without namespace: + _user_cache = CacheMapping[int, UserData](lambda uid: f"user:{uid}") + """ + + def __init__( + self, + key_func: Callable[[K], str], + *, + namespace: str | None = None, + ): + self._key_func = key_func + self._namespace = namespace + if namespace is not None: + if namespace in _registered_namespaces: + raise ValueError(f"Cache namespace '{namespace}' is already registered") + _registered_namespaces.add(namespace) + + def key(self, input: K) -> str: + base_key = self._key_func(input) + if self._namespace is not None: + return f"{self._namespace}:{base_key}" + return base_key + + def get(self, input: K) -> V | None: + return cache.get(self.key(input)) + + def set(self, input: K, value: V, timeout: float | None = None) -> None: + cache.set(self.key(input), value, timeout) + + def delete(self, input: K) -> bool: + return cache.delete(self.key(input)) + + def get_many(self, inputs: Collection[K]) -> dict[K, V | None]: + """ + Fetch multiple cache values at once. + + Returns a dict with an entry for every input key. Missing cache entries + have a value of None. This guarantees that `get_many([k])[k]` is always + safe (will not raise KeyError). + """ + if not inputs: + return {} + key_to_input = {self.key(inp): inp for inp in inputs} + values = cache.get_many(key_to_input.keys()) + return {key_to_input[k]: values.get(k) for k in key_to_input} + + def set_many(self, data: Mapping[K, V], timeout: float | None = None) -> list[K]: + """ + Set multiple cache values at once. + + Returns a list of input keys that failed to set. An empty list indicates + all keys were set successfully. + """ + if not data: + return [] + keyed_data = {self.key(inp): (inp, val) for inp, val in data.items()} + failed_keys = cache.set_many( + {k: val for k, (_, val) in keyed_data.items()}, + timeout, + ) + failed = set(failed_keys or []) + return [inp for k, (inp, _) in keyed_data.items() if k in failed] + + def delete_many(self, inputs: Collection[K]) -> None: + """ + Delete multiple cache values at once. + + This is a best-effort operation; partial failures are not reported. + """ + if inputs: + cache.delete_many([self.key(inp) for inp in inputs]) + + def accessor(self, input: K) -> CacheAccess[V]: + return _MappingAccessor(self, input) diff --git a/src/sentry/workflow_engine/caches/workflow.py b/src/sentry/workflow_engine/caches/workflow.py index a4fadf67fe8862..a9e21a931cc76a 100644 --- a/src/sentry/workflow_engine/caches/workflow.py +++ b/src/sentry/workflow_engine/caches/workflow.py @@ -4,8 +4,7 @@ from django.db.models import Q from sentry.models.environment import Environment -from sentry.utils.cache import cache -from sentry.workflow_engine.caches import CacheAccess +from sentry.workflow_engine.caches import CacheMapping from sentry.workflow_engine.models import Detector, DetectorWorkflow, Workflow from sentry.workflow_engine.utils import scopedstats from sentry.workflow_engine.utils.metrics import metrics_incr @@ -19,6 +18,17 @@ WORKFLOW_CACHE_PREFIX = "workflows_by_detector_env" +class _WorkflowCacheKey(NamedTuple): + detector_id: int + env_id: int | None + + +_workflow_cache = CacheMapping[_WorkflowCacheKey, set[Workflow]]( + lambda key: f"{key.detector_id}:{key.env_id}", + namespace=WORKFLOW_CACHE_PREFIX, +) + + class _CacheLookupResult(NamedTuple): """ Result of checking cache for detector workflows. @@ -60,18 +70,6 @@ class _SplitWorkflowsByDetector(NamedTuple): env_workflows: _WorkflowsByDetector # env_id=X workflows (may be empty) -class _WorkflowCacheAccess(CacheAccess[set[Workflow]]): - """ - To reduce look-ups, this uses id's instead of requiring the full model for types - """ - - def __init__(self, detector_id: int, env_id: int | None): - self._key = f"{WORKFLOW_CACHE_PREFIX}:{detector_id}:{env_id}" - - def key(self) -> str: - return self._key - - def _invalidate_all_environments(detector_id: int) -> bool: """ Invalidate all cache entries for a detector across all environments. @@ -84,11 +82,11 @@ def _invalidate_all_environments(detector_id: int) -> bool: .values_list("workflow__environment_id", flat=True) ) - keys = {_WorkflowCacheAccess(detector_id, env_id).key() for env_id in environment_ids} - keys.add(_WorkflowCacheAccess(detector_id, None).key()) + keys = {_WorkflowCacheKey(detector_id, env_id) for env_id in environment_ids} + keys.add(_WorkflowCacheKey(detector_id, None)) if keys: - cache.delete_many(keys) + _workflow_cache.delete_many(keys) metrics_incr(f"{METRIC_PREFIX}.invalidated_all", value=len(keys)) return len(keys) > 0 @@ -117,7 +115,7 @@ def invalidate_processing_workflows( return _invalidate_all_environments(detector_id) metrics_incr(f"{METRIC_PREFIX}.invalidated") - return _WorkflowCacheAccess(detector_id, env_id).delete() + return _workflow_cache.delete(_WorkflowCacheKey(detector_id, env_id)) def _check_caches_for_detectors( @@ -136,15 +134,13 @@ def _check_caches_for_detectors( workflows: set[Workflow] = set() missed_detector_ids: list[int] = [] - for detector in detectors: - cache_access = _WorkflowCacheAccess(detector.id, env_id) - cached = cache_access.get() - + keys = [_WorkflowCacheKey(d.id, env_id) for d in detectors] + for key, cached in _workflow_cache.get_many(keys).items(): if cached is not None: workflows |= cached metrics_incr(f"{METRIC_PREFIX}.hit") else: - missed_detector_ids.append(detector.id) + missed_detector_ids.append(key.detector_id) metrics_incr(f"{METRIC_PREFIX}.miss") return _CacheLookupResult(workflows, missed_detector_ids) @@ -207,14 +203,20 @@ def _populate_detector_caches( split_workflows: SplitWorkflowsByDetector with global and env-specific workflows env_id: Environment ID for the env-specific cache (None for global-only query) """ - # Always store global workflows in env_id=None cache - for detector_id, workflows in split_workflows.global_workflows.mapping.items(): - _WorkflowCacheAccess(detector_id, None).set(workflows, CACHE_TTL) + data: dict[_WorkflowCacheKey, set[Workflow]] = { + _WorkflowCacheKey(detector_id, None): workflows + for detector_id, workflows in split_workflows.global_workflows.mapping.items() + } - # Store env-specific workflows in env_id=X cache (only if env_id was specified) if env_id is not None: - for detector_id, workflows in split_workflows.env_workflows.mapping.items(): - _WorkflowCacheAccess(detector_id, env_id).set(workflows, CACHE_TTL) + data.update( + { + _WorkflowCacheKey(detector_id, env_id): workflows + for detector_id, workflows in split_workflows.env_workflows.mapping.items() + } + ) + + _workflow_cache.set_many(data, CACHE_TTL) @scopedstats.timer() diff --git a/src/sentry/workflow_engine/handlers/condition/latest_release_handler.py b/src/sentry/workflow_engine/handlers/condition/latest_release_handler.py index 3b088ec4e45d06..4ddbaf261ea4d8 100644 --- a/src/sentry/workflow_engine/handlers/condition/latest_release_handler.py +++ b/src/sentry/workflow_engine/handlers/condition/latest_release_handler.py @@ -1,4 +1,4 @@ -from typing import Any, Literal +from typing import Any, Literal, NamedTuple from sentry import tagstore from sentry.models.environment import Environment @@ -12,38 +12,30 @@ from sentry.search.utils import get_latest_release from sentry.services.eventstore.models import GroupEvent from sentry.utils import metrics -from sentry.workflow_engine.caches import CacheAccess +from sentry.workflow_engine.caches import CacheAccess, CacheMapping from sentry.workflow_engine.models.data_condition import Condition from sentry.workflow_engine.registry import condition_handler_registry from sentry.workflow_engine.types import DataConditionHandler, WorkflowEventData -class _LatestReleaseCacheAccess(CacheAccess[Release | Literal[False]]): - """ - If we have a release for a project in an environment, we cache it. - If we don't, we cache False. - """ +class _LatestReleaseCacheKey(NamedTuple): + project_id: int + environment_id: int | None - def __init__(self, event: GroupEvent, environment: Environment | None): - self._key = latest_release_cache_key( - event.group.project_id, environment.id if environment else None - ) - def key(self) -> str: - return self._key +class _LatestAdoptedReleaseCacheKey(NamedTuple): + project_id: int + environment_id: int -class _LatestAdoptedReleaseCacheAccess(CacheAccess[Release | Literal[False]]): - """ - If we have a latest adopted release for a project in an environment, we cache it. - If we don't, we cache False. - """ - - def __init__(self, event: GroupEvent, environment: Environment): - self._key = latest_adopted_release_cache_key(event.group.project_id, environment.id) - - def key(self) -> str: - return self._key +# Cache mappings for latest release lookups. +# Values are Release objects, or False if no release exists (to cache negative lookups). +_latest_release_cache = CacheMapping[_LatestReleaseCacheKey, Release | Literal[False]]( + lambda key: latest_release_cache_key(key.project_id, key.environment_id) +) +_latest_adopted_release_cache = CacheMapping[ + _LatestAdoptedReleaseCacheKey, Release | Literal[False] +](lambda key: latest_adopted_release_cache_key(key.project_id, key.environment_id)) def get_latest_adopted_release_for_env( @@ -52,11 +44,12 @@ def get_latest_adopted_release_for_env( """ Get the latest adopted release for a project in an environment. """ + cache_key = _LatestAdoptedReleaseCacheKey(event.group.project_id, environment.id) return _get_latest_release_for_env_impl( environment, event, only_adopted=True, - cache_access=_LatestAdoptedReleaseCacheAccess(event, environment), + cache_access=_latest_adopted_release_cache.accessor(cache_key), ) @@ -68,11 +61,14 @@ def get_latest_release_for_env( Get the latest release for a project in an environment. NOTE: This is independent of whether it has been adopted or not. """ + cache_key = _LatestReleaseCacheKey( + event.group.project_id, environment.id if environment else None + ) return _get_latest_release_for_env_impl( environment, event, only_adopted=False, - cache_access=_LatestReleaseCacheAccess(event, environment), + cache_access=_latest_release_cache.accessor(cache_key), ) diff --git a/tests/sentry/workflow_engine/caches/test_cache_access.py b/tests/sentry/workflow_engine/caches/test_cache_access.py index 27c1d72057fdb5..f3a0496b0de372 100644 --- a/tests/sentry/workflow_engine/caches/test_cache_access.py +++ b/tests/sentry/workflow_engine/caches/test_cache_access.py @@ -1,5 +1,8 @@ +import pytest + from sentry.testutils.cases import TestCase -from sentry.workflow_engine.caches.cache_access import CacheAccess +from sentry.workflow_engine.caches import cache_access +from sentry.workflow_engine.caches.cache_access import CacheAccess, CacheMapping class _TestCacheAccess(CacheAccess[str]): @@ -48,3 +51,116 @@ def test_delete__removes_value(self) -> None: def test_delete__returns_false_when_key_not_exists(self) -> None: result = self.cache_access.delete() assert result is False + + +class TestCacheMapping(TestCase): + def setUp(self) -> None: + # Clear registered namespaces between tests + cache_access._registered_namespaces.clear() + self.mapping = CacheMapping[int, str](lambda x: str(x)) + + def test_key(self) -> None: + assert self.mapping.key(123) == "123" + + def test_get__returns_none_when_not_set(self) -> None: + assert self.mapping.get(1) is None + + def test_get__returns_value_when_set(self) -> None: + self.mapping.set(1, "test_value", 60) + assert self.mapping.get(1) == "test_value" + + def test_set__stores_value(self) -> None: + self.mapping.set(42, "stored_value", 60) + assert self.mapping.get(42) == "stored_value" + + def test_set__overwrites_existing_value(self) -> None: + self.mapping.set(1, "first_value", 60) + self.mapping.set(1, "second_value", 60) + assert self.mapping.get(1) == "second_value" + + def test_delete__removes_value(self) -> None: + self.mapping.set(1, "value_to_delete", 60) + assert self.mapping.get(1) == "value_to_delete" + + result = self.mapping.delete(1) + + assert result is True + assert self.mapping.get(1) is None + + def test_delete__returns_false_when_key_not_exists(self) -> None: + result = self.mapping.delete(999) + assert result is False + + def test_get_many__returns_empty_dict_for_empty_input(self) -> None: + assert self.mapping.get_many([]) == {} + + def test_get_many__returns_values(self) -> None: + for i in range(5): + if i % 2 == 0: + self.mapping.set(i, f"value_{i}") + + results = self.mapping.get_many([0, 1, 2, 3, 4]) + + assert results == { + 0: "value_0", + 1: None, + 2: "value_2", + 3: None, + 4: "value_4", + } + + def test_set_many__returns_empty_list_for_empty_input(self) -> None: + assert self.mapping.set_many({}) == [] + + def test_set_many__sets_values(self) -> None: + data = {i: f"value_{i}" for i in range(5)} + failed = self.mapping.set_many(data, 60) + + assert failed == [] + for i in range(5): + assert self.mapping.get(i) == f"value_{i}" + + def test_delete_many__deletes_values(self) -> None: + for i in range(4): + self.mapping.set(i, f"value_{i}", 60) + + self.mapping.delete_many([0, 1, 2, 3]) + + for i in range(4): + assert self.mapping.get(i) is None + + def test_delete_many__handles_empty_input(self) -> None: + self.mapping.delete_many([]) + + def test_accessor__returns_cache_access(self) -> None: + accessor = self.mapping.accessor(123) + assert accessor.key() == "123" + + def test_accessor__get_set_delete(self) -> None: + accessor = self.mapping.accessor(42) + + assert accessor.get() is None + accessor.set("test_value", 60) + assert accessor.get() == "test_value" + assert self.mapping.get(42) == "test_value" + + accessor.delete() + assert accessor.get() is None + + def test_namespace__prefixes_key(self) -> None: + mapping = CacheMapping[int, str](lambda x: str(x), namespace="test_ns") + assert mapping.key(123) == "test_ns:123" + + def test_namespace__collision_raises(self) -> None: + CacheMapping[int, str](lambda x: str(x), namespace="unique_ns") + with pytest.raises(ValueError, match="already registered"): + CacheMapping[int, str](lambda x: str(x), namespace="unique_ns") + + def test_namespace__accessor_uses_namespace(self) -> None: + mapping = CacheMapping[int, str](lambda x: str(x), namespace="accessor_ns") + accessor = mapping.accessor(42) + assert accessor.key() == "accessor_ns:42" + + def test_namespace__none_does_not_prefix(self) -> None: + mapping = CacheMapping[int, str](lambda x: f"raw:{x}") + assert mapping.key(123) == "raw:123" diff --git a/tests/sentry/workflow_engine/caches/test_workflow.py b/tests/sentry/workflow_engine/caches/test_workflow.py index 93ccea25d86e61..24c182b35ed9cf 100644 --- a/tests/sentry/workflow_engine/caches/test_workflow.py +++ b/tests/sentry/workflow_engine/caches/test_workflow.py @@ -5,7 +5,8 @@ _populate_detector_caches, _query_workflows_by_detector_ids, _SplitWorkflowsByDetector, - _WorkflowCacheAccess, + _workflow_cache, + _WorkflowCacheKey, _WorkflowsByDetector, get_workflows_by_detectors, invalidate_processing_workflows, @@ -20,11 +21,11 @@ def setUp(self) -> None: self.detector = self.create_detector() def test_processing_workflow_cache_key(self) -> None: - key = _WorkflowCacheAccess(self.detector.id, self.environment.id).key() + key = _workflow_cache.key(_WorkflowCacheKey(self.detector.id, self.environment.id)) assert key == f"workflows_by_detector_env:{self.detector.id}:{self.environment.id}" def test_processing_workflow_cache_key__no_env(self) -> None: - key = _WorkflowCacheAccess(self.detector.id, None).key() + key = _workflow_cache.key(_WorkflowCacheKey(self.detector.id, None)) assert key == f"workflows_by_detector_env:{self.detector.id}:None" @@ -45,16 +46,16 @@ def setUp(self) -> None: detector=self.detector, workflow=self.workflow_no_env ) - self.cache = _WorkflowCacheAccess(self.detector.id, self.environment.id) - self.cache_no_env = _WorkflowCacheAccess(self.detector.id, None) + self.cache_key = _WorkflowCacheKey(self.detector.id, self.environment.id) + self.cache_key_no_env = _WorkflowCacheKey(self.detector.id, None) # warm the cache for the invalidation tests self.cache_value = {self.workflow} - self.cache.set(self.cache_value, 60) - self.cache_no_env.set(self.cache_value, 60) + _workflow_cache.set(self.cache_key, self.cache_value, 60) + _workflow_cache.set(self.cache_key_no_env, self.cache_value, 60) - assert self.cache.get() == self.cache_value - assert self.cache_no_env.get() == self.cache_value + assert _workflow_cache.get(self.cache_key) == self.cache_value + assert _workflow_cache.get(self.cache_key_no_env) == self.cache_value def _env_and_workflow(self, detector: Detector | None = None) -> tuple[Environment, Workflow]: if detector is None: @@ -70,22 +71,22 @@ def test_cache_invalidate__by_detector_and_env(self) -> None: invalidate_processing_workflows(self.detector.id, self.environment.id) # Removes all items for the detector + env - assert self.cache.get() is None + assert _workflow_cache.get(self.cache_key) is None # Other value is still set - assert self.cache_no_env.get() == self.cache_value + assert _workflow_cache.get(self.cache_key_no_env) == self.cache_value def test_cache_invalidate__by_detector(self) -> None: env, workflow = self._env_and_workflow() - workflow_cache = _WorkflowCacheAccess(self.detector.id, env.id) - workflow_cache.set({workflow}, 60) + env_key = _WorkflowCacheKey(self.detector.id, env.id) + _workflow_cache.set(env_key, {workflow}, 60) invalidate_processing_workflows(self.detector.id) - assert self.cache.get() is None - assert self.cache_no_env.get() is None - assert workflow_cache.get() is None + assert _workflow_cache.get(self.cache_key) is None + assert _workflow_cache.get(self.cache_key_no_env) is None + assert _workflow_cache.get(env_key) is None class TestGetWorkflowsByDetectors(TestCase): @@ -122,15 +123,15 @@ def test_multiple_detectors_unions_results(self) -> None: def test_cache_populated_after_cold_lookup(self) -> None: # Cache should be cold initially - both global and env-specific env_id = self.environment.id - global_cache1 = _WorkflowCacheAccess(self.detector1.id, None) - global_cache2 = _WorkflowCacheAccess(self.detector2.id, None) - env_cache1 = _WorkflowCacheAccess(self.detector1.id, env_id) - env_cache2 = _WorkflowCacheAccess(self.detector2.id, env_id) + global_key1 = _WorkflowCacheKey(self.detector1.id, None) + global_key2 = _WorkflowCacheKey(self.detector2.id, None) + env_key1 = _WorkflowCacheKey(self.detector1.id, env_id) + env_key2 = _WorkflowCacheKey(self.detector2.id, env_id) - assert global_cache1.get() is None - assert global_cache2.get() is None - assert env_cache1.get() is None - assert env_cache2.get() is None + assert _workflow_cache.get(global_key1) is None + assert _workflow_cache.get(global_key2) is None + assert _workflow_cache.get(env_key1) is None + assert _workflow_cache.get(env_key2) is None # Call the function (cold cache) get_workflows_by_detectors([self.detector1, self.detector2], self.environment) @@ -138,23 +139,26 @@ def test_cache_populated_after_cold_lookup(self) -> None: # Now BOTH caches should be populated for each detector: # - Global cache (env_id=None) stores global workflows (empty in this case) # - Env cache stores env-specific workflows - assert global_cache1.get() == set() - assert global_cache2.get() == set() - assert env_cache1.get() == {self.workflow1, self.shared_workflow} - assert env_cache2.get() == {self.workflow2, self.shared_workflow} + assert _workflow_cache.get(global_key1) == set() + assert _workflow_cache.get(global_key2) == set() + assert _workflow_cache.get(env_key1) == {self.workflow1, self.shared_workflow} + assert _workflow_cache.get(env_key2) == {self.workflow2, self.shared_workflow} def test_hot_cache_no_db_query(self) -> None: env_id = self.environment.id # Pre-populate both global and env caches - global_cache1 = _WorkflowCacheAccess(self.detector1.id, None) - global_cache2 = _WorkflowCacheAccess(self.detector2.id, None) - env_cache1 = _WorkflowCacheAccess(self.detector1.id, env_id) - env_cache2 = _WorkflowCacheAccess(self.detector2.id, env_id) - - global_cache1.set(set(), 60) - global_cache2.set(set(), 60) - env_cache1.set({self.workflow1, self.shared_workflow}, 60) - env_cache2.set({self.workflow2, self.shared_workflow}, 60) + _workflow_cache.set(_WorkflowCacheKey(self.detector1.id, None), set(), 60) + _workflow_cache.set(_WorkflowCacheKey(self.detector2.id, None), set(), 60) + _workflow_cache.set( + _WorkflowCacheKey(self.detector1.id, env_id), + {self.workflow1, self.shared_workflow}, + 60, + ) + _workflow_cache.set( + _WorkflowCacheKey(self.detector2.id, env_id), + {self.workflow2, self.shared_workflow}, + 60, + ) # Call the function - should return cached values from both caches result = get_workflows_by_detectors([self.detector1, self.detector2], self.environment) @@ -162,24 +166,26 @@ def test_hot_cache_no_db_query(self) -> None: def test_partial_cache_hit(self) -> None: env_id = self.environment.id - global_cache1 = _WorkflowCacheAccess(self.detector1.id, None) - global_cache2 = _WorkflowCacheAccess(self.detector2.id, None) - env_cache1 = _WorkflowCacheAccess(self.detector1.id, env_id) - env_cache2 = _WorkflowCacheAccess(self.detector2.id, env_id) + global_key2 = _WorkflowCacheKey(self.detector2.id, None) + env_key2 = _WorkflowCacheKey(self.detector2.id, env_id) # Pre-populate only detector1's caches (both global and env) - global_cache1.set(set(), 60) - env_cache1.set({self.workflow1, self.shared_workflow}, 60) - assert global_cache2.get() is None - assert env_cache2.get() is None + _workflow_cache.set(_WorkflowCacheKey(self.detector1.id, None), set(), 60) + _workflow_cache.set( + _WorkflowCacheKey(self.detector1.id, env_id), + {self.workflow1, self.shared_workflow}, + 60, + ) + assert _workflow_cache.get(global_key2) is None + assert _workflow_cache.get(env_key2) is None # Call the function - detector1 hits cache, detector2 misses result = get_workflows_by_detectors([self.detector1, self.detector2], self.environment) assert result == {self.workflow1, self.workflow2, self.shared_workflow} # detector2's caches should now be populated - assert global_cache2.get() == set() - assert env_cache2.get() == {self.workflow2, self.shared_workflow} + assert _workflow_cache.get(global_key2) == set() + assert _workflow_cache.get(env_key2) == {self.workflow2, self.shared_workflow} def test_no_environment_filter(self) -> None: # Create workflow with no environment @@ -201,13 +207,13 @@ def test_environment_includes_none_environment_workflows(self) -> None: # Verify workflows are stored in SEPARATE cache entries env_id = self.environment.id - global_cache = _WorkflowCacheAccess(self.detector1.id, None) - env_cache = _WorkflowCacheAccess(self.detector1.id, env_id) + global_key = _WorkflowCacheKey(self.detector1.id, None) + env_key = _WorkflowCacheKey(self.detector1.id, env_id) # Global workflow should be in global cache only - assert global_cache.get() == {workflow_no_env} + assert _workflow_cache.get(global_key) == {workflow_no_env} # Env workflow should be in env cache only - assert env_cache.get() == {self.workflow1, self.shared_workflow} + assert _workflow_cache.get(env_key) == {self.workflow1, self.shared_workflow} def test_global_workflow_invalidation_doesnt_affect_env_cache(self) -> None: """ @@ -226,18 +232,18 @@ def test_global_workflow_invalidation_doesnt_affect_env_cache(self) -> None: assert self.workflow1 in result # Verify both caches are populated - global_cache = _WorkflowCacheAccess(self.detector1.id, None) - env_cache = _WorkflowCacheAccess(self.detector1.id, env_id) - assert global_cache.get() == {workflow_no_env} - assert env_cache.get() == {self.workflow1, self.shared_workflow} + global_key = _WorkflowCacheKey(self.detector1.id, None) + env_key = _WorkflowCacheKey(self.detector1.id, env_id) + assert _workflow_cache.get(global_key) == {workflow_no_env} + assert _workflow_cache.get(env_key) == {self.workflow1, self.shared_workflow} # Invalidate the global workflow's cache entry invalidate_processing_workflows(self.detector1.id, None) # Global cache should be invalidated - assert global_cache.get() is None + assert _workflow_cache.get(global_key) is None # Env cache should NOT be affected - assert env_cache.get() == {self.workflow1, self.shared_workflow} + assert _workflow_cache.get(env_key) == {self.workflow1, self.shared_workflow} def test_disabled_workflows_excluded(self) -> None: self.workflow1.enabled = False @@ -267,10 +273,8 @@ def test_all_cache_misses(self) -> None: def test_all_cache_hits(self) -> None: env_id = self.environment.id - cache1 = _WorkflowCacheAccess(self.detector1.id, env_id) - cache2 = _WorkflowCacheAccess(self.detector2.id, env_id) - cache1.set({self.workflow1}, 60) - cache2.set({self.workflow2}, 60) + _workflow_cache.set(_WorkflowCacheKey(self.detector1.id, env_id), {self.workflow1}, 60) + _workflow_cache.set(_WorkflowCacheKey(self.detector2.id, env_id), {self.workflow2}, 60) result = _check_caches_for_detectors([self.detector1, self.detector2], env_id) @@ -280,8 +284,7 @@ def test_all_cache_hits(self) -> None: def test_partial_cache_hit(self) -> None: env_id = self.environment.id - cache1 = _WorkflowCacheAccess(self.detector1.id, env_id) - cache1.set({self.workflow1}, 60) + _workflow_cache.set(_WorkflowCacheKey(self.detector1.id, env_id), {self.workflow1}, 60) result = _check_caches_for_detectors([self.detector1, self.detector2], env_id) @@ -409,16 +412,14 @@ def test_populates_both_global_and_env_caches(self) -> None: _populate_detector_caches(split_workflows, env_id) # Check global caches (env_id=None) - global_cache1 = _WorkflowCacheAccess(self.detector1.id, None) - global_cache2 = _WorkflowCacheAccess(self.detector2.id, None) - assert global_cache1.get() == {self.workflow_no_env} - assert global_cache2.get() == set() + assert _workflow_cache.get(_WorkflowCacheKey(self.detector1.id, None)) == { + self.workflow_no_env + } + assert _workflow_cache.get(_WorkflowCacheKey(self.detector2.id, None)) == set() # Check env-specific caches - env_cache1 = _WorkflowCacheAccess(self.detector1.id, env_id) - env_cache2 = _WorkflowCacheAccess(self.detector2.id, env_id) - assert env_cache1.get() == {self.workflow1} - assert env_cache2.get() == {self.workflow2} + assert _workflow_cache.get(_WorkflowCacheKey(self.detector1.id, env_id)) == {self.workflow1} + assert _workflow_cache.get(_WorkflowCacheKey(self.detector2.id, env_id)) == {self.workflow2} def test_populates_empty_sets(self) -> None: env_id = self.environment.id @@ -429,10 +430,8 @@ def test_populates_empty_sets(self) -> None: _populate_detector_caches(split_workflows, env_id) - global_cache = _WorkflowCacheAccess(self.detector1.id, None) - env_cache = _WorkflowCacheAccess(self.detector1.id, env_id) - assert global_cache.get() == set() - assert env_cache.get() == set() + assert _workflow_cache.get(_WorkflowCacheKey(self.detector1.id, None)) == set() + assert _workflow_cache.get(_WorkflowCacheKey(self.detector1.id, env_id)) == set() def test_only_populates_global_when_env_is_none(self) -> None: """When env_id is None, only global cache should be populated.""" @@ -444,9 +443,11 @@ def test_only_populates_global_when_env_is_none(self) -> None: _populate_detector_caches(split_workflows, None) # Global cache should be populated - global_cache = _WorkflowCacheAccess(self.detector1.id, None) - assert global_cache.get() == {self.workflow_no_env} + assert _workflow_cache.get(_WorkflowCacheKey(self.detector1.id, None)) == { + self.workflow_no_env + } # Env cache should NOT be populated (env_id=None means global-only query) - env_cache = _WorkflowCacheAccess(self.detector1.id, self.environment.id) - assert env_cache.get() is None + assert ( + _workflow_cache.get(_WorkflowCacheKey(self.detector1.id, self.environment.id)) is None + )