Skip to content

Commit 3070147

Browse files
committed
feat/add-pipeline-analytics-processor-for-batching-events
1 parent ccced7f commit 3070147

3 files changed

Lines changed: 366 additions & 1 deletion

File tree

flagsmith/analytics.py

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
import json
2+
import logging
3+
import threading
4+
import time
25
import typing
6+
from dataclasses import dataclass
37
from datetime import datetime
48

59
from requests_futures.sessions import FuturesSession # type: ignore
610

11+
from flagsmith.version import __version__
12+
13+
logger = logging.getLogger(__name__)
14+
715
ANALYTICS_ENDPOINT: typing.Final[str] = "analytics/flags/"
816

917
# Used to control how often we send data(in seconds)
@@ -60,3 +68,147 @@ def track_feature(self, feature_name: str) -> None:
6068
self.analytics_data[feature_name] = self.analytics_data.get(feature_name, 0) + 1
6169
if (datetime.now() - self._last_flushed).seconds > ANALYTICS_TIMER:
6270
self.flush()
71+
72+
73+
@dataclass
74+
class PipelineAnalyticsConfig:
75+
analytics_server_url: str
76+
max_buffer: int = 1000
77+
flush_interval_seconds: float = 10.0
78+
79+
80+
class PipelineAnalyticsProcessor:
81+
def __init__(
82+
self,
83+
config: PipelineAnalyticsConfig,
84+
environment_key: str,
85+
) -> None:
86+
url = config.analytics_server_url
87+
if not url.endswith("/"):
88+
url = f"{url}/"
89+
self._batch_endpoint = f"{url}v1/analytics/batch"
90+
self._environment_key = environment_key
91+
self._max_buffer = config.max_buffer
92+
self._flush_interval_seconds = config.flush_interval_seconds
93+
94+
self._buffer: typing.List[typing.Dict[str, typing.Any]] = []
95+
self._dedup_keys: typing.Dict[str, str] = {}
96+
self._lock = threading.Lock()
97+
self._timer: typing.Optional[threading.Timer] = None
98+
99+
def record_evaluation_event(
100+
self,
101+
flag_key: str,
102+
enabled: bool,
103+
value: typing.Any,
104+
identity_identifier: typing.Optional[str] = None,
105+
traits: typing.Optional[typing.Dict[str, typing.Any]] = None,
106+
) -> None:
107+
fingerprint = f"{identity_identifier or 'none'}|{enabled}|{value}"
108+
should_flush = False
109+
110+
with self._lock:
111+
if self._dedup_keys.get(flag_key) == fingerprint:
112+
return
113+
self._dedup_keys[flag_key] = fingerprint
114+
self._buffer.append(
115+
{
116+
"event_id": flag_key,
117+
"event_type": "flag_evaluation",
118+
"evaluated_at": int(time.time() * 1000),
119+
"identity_identifier": identity_identifier,
120+
"enabled": enabled,
121+
"value": value,
122+
"traits": dict(traits) if traits else None,
123+
"metadata": {"sdk_version": __version__},
124+
}
125+
)
126+
if len(self._buffer) >= self._max_buffer:
127+
should_flush = True
128+
129+
if should_flush:
130+
self.flush()
131+
132+
def record_custom_event(
133+
self,
134+
event_name: str,
135+
identity_identifier: typing.Optional[str] = None,
136+
traits: typing.Optional[typing.Dict[str, typing.Any]] = None,
137+
metadata: typing.Optional[typing.Dict[str, typing.Any]] = None,
138+
) -> None:
139+
should_flush = False
140+
141+
with self._lock:
142+
self._buffer.append(
143+
{
144+
"event_id": event_name,
145+
"event_type": "custom_event",
146+
"evaluated_at": int(time.time() * 1000),
147+
"identity_identifier": identity_identifier,
148+
"enabled": None,
149+
"value": None,
150+
"traits": dict(traits) if traits else None,
151+
"metadata": {**(metadata or {}), "sdk_version": __version__},
152+
}
153+
)
154+
if len(self._buffer) >= self._max_buffer:
155+
should_flush = True
156+
157+
if should_flush:
158+
self.flush()
159+
160+
def flush(self) -> None:
161+
with self._lock:
162+
if not self._buffer:
163+
return
164+
events = self._buffer
165+
self._buffer = []
166+
self._dedup_keys.clear()
167+
168+
payload = json.dumps(
169+
{"events": events, "environment_key": self._environment_key}
170+
)
171+
future = session.post(
172+
self._batch_endpoint,
173+
data=payload,
174+
timeout=3,
175+
headers={
176+
"Content-Type": "application/json; charset=utf-8",
177+
"X-Environment-Key": self._environment_key,
178+
"Flagsmith-SDK-User-Agent": f"flagsmith-python-client/{__version__}",
179+
},
180+
)
181+
future.add_done_callback(lambda f: self._handle_flush_result(f, events))
182+
183+
def _handle_flush_result(
184+
self,
185+
future: typing.Any,
186+
events: typing.List[typing.Dict[str, typing.Any]],
187+
) -> None:
188+
try:
189+
response = future.result()
190+
response.raise_for_status()
191+
except Exception:
192+
logger.warning("Failed to flush pipeline analytics, re-queuing events")
193+
with self._lock:
194+
self._buffer = events + self._buffer
195+
self._buffer = self._buffer[: self._max_buffer]
196+
197+
def start(self) -> None:
198+
self._schedule_flush()
199+
200+
def stop(self) -> None:
201+
if self._timer is not None:
202+
self._timer.cancel()
203+
self.flush()
204+
205+
def _schedule_flush(self) -> None:
206+
self._timer = threading.Timer(
207+
self._flush_interval_seconds, self._timer_flush
208+
)
209+
self._timer.daemon = True
210+
self._timer.start()
211+
212+
def _timer_flush(self) -> None:
213+
self.flush()
214+
self._schedule_flush()

tests/conftest.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111
from pytest_mock import MockerFixture
1212

1313
from flagsmith import Flagsmith
14-
from flagsmith.analytics import AnalyticsProcessor
14+
from flagsmith.analytics import (
15+
AnalyticsProcessor,
16+
PipelineAnalyticsConfig,
17+
PipelineAnalyticsProcessor,
18+
)
1519
from flagsmith.api.types import EnvironmentModel
1620
from flagsmith.mappers import map_environment_document_to_context
1721
from flagsmith.types import SDKEvaluationContext
@@ -26,6 +30,21 @@ def analytics_processor() -> AnalyticsProcessor:
2630
)
2731

2832

33+
@pytest.fixture()
34+
def pipeline_analytics_config() -> PipelineAnalyticsConfig:
35+
return PipelineAnalyticsConfig(analytics_server_url="http://test_analytics/")
36+
37+
38+
@pytest.fixture()
39+
def pipeline_analytics_processor(
40+
pipeline_analytics_config: PipelineAnalyticsConfig,
41+
) -> PipelineAnalyticsProcessor:
42+
return PipelineAnalyticsProcessor(
43+
config=pipeline_analytics_config,
44+
environment_key="test_key",
45+
)
46+
47+
2948
@pytest.fixture(scope="session")
3049
def api_key() -> str:
3150
return "".join(random.sample(string.ascii_letters, 20))

tests/test_pipeline_analytics.py

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
import json
2+
from concurrent.futures import Future
3+
from unittest import mock
4+
5+
import pytest
6+
7+
from flagsmith.analytics import (
8+
PipelineAnalyticsConfig,
9+
PipelineAnalyticsProcessor,
10+
)
11+
12+
13+
def test_record_evaluation_event_buffers_event(
14+
pipeline_analytics_processor: PipelineAnalyticsProcessor,
15+
) -> None:
16+
pipeline_analytics_processor.record_evaluation_event(
17+
flag_key="my_flag",
18+
enabled=True,
19+
value="variant_a",
20+
identity_identifier="user123",
21+
traits={"plan": "premium"},
22+
)
23+
24+
assert len(pipeline_analytics_processor._buffer) == 1
25+
event = pipeline_analytics_processor._buffer[0]
26+
assert event["event_id"] == "my_flag"
27+
assert event["event_type"] == "flag_evaluation"
28+
assert event["identity_identifier"] == "user123"
29+
assert event["enabled"] is True
30+
assert event["value"] == "variant_a"
31+
assert event["traits"] == {"plan": "premium"}
32+
assert "sdk_version" in event["metadata"]
33+
assert isinstance(event["evaluated_at"], int)
34+
35+
36+
@pytest.mark.parametrize(
37+
"second_enabled, expected_count",
38+
[
39+
(True, 1), # same fingerprint -> deduplicated
40+
(False, 2), # different fingerprint -> both kept
41+
],
42+
)
43+
def test_evaluation_event_deduplication(
44+
pipeline_analytics_processor: PipelineAnalyticsProcessor,
45+
second_enabled: bool,
46+
expected_count: int,
47+
) -> None:
48+
pipeline_analytics_processor.record_evaluation_event(
49+
flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1"
50+
)
51+
pipeline_analytics_processor.record_evaluation_event(
52+
flag_key="my_flag", enabled=second_enabled, value="v1", identity_identifier="user1"
53+
)
54+
55+
assert len(pipeline_analytics_processor._buffer) == expected_count
56+
57+
58+
def test_dedup_keys_cleared_after_flush(
59+
pipeline_analytics_processor: PipelineAnalyticsProcessor,
60+
) -> None:
61+
with mock.patch("flagsmith.analytics.session"):
62+
pipeline_analytics_processor.record_evaluation_event(
63+
flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1"
64+
)
65+
pipeline_analytics_processor.flush()
66+
67+
pipeline_analytics_processor.record_evaluation_event(
68+
flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1"
69+
)
70+
71+
assert len(pipeline_analytics_processor._buffer) == 1
72+
73+
74+
def test_auto_flush_on_buffer_full() -> None:
75+
config = PipelineAnalyticsConfig(
76+
analytics_server_url="http://test/", max_buffer=5
77+
)
78+
processor = PipelineAnalyticsProcessor(config=config, environment_key="key")
79+
80+
with mock.patch("flagsmith.analytics.session"):
81+
for i in range(5):
82+
processor.record_evaluation_event(
83+
flag_key=f"flag_{i}", enabled=True, value=str(i)
84+
)
85+
86+
assert len(processor._buffer) == 0
87+
88+
89+
def test_flush_sends_correct_http_request(
90+
pipeline_analytics_processor: PipelineAnalyticsProcessor,
91+
) -> None:
92+
with mock.patch("flagsmith.analytics.session") as mock_session:
93+
pipeline_analytics_processor.record_evaluation_event(
94+
flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1"
95+
)
96+
pipeline_analytics_processor.flush()
97+
98+
mock_session.post.assert_called_once()
99+
call_kwargs = mock_session.post.call_args
100+
assert call_kwargs[0][0] == "http://test_analytics/v1/analytics/batch"
101+
102+
headers = call_kwargs[1]["headers"]
103+
assert headers["X-Environment-Key"] == "test_key"
104+
assert headers["Content-Type"] == "application/json; charset=utf-8"
105+
assert "flagsmith-python-client/" in headers["Flagsmith-SDK-User-Agent"]
106+
107+
body = json.loads(call_kwargs[1]["data"])
108+
assert body["environment_key"] == "test_key"
109+
assert len(body["events"]) == 1
110+
assert body["events"][0]["event_id"] == "my_flag"
111+
112+
113+
def test_flush_noop_when_empty(
114+
pipeline_analytics_processor: PipelineAnalyticsProcessor,
115+
) -> None:
116+
with mock.patch("flagsmith.analytics.session") as mock_session:
117+
pipeline_analytics_processor.flush()
118+
119+
mock_session.post.assert_not_called()
120+
121+
122+
def test_failed_flush_requeues_events(
123+
pipeline_analytics_processor: PipelineAnalyticsProcessor,
124+
) -> None:
125+
future: Future[None] = Future()
126+
future.set_exception(Exception("connection error"))
127+
128+
with mock.patch("flagsmith.analytics.session") as mock_session:
129+
mock_session.post.return_value = future
130+
pipeline_analytics_processor.record_evaluation_event(
131+
flag_key="my_flag", enabled=True, value="v1"
132+
)
133+
pipeline_analytics_processor.flush()
134+
135+
assert len(pipeline_analytics_processor._buffer) == 1
136+
assert pipeline_analytics_processor._buffer[0]["event_id"] == "my_flag"
137+
138+
139+
@pytest.mark.parametrize(
140+
"url, expected_endpoint",
141+
[
142+
("http://example.com", "http://example.com/v1/analytics/batch"),
143+
("http://example.com/", "http://example.com/v1/analytics/batch"),
144+
],
145+
)
146+
def test_url_trailing_slash_handling(url: str, expected_endpoint: str) -> None:
147+
config = PipelineAnalyticsConfig(analytics_server_url=url)
148+
processor = PipelineAnalyticsProcessor(config=config, environment_key="key")
149+
assert processor._batch_endpoint == expected_endpoint
150+
151+
152+
def test_record_custom_event(
153+
pipeline_analytics_processor: PipelineAnalyticsProcessor,
154+
) -> None:
155+
pipeline_analytics_processor.record_custom_event(
156+
event_name="purchase",
157+
identity_identifier="user1",
158+
traits={"plan": "premium"},
159+
metadata={"amount": 99},
160+
)
161+
# Custom events are never deduplicated
162+
pipeline_analytics_processor.record_custom_event(
163+
event_name="purchase",
164+
identity_identifier="user1",
165+
)
166+
167+
assert len(pipeline_analytics_processor._buffer) == 2
168+
event = pipeline_analytics_processor._buffer[0]
169+
assert event["event_id"] == "purchase"
170+
assert event["event_type"] == "custom_event"
171+
assert event["enabled"] is None
172+
assert event["value"] is None
173+
assert event["traits"] == {"plan": "premium"}
174+
assert event["metadata"]["amount"] == 99
175+
assert "sdk_version" in event["metadata"]
176+
177+
178+
def test_start_stop_lifecycle() -> None:
179+
config = PipelineAnalyticsConfig(
180+
analytics_server_url="http://test/", flush_interval_seconds=100
181+
)
182+
processor = PipelineAnalyticsProcessor(config=config, environment_key="key")
183+
184+
processor.start()
185+
assert processor._timer is not None
186+
assert processor._timer.is_alive()
187+
188+
with mock.patch("flagsmith.analytics.session"):
189+
processor.record_evaluation_event(
190+
flag_key="my_flag", enabled=True, value="v1"
191+
)
192+
processor.stop()
193+
194+
assert len(processor._buffer) == 0

0 commit comments

Comments
 (0)