Skip to content
50 changes: 50 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,56 @@ client.is_enabled("testFlag")

```

### Impact metrics
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!


Impact metrics are lightweight, application-level time-series metrics stored and visualized directly inside Unleash. They allow you to connect specific application data, such as request counts, error rates, or latency, to your feature flags and release plans.

These metrics help validate feature impact and automate release processes. For instance, you can monitor usage patterns or performance to determine if a feature meets its goals.

The SDK automatically attaches context labels to metrics: `appName` and `environment`.

#### Counters

Use counters for cumulative values that only increase (total requests, errors):

```python
client.impact_metrics.define_counter(
"request_count",
"Total number of HTTP requests processed"
)

client.impact_metrics.increment_counter("request_count")
```

#### Gauges

Use gauges for point-in-time values that can go up or down:

```python
client.impact_metrics.define_gauge(
"total_users",
"Total number of registered users"
)

client.impact_metrics.update_gauge("total_users", user_count)
```

#### Histograms

Histograms measure value distribution (request duration, response size):

```python
client.impact_metrics.define_histogram(
"request_time_ms",
"Time taken to process a request in milliseconds",
[50, 100, 200, 500, 1000]
)

client.impact_metrics.observe_histogram("request_time_ms", 125)
```

Impact metrics are batched and sent using the same interval as standard SDK metrics.

### Custom cache

By default, the Python SDK stores feature flags in an on-disk cache using fcache. If you need a different storage backend, for example, Redis, memory-only, or a custom database, you can provide your own cache implementation.
Expand Down
11 changes: 11 additions & 0 deletions UnleashClient/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
SDK_NAME,
SDK_VERSION,
)
from UnleashClient.environment_resolver import extract_environment_from_headers
from UnleashClient.events import (
BaseEvent,
UnleashEvent,
UnleashEventType,
UnleashReadyEvent,
)
from UnleashClient.impact_metrics import ImpactMetrics
from UnleashClient.periodic_tasks import (
aggregate_and_send_metrics,
)
Expand Down Expand Up @@ -206,6 +208,15 @@ def __init__(
self.metric_job: Job = None
self.engine = UnleashEngine()

impact_metrics_environment = self.unleash_environment
extracted_env = extract_environment_from_headers(self.unleash_custom_headers)
if extracted_env:
impact_metrics_environment = extracted_env

self.impact_metrics = ImpactMetrics(
self.engine, self.unleash_app_name, impact_metrics_environment
)

self.cache = cache or FileCache(
self.unleash_app_name, directory=cache_directory
)
Expand Down
26 changes: 26 additions & 0 deletions UnleashClient/environment_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Dict, Optional


def extract_environment_from_headers(
headers: Optional[Dict[str, str]],
) -> Optional[str]:
if not headers:
return None

auth_key = next(
(key for key in headers if key.lower() == "authorization"),
None,
)
if not auth_key:
return None

auth_value = headers.get(auth_key)
if not auth_value:
return None

_, sep, after_colon = auth_value.partition(":")
if not sep:
return None

environment, _, _ = after_colon.partition(".")
return environment or None
86 changes: 86 additions & 0 deletions UnleashClient/impact_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

from yggdrasil_engine.engine import UnleashEngine


@dataclass
class MetricFlagContext:
"""Context for resolving feature flag values as metric labels."""

flag_names: List[str] = field(default_factory=list)
context: Dict[str, Any] = field(default_factory=dict)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting some mixed messages from this field. The class is a context but so is this. Is it contexts all the way down?

Weirdly enough because of this, I have no context for what this actually represents

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is consistent with what we called it in Node SDK. This is our user context so that we can evaluate flag_names to enabled/disabled/variant_name with a user context. I agree it may be confusing here since in Node Context has a proper shape:

export interface Context {
  [key: string]: string | Date | undefined | number | Properties;
  currentTime?: Date;
  userId?: string;
  sessionId?: string;
  remoteAddress?: string;
  environment?: string;
  appName?: string;
  properties?: Properties;
}

Copy link
Collaborator Author

@kwasniew kwasniew Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this SDK uses plain dict for user context everywhere and I don't want to make a big change in this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! It's the actual context. Yep, makes sense, let's keep it the same. Context needs a proper type some day, today is not that day



class ImpactMetrics:
"""
Provides methods to define and record metrics (counters, gauges, histograms)
with optional feature flag context that gets resolved to labels.
"""

def __init__(self, engine: UnleashEngine, app_name: str, environment: str):
self._engine = engine
self._base_labels = {
"appName": app_name,
"environment": environment,
}

def define_counter(self, name: str, help_text: str) -> None:
self._engine.define_counter(name, help_text)

def increment_counter(
self,
name: str,
value: int = 1,
flag_context: Optional[MetricFlagContext] = None,
) -> None:
labels = self._resolve_labels(flag_context)
self._engine.inc_counter(name, value, labels)

def define_gauge(self, name: str, help_text: str) -> None:
self._engine.define_gauge(name, help_text)

def update_gauge(
self,
name: str,
value: float,
flag_context: Optional[MetricFlagContext] = None,
) -> None:
labels = self._resolve_labels(flag_context)
self._engine.set_gauge(name, value, labels)

def define_histogram(
self, name: str, help_text: str, buckets: Optional[List[float]] = None
) -> None:
self._engine.define_histogram(name, help_text, buckets)

def observe_histogram(
self,
name: str,
value: float,
flag_context: Optional[MetricFlagContext] = None,
) -> None:
labels = self._resolve_labels(flag_context)
self._engine.observe_histogram(name, value, labels)

def _variant_label(self, flag_name: str, context: Dict[str, Any]) -> str:
variant = self._engine.get_variant(flag_name, context)
if variant and variant.enabled:
return variant.name
if variant and variant.feature_enabled:
return "enabled"
return "disabled"

def _resolve_labels(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is wrong but it does look like "Java developer slipped, fell and landed in the Python runtime" to me. What you have is clear so I'm going to make some recommendations based on what I would do given my Python background and you can pick and choose some or none of this

I think the creation of the dict here is a bit unfortunate. We close over the environment and appName fields but literally only use them here to new this dict up. Smells like a way to work around Python's lack of destructuring. Good news, Python does have destructuring though!

This is also definitely begging to be a list comprehension with destructuring. That may or may not read okay to someone without a Python background but it reads fine to me

This is how I would have done this:

def __init__(self, engine: UnleashEngine, app_name: str, environment: str):
    self._engine = engine
    self._base_labels = {
        "appName": app_name,
        "environment": environment,
    }


def _variant_label(self, flag_name: str, ctx) -> str:
    variant = self._engine.get_variant(flag_name, ctx)
    if variant and variant.enabled:
        return variant.name
    if variant and variant.feature_enabled:
        return "enabled"
    return "disabled"


def _resolve_labels(
    self, flag_context: Optional[MetricFlagContext]
) -> Dict[str, str]:
    if not flag_context:
    	## Just a lil defensive copying so we don't leak mutable state
        return dict(self._base_labels)

    return {
        **self._base_labels,
        **{
            flag: self._variant_label(flag, flag_context.context)
            for flag in flag_context.flag_names
        },
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup I like your version more. thanks for pointing it towards more idiomatic python

self, flag_context: Optional[MetricFlagContext]
) -> Dict[str, str]:
if not flag_context:
return dict(self._base_labels)

return {
**self._base_labels,
**{
flag: self._variant_label(flag, flag_context.context)
for flag in flag_context.flag_names
},
}
17 changes: 15 additions & 2 deletions UnleashClient/periodic_tasks/send_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ def aggregate_and_send_metrics(
) -> None:
metrics_bucket = engine.get_metrics()

try:
impact_metrics = engine.collect_impact_metrics()
except Exception as exc:
LOGGER.warning("Failed to collect impact metrics: %s", exc)
impact_metrics = None

metrics_request = {
"appName": app_name,
"instanceId": instance_id,
Expand All @@ -31,7 +37,14 @@ def aggregate_and_send_metrics(
"specVersion": CLIENT_SPEC_VERSION,
}

if metrics_bucket:
send_metrics(url, metrics_request, headers, custom_options, request_timeout)
if impact_metrics:
metrics_request["impactMetrics"] = impact_metrics

if metrics_bucket or impact_metrics:
success = send_metrics(
url, metrics_request, headers, custom_options, request_timeout
)
if not success and impact_metrics:
engine.restore_impact_metrics(impact_metrics)
else:
LOGGER.debug("No feature flags with metrics, skipping metrics submission.")
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mmhash3
python-dateutil
requests
semver
yggdrasil-engine>=1.0.0
yggdrasil-engine>=1.2.1
launchdarkly-eventsource

# Development packages
Expand Down
35 changes: 35 additions & 0 deletions tests/unit_tests/test_environment_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from UnleashClient.environment_resolver import extract_environment_from_headers


def test_valid_headers():
custom_headers = {
"Authorization": "project:environment.hash",
"Content-Type": "application/json",
}

result = extract_environment_from_headers(custom_headers)
assert result == "environment"


def test_case_insensitive_header_keys():
custom_headers = {
"AUTHORIZATION": "project:environment.hash",
"Content-Type": "application/json",
}

result = extract_environment_from_headers(custom_headers)
assert result == "environment"


def test_authorization_header_not_present():
result = extract_environment_from_headers({})
assert result is None


def test_environment_part_is_empty():
custom_headers = {
"Authorization": "project:.hash",
}

result = extract_environment_from_headers(custom_headers)
assert result is None
Loading
Loading