88from requests .adapters import HTTPAdapter
99from urllib3 import Retry
1010
11- from flagsmith .analytics import AnalyticsProcessor
11+ from flagsmith .analytics import (
12+ AnalyticsProcessor ,
13+ PipelineAnalyticsConfig ,
14+ PipelineAnalyticsProcessor ,
15+ )
1216from flagsmith .exceptions import FlagsmithAPIError , FlagsmithClientError
1317from flagsmith .mappers import (
1418 map_context_and_identity_data_to_context ,
@@ -63,6 +67,7 @@ def __init__(
6367 environment_refresh_interval_seconds : typing .Union [int , float ] = 60 ,
6468 retries : typing .Optional [Retry ] = None ,
6569 enable_analytics : bool = False ,
70+ pipeline_analytics_config : typing .Optional [PipelineAnalyticsConfig ] = None ,
6671 default_flag_handler : typing .Optional [
6772 typing .Callable [[str ], DefaultFlag ]
6873 ] = None ,
@@ -108,6 +113,7 @@ def __init__(
108113 self .default_flag_handler = default_flag_handler
109114 self .enable_realtime_updates = enable_realtime_updates
110115 self ._analytics_processor : typing .Optional [AnalyticsProcessor ] = None
116+ self ._pipeline_analytics_processor : typing .Optional [PipelineAnalyticsProcessor ] = None
111117 self ._evaluation_context : typing .Optional [SDKEvaluationContext ] = None
112118 self ._environment_updated_at : typing .Optional [datetime ] = None
113119
@@ -175,6 +181,13 @@ def __init__(
175181 environment_key , self .api_url , timeout = self .request_timeout_seconds
176182 )
177183
184+ if pipeline_analytics_config :
185+ self ._pipeline_analytics_processor = PipelineAnalyticsProcessor (
186+ config = pipeline_analytics_config ,
187+ environment_key = environment_key ,
188+ )
189+ self ._pipeline_analytics_processor .start ()
190+
178191 def _initialise_local_evaluation (self ) -> None :
179192 # To ensure that the environment is set before allowing subsequent
180193 # method calls, update the environment manually.
@@ -290,6 +303,36 @@ def get_identity_segments(
290303
291304 return map_segment_results_to_identity_segments (evaluation_result ["segments" ])
292305
306+ def track_event (
307+ self ,
308+ event_name : str ,
309+ identity_identifier : typing .Optional [str ] = None ,
310+ traits : typing .Optional [TraitMapping ] = None ,
311+ metadata : typing .Optional [typing .Dict [str , typing .Any ]] = None ,
312+ ) -> None :
313+ if not self ._pipeline_analytics_processor :
314+ raise ValueError (
315+ "Pipeline analytics is not configured. "
316+ "Provide pipeline_analytics_config to use track_event."
317+ )
318+ self ._pipeline_analytics_processor .record_custom_event (
319+ event_name = event_name ,
320+ identity_identifier = identity_identifier ,
321+ traits = self ._resolve_traits (traits ),
322+ metadata = metadata ,
323+ )
324+
325+ @staticmethod
326+ def _resolve_traits (
327+ traits : typing .Optional [TraitMapping ],
328+ ) -> typing .Optional [typing .Dict [str , typing .Any ]]:
329+ if not traits :
330+ return None
331+ return {
332+ key : (val ["value" ] if isinstance (val , dict ) else val )
333+ for key , val in traits .items ()
334+ }
335+
293336 def update_environment (self ) -> None :
294337 try :
295338 environment_data = self ._get_json_response (
@@ -345,6 +388,7 @@ def _get_environment_flags_from_document(self) -> Flags:
345388 evaluation_result = evaluation_result ,
346389 analytics_processor = self ._analytics_processor ,
347390 default_flag_handler = self .default_flag_handler ,
391+ pipeline_analytics_processor = self ._pipeline_analytics_processor ,
348392 )
349393
350394 def _get_identity_flags_from_document (
@@ -368,6 +412,9 @@ def _get_identity_flags_from_document(
368412 evaluation_result = evaluation_result ,
369413 analytics_processor = self ._analytics_processor ,
370414 default_flag_handler = self .default_flag_handler ,
415+ pipeline_analytics_processor = self ._pipeline_analytics_processor ,
416+ identity_identifier = identifier ,
417+ traits = self ._resolve_traits (traits ),
371418 )
372419
373420 def _get_environment_flags_from_api (self ) -> Flags :
@@ -379,6 +426,7 @@ def _get_environment_flags_from_api(self) -> Flags:
379426 api_flags = json_response ,
380427 analytics_processor = self ._analytics_processor ,
381428 default_flag_handler = self .default_flag_handler ,
429+ pipeline_analytics_processor = self ._pipeline_analytics_processor ,
382430 )
383431 except FlagsmithAPIError :
384432 if self .offline_handler :
@@ -411,6 +459,9 @@ def _get_identity_flags_from_api(
411459 api_flags = json_response ["flags" ],
412460 analytics_processor = self ._analytics_processor ,
413461 default_flag_handler = self .default_flag_handler ,
462+ pipeline_analytics_processor = self ._pipeline_analytics_processor ,
463+ identity_identifier = identifier ,
464+ traits = self ._resolve_traits (traits ),
414465 )
415466 except FlagsmithAPIError :
416467 if self .offline_handler :
@@ -443,3 +494,6 @@ def __del__(self) -> None:
443494
444495 if hasattr (self , "event_stream_thread" ):
445496 self .event_stream_thread .stop ()
497+
498+ if hasattr (self , "_pipeline_analytics_processor" ) and self ._pipeline_analytics_processor :
499+ self ._pipeline_analytics_processor .stop ()
0 commit comments