diff --git a/snuba/query/allocation_policies/dynamic_concurrent_queries.py b/snuba/query/allocation_policies/dynamic_concurrent_queries.py new file mode 100644 index 0000000000..9bc57a9a3d --- /dev/null +++ b/snuba/query/allocation_policies/dynamic_concurrent_queries.py @@ -0,0 +1,250 @@ +from __future__ import annotations + +import logging + +from snuba.configs.configuration import Configuration +from snuba.query.allocation_policies import ( + CROSS_ORG_SUGGESTION, + MAX_THRESHOLD, + NO_SUGGESTION, + PASS_THROUGH_REFERRERS_SUGGESTION, + InvalidTenantsForAllocationPolicy, + QueryResultOrError, + QuotaAllowance, +) +from snuba.query.allocation_policies.concurrent_rate_limit import ( + _PASS_THROUGH_REFERRERS, + BaseConcurrentRateLimitAllocationPolicy, +) +from snuba.state.rate_limit import RateLimitParameters + +logger = logging.getLogger("snuba.query.allocation_policy_dynamic_concurrent_queries") + +QUOTA_UNIT = "concurrent_queries" +SUGGESTION = ( + "The cluster is close to its maximum sustainable concurrent query load and this " + "organization is sending more concurrent queries than its fair share. Reduce the " + "number of concurrent queries this organization is sending." +) + +# The global bucket is shared by every org querying a single storage (and therefore a +# single ClickHouse cluster). Because `component_name()` already prefixes redis keys with +# the storage key, this bucket name only needs to be unique within a storage. +GLOBAL_BUCKET = "__global__" + +DEFAULT_GLOBAL_SOFT_LIMIT = 200 +DEFAULT_PER_ORG_SOFT_LIMIT = 22 + + +class DynamicConcurrentQueries(BaseConcurrentRateLimitAllocationPolicy): + """Allows all queries for every org while there is spare capacity on the cluster, and + sheds load from the heaviest orgs once the cluster approaches its sustainable limit. + + Two soft limits drive the policy: + + * ``global_soft_limit`` - the number of concurrent queries (across every org hitting + this storage) that we consider "close to the maximum the cluster can sustain". While + the global concurrent count is at or below this value, every query for every org is + allowed through. + * ``per_org_soft_limit`` - the number of concurrent queries each org is softly + guaranteed. Orgs are only candidates for rejection once they exceed this. + + Once the global concurrent count climbs above ``global_soft_limit`` the policy starts + shedding load proportionally to how far the cluster is over the soft limit. The + effective per-org ceiling is:: + + effective_limit = max(1, floor(per_org_soft_limit * global_soft_limit / global_concurrent)) + + which equals ``per_org_soft_limit`` right at the soft limit and shrinks as global + pressure rises. Because an org is rejected when its own concurrent count exceeds this + shrinking ceiling, the orgs that are *most* above their soft limit are rejected first, + and progressively more orgs get shed as the cluster gets busier. Orgs running a small + number of queries keep running (an org may always run at least one query), which keeps + the impact focused on the heaviest abusers. + + Per-org soft limits can be overridden for individual orgs via + ``organization_soft_limit_override``. + """ + + @property + def rate_limit_name(self) -> str: + return "dynamic_concurrent_queries_policy" + + def _additional_config_definitions(self) -> list[Configuration]: + return super()._additional_config_definitions() + [ + Configuration( + name="global_soft_limit", + description=( + "Number of concurrent queries (across all orgs on this storage) that " + "is considered close to the maximum the cluster can sustain. While the " + "global concurrent count is at or below this value, all queries are " + "allowed. Above it, the policy starts rejecting queries from the orgs " + "that are furthest above their per-org soft limit." + ), + value_type=int, + default=DEFAULT_GLOBAL_SOFT_LIMIT, + ), + Configuration( + name="per_org_soft_limit", + description=( + "Number of concurrent queries each org is softly guaranteed. Orgs at " + "or below this are always allowed; orgs above it are the first to be " + "rejected once the cluster crosses the global soft limit." + ), + value_type=int, + default=DEFAULT_PER_ORG_SOFT_LIMIT, + ), + Configuration( + name="organization_soft_limit_override", + description="Override the per-org soft limit for a specific organization_id.", + value_type=int, + default=-1, + param_types={"organization_id": int}, + ), + ] + + def _get_org_id(self, tenant_ids: dict[str, str | int]) -> int: + org_id = tenant_ids.get("organization_id") + if org_id is None: + raise InvalidTenantsForAllocationPolicy.from_args( + tenant_ids, + self.__class__.__name__, + "tenant_ids must include organization_id", + ) + return int(org_id) + + def _get_per_org_soft_limit(self, org_id: int) -> int: + override = self.get_config_value( + "organization_soft_limit_override", params={"organization_id": org_id} + ) + if override != -1: + return int(override) + return int(self.get_config_value("per_org_soft_limit")) + + def _global_rate_limit_params(self) -> RateLimitParameters: + return RateLimitParameters( + self.rate_limit_name, + bucket=GLOBAL_BUCKET, + per_second_limit=None, + concurrent_limit=int(self.get_config_value("global_soft_limit")), + ) + + def _org_rate_limit_params(self, org_id: int, per_org_soft_limit: int) -> RateLimitParameters: + return RateLimitParameters( + self.rate_limit_name, + bucket=str(org_id), + per_second_limit=None, + concurrent_limit=per_org_soft_limit, + ) + + def _get_quota_allowance( + self, tenant_ids: dict[str, str | int], query_id: str + ) -> QuotaAllowance: + if tenant_ids.get("referrer", "no_referrer") in _PASS_THROUGH_REFERRERS: + return QuotaAllowance( + can_run=True, + max_threads=self.max_threads, + explanation={"reason": "pass_through"}, + is_throttled=False, + throttle_threshold=MAX_THRESHOLD, + rejection_threshold=MAX_THRESHOLD, + quota_used=0, + quota_unit=QUOTA_UNIT, + suggestion=PASS_THROUGH_REFERRERS_SUGGESTION, + ) + if self.is_cross_org_query(tenant_ids): + return QuotaAllowance( + can_run=True, + max_threads=self.max_threads, + explanation={"reason": "cross_org"}, + is_throttled=False, + throttle_threshold=MAX_THRESHOLD, + rejection_threshold=MAX_THRESHOLD, + quota_used=0, + quota_unit=QUOTA_UNIT, + suggestion=CROSS_ORG_SUGGESTION, + ) + + org_id = self._get_org_id(tenant_ids) + global_soft_limit = int(self.get_config_value("global_soft_limit")) + per_org_soft_limit = self._get_per_org_soft_limit(org_id) + + # Both calls register this query in their respective redis bucket and return the + # current concurrent count (including this query). + global_stats, _, _ = self._is_within_rate_limit(query_id, self._global_rate_limit_params()) + org_stats, _, _ = self._is_within_rate_limit( + query_id, self._org_rate_limit_params(org_id, per_org_soft_limit) + ) + + global_concurrent = global_stats.concurrent + org_concurrent = org_stats.concurrent + + if global_concurrent == -1 or org_concurrent == -1: + # the rate limiter errored, fail open + return QuotaAllowance( + can_run=True, + max_threads=self.max_threads, + explanation={"reason": "rate limiter errored, failing open"}, + is_throttled=False, + throttle_threshold=MAX_THRESHOLD, + rejection_threshold=MAX_THRESHOLD, + quota_used=0, + quota_unit=QUOTA_UNIT, + suggestion=NO_SUGGESTION, + ) + + if global_concurrent <= global_soft_limit: + # The cluster has spare capacity, allow everything. + can_run = True + effective_limit = per_org_soft_limit + why = "within global soft limit" + else: + # The cluster is over its global soft limit. Shrink the per-org ceiling + # proportionally to how far over we are, so the orgs furthest above their + # soft limit get shed first. An org may always run at least one query. + effective_limit = max(1, (per_org_soft_limit * global_soft_limit) // global_concurrent) + can_run = org_concurrent <= effective_limit + why = ( + f"global concurrent {global_concurrent} exceeds global soft limit " + f"{global_soft_limit}; org concurrent {org_concurrent} " + f"{'within' if can_run else 'exceeds'} effective limit {effective_limit}" + ) + + return QuotaAllowance( + can_run=can_run, + max_threads=self.max_threads if can_run else 0, + explanation={ + "reason": why, + "global_concurrent": global_concurrent, + "global_soft_limit": global_soft_limit, + "org_concurrent": org_concurrent, + "per_org_soft_limit": per_org_soft_limit, + "effective_org_limit": effective_limit, + }, + is_throttled=False, + throttle_threshold=effective_limit, + rejection_threshold=effective_limit, + quota_used=org_concurrent, + quota_unit=QUOTA_UNIT, + suggestion=NO_SUGGESTION if can_run else SUGGESTION, + ) + + def _update_quota_balance( + self, + tenant_ids: dict[str, str | int], + query_id: str, + result_or_error: QueryResultOrError, + ) -> None: + if self.is_cross_org_query(tenant_ids): + return + if tenant_ids.get("referrer", "no_referrer") in _PASS_THROUGH_REFERRERS: + return + org_id = self._get_org_id(tenant_ids) + per_org_soft_limit = self._get_per_org_soft_limit(org_id) + # Release the query from both buckets it was registered in. + self._end_query(query_id, self._global_rate_limit_params(), result_or_error) + self._end_query( + query_id, + self._org_rate_limit_params(org_id, per_org_soft_limit), + result_or_error, + ) diff --git a/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py b/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py index 648a8ab981..d5e4f14b1c 100644 --- a/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py +++ b/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py @@ -48,6 +48,9 @@ from snuba.query.allocation_policies.concurrent_rate_limit import ( ConcurrentRateLimitAllocationPolicy, ) +from snuba.query.allocation_policies.dynamic_concurrent_queries import ( + DynamicConcurrentQueries, +) from snuba.query.allocation_policies.per_referrer import ReferrerGuardRailPolicy from snuba.query.allocation_policies.utils import get_max_bytes_to_read from snuba.query.query_settings import HTTPQuerySettings @@ -377,6 +380,11 @@ def get_allocation_policies(self) -> list[AllocationPolicy]: required_tenant_types=["organization_id", "project_id", "referrer"], default_config_overrides={"is_active": 0, "is_enforced": 0}, ), + DynamicConcurrentQueries( + storage_key=EAP_RESOURCE_IDENTIFIER, + required_tenant_types=["organization_id"], + default_config_overrides={"is_active": 0, "is_enforced": 0}, + ), ] def get_delete_allocation_policies(self) -> list[AllocationPolicy]: diff --git a/tests/query/allocation_policies/test_dynamic_concurrent_queries_policy.py b/tests/query/allocation_policies/test_dynamic_concurrent_queries_policy.py new file mode 100644 index 0000000000..31421ecae1 --- /dev/null +++ b/tests/query/allocation_policies/test_dynamic_concurrent_queries_policy.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +import pytest + +from snuba.configs.configuration import ResourceIdentifier +from snuba.datasets.storages.storage_key import StorageKey +from snuba.query.allocation_policies import ( + AllocationPolicyViolations, + QueryResultOrError, +) +from snuba.query.allocation_policies.dynamic_concurrent_queries import ( + DynamicConcurrentQueries, +) +from snuba.web import QueryException, QueryResult + +_RESULT_SUCCESS = QueryResultOrError( + QueryResult( + result={"profile": {"bytes": 42069}}, + extra={"stats": {}, "sql": "", "experiments": {}}, + ), + error=None, +) + +_QUERY_EXCEPTION = QueryException() +_QUERY_EXCEPTION.__cause__ = AllocationPolicyViolations("some policy was violated") +_RESULT_FAIL = QueryResultOrError(None, error=_QUERY_EXCEPTION) + +GLOBAL_SOFT_LIMIT = 10 +PER_ORG_SOFT_LIMIT = 4 + + +@pytest.fixture(scope="function") +def policy() -> DynamicConcurrentQueries: + return DynamicConcurrentQueries( + storage_key=ResourceIdentifier(StorageKey("test")), + required_tenant_types=["organization_id"], + default_config_overrides={ + "global_soft_limit": GLOBAL_SOFT_LIMIT, + "per_org_soft_limit": PER_ORG_SOFT_LIMIT, + }, + ) + + +@pytest.mark.redis_db +def test_allows_all_orgs_under_global_soft_limit( + policy: DynamicConcurrentQueries, +) -> None: + # Spread queries across orgs while staying under the global soft limit. Even though a + # single org can go well above its per-org soft limit, everything is allowed because + # the cluster has spare capacity. + for i in range(GLOBAL_SOFT_LIMIT): + allowance = policy.get_quota_allowance( + tenant_ids={"organization_id": 123}, query_id=f"q{i}" + ) + assert allowance.can_run, f"query {i} should have been allowed" + + +@pytest.mark.redis_db +def test_sheds_heavy_org_over_global_soft_limit( + policy: DynamicConcurrentQueries, +) -> None: + # One heavy org pushes the cluster past the global soft limit on its own. + for i in range(GLOBAL_SOFT_LIMIT): + assert policy.get_quota_allowance( + tenant_ids={"organization_id": 123}, query_id=f"heavy{i}" + ).can_run + + # The cluster is now at the global soft limit; the next query crosses it. The heavy + # org is far above its (proportionally shrunk) effective limit, so it is rejected. + allowance = policy.get_quota_allowance( + tenant_ids={"organization_id": 123}, query_id="heavy_over" + ) + assert not allowance.can_run and allowance.max_threads == 0 + + +@pytest.mark.redis_db +def test_protects_small_org_when_cluster_busy( + policy: DynamicConcurrentQueries, +) -> None: + # A heavy org saturates the cluster past the global soft limit. + for i in range(GLOBAL_SOFT_LIMIT + 5): + policy.get_quota_allowance(tenant_ids={"organization_id": 123}, query_id=f"heavy{i}") + + # A different org sending only a single query is still allowed even though the cluster + # is over its global soft limit, because an org may always run at least one query. + allowance = policy.get_quota_allowance(tenant_ids={"organization_id": 456}, query_id="small1") + assert allowance.can_run + + +@pytest.mark.redis_db +def test_finished_queries_free_capacity( + policy: DynamicConcurrentQueries, +) -> None: + for i in range(GLOBAL_SOFT_LIMIT): + policy.get_quota_allowance(tenant_ids={"organization_id": 123}, query_id=f"q{i}") + + assert not policy.get_quota_allowance( + tenant_ids={"organization_id": 123}, query_id="over" + ).can_run + + # Finish several queries to drop the global concurrent count back under the soft limit. + for i in range(GLOBAL_SOFT_LIMIT): + policy.update_quota_balance( + tenant_ids={"organization_id": 123}, + query_id=f"q{i}", + result_or_error=_RESULT_SUCCESS, + ) + + assert policy.get_quota_allowance(tenant_ids={"organization_id": 123}, query_id="after").can_run + + +@pytest.mark.redis_db +def test_organization_soft_limit_override( + policy: DynamicConcurrentQueries, +) -> None: + # Give org 123 a tiny soft limit and a low global soft limit so it gets shed quickly, + # while org 456 keeps the default per-org soft limit. + policy.set_config_value("global_soft_limit", 4) + policy.set_config_value("organization_soft_limit_override", 1, params={"organization_id": 123}) + + # Fill the cluster past the global soft limit using org 456. + for i in range(5): + policy.get_quota_allowance(tenant_ids={"organization_id": 456}, query_id=f"other{i}") + + # org 123 already has one query running (its overridden soft limit), so a second one + # is rejected once the cluster is over the global soft limit. + assert policy.get_quota_allowance( + tenant_ids={"organization_id": 123}, query_id="o123_1" + ).can_run + assert not policy.get_quota_allowance( + tenant_ids={"organization_id": 123}, query_id="o123_2" + ).can_run + + +@pytest.mark.redis_db +def test_rejected_query_not_counted_after_balance_update( + policy: DynamicConcurrentQueries, +) -> None: + for i in range(GLOBAL_SOFT_LIMIT): + policy.get_quota_allowance(tenant_ids={"organization_id": 123}, query_id=f"q{i}") + rejected = policy.get_quota_allowance(tenant_ids={"organization_id": 123}, query_id="rejected") + assert not rejected.can_run + + # The rejected query is removed from both buckets so it does not leak concurrency. + policy.update_quota_balance( + tenant_ids={"organization_id": 123}, + query_id="rejected", + result_or_error=_RESULT_FAIL, + ) + + +def test_pass_through_referrer(policy: DynamicConcurrentQueries) -> None: + for i in range(GLOBAL_SOFT_LIMIT * 2): + assert policy.get_quota_allowance( + tenant_ids={"referrer": "subscriptions_executor", "organization_id": 1}, + query_id=f"q{i}", + ).can_run + + +@pytest.mark.redis_db +def test_cross_org(policy: DynamicConcurrentQueries) -> None: + tenant_ids: dict[str, str | int] = { + "referrer": "do_something", + "cross_org_query": 1, + } + assert policy.get_quota_allowance(tenant_ids=tenant_ids, query_id="a").can_run + # update must not raise for cross-org queries + policy.update_quota_balance(tenant_ids, "a", _RESULT_SUCCESS) + + +@pytest.mark.redis_db +def test_missing_organization_id_rejected( + policy: DynamicConcurrentQueries, +) -> None: + from unittest import mock + + with mock.patch("snuba.settings.RAISE_ON_ALLOCATION_POLICY_FAILURES", False): + allowance = policy.get_quota_allowance({"referrer": "abcd"}, "1234") + assert not allowance.can_run and allowance.max_threads == 0 + # does not raise + policy.update_quota_balance({"referrer": "abcd"}, "1234", _RESULT_SUCCESS) diff --git a/tests/web/rpc/v1/test_storage_routing.py b/tests/web/rpc/v1/test_storage_routing.py index 792bf70d7a..c94ee4c394 100644 --- a/tests/web/rpc/v1/test_storage_routing.py +++ b/tests/web/rpc/v1/test_storage_routing.py @@ -373,6 +373,18 @@ def _output_metrics(self, routing_context: RoutingContext) -> None: "suggestion": "no_suggestion", "max_bytes_to_read": 0, }, + "DynamicConcurrentQueries": { + "can_run": True, + "max_threads": 10, + "explanation": {"storage_key": "EAP"}, + "is_throttled": False, + "throttle_threshold": AnyInt(1000000000000), + "rejection_threshold": AnyInt(1000000000000), + "quota_used": 0, + "quota_unit": "no_units", + "suggestion": "no_suggestion", + "max_bytes_to_read": 0, + }, }, "clickhouse_settings": {"max_threads": 10}, "source_request_id": RANDOM_REQUEST_ID,