diff --git a/api/app/settings/common.py b/api/app/settings/common.py index ba394155f34f..827f5eeb3e33 100644 --- a/api/app/settings/common.py +++ b/api/app/settings/common.py @@ -1477,6 +1477,10 @@ CLICKHOUSE_ENABLED = bool(CLICKHOUSE_URL or CLICKHOUSE_HOST) +SEGMENT_MEMBERSHIP_REFRESH_INTERVAL_HOURS = env.int( + "SEGMENT_MEMBERSHIP_REFRESH_INTERVAL_HOURS", default=6 +) + # Always installed: the router fences the `clickhouse` app's migrations off # the default Postgres database whether or not a CH alias is configured. DATABASE_ROUTERS.append("app.routers.ClickHouseRouter") diff --git a/api/segment_membership/admin.py b/api/segment_membership/admin.py new file mode 100644 index 000000000000..c1177582749c --- /dev/null +++ b/api/segment_membership/admin.py @@ -0,0 +1,24 @@ +from django.contrib import admin +from django.db.models import QuerySet +from django.http import HttpRequest + +from segment_membership.models import SegmentMembershipSeed +from segment_membership.tasks import seed_organisation_identities + + +@admin.register(SegmentMembershipSeed) +class SegmentMembershipSeedAdmin(admin.ModelAdmin[SegmentMembershipSeed]): + actions = ["force_reseed"] + list_display = ("organisation", "seeded_at") + readonly_fields = ("seeded_at",) + autocomplete_fields = ("organisation",) + + @admin.action(description="Force re-seed (clears the marker)") + def force_reseed( + self, + request: HttpRequest, + queryset: QuerySet[SegmentMembershipSeed], + ) -> None: + queryset.update(seeded_at=None) + for seed in queryset: + seed_organisation_identities.delay(args=(seed.organisation_id,)) diff --git a/api/segment_membership/mappers.py b/api/segment_membership/mappers.py index 7702f0566eee..52f4880e51fb 100644 --- a/api/segment_membership/mappers.py +++ b/api/segment_membership/mappers.py @@ -1,3 +1,4 @@ +from datetime import datetime from decimal import Decimal from flagsmith_schemas import dynamodb @@ -8,9 +9,9 @@ def map_identity_document_to_clickhouse_row( env_key: str, identity_doc: dynamodb.Identity, + inserted_at: datetime, ) -> ClickHouseIdentityRow: - """Project a Dynamo identity document onto an IDENTITIES row tuple - `(environment_id, identifier, identity_key, traits)`.""" + """Project a Dynamo identity document onto an IDENTITIES row tuple.""" identifier = identity_doc["identifier"] composite_key = identity_doc["composite_key"] raw_traits = identity_doc.get("identity_traits") @@ -20,6 +21,7 @@ def map_identity_document_to_clickhouse_row( identifier, composite_key, traits, + inserted_at, ) diff --git a/api/segment_membership/migrations/0002_segment_membership_seed.py b/api/segment_membership/migrations/0002_segment_membership_seed.py new file mode 100644 index 000000000000..1be421fc12bb --- /dev/null +++ b/api/segment_membership/migrations/0002_segment_membership_seed.py @@ -0,0 +1,38 @@ +# Generated by Django 5.2.15 on 2026-06-27 18:51 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("organisations", "0058_update_audit_and_history_limits_in_sub_cache"), + ("segment_membership", "0001_initial"), + ] + + operations = [ + migrations.CreateModel( + name="SegmentMembershipSeed", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("seeded_at", models.DateTimeField(null=True)), + ( + "organisation", + models.OneToOneField( + on_delete=django.db.models.deletion.CASCADE, + related_name="+", + to="organisations.organisation", + ), + ), + ], + ), + ] diff --git a/api/segment_membership/models.py b/api/segment_membership/models.py index a53fe3de0b72..a0eef2946d2f 100644 --- a/api/segment_membership/models.py +++ b/api/segment_membership/models.py @@ -1,6 +1,7 @@ from django.db import models from environments.models import Environment +from organisations.models import Organisation from segments.models import Segment @@ -27,3 +28,17 @@ class Meta: name="segment_membership_count_unique_segment_environment", ), ] + + +class SegmentMembershipSeed(models.Model): + """Tracks whether an organisation's existing identities have been mirrored + into ClickHouse. + + `seeded_at` is null while a backfill is outstanding.""" + + organisation = models.OneToOneField( + Organisation, + on_delete=models.CASCADE, + related_name="+", + ) + seeded_at = models.DateTimeField(null=True) diff --git a/api/segment_membership/tasks.py b/api/segment_membership/tasks.py index 704ef7782ab6..ab5deabd31c2 100644 --- a/api/segment_membership/tasks.py +++ b/api/segment_membership/tasks.py @@ -1,10 +1,3 @@ -"""Daily backfill of IDENTITIES from Dynamo to ClickHouse, then per-project -refresh of `SegmentMembershipCount` rows. Each backfill fans out the refresh -so the count read always sees the fresh snapshot. Both tasks short-circuit -when `CLICKHOUSE_ENABLED` is False or the org's `segment_membership_inspection` -flag is off. -""" - from datetime import timedelta from typing import cast @@ -16,8 +9,10 @@ register_recurring_task, register_task_handler, ) +from task_processor.models import Task from environments.dynamodb.wrappers.identity_wrapper import DynamoIdentityWrapper +from organisations.models import Organisation from projects.models import Project from segment_membership.mappers import map_identity_document_to_clickhouse_row from segment_membership.metrics import ( @@ -26,13 +21,15 @@ flagsmith_segment_membership_refresh_duration_seconds, flagsmith_segment_membership_refresh_failures_total, ) -from segment_membership.models import SegmentMembershipCount +from segment_membership.models import SegmentMembershipCount, SegmentMembershipSeed from segment_membership.services import ( compute_segment_counts_for_project, + enqueue_membership_refresh, get_projects_to_process, is_membership_enabled, open_clickhouse_cursor, ) +from segments.models import Segment from util.util import batched logger = structlog.get_logger("segment_membership") @@ -45,6 +42,7 @@ "identifier", "identity_key", "traits", + "inserted_at", ) _INSERT_IDENTITIES_SQL = ( @@ -52,30 +50,40 @@ ) -@register_recurring_task( - run_every=timedelta(days=1), +@register_task_handler( # 4h fits several large environments back-to-back at SaaS scale. timeout=timedelta(hours=4), ) -def backfill_identities_to_clickhouse() -> None: - """Insert each relevant environment's current Dynamo state into - IDENTITIES, dispatching one refresh per project as its backfill - completes so the refresh enqueue rate tracks the backfill rate - rather than spiking in one burst at the end. +def seed_organisation_identities(organisation_id: int) -> None: + """Mirror one organisation's current Dynamo identities into IDENTITIES, + dispatching a refresh per project as each completes. + + Rows are versioned at scan start via `inserted_at` + so writes arriving mid-scan win ReplacingMergeTree dedup over the seeded row. """ + log = logger.bind(organisation__id=organisation_id) if not settings.CLICKHOUSE_ENABLED: - logger.info("backfill.skipped", reason="clickhouse_not_configured") + log.info("seed.skipped", reason="clickhouse_not_configured") + return + + organisation = Organisation.objects.get(pk=organisation_id) + if not is_membership_enabled(organisation): + log.info("seed.skipped", reason="ff_disabled") return wrapper = DynamoIdentityWrapper() if not wrapper.is_enabled: - logger.info("backfill.skipped", reason="dynamo_disabled") + log.info("seed.skipped", reason="dynamo_disabled") return - for project in get_projects_to_process(): + scan_started_at = timezone.now() + project_ids = Segment.live_objects.filter( + project__organisation=organisation + ).values_list("project_id", flat=True) + for project in Project.objects.filter(id__in=project_ids).iterator(): log_comment = ( "flagsmith:segment_membership:backfill" - f":org_{project.organisation_id}" + f":org_{organisation_id}" f":project_{project.id}" ) with open_clickhouse_cursor(log_comment=log_comment) as cursor: @@ -90,7 +98,9 @@ def backfill_identities_to_clickhouse() -> None: ): rows = [ map_identity_document_to_clickhouse_row( - env_key, cast(DynamoIdentity, doc) + env_key, + cast(DynamoIdentity, doc), + scan_started_at, ) for doc in batch ] @@ -100,20 +110,78 @@ def backfill_identities_to_clickhouse() -> None: cursor.executemany(_INSERT_IDENTITIES_SQL, rows) # type: ignore[arg-type] row_count += len(rows) except Exception: - logger.exception( - "backfill.environment.failed", + log.exception( + "seed.environment.failed", project__id=project.id, environment__id=env.id, ) continue flagsmith_segment_membership_backfill_identities_total.inc(row_count) - logger.info( - "backfill.environment.completed", + log.info( + "seed.environment.completed", project__id=project.id, environment__id=env.id, rows__count=row_count, ) - refresh_project_segment_counts.delay(args=(project.id,)) + enqueue_membership_refresh(project) + + SegmentMembershipSeed.objects.update_or_create( + organisation=organisation, + defaults={"seeded_at": timezone.now()}, + ) + + +@register_recurring_task( + run_every=timedelta(hours=1), + timeout=timedelta(minutes=5), +) +def reconcile_segment_membership_seeds() -> None: + """Enqueue a backfill for each opted-in organisation that owns live + segments and hasn't been seeded yet, debouncing orgs whose seed is already + pending. + """ + if not settings.CLICKHOUSE_ENABLED: + return + + seeded_organisation_ids = set( + SegmentMembershipSeed.objects.filter(seeded_at__isnull=False).values_list( + "organisation_id", flat=True + ) + ) + organisation_ids = { + project.organisation_id for project in get_projects_to_process() + } - seeded_organisation_ids + + for organisation_id in organisation_ids: + if Task.objects.filter( + task_identifier=seed_organisation_identities.task_identifier, + completed=False, + num_failures__lt=3, + serialized_args=Task.serialize_data((organisation_id,)), + ).exists(): + continue + seed_organisation_identities.delay(args=(organisation_id,)) + + +@register_recurring_task( + run_every=timedelta(hours=settings.SEGMENT_MEMBERSHIP_REFRESH_INTERVAL_HOURS), + timeout=timedelta(minutes=10), +) +def refresh_all_segment_counts() -> None: + """Refresh counts for every project with a live segment on a slow cadence so + cached counts track identities ingested via CDC between segment edits. + `enqueue_membership_refresh` is the single flag + debounce gate. + """ + if not settings.CLICKHOUSE_ENABLED: + return + + project_ids = Segment.live_objects.values_list("project_id", flat=True) + for project in ( + Project.objects.filter(id__in=project_ids) + .select_related("organisation") + .iterator() + ): + enqueue_membership_refresh(project) @register_task_handler( diff --git a/api/segment_membership/types.py b/api/segment_membership/types.py index 51085acd19c5..8af9d71f6cde 100644 --- a/api/segment_membership/types.py +++ b/api/segment_membership/types.py @@ -1,7 +1,8 @@ +from datetime import datetime from typing import Any, TypedDict -# (environment_key, identifier, identity_key, traits) -ClickHouseIdentityRow = tuple[str, str, str, dict[str, object] | None] +# (environment_key, identifier, identity_key, traits, inserted_at) +ClickHouseIdentityRow = tuple[str, str, str, dict[str, object] | None, datetime] # (identifier, identity_key, traits) ClickHouseReadIdentityRow = tuple[str, str, dict[str, object] | None] diff --git a/api/tests/integration/segment_membership/test_segment_membership_clickhouse.py b/api/tests/integration/segment_membership/test_segment_membership_clickhouse.py index 8f61abc738aa..e067f9dc16cb 100644 --- a/api/tests/integration/segment_membership/test_segment_membership_clickhouse.py +++ b/api/tests/integration/segment_membership/test_segment_membership_clickhouse.py @@ -12,8 +12,8 @@ open_clickhouse_cursor, ) from segment_membership.tasks import ( - backfill_identities_to_clickhouse, refresh_project_segment_counts, + seed_organisation_identities, ) from tests.types import EnableFeaturesFixture @@ -65,7 +65,7 @@ def test_refresh_project_segment_counts__matching_identities__upserts_real_count @pytest.mark.clickhouse -def test_backfill_identities_to_clickhouse__happy_path__rows_land_in_clickhouse( +def test_seed_organisation_identities__happy_path__rows_land_in_clickhouse( clickhouse_db: None, settings: SettingsWrapper, mocker: MockerFixture, @@ -106,8 +106,8 @@ def test_backfill_identities_to_clickhouse__happy_path__rows_land_in_clickhouse( ) mocker.patch("segment_membership.tasks.DynamoIdentityWrapper", return_value=wrapper) - # When the backfill task runs end-to-end against real ClickHouse - backfill_identities_to_clickhouse() + # When the seed task runs end-to-end against real ClickHouse + seed_organisation_identities(Project.objects.get(pk=project).organisation_id) # Then both identities actually land in IDENTITIES, keyed by env api key with open_clickhouse_cursor() as cursor: @@ -121,7 +121,7 @@ def test_backfill_identities_to_clickhouse__happy_path__rows_land_in_clickhouse( # and the project's count refresh is dispatched refresh_dispatch.delay.assert_called_once_with(args=(project,)) assert any( - e["event"] == "backfill.environment.completed" and e["rows__count"] == 2 + e["event"] == "seed.environment.completed" and e["rows__count"] == 2 for e in log.events ) diff --git a/api/tests/unit/segment_membership/test_unit_segment_membership_admin.py b/api/tests/unit/segment_membership/test_unit_segment_membership_admin.py new file mode 100644 index 000000000000..9a04a0af4637 --- /dev/null +++ b/api/tests/unit/segment_membership/test_unit_segment_membership_admin.py @@ -0,0 +1,29 @@ +from django.contrib.admin.sites import AdminSite +from django.utils import timezone +from pytest_mock import MockerFixture + +from organisations.models import Organisation +from segment_membership.admin import SegmentMembershipSeedAdmin +from segment_membership.models import SegmentMembershipSeed + + +def test_segment_membership_seed_admin_force_reseed__queryset__clears_marker_and_enqueues_seed( + organisation: Organisation, + mocker: MockerFixture, +) -> None: + # Given + seed = SegmentMembershipSeed.objects.create( + organisation=organisation, seeded_at=timezone.now() + ) + seed_task = mocker.patch("segment_membership.admin.seed_organisation_identities") + admin = SegmentMembershipSeedAdmin(SegmentMembershipSeed, AdminSite()) + + # When + admin.force_reseed( + request=mocker.MagicMock(), queryset=SegmentMembershipSeed.objects.all() + ) + + # Then + seed.refresh_from_db() + assert seed.seeded_at is None + seed_task.delay.assert_called_once_with(args=(organisation.id,)) diff --git a/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py b/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py index bd3cf464a27e..01f671f5f661 100644 --- a/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py +++ b/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py @@ -1,11 +1,14 @@ +from datetime import datetime, timezone from decimal import Decimal import pytest from flagsmith_schemas.dynamodb import Identity as DynamoIdentity from segment_membership.mappers import map_identity_document_to_clickhouse_row +from segment_membership.types import ClickHouseIdentityRow UUID_A = "f47ac10b-58cc-4372-a567-0e02b2c3d479" +INSERTED_AT = datetime(2026, 5, 8, 12, 0, 0, tzinfo=timezone.utc) @pytest.mark.parametrize( @@ -22,7 +25,7 @@ {"trait_key": "plan", "trait_value": "growth"}, ], }, - ("env-key", "alice", "env_x_alice", {"plan": "growth"}), + ("env-key", "alice", "env_x_alice", {"plan": "growth"}, INSERTED_AT), id="single string trait", ), pytest.param( @@ -34,7 +37,7 @@ "created_date": "2026-05-08T00:00:00Z", "identity_traits": [], }, - ("env-key", "alice", "env_x_alice", None), + ("env-key", "alice", "env_x_alice", None, INSERTED_AT), id="empty traits collapse to NULL", ), pytest.param( @@ -48,7 +51,7 @@ {"trait_key": "age", "trait_value": Decimal("18")}, ], }, - ("env-key", "alice", "env_x_alice", {"age": 18}), + ("env-key", "alice", "env_x_alice", {"age": 18}, INSERTED_AT), id="whole-number Decimal narrows to int", ), pytest.param( @@ -62,7 +65,7 @@ {"trait_key": "score", "trait_value": Decimal("1.5")}, ], }, - ("env-key", "alice", "env_x_alice", {"score": 1.5}), + ("env-key", "alice", "env_x_alice", {"score": 1.5}, INSERTED_AT), id="fractional Decimal narrows to float", ), pytest.param( @@ -82,6 +85,7 @@ "alice", "env_x_alice", {"plan": "growth", "team": "alpha"}, + INSERTED_AT, ), id="multiple traits flatten to a single dict", ), @@ -89,9 +93,11 @@ ) def test_map_identity_document_to_clickhouse_row__cases__return_expected( doc: DynamoIdentity, - expected: tuple[str, str, str, dict[str, object] | None], + expected: ClickHouseIdentityRow, ) -> None: - # Given a Dynamo identity document - # When mapped onto an IDENTITIES row - # Then it lines up positionally with the IDENTITIES schema - assert map_identity_document_to_clickhouse_row("env-key", doc) == expected + # Given + # When + # Then + assert ( + map_identity_document_to_clickhouse_row("env-key", doc, INSERTED_AT) == expected + ) diff --git a/api/tests/unit/segment_membership/test_unit_segment_membership_tasks.py b/api/tests/unit/segment_membership/test_unit_segment_membership_tasks.py index a4cb467ce4cf..1589cb9ff361 100644 --- a/api/tests/unit/segment_membership/test_unit_segment_membership_tasks.py +++ b/api/tests/unit/segment_membership/test_unit_segment_membership_tasks.py @@ -1,25 +1,54 @@ +from datetime import datetime +from datetime import timezone as dt_timezone from unittest.mock import MagicMock +import pytest +from django.db import connections from django.utils import timezone +from mypy_boto3_dynamodb.service_resource import Table from pytest_django.fixtures import SettingsWrapper from pytest_mock import MockerFixture from pytest_structlog import StructuredLogCapture +from task_processor.models import Task +from task_processor.task_run_method import TaskRunMethod from environments.models import Environment from projects.models import Project from segment_membership import tasks -from segment_membership.models import SegmentMembershipCount +from segment_membership.models import SegmentMembershipCount, SegmentMembershipSeed from segment_membership.tasks import ( - backfill_identities_to_clickhouse, + reconcile_segment_membership_seeds, + refresh_all_segment_counts, refresh_project_segment_counts, + seed_organisation_identities, ) from segments.models import Segment from tests.types import EnableFeaturesFixture +SCAN_START = datetime(2026, 6, 1, 12, 0, 0, tzinfo=dt_timezone.utc) -def test_backfill_identities_to_clickhouse__no_clickhouse_creds__skips( + +@pytest.fixture +def dynamo_identities( + flagsmith_identities_table: Table, + environment: Environment, +) -> None: + for identifier, trait_value in (("alice", "bar"), ("carol", "baz")): + flagsmith_identities_table.put_item( + Item={ + "composite_key": f"{environment.api_key}_{identifier}", + "environment_api_key": environment.api_key, + "identifier": identifier, + "identity_uuid": f"f47ac10b-58cc-4372-a567-0e02b2c3d47{identifier[0]}", + "identity_traits": [{"trait_key": "foo", "trait_value": trait_value}], + } + ) + + +def test_seed_organisation_identities__no_clickhouse_creds__skips( mocker: MockerFixture, settings: SettingsWrapper, + project: Project, log: StructuredLogCapture, ) -> None: # Given @@ -27,18 +56,29 @@ def test_backfill_identities_to_clickhouse__no_clickhouse_creds__skips( spy = mocker.patch.object(tasks, "open_clickhouse_cursor") # When - backfill_identities_to_clickhouse() + seed_organisation_identities(project.organisation_id) # Then spy.assert_not_called() - assert any(e["event"] == "backfill.skipped" for e in log.events) + assert log.events == [ + { + "level": "info", + "event": "seed.skipped", + "organisation__id": project.organisation_id, + "reason": "clickhouse_not_configured", + } + ] -def test_backfill_identities_to_clickhouse__dynamo_disabled__skips( +def test_seed_organisation_identities__dynamo_disabled__skips( mocker: MockerFixture, settings: SettingsWrapper, + project: Project, + segment: Segment, + enable_features: EnableFeaturesFixture, ) -> None: # Given + enable_features("segment_membership_inspection") settings.CLICKHOUSE_ENABLED = True spy = mocker.patch.object(tasks, "open_clickhouse_cursor") mocker.patch.object( @@ -48,18 +88,40 @@ def test_backfill_identities_to_clickhouse__dynamo_disabled__skips( ) # When - backfill_identities_to_clickhouse() + seed_organisation_identities(project.organisation_id) + + # Then + spy.assert_not_called() + + +def test_seed_organisation_identities__flag_off__skips( + mocker: MockerFixture, + settings: SettingsWrapper, + project: Project, + segment: Segment, +) -> None: + # Given + settings.CLICKHOUSE_ENABLED = True + spy = mocker.patch.object(tasks, "open_clickhouse_cursor") + + # When + seed_organisation_identities(project.organisation_id) # Then spy.assert_not_called() + assert not SegmentMembershipSeed.objects.filter( + organisation=project.organisation, seeded_at__isnull=False + ).exists() -def test_backfill_identities_to_clickhouse__insert_fails__logs_and_continues( +def test_seed_organisation_identities__insert_fails__logs_and_continues( mocker: MockerFixture, settings: SettingsWrapper, project: Project, environment: Environment, segment: Segment, + flagsmith_identities_table: Table, + dynamo_identities: None, enable_features: EnableFeaturesFixture, log: StructuredLogCapture, ) -> None: @@ -70,55 +132,269 @@ def test_backfill_identities_to_clickhouse__insert_fails__logs_and_continues( cursor.executemany.side_effect = RuntimeError("boom") open_cursor = mocker.patch.object(tasks, "open_clickhouse_cursor") open_cursor.return_value.__enter__.return_value = cursor - wrapper = MagicMock(is_enabled=True) - wrapper.iter_all_items_paginated.return_value = iter( - [ - { - "identity_uuid": "f47ac10b-58cc-4372-a567-0e02b2c3d479", - "identifier": "a", - "composite_key": "k1", - "environment_api_key": environment.api_key, - "created_date": "2026-05-08T00:00:00Z", - "identity_traits": [], - } - ] - ) - mocker.patch.object(tasks, "DynamoIdentityWrapper", return_value=wrapper) + mocker.patch.object(tasks, "enqueue_membership_refresh") # When - backfill_identities_to_clickhouse() + seed_organisation_identities(project.organisation_id) # Then - assert any(e["event"] == "backfill.environment.failed" for e in log.events) + assert log.events == [ + { + "event": "seed.environment.failed", + "level": "error", + "exc_info": mocker.ANY, + "organisation__id": project.organisation_id, + "project__id": project.id, + "environment__id": environment.id, + } + ] + + +@pytest.mark.clickhouse +def test_seed_organisation_identities__matching_identities__inserts_rows_versioned_at_scan_start( + mocker: MockerFixture, + clickhouse_db: None, + settings: SettingsWrapper, + project: Project, + environment: Environment, + segment: Segment, + dynamo_identities: None, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features("segment_membership_inspection") + settings.CLICKHOUSE_ENABLED = True + mocker.patch("segment_membership.tasks.timezone.now", return_value=SCAN_START) + mocker.patch.object(tasks, "enqueue_membership_refresh") + # When + seed_organisation_identities(project.organisation_id) -def test_backfill_identities_to_clickhouse__multiple_projects__fans_out_refresh_per_project( + # Then + with connections["clickhouse"].cursor() as cursor: + cursor.execute( + "SELECT identifier, identity_key, traits, inserted_at " + "FROM IDENTITIES FINAL WHERE environment_id = %(key)s " + "ORDER BY identifier", + {"key": environment.api_key}, + ) + rows = cursor.fetchall() + assert rows == [ + ( + "alice", + f"{environment.api_key}_alice", + {"foo": "bar"}, + SCAN_START.replace(tzinfo=None), + ), + ( + "carol", + f"{environment.api_key}_carol", + {"foo": "baz"}, + SCAN_START.replace(tzinfo=None), + ), + ] + + +@pytest.mark.clickhouse +def test_seed_organisation_identities__success__marks_org_seeded( mocker: MockerFixture, + clickhouse_db: None, settings: SettingsWrapper, project: Project, - project_b: Project, segment: Segment, + flagsmith_identities_table: Table, enable_features: EnableFeaturesFixture, ) -> None: # Given enable_features("segment_membership_inspection") + settings.CLICKHOUSE_ENABLED = True + mocker.patch.object(tasks, "enqueue_membership_refresh") + + # When + seed_organisation_identities(project.organisation_id) + + # Then + assert SegmentMembershipSeed.objects.filter( + organisation=project.organisation, seeded_at__isnull=False + ).exists() + + +@pytest.mark.clickhouse +def test_seed_organisation_identities__success__fans_out_refresh_per_project( + mocker: MockerFixture, + clickhouse_db: None, + settings: SettingsWrapper, + project: Project, + segment: Segment, + flagsmith_identities_table: Table, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features("segment_membership_inspection") + project_b = Project.objects.create( + name="project-b", organisation=project.organisation + ) Segment.objects.create(name="seg-b", project=project_b) settings.CLICKHOUSE_ENABLED = True - cursor = MagicMock() - open_cursor = mocker.patch.object(tasks, "open_clickhouse_cursor") - open_cursor.return_value.__enter__.return_value = cursor - refresh_dispatch = mocker.patch.object(tasks, "refresh_project_segment_counts") - wrapper = MagicMock(is_enabled=True) - wrapper.iter_all_items_paginated.return_value = iter([]) - mocker.patch.object(tasks, "DynamoIdentityWrapper", return_value=wrapper) + enqueue = mocker.patch.object(tasks, "enqueue_membership_refresh") # When - backfill_identities_to_clickhouse() + seed_organisation_identities(project.organisation_id) # Then - dispatched_ids = { - call.kwargs["args"][0] for call in refresh_dispatch.delay.call_args_list - } + dispatched_ids = {call.args[0].id for call in enqueue.call_args_list} + assert dispatched_ids == {project.id, project_b.id} + + +def test_reconcile_segment_membership_seeds__no_clickhouse_creds__skips( + settings: SettingsWrapper, + project: Project, + segment: Segment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features("segment_membership_inspection") + settings.CLICKHOUSE_ENABLED = False + + # When + reconcile_segment_membership_seeds() + + # Then + assert not Task.objects.filter( + task_identifier=seed_organisation_identities.task_identifier, + serialized_args=Task.serialize_data((project.organisation_id,)), + ).exists() + + +def test_reconcile_segment_membership_seeds__flagged_unseeded_org__enqueues_seed( + settings: SettingsWrapper, + project: Project, + segment: Segment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features("segment_membership_inspection") + settings.CLICKHOUSE_ENABLED = True + settings.TASK_RUN_METHOD = TaskRunMethod.TASK_PROCESSOR + + # When + reconcile_segment_membership_seeds() + + # Then + assert ( + Task.objects.filter( + task_identifier=seed_organisation_identities.task_identifier, + completed=False, + serialized_args=Task.serialize_data((project.organisation_id,)), + ).count() + == 1 + ) + + +def test_reconcile_segment_membership_seeds__flag_off__does_not_enqueue( + settings: SettingsWrapper, + project: Project, + segment: Segment, +) -> None: + # Given + settings.CLICKHOUSE_ENABLED = True + + # When + reconcile_segment_membership_seeds() + + # Then + assert not Task.objects.filter( + task_identifier=seed_organisation_identities.task_identifier, + serialized_args=Task.serialize_data((project.organisation_id,)), + ).exists() + + +def test_reconcile_segment_membership_seeds__already_seeded__does_not_enqueue( + settings: SettingsWrapper, + project: Project, + segment: Segment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features("segment_membership_inspection") + settings.CLICKHOUSE_ENABLED = True + SegmentMembershipSeed.objects.create( + organisation=project.organisation, seeded_at=timezone.now() + ) + + # When + reconcile_segment_membership_seeds() + + # Then + assert not Task.objects.filter( + task_identifier=seed_organisation_identities.task_identifier, + serialized_args=Task.serialize_data((project.organisation_id,)), + ).exists() + + +def test_reconcile_segment_membership_seeds__seed_already_pending__does_not_enqueue( + settings: SettingsWrapper, + project: Project, + segment: Segment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given a seed for the org is already in flight (a large org still loading) + enable_features("segment_membership_inspection") + settings.CLICKHOUSE_ENABLED = True + settings.TASK_RUN_METHOD = TaskRunMethod.TASK_PROCESSOR + seed_organisation_identities.delay(args=(project.organisation_id,)) + + # When + reconcile_segment_membership_seeds() + + # Then + assert ( + Task.objects.filter( + task_identifier=seed_organisation_identities.task_identifier, + completed=False, + serialized_args=Task.serialize_data((project.organisation_id,)), + ).count() + == 1 + ) + + +def test_refresh_all_segment_counts__no_clickhouse_creds__skips( + mocker: MockerFixture, + settings: SettingsWrapper, + project: Project, + segment: Segment, +) -> None: + # Given + settings.CLICKHOUSE_ENABLED = False + enqueue = mocker.patch.object(tasks, "enqueue_membership_refresh") + + # When + refresh_all_segment_counts() + + # Then + enqueue.assert_not_called() + + +def test_refresh_all_segment_counts__live_segment_projects__delegates_to_enqueue( + mocker: MockerFixture, + settings: SettingsWrapper, + project: Project, + segment: Segment, +) -> None: + # Given two projects with live segments (flag + debounce gating is the + # helper's job, so refresh_all just delegates one call per project) + project_b = Project.objects.create( + name="project-b", organisation=project.organisation + ) + Segment.objects.create(name="seg-b", project=project_b) + settings.CLICKHOUSE_ENABLED = True + enqueue = mocker.patch.object(tasks, "enqueue_membership_refresh") + + # When + refresh_all_segment_counts() + + # Then + dispatched_ids = {call.args[0].id for call in enqueue.call_args_list} assert dispatched_ids == {project.id, project_b.id} @@ -210,15 +486,12 @@ def test_refresh_project_segment_counts__previously_matching_pair_drops_to_zero_ cursor = MagicMock() open_cursor = mocker.patch.object(tasks, "open_clickhouse_cursor") open_cursor.return_value.__enter__.return_value = cursor - # ... and a new compute that returns no matches for the same pair (the - # rule was edited, the identity set drifted, etc.). mocker.patch.object(tasks, "compute_segment_counts_for_project", return_value=[]) # When refresh_project_segment_counts(project.id) - # Then the stale row is gone -- pairs that no longer match drop out of - # the table entirely rather than lingering at the previous count. + # Then the stale row is gone -- pairs that no longer match drop out entirely assert not SegmentMembershipCount.objects.filter( segment=segment, environment=environment ).exists() @@ -243,8 +516,7 @@ def test_refresh_project_segment_counts__never_matched_pair__no_row_written( # When refresh_project_segment_counts(project.id) - # Then no row is written: refresh upserts matches, drops misses, and - # leaves never-matched pairs untouched. + # Then assert not SegmentMembershipCount.objects.filter( segment=segment, environment=environment ).exists() diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index df5302978896..b4328caa8a21 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -365,34 +365,6 @@ Logged at `warning` from: Attributes: -### `segment_membership.backfill.environment.completed` - -Logged at `info` from: - - `api/segment_membership/tasks.py:110` - -Attributes: - - `environment.id` - - `project.id` - - `rows.count` - -### `segment_membership.backfill.environment.failed` - -Logged at `exception` from: - - `api/segment_membership/tasks.py:103` - -Attributes: - - `environment.id` - - `project.id` - -### `segment_membership.backfill.skipped` - -Logged at `info` from: - - `api/segment_membership/tasks.py:67` - - `api/segment_membership/tasks.py:72` - -Attributes: - - `reason` - ### `segment_membership.compute.segment.skipped` Logged at `error` from: @@ -415,7 +387,7 @@ Attributes: ### `segment_membership.refresh.project.completed` Logged at `info` from: - - `api/segment_membership/tasks.py:185` + - `api/segment_membership/tasks.py:253` Attributes: - `membership_counts.count` @@ -425,7 +397,7 @@ Attributes: ### `segment_membership.refresh.project.failed` Logged at `exception` from: - - `api/segment_membership/tasks.py:158` + - `api/segment_membership/tasks.py:226` Attributes: - `project.id` @@ -433,13 +405,45 @@ Attributes: ### `segment_membership.refresh.project.skipped` Logged at `info` from: - - `api/segment_membership/tasks.py:129` - - `api/segment_membership/tasks.py:138` + - `api/segment_membership/tasks.py:197` + - `api/segment_membership/tasks.py:206` Attributes: - `project.id` - `reason` +### `segment_membership.seed.environment.completed` + +Logged at `info` from: + - `api/segment_membership/tasks.py:120` + +Attributes: + - `environment.id` + - `organisation.id` + - `project.id` + - `rows.count` + +### `segment_membership.seed.environment.failed` + +Logged at `exception` from: + - `api/segment_membership/tasks.py:113` + +Attributes: + - `environment.id` + - `organisation.id` + - `project.id` + +### `segment_membership.seed.skipped` + +Logged at `info` from: + - `api/segment_membership/tasks.py:66` + - `api/segment_membership/tasks.py:71` + - `api/segment_membership/tasks.py:76` + +Attributes: + - `organisation.id` + - `reason` + ### `segments.serializers.segment_revision_created` Logged at `info` from: