Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from typing import Sequence

from snuba.migrations import migration
from snuba.migrations.operations import SqlOperation
from snuba.snuba_migrations.events_analytics_platform.templates import (
SAMPLING_WEIGHTS,
downsample_mv_select,
get_eap_items_columns,
swap_downsample_materialized_views,
)

sampling_weights = SAMPLING_WEIGHTS
old_version = 4
new_version = old_version + 1

columns = get_eap_items_columns()


# Per-item sampling on `item_id`, but the hash is perturbed by the sampling
# weight (`item_id + weight`). Because each tier hashes a different value,
# the tiers pick independently and are not subsets of each other: an item
# can land in tier 8 but not tier 64.
def generate_old_materialized_view_expression(sampling_weight: int) -> str:
return downsample_mv_select(
columns,
sampling_weight,
where_predicate=f"cityHash64(item_id + {sampling_weight}) % {sampling_weight}",
)


# Per-item sampling on `item_id` with a single, un-perturbed hash. Sampling
# stays independent across items (so the extrapolation variance math keeps
# its Bernoulli-independence assumption), but because the sampling weights
# are 8 / 64 / 512 (each divides the next) and every tier hashes the same
# value, an item that satisfies `H % 512 == 0` also satisfies `H % 64 == 0`
# and `H % 8 == 0`. That makes the tiers strict subsets: tier 512 ⊆ tier 64
# ⊆ tier 8.
def generate_new_materialized_view_expression(sampling_weight: int) -> str:
return downsample_mv_select(
columns,
sampling_weight,
where_predicate=f"cityHash64(item_id) % {sampling_weight}",
)


class Migration(migration.ClickhouseNodeMigration):
blocking = False

def forwards_ops(self) -> Sequence[SqlOperation]:
return swap_downsample_materialized_views(
columns=columns,
create_version=new_version,
drop_version=old_version,
query_for_weight=generate_new_materialized_view_expression,
)

def backwards_ops(self) -> Sequence[SqlOperation]:
return swap_downsample_materialized_views(
columns=columns,
create_version=old_version,
drop_version=new_version,
query_for_weight=generate_old_materialized_view_expression,
)
168 changes: 168 additions & 0 deletions snuba/snuba_migrations/events_analytics_platform/templates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""Shared helpers for events_analytics_platform migrations.

Many EAP migrations regenerate the ``eap_items`` downsample materialized
views. Historically each migration copy-pasted the full column list, the
``SELECT`` expression, and the create/drop loop. This module centralises
those pieces so a new migration only has to describe what actually changes
(usually the sampling predicate and the version it bumps to).
"""

from typing import Callable, List, Sequence

from snuba.clickhouse.columns import Column, UInt
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import operations
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import OperationTarget, SqlOperation
from snuba.utils.schemas import UUID, Bool, DateTime, Float, Int, Map, String

EAP_STORAGE_SET_KEY = StorageSetKey.EVENTS_ANALYTICS_PLATFORM

#: Number of bucketed attribute map columns (``attributes_string_0`` ..) the
#: ``eap_items`` schema uses.
DEFAULT_NUM_ATTR_BUCKETS = 40

#: Sampling weights for the downsample tiers. Each weight divides the next,
#: which is what lets a trace-based hash keep the tiers as nested subsets.
SAMPLING_WEIGHTS = [8, 8**2, 8**3]

#: The source table the downsample materialized views read from.
EAP_ITEMS_LOCAL_TABLE = "eap_items_1_local"

#: Columns that are re-projected (transformed) by the downsample views rather
#: than copied straight through, so they are dropped from the passthrough
#: ``SELECT`` column list.
TRANSFORMED_COLUMNS = frozenset(
{
"sampling_weight",
"sampling_factor",
"retention_days",
"client_sample_rate",
"server_sample_rate",
}
)


def get_eap_items_columns(
num_attr_buckets: int = DEFAULT_NUM_ATTR_BUCKETS,
) -> List[Column[Modifiers]]:
"""Return a fresh ``eap_items`` column list.

A new list is returned on every call so callers can freely append or
tweak columns without mutating shared state.
"""
columns: List[Column[Modifiers]] = [
Column("organization_id", UInt(64)),
Column("project_id", UInt(64)),
Column("item_type", UInt(8)),
Column("timestamp", DateTime(Modifiers(codecs=["DoubleDelta", "ZSTD(1)"]))),
Column("trace_id", UUID()),
Column("item_id", UInt(128)),
Column("sampling_weight", UInt(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column("sampling_factor", Float(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column("retention_days", UInt(16, modifiers=Modifiers(codecs=["T64", "ZSTD(1)"]))),
Column("attributes_bool", Map(String(), Bool())),
Column("attributes_int", Map(String(), Int(64))),
]
columns.extend(
Column(
f"attributes_string_{i}",
Map(String(), String(), modifiers=Modifiers(codecs=["ZSTD(1)"])),
)
for i in range(num_attr_buckets)
)
columns.extend(
Column(
f"attributes_float_{i}",
Map(String(), Float(64), modifiers=Modifiers(codecs=["ZSTD(1)"])),
)
for i in range(num_attr_buckets)
)
return columns


def downsample_mv_select(
columns: Sequence[Column[Modifiers]],
sampling_weight: int,
*,
where_predicate: str,
source_table: str = EAP_ITEMS_LOCAL_TABLE,
include_sample_rates: bool = True,
) -> str:
"""Build the ``SELECT`` body of a downsample materialized view.

``columns`` is the destination column list; the transformed columns
(see :data:`TRANSFORMED_COLUMNS`) are projected explicitly and the rest
are copied through.

``where_predicate`` is the body of the sampling filter, e.g.
``f"cityHash64(reinterpretAsUInt128(trace_id)) % {sampling_weight}"``;
it is wrapped as ``WHERE ({where_predicate}) = 0``.

``include_sample_rates`` controls whether ``client_sample_rate`` and
``server_sample_rate`` are scaled in the projection (older views did not
carry them).
"""
passthrough = ", ".join(c.name for c in columns if c.name not in TRANSFORMED_COLUMNS)
select_terms = [
f"{passthrough},",
"downsampled_retention_days AS retention_days,",
f"sampling_weight * {sampling_weight} AS sampling_weight,",
f"sampling_factor / {sampling_weight} AS sampling_factor"
+ ("," if include_sample_rates else ""),
]
if include_sample_rates:
select_terms.extend(
[
f"client_sample_rate / {sampling_weight} AS client_sample_rate,",
f"server_sample_rate / {sampling_weight} AS server_sample_rate",
]
)
return " ".join(
[
"SELECT",
*select_terms,
f"FROM {source_table}",
f"WHERE ({where_predicate}) = 0",
]
)


def swap_downsample_materialized_views(
*,
columns: Sequence[Column[Modifiers]],
create_version: int,
drop_version: int,
query_for_weight: Callable[[int], str],
sampling_weights: Sequence[int] = SAMPLING_WEIGHTS,
storage_set: StorageSetKey = EAP_STORAGE_SET_KEY,
table_prefix: str = "eap_items_1_downsample",
) -> List[SqlOperation]:
"""Create one downsample materialized view per sampling weight and drop
the previous version.

Both ``forwards_ops`` and ``backwards_ops`` are this same operation with
the create/drop versions and the query swapped, so a migration can call
this for each direction.
"""
ops: List[SqlOperation] = []
for sampling_weight in sampling_weights:
view_base = f"{table_prefix}_{sampling_weight}"
ops.extend(
[
operations.CreateMaterializedView(
storage_set=storage_set,
view_name=f"{view_base}_mv_{create_version}",
columns=list(columns),
destination_table_name=f"{view_base}_local",
target=OperationTarget.LOCAL,
query=query_for_weight(sampling_weight),
),
operations.DropTable(
storage_set=storage_set,
table_name=f"{view_base}_mv_{drop_version}",
target=OperationTarget.LOCAL,
),
]
)
return ops
60 changes: 60 additions & 0 deletions tests/migrations/test_eap_subset_sampling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Unit tests for the EAP downsample materialized view introduced in
migration 0057_nest_downsample_tiers.

These check:
- the new MV samples per item via a single, un-perturbed hash of `item_id`
(so inclusion stays independent across items and the extrapolation
variance math keeps its Bernoulli-independence assumption), and
- because the sampling weights are 8 / 64 / 512 (each divides the next) and
every tier hashes the same value, any hash value that satisfies
`H % 512 == 0` also satisfies `H % 64 == 0` and `H % 8 == 0` — i.e.
tier 512 ⊆ tier 64 ⊆ tier 8.
"""

from importlib import import_module

_migration = import_module(
"snuba.snuba_migrations.events_analytics_platform.0057_nest_downsample_tiers"
)


def test_new_mv_samples_per_item_without_perturbation() -> None:
for sampling_weight in _migration.sampling_weights:
sql = _migration.generate_new_materialized_view_expression(sampling_weight)
# Hash `item_id` directly (no `+ weight` perturbation) so every tier
# hashes the same value and the tiers nest as subsets.
assert f"cityHash64(item_id) % {sampling_weight}" in sql, (
f"sampling weight {sampling_weight} should hash item_id unperturbed, got: {sql}"
)
# The perturbed per-tier form must be gone — otherwise the tiers are
# not subsets of each other.
assert f"item_id + {sampling_weight}" not in sql
# Sampling must stay per-item, not per-trace, to preserve the
# independence assumption in the extrapolation variance math.
# (`trace_id` still appears as a passed-through SELECT column, so we
# only assert it is not the hashed/sampled value.)
assert "reinterpretAsUInt128(trace_id)" not in sql
assert "cityHash64(trace_id)" not in sql


def test_subset_property_via_modular_arithmetic() -> None:
# The MV uses `WHERE cityHash64(item_id) % w = 0`.
# If H % 512 == 0 then H % 64 == 0 and H % 8 == 0, since 8 | 64 | 512.
# Verify the divisibility chain that the SQL relies on for the subset
# guarantee.
weights = sorted(_migration.sampling_weights)
for smaller, larger in zip(weights, weights[1:]):
assert larger % smaller == 0, (
f"sampling_weight {larger} must be a multiple of {smaller} "
"for tier subset property to hold"
)

# Spot-check with concrete hash values. Any H that lands in the tightest
# tier (largest weight) must also land in every looser tier.
largest = max(_migration.sampling_weights)
for k in range(10):
h = k * largest # H % largest == 0 by construction
for w in _migration.sampling_weights:
assert h % w == 0, (
f"hash {h} passes tier {largest} but not tier {w}; subset property violated"
)
Loading