From 7a563b7a9153c2b911db631e8c5146347b144ae5 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 29 May 2026 16:29:16 -0700 Subject: [PATCH 1/6] feat(eap-items): Sample downsample tiers by trace and make tiers subsets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The downsample materialized views previously sampled per `item_id` and perturbed the hash per tier (`cityHash64(item_id + sampling_weight)`), so the 1/8, 1/64, and 1/512 tiers were independent samples and items in the same trace could land in different tiers. Switch the MVs to `cityHash64(reinterpretAsUInt128(trace_id)) % w = 0`. Hashing `trace_id` (without perturbation) keeps every item in a trace together, and because 8 | 64 | 512 the tiers nest: tier 512 ⊆ tier 64 ⊆ tier 8. Co-Authored-By: Claude Opus 4.7 (1M context) Agent transcript: https://claudescope.sentry.dev/share/J1l8N_MgYvD7E5OR27eSBvBE-k-Q7G1nqkfGulwerG8 --- .../0055_sample_downsample_tiers_by_trace.py | 184 ++++++++++++++++++ tests/migrations/test_eap_subset_sampling.py | 50 +++++ 2 files changed, 234 insertions(+) create mode 100644 snuba/snuba_migrations/events_analytics_platform/0055_sample_downsample_tiers_by_trace.py create mode 100644 tests/migrations/test_eap_subset_sampling.py diff --git a/snuba/snuba_migrations/events_analytics_platform/0055_sample_downsample_tiers_by_trace.py b/snuba/snuba_migrations/events_analytics_platform/0055_sample_downsample_tiers_by_trace.py new file mode 100644 index 00000000000..10837f27c71 --- /dev/null +++ b/snuba/snuba_migrations/events_analytics_platform/0055_sample_downsample_tiers_by_trace.py @@ -0,0 +1,184 @@ +from typing import List, Sequence + +from snuba.clickhouse.columns import Column, UInt +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import migration, operations +from snuba.migrations.columns import MigrationModifiers as Modifiers +from snuba.migrations.operations import OperationTarget, SqlOperation +from snuba.utils.schemas import UUID, Bool, DateTime, Float, Int, Map, String + +num_attr_buckets = 40 +storage_set_key = StorageSetKey.EVENTS_ANALYTICS_PLATFORM +sampling_weights = [8, 8**2, 8**3] +old_version = 4 +new_version = old_version + 1 + +columns: List[Column[Modifiers]] = [ + Column("organization_id", UInt(64)), + Column("project_id", UInt(64)), + Column("item_type", UInt(8)), + Column("timestamp", DateTime(Modifiers(codecs=["DoubleDelta", "ZSTD(1)"]))), + Column("trace_id", UUID()), + Column("item_id", UInt(128)), + Column("sampling_weight", UInt(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))), + Column("sampling_factor", Float(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))), + Column( + "retention_days", + UInt(16, modifiers=Modifiers(codecs=["T64", "ZSTD(1)"])), + ), + Column( + "attributes_bool", + Map( + String(), + Bool(), + ), + ), + Column( + "attributes_int", + Map( + String(), + Int(64), + ), + ), +] + + +columns.extend( + [ + Column( + f"attributes_string_{i}", + Map( + String(), + String(), + modifiers=Modifiers( + codecs=["ZSTD(1)"], + ), + ), + ) + for i in range(num_attr_buckets) + ] +) + +columns.extend( + [ + Column( + f"attributes_float_{i}", + Map( + String(), + Float(64), + modifiers=Modifiers( + codecs=["ZSTD(1)"], + ), + ), + ) + for i in range(num_attr_buckets) + ] +) + + +def should_include_column(name: str) -> bool: + return name not in { + "sampling_weight", + "sampling_factor", + "retention_days", + "client_sample_rate", + "server_sample_rate", + } + + +# Per-item sampling on `item_id`. Each tier picks independently so the tiers +# are not subsets of each other, and items belonging to the same trace can +# end up in different tiers. +def generate_old_materialized_view_expression(sampling_weight: int) -> str: + column_names_str = ", ".join([c.name for c in columns if should_include_column(c.name)]) + return " ".join( + [ + "SELECT", + f"{column_names_str},", + "downsampled_retention_days AS retention_days,", + f"sampling_weight * {sampling_weight} AS sampling_weight,", + f"sampling_factor / {sampling_weight} AS sampling_factor,", + f"client_sample_rate / {sampling_weight} AS client_sample_rate,", + f"server_sample_rate / {sampling_weight} AS server_sample_rate", + "FROM eap_items_1_local", + f"WHERE (cityHash64(item_id + {sampling_weight}) % {sampling_weight}) = 0", + ] + ) + + +# Trace-based sampling that keeps each tier a strict subset of the tier +# above. Hashing `trace_id` (and not perturbing the hash per tier) makes +# every item in a trace land in the same set of tiers, and because the +# sampling weights are 8 / 64 / 512 (each divides the next), an item that +# satisfies `H % 512 == 0` also satisfies `H % 64 == 0` and `H % 8 == 0`. +def generate_new_materialized_view_expression(sampling_weight: int) -> str: + column_names_str = ", ".join([c.name for c in columns if should_include_column(c.name)]) + return " ".join( + [ + "SELECT", + f"{column_names_str},", + "downsampled_retention_days AS retention_days,", + f"sampling_weight * {sampling_weight} AS sampling_weight,", + f"sampling_factor / {sampling_weight} AS sampling_factor,", + f"client_sample_rate / {sampling_weight} AS client_sample_rate,", + f"server_sample_rate / {sampling_weight} AS server_sample_rate", + "FROM eap_items_1_local", + f"WHERE (cityHash64(reinterpretAsUInt128(trace_id)) % {sampling_weight}) = 0", + ] + ) + + +class Migration(migration.ClickhouseNodeMigration): + blocking = False + + def forwards_ops(self) -> Sequence[SqlOperation]: + ops: List[SqlOperation] = [] + + for sampling_weight in sampling_weights: + local_table_name = f"eap_items_1_downsample_{sampling_weight}_local" + + ops.extend( + [ + operations.CreateMaterializedView( + storage_set=storage_set_key, + view_name=f"eap_items_1_downsample_{sampling_weight}_mv_{new_version}", + columns=columns, + destination_table_name=local_table_name, + target=OperationTarget.LOCAL, + query=generate_new_materialized_view_expression(sampling_weight), + ), + operations.DropTable( + storage_set=storage_set_key, + table_name=f"eap_items_1_downsample_{sampling_weight}_mv_{old_version}", + target=OperationTarget.LOCAL, + ), + ] + ) + + return ops + + def backwards_ops(self) -> Sequence[SqlOperation]: + ops: List[SqlOperation] = [] + + for sampling_weight in sampling_weights: + local_table_name = f"eap_items_1_downsample_{sampling_weight}_local" + + ops.extend( + [ + operations.CreateMaterializedView( + storage_set=storage_set_key, + view_name=f"eap_items_1_downsample_{sampling_weight}_mv_{old_version}", + columns=columns, + destination_table_name=local_table_name, + target=OperationTarget.LOCAL, + query=generate_old_materialized_view_expression(sampling_weight), + ), + operations.DropTable( + storage_set=storage_set_key, + table_name=f"eap_items_1_downsample_{sampling_weight}_mv_{new_version}", + target=OperationTarget.LOCAL, + ), + ] + ) + + return ops diff --git a/tests/migrations/test_eap_subset_sampling.py b/tests/migrations/test_eap_subset_sampling.py new file mode 100644 index 00000000000..cc7c7a74db8 --- /dev/null +++ b/tests/migrations/test_eap_subset_sampling.py @@ -0,0 +1,50 @@ +"""Unit tests for the EAP downsample materialized view introduced in +migration 0055_sample_downsample_tiers_by_trace. + +These check: +- the new MV samples on `trace_id` (so every item in a trace lands in the + same set of tiers), and +- because the sampling weights are 8 / 64 / 512 (each divides the next), + any hash value that satisfies `H % 512 == 0` also satisfies + `H % 64 == 0` and `H % 8 == 0` — i.e. tier 512 ⊆ tier 64 ⊆ tier 8. +""" + +from importlib import import_module + +_migration = import_module( + "snuba.snuba_migrations.events_analytics_platform.0055_sample_downsample_tiers_by_trace" +) + + +def test_new_mv_samples_on_trace_id() -> None: + for sampling_weight in _migration.sampling_weights: + sql = _migration.generate_new_materialized_view_expression(sampling_weight) + assert f"cityHash64(reinterpretAsUInt128(trace_id)) % {sampling_weight}" in sql, ( + f"sampling weight {sampling_weight} should hash trace_id, got: {sql}" + ) + # The previous per-item-id form must be gone — otherwise items in + # the same trace can land in different tiers. + assert "cityHash64(item_id" not in sql + + +def test_subset_property_via_modular_arithmetic() -> None: + # The MV uses `WHERE cityHash64(reinterpretAsUInt128(trace_id)) % w = 0`. + # If H % 512 == 0 then H % 64 == 0 and H % 8 == 0, since 8 | 64 | 512. + # Verify the divisibility chain that the SQL relies on for the subset + # guarantee. + weights = sorted(_migration.sampling_weights) + for smaller, larger in zip(weights, weights[1:]): + assert larger % smaller == 0, ( + f"sampling_weight {larger} must be a multiple of {smaller} " + "for tier subset property to hold" + ) + + # Spot-check with concrete hash values. Any H that lands in the tightest + # tier (largest weight) must also land in every looser tier. + largest = max(_migration.sampling_weights) + for k in range(10): + h = k * largest # H % largest == 0 by construction + for w in _migration.sampling_weights: + assert h % w == 0, ( + f"hash {h} passes tier {largest} but not tier {w}; subset property violated" + ) From ddfb2e46a014e9d097155bb52baf98d3f037d151 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 15 Jun 2026 09:50:13 -0700 Subject: [PATCH 2/6] fix(eap): renumber downsample tiers migration to 0057 Migrations 0055 and 0056 landed on master, colliding with this PR's 0055_sample_downsample_tiers_by_trace. Bump it to 0057 and update the test import path. Co-Authored-By: Claude Opus 4.8 (1M context) Agent transcript: https://claudescope.sentry.dev/share/a4n8ne-FeVHxgip0LTtvUecLmEJ3EXkM9ToaC5Ocymc --- ...s_by_trace.py => 0057_sample_downsample_tiers_by_trace.py} | 0 tests/migrations/test_eap_subset_sampling.py | 4 ++-- 2 files changed, 2 insertions(+), 2 deletions(-) rename snuba/snuba_migrations/events_analytics_platform/{0055_sample_downsample_tiers_by_trace.py => 0057_sample_downsample_tiers_by_trace.py} (100%) diff --git a/snuba/snuba_migrations/events_analytics_platform/0055_sample_downsample_tiers_by_trace.py b/snuba/snuba_migrations/events_analytics_platform/0057_sample_downsample_tiers_by_trace.py similarity index 100% rename from snuba/snuba_migrations/events_analytics_platform/0055_sample_downsample_tiers_by_trace.py rename to snuba/snuba_migrations/events_analytics_platform/0057_sample_downsample_tiers_by_trace.py diff --git a/tests/migrations/test_eap_subset_sampling.py b/tests/migrations/test_eap_subset_sampling.py index cc7c7a74db8..9655f0ab138 100644 --- a/tests/migrations/test_eap_subset_sampling.py +++ b/tests/migrations/test_eap_subset_sampling.py @@ -1,5 +1,5 @@ """Unit tests for the EAP downsample materialized view introduced in -migration 0055_sample_downsample_tiers_by_trace. +migration 0057_sample_downsample_tiers_by_trace. These check: - the new MV samples on `trace_id` (so every item in a trace lands in the @@ -12,7 +12,7 @@ from importlib import import_module _migration = import_module( - "snuba.snuba_migrations.events_analytics_platform.0055_sample_downsample_tiers_by_trace" + "snuba.snuba_migrations.events_analytics_platform.0057_sample_downsample_tiers_by_trace" ) From 132212d87919243624fd809969d60c1865a56215 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 15 Jun 2026 09:55:10 -0700 Subject: [PATCH 3/6] ref(eap): extract shared downsample MV regeneration helpers EAP migrations repeatedly copy-paste the eap_items column list, the downsample materialized view SELECT, and the create/drop-per-weight loop. Add snuba_migrations/events_analytics_platform/templates.py with: - get_eap_items_columns(num_attr_buckets): a fresh, customizable column list - downsample_mv_select(): builds the MV SELECT from a sampling predicate - swap_downsample_materialized_views(): create-new/drop-old ops per weight Adopt them in 0057 so the migration only describes the sampling predicate and version bump. Forwards SQL is unchanged. Existing applied migrations are left as-is (immutable history); new migrations can use these helpers. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../0057_sample_downsample_tiers_by_trace.py | 188 +++--------------- .../events_analytics_platform/templates.py | 168 ++++++++++++++++ 2 files changed, 200 insertions(+), 156 deletions(-) create mode 100644 snuba/snuba_migrations/events_analytics_platform/templates.py diff --git a/snuba/snuba_migrations/events_analytics_platform/0057_sample_downsample_tiers_by_trace.py b/snuba/snuba_migrations/events_analytics_platform/0057_sample_downsample_tiers_by_trace.py index 10837f27c71..5358761094f 100644 --- a/snuba/snuba_migrations/events_analytics_platform/0057_sample_downsample_tiers_by_trace.py +++ b/snuba/snuba_migrations/events_analytics_platform/0057_sample_downsample_tiers_by_trace.py @@ -1,108 +1,29 @@ -from typing import List, Sequence - -from snuba.clickhouse.columns import Column, UInt -from snuba.clusters.storage_sets import StorageSetKey -from snuba.migrations import migration, operations -from snuba.migrations.columns import MigrationModifiers as Modifiers -from snuba.migrations.operations import OperationTarget, SqlOperation -from snuba.utils.schemas import UUID, Bool, DateTime, Float, Int, Map, String +from typing import Sequence + +from snuba.migrations import migration +from snuba.migrations.operations import SqlOperation +from snuba.snuba_migrations.events_analytics_platform.templates import ( + SAMPLING_WEIGHTS, + downsample_mv_select, + get_eap_items_columns, + swap_downsample_materialized_views, +) -num_attr_buckets = 40 -storage_set_key = StorageSetKey.EVENTS_ANALYTICS_PLATFORM -sampling_weights = [8, 8**2, 8**3] +sampling_weights = SAMPLING_WEIGHTS old_version = 4 new_version = old_version + 1 -columns: List[Column[Modifiers]] = [ - Column("organization_id", UInt(64)), - Column("project_id", UInt(64)), - Column("item_type", UInt(8)), - Column("timestamp", DateTime(Modifiers(codecs=["DoubleDelta", "ZSTD(1)"]))), - Column("trace_id", UUID()), - Column("item_id", UInt(128)), - Column("sampling_weight", UInt(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))), - Column("sampling_factor", Float(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))), - Column( - "retention_days", - UInt(16, modifiers=Modifiers(codecs=["T64", "ZSTD(1)"])), - ), - Column( - "attributes_bool", - Map( - String(), - Bool(), - ), - ), - Column( - "attributes_int", - Map( - String(), - Int(64), - ), - ), -] - - -columns.extend( - [ - Column( - f"attributes_string_{i}", - Map( - String(), - String(), - modifiers=Modifiers( - codecs=["ZSTD(1)"], - ), - ), - ) - for i in range(num_attr_buckets) - ] -) - -columns.extend( - [ - Column( - f"attributes_float_{i}", - Map( - String(), - Float(64), - modifiers=Modifiers( - codecs=["ZSTD(1)"], - ), - ), - ) - for i in range(num_attr_buckets) - ] -) - - -def should_include_column(name: str) -> bool: - return name not in { - "sampling_weight", - "sampling_factor", - "retention_days", - "client_sample_rate", - "server_sample_rate", - } +columns = get_eap_items_columns() # Per-item sampling on `item_id`. Each tier picks independently so the tiers # are not subsets of each other, and items belonging to the same trace can # end up in different tiers. def generate_old_materialized_view_expression(sampling_weight: int) -> str: - column_names_str = ", ".join([c.name for c in columns if should_include_column(c.name)]) - return " ".join( - [ - "SELECT", - f"{column_names_str},", - "downsampled_retention_days AS retention_days,", - f"sampling_weight * {sampling_weight} AS sampling_weight,", - f"sampling_factor / {sampling_weight} AS sampling_factor,", - f"client_sample_rate / {sampling_weight} AS client_sample_rate,", - f"server_sample_rate / {sampling_weight} AS server_sample_rate", - "FROM eap_items_1_local", - f"WHERE (cityHash64(item_id + {sampling_weight}) % {sampling_weight}) = 0", - ] + return downsample_mv_select( + columns, + sampling_weight, + where_predicate=f"cityHash64(item_id + {sampling_weight}) % {sampling_weight}", ) @@ -112,19 +33,10 @@ def generate_old_materialized_view_expression(sampling_weight: int) -> str: # sampling weights are 8 / 64 / 512 (each divides the next), an item that # satisfies `H % 512 == 0` also satisfies `H % 64 == 0` and `H % 8 == 0`. def generate_new_materialized_view_expression(sampling_weight: int) -> str: - column_names_str = ", ".join([c.name for c in columns if should_include_column(c.name)]) - return " ".join( - [ - "SELECT", - f"{column_names_str},", - "downsampled_retention_days AS retention_days,", - f"sampling_weight * {sampling_weight} AS sampling_weight,", - f"sampling_factor / {sampling_weight} AS sampling_factor,", - f"client_sample_rate / {sampling_weight} AS client_sample_rate,", - f"server_sample_rate / {sampling_weight} AS server_sample_rate", - "FROM eap_items_1_local", - f"WHERE (cityHash64(reinterpretAsUInt128(trace_id)) % {sampling_weight}) = 0", - ] + return downsample_mv_select( + columns, + sampling_weight, + where_predicate=f"cityHash64(reinterpretAsUInt128(trace_id)) % {sampling_weight}", ) @@ -132,53 +44,17 @@ class Migration(migration.ClickhouseNodeMigration): blocking = False def forwards_ops(self) -> Sequence[SqlOperation]: - ops: List[SqlOperation] = [] - - for sampling_weight in sampling_weights: - local_table_name = f"eap_items_1_downsample_{sampling_weight}_local" - - ops.extend( - [ - operations.CreateMaterializedView( - storage_set=storage_set_key, - view_name=f"eap_items_1_downsample_{sampling_weight}_mv_{new_version}", - columns=columns, - destination_table_name=local_table_name, - target=OperationTarget.LOCAL, - query=generate_new_materialized_view_expression(sampling_weight), - ), - operations.DropTable( - storage_set=storage_set_key, - table_name=f"eap_items_1_downsample_{sampling_weight}_mv_{old_version}", - target=OperationTarget.LOCAL, - ), - ] - ) - - return ops + return swap_downsample_materialized_views( + columns=columns, + create_version=new_version, + drop_version=old_version, + query_for_weight=generate_new_materialized_view_expression, + ) def backwards_ops(self) -> Sequence[SqlOperation]: - ops: List[SqlOperation] = [] - - for sampling_weight in sampling_weights: - local_table_name = f"eap_items_1_downsample_{sampling_weight}_local" - - ops.extend( - [ - operations.CreateMaterializedView( - storage_set=storage_set_key, - view_name=f"eap_items_1_downsample_{sampling_weight}_mv_{old_version}", - columns=columns, - destination_table_name=local_table_name, - target=OperationTarget.LOCAL, - query=generate_old_materialized_view_expression(sampling_weight), - ), - operations.DropTable( - storage_set=storage_set_key, - table_name=f"eap_items_1_downsample_{sampling_weight}_mv_{new_version}", - target=OperationTarget.LOCAL, - ), - ] - ) - - return ops + return swap_downsample_materialized_views( + columns=columns, + create_version=old_version, + drop_version=new_version, + query_for_weight=generate_old_materialized_view_expression, + ) diff --git a/snuba/snuba_migrations/events_analytics_platform/templates.py b/snuba/snuba_migrations/events_analytics_platform/templates.py new file mode 100644 index 00000000000..907ff98c684 --- /dev/null +++ b/snuba/snuba_migrations/events_analytics_platform/templates.py @@ -0,0 +1,168 @@ +"""Shared helpers for events_analytics_platform migrations. + +Many EAP migrations regenerate the ``eap_items`` downsample materialized +views. Historically each migration copy-pasted the full column list, the +``SELECT`` expression, and the create/drop loop. This module centralises +those pieces so a new migration only has to describe what actually changes +(usually the sampling predicate and the version it bumps to). +""" + +from typing import Callable, List, Sequence + +from snuba.clickhouse.columns import Column, UInt +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import operations +from snuba.migrations.columns import MigrationModifiers as Modifiers +from snuba.migrations.operations import OperationTarget, SqlOperation +from snuba.utils.schemas import UUID, Bool, DateTime, Float, Int, Map, String + +EAP_STORAGE_SET_KEY = StorageSetKey.EVENTS_ANALYTICS_PLATFORM + +#: Number of bucketed attribute map columns (``attributes_string_0`` ..) the +#: ``eap_items`` schema uses. +DEFAULT_NUM_ATTR_BUCKETS = 40 + +#: Sampling weights for the downsample tiers. Each weight divides the next, +#: which is what lets a trace-based hash keep the tiers as nested subsets. +SAMPLING_WEIGHTS = [8, 8**2, 8**3] + +#: The source table the downsample materialized views read from. +EAP_ITEMS_LOCAL_TABLE = "eap_items_1_local" + +#: Columns that are re-projected (transformed) by the downsample views rather +#: than copied straight through, so they are dropped from the passthrough +#: ``SELECT`` column list. +TRANSFORMED_COLUMNS = frozenset( + { + "sampling_weight", + "sampling_factor", + "retention_days", + "client_sample_rate", + "server_sample_rate", + } +) + + +def get_eap_items_columns( + num_attr_buckets: int = DEFAULT_NUM_ATTR_BUCKETS, +) -> List[Column[Modifiers]]: + """Return a fresh ``eap_items`` column list. + + A new list is returned on every call so callers can freely append or + tweak columns without mutating shared state. + """ + columns: List[Column[Modifiers]] = [ + Column("organization_id", UInt(64)), + Column("project_id", UInt(64)), + Column("item_type", UInt(8)), + Column("timestamp", DateTime(Modifiers(codecs=["DoubleDelta", "ZSTD(1)"]))), + Column("trace_id", UUID()), + Column("item_id", UInt(128)), + Column("sampling_weight", UInt(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))), + Column("sampling_factor", Float(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))), + Column("retention_days", UInt(16, modifiers=Modifiers(codecs=["T64", "ZSTD(1)"]))), + Column("attributes_bool", Map(String(), Bool())), + Column("attributes_int", Map(String(), Int(64))), + ] + columns.extend( + Column( + f"attributes_string_{i}", + Map(String(), String(), modifiers=Modifiers(codecs=["ZSTD(1)"])), + ) + for i in range(num_attr_buckets) + ) + columns.extend( + Column( + f"attributes_float_{i}", + Map(String(), Float(64), modifiers=Modifiers(codecs=["ZSTD(1)"])), + ) + for i in range(num_attr_buckets) + ) + return columns + + +def downsample_mv_select( + columns: Sequence[Column[Modifiers]], + sampling_weight: int, + *, + where_predicate: str, + source_table: str = EAP_ITEMS_LOCAL_TABLE, + include_sample_rates: bool = True, +) -> str: + """Build the ``SELECT`` body of a downsample materialized view. + + ``columns`` is the destination column list; the transformed columns + (see :data:`TRANSFORMED_COLUMNS`) are projected explicitly and the rest + are copied through. + + ``where_predicate`` is the body of the sampling filter, e.g. + ``f"cityHash64(reinterpretAsUInt128(trace_id)) % {sampling_weight}"``; + it is wrapped as ``WHERE ({where_predicate}) = 0``. + + ``include_sample_rates`` controls whether ``client_sample_rate`` and + ``server_sample_rate`` are scaled in the projection (older views did not + carry them). + """ + passthrough = ", ".join(c.name for c in columns if c.name not in TRANSFORMED_COLUMNS) + select_terms = [ + f"{passthrough},", + "downsampled_retention_days AS retention_days,", + f"sampling_weight * {sampling_weight} AS sampling_weight,", + f"sampling_factor / {sampling_weight} AS sampling_factor" + + ("," if include_sample_rates else ""), + ] + if include_sample_rates: + select_terms.extend( + [ + f"client_sample_rate / {sampling_weight} AS client_sample_rate,", + f"server_sample_rate / {sampling_weight} AS server_sample_rate", + ] + ) + return " ".join( + [ + "SELECT", + *select_terms, + f"FROM {source_table}", + f"WHERE ({where_predicate}) = 0", + ] + ) + + +def swap_downsample_materialized_views( + *, + columns: Sequence[Column[Modifiers]], + create_version: int, + drop_version: int, + query_for_weight: Callable[[int], str], + sampling_weights: Sequence[int] = SAMPLING_WEIGHTS, + storage_set: StorageSetKey = EAP_STORAGE_SET_KEY, + table_prefix: str = "eap_items_1_downsample", +) -> List[SqlOperation]: + """Create one downsample materialized view per sampling weight and drop + the previous version. + + Both ``forwards_ops`` and ``backwards_ops`` are this same operation with + the create/drop versions and the query swapped, so a migration can call + this for each direction. + """ + ops: List[SqlOperation] = [] + for sampling_weight in sampling_weights: + view_base = f"{table_prefix}_{sampling_weight}" + ops.extend( + [ + operations.CreateMaterializedView( + storage_set=storage_set, + view_name=f"{view_base}_mv_{create_version}", + columns=list(columns), + destination_table_name=f"{view_base}_local", + target=OperationTarget.LOCAL, + query=query_for_weight(sampling_weight), + ), + operations.DropTable( + storage_set=storage_set, + table_name=f"{view_base}_mv_{drop_version}", + target=OperationTarget.LOCAL, + ), + ] + ) + return ops From 1b71b981924c0f2fc26f55f41b824512030f5b4e Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 15 Jun 2026 10:58:40 -0700 Subject: [PATCH 4/6] feat(eap): make downsample tiers nested via unperturbed per-item hash MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sample each downsample tier with a single, un-perturbed hash of item_id (cityHash64(item_id) % weight) instead of hashing trace_id. Because the weights are 8 / 64 / 512 (each divides the next) and every tier hashes the same value, tier 512 is a subset of tier 64 is a subset of tier 8 — while inclusion stays independent across items, preserving the Bernoulli independence assumption the RPC extrapolation variance math relies on. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../0057_sample_downsample_tiers_by_trace.py | 21 +++++++----- tests/migrations/test_eap_subset_sampling.py | 34 ++++++++++++------- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/snuba/snuba_migrations/events_analytics_platform/0057_sample_downsample_tiers_by_trace.py b/snuba/snuba_migrations/events_analytics_platform/0057_sample_downsample_tiers_by_trace.py index 5358761094f..94046ef7505 100644 --- a/snuba/snuba_migrations/events_analytics_platform/0057_sample_downsample_tiers_by_trace.py +++ b/snuba/snuba_migrations/events_analytics_platform/0057_sample_downsample_tiers_by_trace.py @@ -16,9 +16,10 @@ columns = get_eap_items_columns() -# Per-item sampling on `item_id`. Each tier picks independently so the tiers -# are not subsets of each other, and items belonging to the same trace can -# end up in different tiers. +# Per-item sampling on `item_id`, but the hash is perturbed by the sampling +# weight (`item_id + weight`). Because each tier hashes a different value, +# the tiers pick independently and are not subsets of each other: an item +# can land in tier 8 but not tier 64. def generate_old_materialized_view_expression(sampling_weight: int) -> str: return downsample_mv_select( columns, @@ -27,16 +28,18 @@ def generate_old_materialized_view_expression(sampling_weight: int) -> str: ) -# Trace-based sampling that keeps each tier a strict subset of the tier -# above. Hashing `trace_id` (and not perturbing the hash per tier) makes -# every item in a trace land in the same set of tiers, and because the -# sampling weights are 8 / 64 / 512 (each divides the next), an item that -# satisfies `H % 512 == 0` also satisfies `H % 64 == 0` and `H % 8 == 0`. +# Per-item sampling on `item_id` with a single, un-perturbed hash. Sampling +# stays independent across items (so the extrapolation variance math keeps +# its Bernoulli-independence assumption), but because the sampling weights +# are 8 / 64 / 512 (each divides the next) and every tier hashes the same +# value, an item that satisfies `H % 512 == 0` also satisfies `H % 64 == 0` +# and `H % 8 == 0`. That makes the tiers strict subsets: tier 512 ⊆ tier 64 +# ⊆ tier 8. def generate_new_materialized_view_expression(sampling_weight: int) -> str: return downsample_mv_select( columns, sampling_weight, - where_predicate=f"cityHash64(reinterpretAsUInt128(trace_id)) % {sampling_weight}", + where_predicate=f"cityHash64(item_id) % {sampling_weight}", ) diff --git a/tests/migrations/test_eap_subset_sampling.py b/tests/migrations/test_eap_subset_sampling.py index 9655f0ab138..9265e966fbf 100644 --- a/tests/migrations/test_eap_subset_sampling.py +++ b/tests/migrations/test_eap_subset_sampling.py @@ -2,11 +2,13 @@ migration 0057_sample_downsample_tiers_by_trace. These check: -- the new MV samples on `trace_id` (so every item in a trace lands in the - same set of tiers), and -- because the sampling weights are 8 / 64 / 512 (each divides the next), - any hash value that satisfies `H % 512 == 0` also satisfies - `H % 64 == 0` and `H % 8 == 0` — i.e. tier 512 ⊆ tier 64 ⊆ tier 8. +- the new MV samples per item via a single, un-perturbed hash of `item_id` + (so inclusion stays independent across items and the extrapolation + variance math keeps its Bernoulli-independence assumption), and +- because the sampling weights are 8 / 64 / 512 (each divides the next) and + every tier hashes the same value, any hash value that satisfies + `H % 512 == 0` also satisfies `H % 64 == 0` and `H % 8 == 0` — i.e. + tier 512 ⊆ tier 64 ⊆ tier 8. """ from importlib import import_module @@ -16,19 +18,27 @@ ) -def test_new_mv_samples_on_trace_id() -> None: +def test_new_mv_samples_per_item_without_perturbation() -> None: for sampling_weight in _migration.sampling_weights: sql = _migration.generate_new_materialized_view_expression(sampling_weight) - assert f"cityHash64(reinterpretAsUInt128(trace_id)) % {sampling_weight}" in sql, ( - f"sampling weight {sampling_weight} should hash trace_id, got: {sql}" + # Hash `item_id` directly (no `+ weight` perturbation) so every tier + # hashes the same value and the tiers nest as subsets. + assert f"cityHash64(item_id) % {sampling_weight}" in sql, ( + f"sampling weight {sampling_weight} should hash item_id unperturbed, got: {sql}" ) - # The previous per-item-id form must be gone — otherwise items in - # the same trace can land in different tiers. - assert "cityHash64(item_id" not in sql + # The perturbed per-tier form must be gone — otherwise the tiers are + # not subsets of each other. + assert f"item_id + {sampling_weight}" not in sql + # Sampling must stay per-item, not per-trace, to preserve the + # independence assumption in the extrapolation variance math. + # (`trace_id` still appears as a passed-through SELECT column, so we + # only assert it is not the hashed/sampled value.) + assert "reinterpretAsUInt128(trace_id)" not in sql + assert "cityHash64(trace_id)" not in sql def test_subset_property_via_modular_arithmetic() -> None: - # The MV uses `WHERE cityHash64(reinterpretAsUInt128(trace_id)) % w = 0`. + # The MV uses `WHERE cityHash64(item_id) % w = 0`. # If H % 512 == 0 then H % 64 == 0 and H % 8 == 0, since 8 | 64 | 512. # Verify the divisibility chain that the SQL relies on for the subset # guarantee. From 2f0b442d1def83b6de4560c6eaa2a4a71906bce8 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 15 Jun 2026 11:31:52 -0700 Subject: [PATCH 5/6] ref(eap): rename migration to 0057_nest_downsample_tiers The migration no longer samples by trace, so rename it from 0057_sample_downsample_tiers_by_trace to 0057_nest_downsample_tiers to match what it does, and update the test import. Co-Authored-By: Claude Opus 4.8 (1M context) --- ...sample_tiers_by_trace.py => 0057_nest_downsample_tiers.py} | 0 tests/migrations/test_eap_subset_sampling.py | 4 ++-- 2 files changed, 2 insertions(+), 2 deletions(-) rename snuba/snuba_migrations/events_analytics_platform/{0057_sample_downsample_tiers_by_trace.py => 0057_nest_downsample_tiers.py} (100%) diff --git a/snuba/snuba_migrations/events_analytics_platform/0057_sample_downsample_tiers_by_trace.py b/snuba/snuba_migrations/events_analytics_platform/0057_nest_downsample_tiers.py similarity index 100% rename from snuba/snuba_migrations/events_analytics_platform/0057_sample_downsample_tiers_by_trace.py rename to snuba/snuba_migrations/events_analytics_platform/0057_nest_downsample_tiers.py diff --git a/tests/migrations/test_eap_subset_sampling.py b/tests/migrations/test_eap_subset_sampling.py index 9265e966fbf..b21551780c5 100644 --- a/tests/migrations/test_eap_subset_sampling.py +++ b/tests/migrations/test_eap_subset_sampling.py @@ -1,5 +1,5 @@ """Unit tests for the EAP downsample materialized view introduced in -migration 0057_sample_downsample_tiers_by_trace. +migration 0057_nest_downsample_tiers. These check: - the new MV samples per item via a single, un-perturbed hash of `item_id` @@ -14,7 +14,7 @@ from importlib import import_module _migration = import_module( - "snuba.snuba_migrations.events_analytics_platform.0057_sample_downsample_tiers_by_trace" + "snuba.snuba_migrations.events_analytics_platform.0057_nest_downsample_tiers" ) From 187966a298f56850ba20c2c9a7e36d7eb8b13c67 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Tue, 16 Jun 2026 11:37:45 -0700 Subject: [PATCH 6/6] ref(eap): renumber nest-tiers migration to 0058 after merging master Master merged 0057_add_name_column_and_index (mv_5), which took both the 0057 slot and the mv_5 version this branch targeted. Resolve the collision: - Rename 0057_nest_downsample_tiers -> 0058_nest_downsample_tiers. - Bump the version swap from mv_4->mv_5 to mv_5->mv_6, since mv_5 now exists with a different definition on master. - Add the `indexed_name` column to the shared templates column list so the regenerated views keep projecting it (master's mv_5 added it); otherwise mv_6 would silently drop it from the downsampled read path. - Update the test import to the new module name. The only behavioral difference between master's mv_5 and the new mv_6 is the intended one: the un-perturbed `cityHash64(item_id)` sampling predicate that makes the downsample tiers nested subsets. The backwards op restores mv_5 with the perturbed hash, matching master. Co-Authored-By: Claude --- ...st_downsample_tiers.py => 0058_nest_downsample_tiers.py} | 6 +++++- .../snuba_migrations/events_analytics_platform/templates.py | 4 ++++ tests/migrations/test_eap_subset_sampling.py | 4 ++-- 3 files changed, 11 insertions(+), 3 deletions(-) rename snuba/snuba_migrations/events_analytics_platform/{0057_nest_downsample_tiers.py => 0058_nest_downsample_tiers.py} (89%) diff --git a/snuba/snuba_migrations/events_analytics_platform/0057_nest_downsample_tiers.py b/snuba/snuba_migrations/events_analytics_platform/0058_nest_downsample_tiers.py similarity index 89% rename from snuba/snuba_migrations/events_analytics_platform/0057_nest_downsample_tiers.py rename to snuba/snuba_migrations/events_analytics_platform/0058_nest_downsample_tiers.py index 94046ef7505..a8e627044a0 100644 --- a/snuba/snuba_migrations/events_analytics_platform/0057_nest_downsample_tiers.py +++ b/snuba/snuba_migrations/events_analytics_platform/0058_nest_downsample_tiers.py @@ -10,7 +10,11 @@ ) sampling_weights = SAMPLING_WEIGHTS -old_version = 4 +# master is at mv_5 (migration 0057_add_name_column_and_index added the +# `indexed_name` column to the downsample MVs), so this migration bumps 5 -> 6. +# The restored `old_version` (mv_5) keeps the perturbed hash, matching the +# definition currently on master. +old_version = 5 new_version = old_version + 1 columns = get_eap_items_columns() diff --git a/snuba/snuba_migrations/events_analytics_platform/templates.py b/snuba/snuba_migrations/events_analytics_platform/templates.py index 907ff98c684..ce53e82c102 100644 --- a/snuba/snuba_migrations/events_analytics_platform/templates.py +++ b/snuba/snuba_migrations/events_analytics_platform/templates.py @@ -61,6 +61,10 @@ def get_eap_items_columns( Column("sampling_weight", UInt(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))), Column("sampling_factor", Float(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))), Column("retention_days", UInt(16, modifiers=Modifiers(codecs=["T64", "ZSTD(1)"]))), + # Added to the downsample MVs by migration 0057_add_name_column_and_index + # (mv_5). Kept here so regenerated views keep projecting it; it is copied + # straight through (not in TRANSFORMED_COLUMNS). + Column("indexed_name", String(modifiers=Modifiers(codecs=["ZSTD(1)"]))), Column("attributes_bool", Map(String(), Bool())), Column("attributes_int", Map(String(), Int(64))), ] diff --git a/tests/migrations/test_eap_subset_sampling.py b/tests/migrations/test_eap_subset_sampling.py index b21551780c5..9f31af2a687 100644 --- a/tests/migrations/test_eap_subset_sampling.py +++ b/tests/migrations/test_eap_subset_sampling.py @@ -1,5 +1,5 @@ """Unit tests for the EAP downsample materialized view introduced in -migration 0057_nest_downsample_tiers. +migration 0058_nest_downsample_tiers. These check: - the new MV samples per item via a single, un-perturbed hash of `item_id` @@ -14,7 +14,7 @@ from importlib import import_module _migration = import_module( - "snuba.snuba_migrations.events_analytics_platform.0057_nest_downsample_tiers" + "snuba.snuba_migrations.events_analytics_platform.0058_nest_downsample_tiers" )