Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1477,6 +1477,10 @@

CLICKHOUSE_ENABLED = bool(CLICKHOUSE_URL or CLICKHOUSE_HOST)

SEGMENT_MEMBERSHIP_REFRESH_INTERVAL_HOURS = env.int(
"SEGMENT_MEMBERSHIP_REFRESH_INTERVAL_HOURS", default=6
)

# Always installed: the router fences the `clickhouse` app's migrations off
# the default Postgres database whether or not a CH alias is configured.
DATABASE_ROUTERS.append("app.routers.ClickHouseRouter")
Expand Down
24 changes: 24 additions & 0 deletions api/segment_membership/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from django.contrib import admin
from django.db.models import QuerySet
from django.http import HttpRequest

from segment_membership.models import SegmentMembershipSeed
from segment_membership.tasks import seed_organisation_identities


@admin.register(SegmentMembershipSeed)
class SegmentMembershipSeedAdmin(admin.ModelAdmin[SegmentMembershipSeed]):
actions = ["force_reseed"]
list_display = ("organisation", "seeded_at")
readonly_fields = ("seeded_at",)
autocomplete_fields = ("organisation",)

@admin.action(description="Force re-seed (clears the marker)")
def force_reseed(
self,
request: HttpRequest,
queryset: QuerySet[SegmentMembershipSeed],
) -> None:
queryset.update(seeded_at=None)
for seed in queryset:
seed_organisation_identities.delay(args=(seed.organisation_id,))
6 changes: 4 additions & 2 deletions api/segment_membership/mappers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from decimal import Decimal

from flagsmith_schemas import dynamodb
Expand All @@ -8,9 +9,9 @@
def map_identity_document_to_clickhouse_row(
env_key: str,
identity_doc: dynamodb.Identity,
inserted_at: datetime,
) -> ClickHouseIdentityRow:
"""Project a Dynamo identity document onto an IDENTITIES row tuple
`(environment_id, identifier, identity_key, traits)`."""
"""Project a Dynamo identity document onto an IDENTITIES row tuple."""
identifier = identity_doc["identifier"]
composite_key = identity_doc["composite_key"]
raw_traits = identity_doc.get("identity_traits")
Expand All @@ -20,6 +21,7 @@ def map_identity_document_to_clickhouse_row(
identifier,
composite_key,
traits,
inserted_at,
)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Generated by Django 5.2.15 on 2026-06-27 18:51

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("organisations", "0058_update_audit_and_history_limits_in_sub_cache"),
("segment_membership", "0001_initial"),
]

operations = [
migrations.CreateModel(
name="SegmentMembershipSeed",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("seeded_at", models.DateTimeField(null=True)),
(
"organisation",
models.OneToOneField(
on_delete=django.db.models.deletion.CASCADE,
related_name="+",
to="organisations.organisation",
),
),
],
),
]
15 changes: 15 additions & 0 deletions api/segment_membership/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from django.db import models

from environments.models import Environment
from organisations.models import Organisation
from segments.models import Segment


Expand All @@ -27,3 +28,17 @@ class Meta:
name="segment_membership_count_unique_segment_environment",
),
]


class SegmentMembershipSeed(models.Model):
"""Tracks whether an organisation's existing identities have been mirrored
into ClickHouse.

`seeded_at` is null while a backfill is outstanding."""

organisation = models.OneToOneField(
Organisation,
on_delete=models.CASCADE,
related_name="+",
)
seeded_at = models.DateTimeField(null=True)
118 changes: 93 additions & 25 deletions api/segment_membership/tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
"""Daily backfill of IDENTITIES from Dynamo to ClickHouse, then per-project
refresh of `SegmentMembershipCount` rows. Each backfill fans out the refresh
so the count read always sees the fresh snapshot. Both tasks short-circuit
when `CLICKHOUSE_ENABLED` is False or the org's `segment_membership_inspection`
flag is off.
"""

from datetime import timedelta
from typing import cast

Expand All @@ -16,8 +9,10 @@
register_recurring_task,
register_task_handler,
)
from task_processor.models import Task

from environments.dynamodb.wrappers.identity_wrapper import DynamoIdentityWrapper
from organisations.models import Organisation
from projects.models import Project
from segment_membership.mappers import map_identity_document_to_clickhouse_row
from segment_membership.metrics import (
Expand All @@ -26,13 +21,15 @@
flagsmith_segment_membership_refresh_duration_seconds,
flagsmith_segment_membership_refresh_failures_total,
)
from segment_membership.models import SegmentMembershipCount
from segment_membership.models import SegmentMembershipCount, SegmentMembershipSeed
from segment_membership.services import (
compute_segment_counts_for_project,
enqueue_membership_refresh,
get_projects_to_process,
is_membership_enabled,
open_clickhouse_cursor,
)
from segments.models import Segment
from util.util import batched

logger = structlog.get_logger("segment_membership")
Expand All @@ -45,37 +42,48 @@
"identifier",
"identity_key",
"traits",
"inserted_at",
)

_INSERT_IDENTITIES_SQL = (
f"INSERT INTO IDENTITIES ({', '.join(_IDENTITIES_COLUMN_NAMES)}) VALUES"
)


@register_recurring_task(
run_every=timedelta(days=1),
@register_task_handler(
# 4h fits several large environments back-to-back at SaaS scale.
timeout=timedelta(hours=4),
)
def backfill_identities_to_clickhouse() -> None:
"""Insert each relevant environment's current Dynamo state into
IDENTITIES, dispatching one refresh per project as its backfill
completes so the refresh enqueue rate tracks the backfill rate
rather than spiking in one burst at the end.
def seed_organisation_identities(organisation_id: int) -> None:
"""Mirror one organisation's current Dynamo identities into IDENTITIES,
dispatching a refresh per project as each completes.

Rows are versioned at scan start via `inserted_at`
so writes arriving mid-scan win ReplacingMergeTree dedup over the seeded row.
"""
log = logger.bind(organisation__id=organisation_id)
if not settings.CLICKHOUSE_ENABLED:
logger.info("backfill.skipped", reason="clickhouse_not_configured")
log.info("seed.skipped", reason="clickhouse_not_configured")
return

organisation = Organisation.objects.get(pk=organisation_id)
if not is_membership_enabled(organisation):
log.info("seed.skipped", reason="ff_disabled")
return

wrapper = DynamoIdentityWrapper()
if not wrapper.is_enabled:
logger.info("backfill.skipped", reason="dynamo_disabled")
log.info("seed.skipped", reason="dynamo_disabled")
return

for project in get_projects_to_process():
scan_started_at = timezone.now()
project_ids = Segment.live_objects.filter(
project__organisation=organisation
).values_list("project_id", flat=True)
for project in Project.objects.filter(id__in=project_ids).iterator():
log_comment = (
"flagsmith:segment_membership:backfill"
f":org_{project.organisation_id}"
f":org_{organisation_id}"
f":project_{project.id}"
)
with open_clickhouse_cursor(log_comment=log_comment) as cursor:
Expand All @@ -90,7 +98,9 @@ def backfill_identities_to_clickhouse() -> None:
):
rows = [
map_identity_document_to_clickhouse_row(
env_key, cast(DynamoIdentity, doc)
env_key,
cast(DynamoIdentity, doc),
scan_started_at,
)
for doc in batch
]
Expand All @@ -100,20 +110,78 @@ def backfill_identities_to_clickhouse() -> None:
cursor.executemany(_INSERT_IDENTITIES_SQL, rows) # type: ignore[arg-type]
row_count += len(rows)
except Exception:
logger.exception(
"backfill.environment.failed",
log.exception(
"seed.environment.failed",
project__id=project.id,
environment__id=env.id,
)
continue
flagsmith_segment_membership_backfill_identities_total.inc(row_count)
logger.info(
"backfill.environment.completed",
log.info(
"seed.environment.completed",
project__id=project.id,
environment__id=env.id,
rows__count=row_count,
)
refresh_project_segment_counts.delay(args=(project.id,))
enqueue_membership_refresh(project)

SegmentMembershipSeed.objects.update_or_create(
organisation=organisation,
defaults={"seeded_at": timezone.now()},
)


@register_recurring_task(
run_every=timedelta(hours=1),
timeout=timedelta(minutes=5),
)
def reconcile_segment_membership_seeds() -> None:
"""Enqueue a backfill for each opted-in organisation that owns live
segments and hasn't been seeded yet, debouncing orgs whose seed is already
pending.
"""
if not settings.CLICKHOUSE_ENABLED:
return

seeded_organisation_ids = set(
SegmentMembershipSeed.objects.filter(seeded_at__isnull=False).values_list(
"organisation_id", flat=True
)
)
organisation_ids = {
project.organisation_id for project in get_projects_to_process()
} - seeded_organisation_ids

for organisation_id in organisation_ids:
if Task.objects.filter(
task_identifier=seed_organisation_identities.task_identifier,
completed=False,
num_failures__lt=3,
serialized_args=Task.serialize_data((organisation_id,)),
).exists():
continue
seed_organisation_identities.delay(args=(organisation_id,))


@register_recurring_task(
run_every=timedelta(hours=settings.SEGMENT_MEMBERSHIP_REFRESH_INTERVAL_HOURS),
timeout=timedelta(minutes=10),
)
def refresh_all_segment_counts() -> None:
"""Refresh counts for every project with a live segment on a slow cadence so
cached counts track identities ingested via CDC between segment edits.
`enqueue_membership_refresh` is the single flag + debounce gate.
"""
if not settings.CLICKHOUSE_ENABLED:
return

project_ids = Segment.live_objects.values_list("project_id", flat=True)
for project in (
Project.objects.filter(id__in=project_ids)
.select_related("organisation")
.iterator()
):
enqueue_membership_refresh(project)


@register_task_handler(
Expand Down
5 changes: 3 additions & 2 deletions api/segment_membership/types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from datetime import datetime
from typing import Any, TypedDict

# (environment_key, identifier, identity_key, traits)
ClickHouseIdentityRow = tuple[str, str, str, dict[str, object] | None]
# (environment_key, identifier, identity_key, traits, inserted_at)
ClickHouseIdentityRow = tuple[str, str, str, dict[str, object] | None, datetime]

# (identifier, identity_key, traits)
ClickHouseReadIdentityRow = tuple[str, str, dict[str, object] | None]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
open_clickhouse_cursor,
)
from segment_membership.tasks import (
backfill_identities_to_clickhouse,
refresh_project_segment_counts,
seed_organisation_identities,
)
from tests.types import EnableFeaturesFixture

Expand Down Expand Up @@ -65,7 +65,7 @@ def test_refresh_project_segment_counts__matching_identities__upserts_real_count


@pytest.mark.clickhouse
def test_backfill_identities_to_clickhouse__happy_path__rows_land_in_clickhouse(
def test_seed_organisation_identities__happy_path__rows_land_in_clickhouse(
clickhouse_db: None,
settings: SettingsWrapper,
mocker: MockerFixture,
Expand Down Expand Up @@ -106,8 +106,8 @@ def test_backfill_identities_to_clickhouse__happy_path__rows_land_in_clickhouse(
)
mocker.patch("segment_membership.tasks.DynamoIdentityWrapper", return_value=wrapper)

# When the backfill task runs end-to-end against real ClickHouse
backfill_identities_to_clickhouse()
# When the seed task runs end-to-end against real ClickHouse
seed_organisation_identities(Project.objects.get(pk=project).organisation_id)

# Then both identities actually land in IDENTITIES, keyed by env api key
with open_clickhouse_cursor() as cursor:
Expand All @@ -121,7 +121,7 @@ def test_backfill_identities_to_clickhouse__happy_path__rows_land_in_clickhouse(
# and the project's count refresh is dispatched
refresh_dispatch.delay.assert_called_once_with(args=(project,))
assert any(
e["event"] == "backfill.environment.completed" and e["rows__count"] == 2
e["event"] == "seed.environment.completed" and e["rows__count"] == 2
for e in log.events
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from django.contrib.admin.sites import AdminSite
from django.utils import timezone
from pytest_mock import MockerFixture

from organisations.models import Organisation
from segment_membership.admin import SegmentMembershipSeedAdmin
from segment_membership.models import SegmentMembershipSeed


def test_segment_membership_seed_admin_force_reseed__queryset__clears_marker_and_enqueues_seed(
organisation: Organisation,
mocker: MockerFixture,
) -> None:
# Given
seed = SegmentMembershipSeed.objects.create(
organisation=organisation, seeded_at=timezone.now()
)
seed_task = mocker.patch("segment_membership.admin.seed_organisation_identities")
admin = SegmentMembershipSeedAdmin(SegmentMembershipSeed, AdminSite())

# When
admin.force_reseed(
request=mocker.MagicMock(), queryset=SegmentMembershipSeed.objects.all()
)

# Then
seed.refresh_from_db()
assert seed.seeded_at is None
seed_task.delay.assert_called_once_with(args=(organisation.id,))
Loading
Loading