diff --git a/snuba/snuba_migrations/events_analytics_platform/0058_nest_downsample_tiers.py b/snuba/snuba_migrations/events_analytics_platform/0058_nest_downsample_tiers.py new file mode 100644 index 00000000000..a8e627044a0 --- /dev/null +++ b/snuba/snuba_migrations/events_analytics_platform/0058_nest_downsample_tiers.py @@ -0,0 +1,67 @@ +from typing import Sequence + +from snuba.migrations import migration +from snuba.migrations.operations import SqlOperation +from snuba.snuba_migrations.events_analytics_platform.templates import ( + SAMPLING_WEIGHTS, + downsample_mv_select, + get_eap_items_columns, + swap_downsample_materialized_views, +) + +sampling_weights = SAMPLING_WEIGHTS +# master is at mv_5 (migration 0057_add_name_column_and_index added the +# `indexed_name` column to the downsample MVs), so this migration bumps 5 -> 6. +# The restored `old_version` (mv_5) keeps the perturbed hash, matching the +# definition currently on master. +old_version = 5 +new_version = old_version + 1 + +columns = get_eap_items_columns() + + +# Per-item sampling on `item_id`, but the hash is perturbed by the sampling +# weight (`item_id + weight`). Because each tier hashes a different value, +# the tiers pick independently and are not subsets of each other: an item +# can land in tier 8 but not tier 64. +def generate_old_materialized_view_expression(sampling_weight: int) -> str: + return downsample_mv_select( + columns, + sampling_weight, + where_predicate=f"cityHash64(item_id + {sampling_weight}) % {sampling_weight}", + ) + + +# Per-item sampling on `item_id` with a single, un-perturbed hash. Sampling +# stays independent across items (so the extrapolation variance math keeps +# its Bernoulli-independence assumption), but because the sampling weights +# are 8 / 64 / 512 (each divides the next) and every tier hashes the same +# value, an item that satisfies `H % 512 == 0` also satisfies `H % 64 == 0` +# and `H % 8 == 0`. That makes the tiers strict subsets: tier 512 ⊆ tier 64 +# ⊆ tier 8. +def generate_new_materialized_view_expression(sampling_weight: int) -> str: + return downsample_mv_select( + columns, + sampling_weight, + where_predicate=f"cityHash64(item_id) % {sampling_weight}", + ) + + +class Migration(migration.ClickhouseNodeMigration): + blocking = False + + def forwards_ops(self) -> Sequence[SqlOperation]: + return swap_downsample_materialized_views( + columns=columns, + create_version=new_version, + drop_version=old_version, + query_for_weight=generate_new_materialized_view_expression, + ) + + def backwards_ops(self) -> Sequence[SqlOperation]: + return swap_downsample_materialized_views( + columns=columns, + create_version=old_version, + drop_version=new_version, + query_for_weight=generate_old_materialized_view_expression, + ) diff --git a/snuba/snuba_migrations/events_analytics_platform/templates.py b/snuba/snuba_migrations/events_analytics_platform/templates.py new file mode 100644 index 00000000000..ce53e82c102 --- /dev/null +++ b/snuba/snuba_migrations/events_analytics_platform/templates.py @@ -0,0 +1,172 @@ +"""Shared helpers for events_analytics_platform migrations. + +Many EAP migrations regenerate the ``eap_items`` downsample materialized +views. Historically each migration copy-pasted the full column list, the +``SELECT`` expression, and the create/drop loop. This module centralises +those pieces so a new migration only has to describe what actually changes +(usually the sampling predicate and the version it bumps to). +""" + +from typing import Callable, List, Sequence + +from snuba.clickhouse.columns import Column, UInt +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import operations +from snuba.migrations.columns import MigrationModifiers as Modifiers +from snuba.migrations.operations import OperationTarget, SqlOperation +from snuba.utils.schemas import UUID, Bool, DateTime, Float, Int, Map, String + +EAP_STORAGE_SET_KEY = StorageSetKey.EVENTS_ANALYTICS_PLATFORM + +#: Number of bucketed attribute map columns (``attributes_string_0`` ..) the +#: ``eap_items`` schema uses. +DEFAULT_NUM_ATTR_BUCKETS = 40 + +#: Sampling weights for the downsample tiers. Each weight divides the next, +#: which is what lets a trace-based hash keep the tiers as nested subsets. +SAMPLING_WEIGHTS = [8, 8**2, 8**3] + +#: The source table the downsample materialized views read from. +EAP_ITEMS_LOCAL_TABLE = "eap_items_1_local" + +#: Columns that are re-projected (transformed) by the downsample views rather +#: than copied straight through, so they are dropped from the passthrough +#: ``SELECT`` column list. +TRANSFORMED_COLUMNS = frozenset( + { + "sampling_weight", + "sampling_factor", + "retention_days", + "client_sample_rate", + "server_sample_rate", + } +) + + +def get_eap_items_columns( + num_attr_buckets: int = DEFAULT_NUM_ATTR_BUCKETS, +) -> List[Column[Modifiers]]: + """Return a fresh ``eap_items`` column list. + + A new list is returned on every call so callers can freely append or + tweak columns without mutating shared state. + """ + columns: List[Column[Modifiers]] = [ + Column("organization_id", UInt(64)), + Column("project_id", UInt(64)), + Column("item_type", UInt(8)), + Column("timestamp", DateTime(Modifiers(codecs=["DoubleDelta", "ZSTD(1)"]))), + Column("trace_id", UUID()), + Column("item_id", UInt(128)), + Column("sampling_weight", UInt(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))), + Column("sampling_factor", Float(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))), + Column("retention_days", UInt(16, modifiers=Modifiers(codecs=["T64", "ZSTD(1)"]))), + # Added to the downsample MVs by migration 0057_add_name_column_and_index + # (mv_5). Kept here so regenerated views keep projecting it; it is copied + # straight through (not in TRANSFORMED_COLUMNS). + Column("indexed_name", String(modifiers=Modifiers(codecs=["ZSTD(1)"]))), + Column("attributes_bool", Map(String(), Bool())), + Column("attributes_int", Map(String(), Int(64))), + ] + columns.extend( + Column( + f"attributes_string_{i}", + Map(String(), String(), modifiers=Modifiers(codecs=["ZSTD(1)"])), + ) + for i in range(num_attr_buckets) + ) + columns.extend( + Column( + f"attributes_float_{i}", + Map(String(), Float(64), modifiers=Modifiers(codecs=["ZSTD(1)"])), + ) + for i in range(num_attr_buckets) + ) + return columns + + +def downsample_mv_select( + columns: Sequence[Column[Modifiers]], + sampling_weight: int, + *, + where_predicate: str, + source_table: str = EAP_ITEMS_LOCAL_TABLE, + include_sample_rates: bool = True, +) -> str: + """Build the ``SELECT`` body of a downsample materialized view. + + ``columns`` is the destination column list; the transformed columns + (see :data:`TRANSFORMED_COLUMNS`) are projected explicitly and the rest + are copied through. + + ``where_predicate`` is the body of the sampling filter, e.g. + ``f"cityHash64(reinterpretAsUInt128(trace_id)) % {sampling_weight}"``; + it is wrapped as ``WHERE ({where_predicate}) = 0``. + + ``include_sample_rates`` controls whether ``client_sample_rate`` and + ``server_sample_rate`` are scaled in the projection (older views did not + carry them). + """ + passthrough = ", ".join(c.name for c in columns if c.name not in TRANSFORMED_COLUMNS) + select_terms = [ + f"{passthrough},", + "downsampled_retention_days AS retention_days,", + f"sampling_weight * {sampling_weight} AS sampling_weight,", + f"sampling_factor / {sampling_weight} AS sampling_factor" + + ("," if include_sample_rates else ""), + ] + if include_sample_rates: + select_terms.extend( + [ + f"client_sample_rate / {sampling_weight} AS client_sample_rate,", + f"server_sample_rate / {sampling_weight} AS server_sample_rate", + ] + ) + return " ".join( + [ + "SELECT", + *select_terms, + f"FROM {source_table}", + f"WHERE ({where_predicate}) = 0", + ] + ) + + +def swap_downsample_materialized_views( + *, + columns: Sequence[Column[Modifiers]], + create_version: int, + drop_version: int, + query_for_weight: Callable[[int], str], + sampling_weights: Sequence[int] = SAMPLING_WEIGHTS, + storage_set: StorageSetKey = EAP_STORAGE_SET_KEY, + table_prefix: str = "eap_items_1_downsample", +) -> List[SqlOperation]: + """Create one downsample materialized view per sampling weight and drop + the previous version. + + Both ``forwards_ops`` and ``backwards_ops`` are this same operation with + the create/drop versions and the query swapped, so a migration can call + this for each direction. + """ + ops: List[SqlOperation] = [] + for sampling_weight in sampling_weights: + view_base = f"{table_prefix}_{sampling_weight}" + ops.extend( + [ + operations.CreateMaterializedView( + storage_set=storage_set, + view_name=f"{view_base}_mv_{create_version}", + columns=list(columns), + destination_table_name=f"{view_base}_local", + target=OperationTarget.LOCAL, + query=query_for_weight(sampling_weight), + ), + operations.DropTable( + storage_set=storage_set, + table_name=f"{view_base}_mv_{drop_version}", + target=OperationTarget.LOCAL, + ), + ] + ) + return ops diff --git a/tests/migrations/test_eap_subset_sampling.py b/tests/migrations/test_eap_subset_sampling.py new file mode 100644 index 00000000000..9f31af2a687 --- /dev/null +++ b/tests/migrations/test_eap_subset_sampling.py @@ -0,0 +1,60 @@ +"""Unit tests for the EAP downsample materialized view introduced in +migration 0058_nest_downsample_tiers. + +These check: +- the new MV samples per item via a single, un-perturbed hash of `item_id` + (so inclusion stays independent across items and the extrapolation + variance math keeps its Bernoulli-independence assumption), and +- because the sampling weights are 8 / 64 / 512 (each divides the next) and + every tier hashes the same value, any hash value that satisfies + `H % 512 == 0` also satisfies `H % 64 == 0` and `H % 8 == 0` — i.e. + tier 512 ⊆ tier 64 ⊆ tier 8. +""" + +from importlib import import_module + +_migration = import_module( + "snuba.snuba_migrations.events_analytics_platform.0058_nest_downsample_tiers" +) + + +def test_new_mv_samples_per_item_without_perturbation() -> None: + for sampling_weight in _migration.sampling_weights: + sql = _migration.generate_new_materialized_view_expression(sampling_weight) + # Hash `item_id` directly (no `+ weight` perturbation) so every tier + # hashes the same value and the tiers nest as subsets. + assert f"cityHash64(item_id) % {sampling_weight}" in sql, ( + f"sampling weight {sampling_weight} should hash item_id unperturbed, got: {sql}" + ) + # The perturbed per-tier form must be gone — otherwise the tiers are + # not subsets of each other. + assert f"item_id + {sampling_weight}" not in sql + # Sampling must stay per-item, not per-trace, to preserve the + # independence assumption in the extrapolation variance math. + # (`trace_id` still appears as a passed-through SELECT column, so we + # only assert it is not the hashed/sampled value.) + assert "reinterpretAsUInt128(trace_id)" not in sql + assert "cityHash64(trace_id)" not in sql + + +def test_subset_property_via_modular_arithmetic() -> None: + # The MV uses `WHERE cityHash64(item_id) % w = 0`. + # If H % 512 == 0 then H % 64 == 0 and H % 8 == 0, since 8 | 64 | 512. + # Verify the divisibility chain that the SQL relies on for the subset + # guarantee. + weights = sorted(_migration.sampling_weights) + for smaller, larger in zip(weights, weights[1:]): + assert larger % smaller == 0, ( + f"sampling_weight {larger} must be a multiple of {smaller} " + "for tier subset property to hold" + ) + + # Spot-check with concrete hash values. Any H that lands in the tightest + # tier (largest weight) must also land in every looser tier. + largest = max(_migration.sampling_weights) + for k in range(10): + h = k * largest # H % largest == 0 by construction + for w in _migration.sampling_weights: + assert h % w == 0, ( + f"hash {h} passes tier {largest} but not tier {w}; subset property violated" + )