From 73211e274f58e3557d192647366fb04fb18213c3 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 12 Jun 2026 08:00:12 -0700 Subject: [PATCH] feat(capman): Add DynamicConcurrentQueries allocation policy Add an allocation policy that lets every org run queries while the cluster has spare capacity, and sheds load from the heaviest orgs once it approaches its sustainable limit. The policy tracks concurrent queries in two Redis buckets: a global bucket (per storage, i.e. per ClickHouse cluster) and a per-org bucket. While the global concurrent count is at or below the global soft limit, all queries for all orgs are allowed. Above it, the effective per-org ceiling shrinks proportionally to global pressure effective_limit = max(1, floor(per_org_soft_limit * global_soft_limit / global_concurrent)) so the orgs furthest above their per-org soft limit are rejected first, with progressively more orgs shed as the cluster gets busier. Small orgs keep running (an org may always run at least one query), focusing the impact on the heaviest senders. Per-org soft limits can be overridden per organization. Wire the policy into the shared EAP routing strategy allocation-policy list so it runs in the CBRS path alongside the existing policies, dormant by default (is_active=0, is_enforced=0) to match the staged-rollout pattern. It composes with regular allocation policies via the existing combine logic (can_run = all, max_threads = min) and is auto-discovered by snuba-admin capacity management for both routing strategies and regular storages. Co-Authored-By: Claude Opus 4.8 (1M context) Agent transcript: https://claudescope.sentry.dev/share/j-IJ2ngiYfk7NnJwDS2CYpZ2HFZu6AmUg4c58q0Xbc8 --- .../dynamic_concurrent_queries.py | 250 ++++++++++++++++++ .../routing_strategies/storage_routing.py | 8 + .../test_dynamic_concurrent_queries_policy.py | 181 +++++++++++++ tests/web/rpc/v1/test_storage_routing.py | 12 + 4 files changed, 451 insertions(+) create mode 100644 snuba/query/allocation_policies/dynamic_concurrent_queries.py create mode 100644 tests/query/allocation_policies/test_dynamic_concurrent_queries_policy.py diff --git a/snuba/query/allocation_policies/dynamic_concurrent_queries.py b/snuba/query/allocation_policies/dynamic_concurrent_queries.py new file mode 100644 index 0000000000..9bc57a9a3d --- /dev/null +++ b/snuba/query/allocation_policies/dynamic_concurrent_queries.py @@ -0,0 +1,250 @@ +from __future__ import annotations + +import logging + +from snuba.configs.configuration import Configuration +from snuba.query.allocation_policies import ( + CROSS_ORG_SUGGESTION, + MAX_THRESHOLD, + NO_SUGGESTION, + PASS_THROUGH_REFERRERS_SUGGESTION, + InvalidTenantsForAllocationPolicy, + QueryResultOrError, + QuotaAllowance, +) +from snuba.query.allocation_policies.concurrent_rate_limit import ( + _PASS_THROUGH_REFERRERS, + BaseConcurrentRateLimitAllocationPolicy, +) +from snuba.state.rate_limit import RateLimitParameters + +logger = logging.getLogger("snuba.query.allocation_policy_dynamic_concurrent_queries") + +QUOTA_UNIT = "concurrent_queries" +SUGGESTION = ( + "The cluster is close to its maximum sustainable concurrent query load and this " + "organization is sending more concurrent queries than its fair share. Reduce the " + "number of concurrent queries this organization is sending." +) + +# The global bucket is shared by every org querying a single storage (and therefore a +# single ClickHouse cluster). Because `component_name()` already prefixes redis keys with +# the storage key, this bucket name only needs to be unique within a storage. +GLOBAL_BUCKET = "__global__" + +DEFAULT_GLOBAL_SOFT_LIMIT = 200 +DEFAULT_PER_ORG_SOFT_LIMIT = 22 + + +class DynamicConcurrentQueries(BaseConcurrentRateLimitAllocationPolicy): + """Allows all queries for every org while there is spare capacity on the cluster, and + sheds load from the heaviest orgs once the cluster approaches its sustainable limit. + + Two soft limits drive the policy: + + * ``global_soft_limit`` - the number of concurrent queries (across every org hitting + this storage) that we consider "close to the maximum the cluster can sustain". While + the global concurrent count is at or below this value, every query for every org is + allowed through. + * ``per_org_soft_limit`` - the number of concurrent queries each org is softly + guaranteed. Orgs are only candidates for rejection once they exceed this. + + Once the global concurrent count climbs above ``global_soft_limit`` the policy starts + shedding load proportionally to how far the cluster is over the soft limit. The + effective per-org ceiling is:: + + effective_limit = max(1, floor(per_org_soft_limit * global_soft_limit / global_concurrent)) + + which equals ``per_org_soft_limit`` right at the soft limit and shrinks as global + pressure rises. Because an org is rejected when its own concurrent count exceeds this + shrinking ceiling, the orgs that are *most* above their soft limit are rejected first, + and progressively more orgs get shed as the cluster gets busier. Orgs running a small + number of queries keep running (an org may always run at least one query), which keeps + the impact focused on the heaviest abusers. + + Per-org soft limits can be overridden for individual orgs via + ``organization_soft_limit_override``. + """ + + @property + def rate_limit_name(self) -> str: + return "dynamic_concurrent_queries_policy" + + def _additional_config_definitions(self) -> list[Configuration]: + return super()._additional_config_definitions() + [ + Configuration( + name="global_soft_limit", + description=( + "Number of concurrent queries (across all orgs on this storage) that " + "is considered close to the maximum the cluster can sustain. While the " + "global concurrent count is at or below this value, all queries are " + "allowed. Above it, the policy starts rejecting queries from the orgs " + "that are furthest above their per-org soft limit." + ), + value_type=int, + default=DEFAULT_GLOBAL_SOFT_LIMIT, + ), + Configuration( + name="per_org_soft_limit", + description=( + "Number of concurrent queries each org is softly guaranteed. Orgs at " + "or below this are always allowed; orgs above it are the first to be " + "rejected once the cluster crosses the global soft limit." + ), + value_type=int, + default=DEFAULT_PER_ORG_SOFT_LIMIT, + ), + Configuration( + name="organization_soft_limit_override", + description="Override the per-org soft limit for a specific organization_id.", + value_type=int, + default=-1, + param_types={"organization_id": int}, + ), + ] + + def _get_org_id(self, tenant_ids: dict[str, str | int]) -> int: + org_id = tenant_ids.get("organization_id") + if org_id is None: + raise InvalidTenantsForAllocationPolicy.from_args( + tenant_ids, + self.__class__.__name__, + "tenant_ids must include organization_id", + ) + return int(org_id) + + def _get_per_org_soft_limit(self, org_id: int) -> int: + override = self.get_config_value( + "organization_soft_limit_override", params={"organization_id": org_id} + ) + if override != -1: + return int(override) + return int(self.get_config_value("per_org_soft_limit")) + + def _global_rate_limit_params(self) -> RateLimitParameters: + return RateLimitParameters( + self.rate_limit_name, + bucket=GLOBAL_BUCKET, + per_second_limit=None, + concurrent_limit=int(self.get_config_value("global_soft_limit")), + ) + + def _org_rate_limit_params(self, org_id: int, per_org_soft_limit: int) -> RateLimitParameters: + return RateLimitParameters( + self.rate_limit_name, + bucket=str(org_id), + per_second_limit=None, + concurrent_limit=per_org_soft_limit, + ) + + def _get_quota_allowance( + self, tenant_ids: dict[str, str | int], query_id: str + ) -> QuotaAllowance: + if tenant_ids.get("referrer", "no_referrer") in _PASS_THROUGH_REFERRERS: + return QuotaAllowance( + can_run=True, + max_threads=self.max_threads, + explanation={"reason": "pass_through"}, + is_throttled=False, + throttle_threshold=MAX_THRESHOLD, + rejection_threshold=MAX_THRESHOLD, + quota_used=0, + quota_unit=QUOTA_UNIT, + suggestion=PASS_THROUGH_REFERRERS_SUGGESTION, + ) + if self.is_cross_org_query(tenant_ids): + return QuotaAllowance( + can_run=True, + max_threads=self.max_threads, + explanation={"reason": "cross_org"}, + is_throttled=False, + throttle_threshold=MAX_THRESHOLD, + rejection_threshold=MAX_THRESHOLD, + quota_used=0, + quota_unit=QUOTA_UNIT, + suggestion=CROSS_ORG_SUGGESTION, + ) + + org_id = self._get_org_id(tenant_ids) + global_soft_limit = int(self.get_config_value("global_soft_limit")) + per_org_soft_limit = self._get_per_org_soft_limit(org_id) + + # Both calls register this query in their respective redis bucket and return the + # current concurrent count (including this query). + global_stats, _, _ = self._is_within_rate_limit(query_id, self._global_rate_limit_params()) + org_stats, _, _ = self._is_within_rate_limit( + query_id, self._org_rate_limit_params(org_id, per_org_soft_limit) + ) + + global_concurrent = global_stats.concurrent + org_concurrent = org_stats.concurrent + + if global_concurrent == -1 or org_concurrent == -1: + # the rate limiter errored, fail open + return QuotaAllowance( + can_run=True, + max_threads=self.max_threads, + explanation={"reason": "rate limiter errored, failing open"}, + is_throttled=False, + throttle_threshold=MAX_THRESHOLD, + rejection_threshold=MAX_THRESHOLD, + quota_used=0, + quota_unit=QUOTA_UNIT, + suggestion=NO_SUGGESTION, + ) + + if global_concurrent <= global_soft_limit: + # The cluster has spare capacity, allow everything. + can_run = True + effective_limit = per_org_soft_limit + why = "within global soft limit" + else: + # The cluster is over its global soft limit. Shrink the per-org ceiling + # proportionally to how far over we are, so the orgs furthest above their + # soft limit get shed first. An org may always run at least one query. + effective_limit = max(1, (per_org_soft_limit * global_soft_limit) // global_concurrent) + can_run = org_concurrent <= effective_limit + why = ( + f"global concurrent {global_concurrent} exceeds global soft limit " + f"{global_soft_limit}; org concurrent {org_concurrent} " + f"{'within' if can_run else 'exceeds'} effective limit {effective_limit}" + ) + + return QuotaAllowance( + can_run=can_run, + max_threads=self.max_threads if can_run else 0, + explanation={ + "reason": why, + "global_concurrent": global_concurrent, + "global_soft_limit": global_soft_limit, + "org_concurrent": org_concurrent, + "per_org_soft_limit": per_org_soft_limit, + "effective_org_limit": effective_limit, + }, + is_throttled=False, + throttle_threshold=effective_limit, + rejection_threshold=effective_limit, + quota_used=org_concurrent, + quota_unit=QUOTA_UNIT, + suggestion=NO_SUGGESTION if can_run else SUGGESTION, + ) + + def _update_quota_balance( + self, + tenant_ids: dict[str, str | int], + query_id: str, + result_or_error: QueryResultOrError, + ) -> None: + if self.is_cross_org_query(tenant_ids): + return + if tenant_ids.get("referrer", "no_referrer") in _PASS_THROUGH_REFERRERS: + return + org_id = self._get_org_id(tenant_ids) + per_org_soft_limit = self._get_per_org_soft_limit(org_id) + # Release the query from both buckets it was registered in. + self._end_query(query_id, self._global_rate_limit_params(), result_or_error) + self._end_query( + query_id, + self._org_rate_limit_params(org_id, per_org_soft_limit), + result_or_error, + ) diff --git a/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py b/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py index 648a8ab981..d5e4f14b1c 100644 --- a/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py +++ b/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py @@ -48,6 +48,9 @@ from snuba.query.allocation_policies.concurrent_rate_limit import ( ConcurrentRateLimitAllocationPolicy, ) +from snuba.query.allocation_policies.dynamic_concurrent_queries import ( + DynamicConcurrentQueries, +) from snuba.query.allocation_policies.per_referrer import ReferrerGuardRailPolicy from snuba.query.allocation_policies.utils import get_max_bytes_to_read from snuba.query.query_settings import HTTPQuerySettings @@ -377,6 +380,11 @@ def get_allocation_policies(self) -> list[AllocationPolicy]: required_tenant_types=["organization_id", "project_id", "referrer"], default_config_overrides={"is_active": 0, "is_enforced": 0}, ), + DynamicConcurrentQueries( + storage_key=EAP_RESOURCE_IDENTIFIER, + required_tenant_types=["organization_id"], + default_config_overrides={"is_active": 0, "is_enforced": 0}, + ), ] def get_delete_allocation_policies(self) -> list[AllocationPolicy]: diff --git a/tests/query/allocation_policies/test_dynamic_concurrent_queries_policy.py b/tests/query/allocation_policies/test_dynamic_concurrent_queries_policy.py new file mode 100644 index 0000000000..31421ecae1 --- /dev/null +++ b/tests/query/allocation_policies/test_dynamic_concurrent_queries_policy.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +import pytest + +from snuba.configs.configuration import ResourceIdentifier +from snuba.datasets.storages.storage_key import StorageKey +from snuba.query.allocation_policies import ( + AllocationPolicyViolations, + QueryResultOrError, +) +from snuba.query.allocation_policies.dynamic_concurrent_queries import ( + DynamicConcurrentQueries, +) +from snuba.web import QueryException, QueryResult + +_RESULT_SUCCESS = QueryResultOrError( + QueryResult( + result={"profile": {"bytes": 42069}}, + extra={"stats": {}, "sql": "", "experiments": {}}, + ), + error=None, +) + +_QUERY_EXCEPTION = QueryException() +_QUERY_EXCEPTION.__cause__ = AllocationPolicyViolations("some policy was violated") +_RESULT_FAIL = QueryResultOrError(None, error=_QUERY_EXCEPTION) + +GLOBAL_SOFT_LIMIT = 10 +PER_ORG_SOFT_LIMIT = 4 + + +@pytest.fixture(scope="function") +def policy() -> DynamicConcurrentQueries: + return DynamicConcurrentQueries( + storage_key=ResourceIdentifier(StorageKey("test")), + required_tenant_types=["organization_id"], + default_config_overrides={ + "global_soft_limit": GLOBAL_SOFT_LIMIT, + "per_org_soft_limit": PER_ORG_SOFT_LIMIT, + }, + ) + + +@pytest.mark.redis_db +def test_allows_all_orgs_under_global_soft_limit( + policy: DynamicConcurrentQueries, +) -> None: + # Spread queries across orgs while staying under the global soft limit. Even though a + # single org can go well above its per-org soft limit, everything is allowed because + # the cluster has spare capacity. + for i in range(GLOBAL_SOFT_LIMIT): + allowance = policy.get_quota_allowance( + tenant_ids={"organization_id": 123}, query_id=f"q{i}" + ) + assert allowance.can_run, f"query {i} should have been allowed" + + +@pytest.mark.redis_db +def test_sheds_heavy_org_over_global_soft_limit( + policy: DynamicConcurrentQueries, +) -> None: + # One heavy org pushes the cluster past the global soft limit on its own. + for i in range(GLOBAL_SOFT_LIMIT): + assert policy.get_quota_allowance( + tenant_ids={"organization_id": 123}, query_id=f"heavy{i}" + ).can_run + + # The cluster is now at the global soft limit; the next query crosses it. The heavy + # org is far above its (proportionally shrunk) effective limit, so it is rejected. + allowance = policy.get_quota_allowance( + tenant_ids={"organization_id": 123}, query_id="heavy_over" + ) + assert not allowance.can_run and allowance.max_threads == 0 + + +@pytest.mark.redis_db +def test_protects_small_org_when_cluster_busy( + policy: DynamicConcurrentQueries, +) -> None: + # A heavy org saturates the cluster past the global soft limit. + for i in range(GLOBAL_SOFT_LIMIT + 5): + policy.get_quota_allowance(tenant_ids={"organization_id": 123}, query_id=f"heavy{i}") + + # A different org sending only a single query is still allowed even though the cluster + # is over its global soft limit, because an org may always run at least one query. + allowance = policy.get_quota_allowance(tenant_ids={"organization_id": 456}, query_id="small1") + assert allowance.can_run + + +@pytest.mark.redis_db +def test_finished_queries_free_capacity( + policy: DynamicConcurrentQueries, +) -> None: + for i in range(GLOBAL_SOFT_LIMIT): + policy.get_quota_allowance(tenant_ids={"organization_id": 123}, query_id=f"q{i}") + + assert not policy.get_quota_allowance( + tenant_ids={"organization_id": 123}, query_id="over" + ).can_run + + # Finish several queries to drop the global concurrent count back under the soft limit. + for i in range(GLOBAL_SOFT_LIMIT): + policy.update_quota_balance( + tenant_ids={"organization_id": 123}, + query_id=f"q{i}", + result_or_error=_RESULT_SUCCESS, + ) + + assert policy.get_quota_allowance(tenant_ids={"organization_id": 123}, query_id="after").can_run + + +@pytest.mark.redis_db +def test_organization_soft_limit_override( + policy: DynamicConcurrentQueries, +) -> None: + # Give org 123 a tiny soft limit and a low global soft limit so it gets shed quickly, + # while org 456 keeps the default per-org soft limit. + policy.set_config_value("global_soft_limit", 4) + policy.set_config_value("organization_soft_limit_override", 1, params={"organization_id": 123}) + + # Fill the cluster past the global soft limit using org 456. + for i in range(5): + policy.get_quota_allowance(tenant_ids={"organization_id": 456}, query_id=f"other{i}") + + # org 123 already has one query running (its overridden soft limit), so a second one + # is rejected once the cluster is over the global soft limit. + assert policy.get_quota_allowance( + tenant_ids={"organization_id": 123}, query_id="o123_1" + ).can_run + assert not policy.get_quota_allowance( + tenant_ids={"organization_id": 123}, query_id="o123_2" + ).can_run + + +@pytest.mark.redis_db +def test_rejected_query_not_counted_after_balance_update( + policy: DynamicConcurrentQueries, +) -> None: + for i in range(GLOBAL_SOFT_LIMIT): + policy.get_quota_allowance(tenant_ids={"organization_id": 123}, query_id=f"q{i}") + rejected = policy.get_quota_allowance(tenant_ids={"organization_id": 123}, query_id="rejected") + assert not rejected.can_run + + # The rejected query is removed from both buckets so it does not leak concurrency. + policy.update_quota_balance( + tenant_ids={"organization_id": 123}, + query_id="rejected", + result_or_error=_RESULT_FAIL, + ) + + +def test_pass_through_referrer(policy: DynamicConcurrentQueries) -> None: + for i in range(GLOBAL_SOFT_LIMIT * 2): + assert policy.get_quota_allowance( + tenant_ids={"referrer": "subscriptions_executor", "organization_id": 1}, + query_id=f"q{i}", + ).can_run + + +@pytest.mark.redis_db +def test_cross_org(policy: DynamicConcurrentQueries) -> None: + tenant_ids: dict[str, str | int] = { + "referrer": "do_something", + "cross_org_query": 1, + } + assert policy.get_quota_allowance(tenant_ids=tenant_ids, query_id="a").can_run + # update must not raise for cross-org queries + policy.update_quota_balance(tenant_ids, "a", _RESULT_SUCCESS) + + +@pytest.mark.redis_db +def test_missing_organization_id_rejected( + policy: DynamicConcurrentQueries, +) -> None: + from unittest import mock + + with mock.patch("snuba.settings.RAISE_ON_ALLOCATION_POLICY_FAILURES", False): + allowance = policy.get_quota_allowance({"referrer": "abcd"}, "1234") + assert not allowance.can_run and allowance.max_threads == 0 + # does not raise + policy.update_quota_balance({"referrer": "abcd"}, "1234", _RESULT_SUCCESS) diff --git a/tests/web/rpc/v1/test_storage_routing.py b/tests/web/rpc/v1/test_storage_routing.py index 792bf70d7a..c94ee4c394 100644 --- a/tests/web/rpc/v1/test_storage_routing.py +++ b/tests/web/rpc/v1/test_storage_routing.py @@ -373,6 +373,18 @@ def _output_metrics(self, routing_context: RoutingContext) -> None: "suggestion": "no_suggestion", "max_bytes_to_read": 0, }, + "DynamicConcurrentQueries": { + "can_run": True, + "max_threads": 10, + "explanation": {"storage_key": "EAP"}, + "is_throttled": False, + "throttle_threshold": AnyInt(1000000000000), + "rejection_threshold": AnyInt(1000000000000), + "quota_used": 0, + "quota_unit": "no_units", + "suggestion": "no_suggestion", + "max_bytes_to_read": 0, + }, }, "clickhouse_settings": {"max_threads": 10}, "source_request_id": RANDOM_REQUEST_ID,