Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions crates/flagd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ rpc = ["dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:hype
# REST evaluation mode - uses HTTP/OFREP to connect to flagd service
rest = ["dep:reqwest"]
# In-process evaluation mode - local evaluation with gRPC sync or file-based configuration
in-process = ["dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:datalogic-rs", "dep:murmurhash3", "dep:semver"]
in-process = ["dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:datalogic-rs", "dep:murmurhash3", "dep:semver", "dep:notify"]
# OpenTelemetry instrumentation - adds tracing spans and context propagation
otel = ["dep:opentelemetry", "dep:tracing-opentelemetry", "dep:pin-project-lite", "dep:http-body", "dep:http"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -38,36 +40,40 @@ tonic-prost-build = "0.14"

[dev-dependencies]
cucumber = "0.22"
tokio-stream = "0.1"
futures-core = "0.3"
testcontainers = { version = "0.26.0", features = ["http_wait", "blocking"] }
wiremock = "0.6.5"
tempfile = "3.23.0"
serial_test = "3.2"
test-log = { version = "0.2", features = ["trace"] }
fake-opentelemetry-collector = "0.33"
insta = { version = "1", features = ["redactions", "yaml"] }
assert2 = "0.3"
opentelemetry_sdk = { version = "0.31", features = ["rt-tokio", "testing"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
bytes = "1"


[dependencies]
open-feature = "0.2"
async-trait = "0.1"
tokio = { version = "1.48", features = ["full"] }
tokio = { version = "1.48", features = ["sync", "time", "fs", "rt", "rt-multi-thread", "macros", "net"] }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
lru = "0.16"
futures = "0.3"
tokio-stream = "0.1"
tracing = "0.1"
anyhow = "1.0.100"
thiserror = "2.0"

# RPC and In-Process shared dependencies (gRPC)
tonic = { version = "0.14", optional = true }
tonic = { version = "0.14", default-features = false, features = ["transport", "codegen"], optional = true }
tonic-prost = { version = "0.14", optional = true }
prost = { version = "0.14", optional = true }
prost-types = { version = "0.14", optional = true }

# RPC-specific dependencies
hyper-util = { version = "0.1", features = ["tokio"], optional = true }
tower = { version = "0.5", optional = true }
tower = { version = "0.5", default-features = false, features = ["util"], optional = true }

# REST-specific dependencies
reqwest = { version = "0.12", default-features = false, features = ["json", "stream", "rustls-tls"], optional = true }
Expand All @@ -76,3 +82,11 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "str
datalogic-rs = { version = "4.0.4", optional = true }
murmurhash3 = { version = "0.0.5", optional = true }
semver = { version = "1.0.27", optional = true }
notify = { version = "8.2", optional = true }

# OpenTelemetry instrumentation dependencies
opentelemetry = { version = "0.31", default-features = false, features = ["trace", "metrics"], optional = true }
tracing-opentelemetry = { version = "0.32", optional = true }
pin-project-lite = { version = "0.2", optional = true }
http-body = { version = "1", optional = true }
http = { version = "1", optional = true }
80 changes: 79 additions & 1 deletion crates/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ keeping the dependency footprint minimal. By default, all features are enabled.
| `rpc` | gRPC-based remote evaluation via flagd service | ✅ |
| `rest` | HTTP/OFREP-based remote evaluation | ✅ |
| `in-process` | Local evaluation with embedded engine (includes File mode) | ✅ |
| `otel` | OpenTelemetry instrumentation (tracing spans & context propagation) | ❌ |

#### Using Specific Features

Expand All @@ -62,6 +63,83 @@ open-feature-flagd = { version = "0.0.8", default-features = false, features = [

# RPC and REST (no local evaluation engine)
open-feature-flagd = { version = "0.0.8", default-features = false, features = ["rpc", "rest"] }

# With OpenTelemetry instrumentation
open-feature-flagd = { version = "0.0.8", features = ["otel"] }
```

#### OpenTelemetry Instrumentation

The `otel` feature enables OpenTelemetry instrumentation for distributed tracing. When enabled, the provider:

- Creates tracing spans for flag evaluations and gRPC/HTTP calls
- Propagates trace context across service boundaries via HTTP headers
- Records semantic attributes following OpenTelemetry conventions

To use OpenTelemetry, configure a tracer provider in your application and use `tracing-opentelemetry`:

```rust
use opentelemetry::global;
use opentelemetry_sdk::trace::TracerProvider;

// Configure your OpenTelemetry exporter (e.g., OTLP, Jaeger, Zipkin)
let provider = TracerProvider::builder()
.with_simple_exporter(/* your exporter */)
.build();
global::set_tracer_provider(provider);
```

The instrumentation integrates with the `tracing` crate, so traces will automatically
flow through your existing tracing infrastructure when using `tracing-opentelemetry`.

##### Emitted Spans by Evaluation Mode

**All Modes** emit a flag evaluation span:

| Span Name | Attributes |
|-----------|------------|
| `evaluate {flag_key}` | `feature_flag.key`, `feature_flag.provider_name`, `feature_flag.provider_version`, `feature_flag.variant`, `resolver_type`, `otel.status_code`, `error.type` |

**RPC Mode** additionally emits gRPC client spans:

| Span Name | Attributes |
|-----------|------------|
| `{service}/{method}` | `rpc.system`, `rpc.service`, `rpc.method`, `server.address`, `server.port`, `rpc.grpc.status_code`, `otel.status_code`, `error.type` |

**REST Mode** additionally emits HTTP client spans:

| Span Name | Attributes |
|-----------|------------|
| `{method} {url}` | `http.request.method`, `url.full`, `server.address`, `http.response.status_code`, `otel.status_code`, `error.type` |

**In-Process Mode** additionally emits gRPC client spans (for sync stream):

| Span Name | Attributes |
|-----------|------------|
| `{service}/{method}` | `rpc.system`, `rpc.service`, `rpc.method`, `server.address`, `server.port`, `rpc.grpc.status_code`, `otel.status_code`, `error.type` |

**File Mode** emits only the flag evaluation span (no network calls).

##### Emitted Metrics

The following OpenTelemetry metrics are emitted for all evaluation modes:

| Metric Name | Type | Description | Attributes |
|-------------|------|-------------|------------|
| `feature_flag.evaluation_total` | Counter | Total number of flag evaluations | `feature_flag.key`, `feature_flag.provider_name`, `feature_flag.provider_version`, `feature_flag.variant`, `feature_flag.reason`, `resolver_type` |
| `feature_flag.evaluation_duration` | Histogram | Duration of flag evaluations (seconds) | Same as above |
| `feature_flag.evaluation_error_total` | Counter | Total number of failed evaluations | `feature_flag.key`, `feature_flag.provider_name`, `feature_flag.provider_version`, `resolver_type`, `error.type` |

To use metrics, configure a meter provider in your application:

```rust
use opentelemetry::global;
use opentelemetry_sdk::metrics::SdkMeterProvider;

let provider = SdkMeterProvider::builder()
.with_reader(/* your reader/exporter */)
.build();
global::set_meter_provider(provider);
```

Then integrate it into your application:
Expand Down Expand Up @@ -199,7 +277,7 @@ Configurations can be provided as constructor options or via environment variabl
| TLS | FLAGD_TLS | boolean | false | RPC, In-Process |
| Socket Path | FLAGD_SOCKET_PATH | string | "" | RPC |
| Certificate Path | FLAGD_SERVER_CERT_PATH | string | "" | RPC, In-Process |
| Cache Type (LRU / In-Memory / Disabled) | FLAGD_CACHE | string ("lru", "mem", "disabled") | lru | RPC, In-Process, File |
| Cache Type (LRU / In-Memory / Disabled) | FLAGD_CACHE | string ("lru", "mem", "disabled") | In-Process: disabled, others: lru | RPC, In-Process, File |
| Cache TTL (Seconds) | FLAGD_CACHE_TTL | number | 60 | RPC, In-Process, File |
| Max Cache Size | FLAGD_MAX_CACHE_SIZE | number | 1000 | RPC, In-Process, File |
| Offline File Path | FLAGD_OFFLINE_FLAG_SOURCE_PATH | string | "" | File |
Expand Down
42 changes: 42 additions & 0 deletions crates/flagd/examples/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//! Common utilities for flagd examples

use std::time::Duration;
use testcontainers::{
ContainerAsync, GenericImage, ImageExt,
core::{ContainerPort, Mount, WaitFor, logs::LogSource, wait::LogWaitStrategy},
runners::AsyncRunner,
};

pub const FLAGD_SYNC_PORT: u16 = 8015;

/// Start a flagd container configured for in-process sync (port 8015)
pub async fn start_flagd_sync(
flags_path: &str,
flags_file: &str,
) -> Result<(ContainerAsync<GenericImage>, u16), Box<dyn std::error::Error>> {
// Use fsnotify provider for faster file change detection
let sources_config = format!(
r#"[{{"uri":"/flags/{}","provider":"fsnotify"}}]"#,
flags_file
);

let container = GenericImage::new("ghcr.io/open-feature/flagd", "latest")
.with_exposed_port(ContainerPort::Tcp(FLAGD_SYNC_PORT))
.with_wait_for(WaitFor::Log(LogWaitStrategy::new(
LogSource::StdErr,
"Flag IResolver listening at",
)))
.with_mount(Mount::bind_mount(flags_path.to_string(), "/flags"))
.with_cmd(["start", "--sources", &sources_config])
.start()
.await?;

let sync_port = container
.get_host_port_ipv4(ContainerPort::Tcp(FLAGD_SYNC_PORT))
.await?;

// Give flagd a moment to fully initialize
tokio::time::sleep(Duration::from_millis(500)).await;

Ok((container, sync_port))
}
14 changes: 14 additions & 0 deletions crates/flagd/examples/flags/basic-flags.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "https://flagd.dev/schema/v0/flags.json",
"flags": {
"basic-boolean": {
"state": "ENABLED",
"defaultVariant": "false",
"variants": {
"true": true,
"false": false
},
"targeting": {}
}
}
}
55 changes: 55 additions & 0 deletions crates/flagd/examples/in_process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//! In-process evaluation example using testcontainers with flagd
//!
//! This example demonstrates in-process flag evaluation by periodically
//! evaluating a boolean flag. Edit `examples/flags/basic-flags.json` while
//! running to see live flag updates.
//!
//! Run with: cargo run --example in_process --all-features
//!
//! Then edit basic-flags.json and change "defaultVariant": "false" to "true"
//! to see the flag value change in real-time.

mod common;

use common::start_flagd_sync;
use open_feature::EvaluationContext;
use open_feature::provider::FeatureProvider;
use open_feature_flagd::{FlagdOptions, FlagdProvider, ResolverType};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let manifest_dir = env!("CARGO_MANIFEST_DIR");
let flags_path = format!("{}/examples/flags", manifest_dir);

println!("Starting flagd container...");
let (_container, sync_port) = start_flagd_sync(&flags_path, "basic-flags.json").await?;
println!("flagd sync service available on port {}", sync_port);

// Configure the flagd provider for in-process evaluation
let provider = FlagdProvider::new(FlagdOptions {
host: "localhost".to_string(),
port: sync_port,
resolver_type: ResolverType::InProcess,
..Default::default()
})
.await
.expect("Failed to create provider");

let ctx = EvaluationContext::default();

println!("\nEvaluating 'basic-boolean' flag every 2 seconds...");
println!("Edit examples/flags/basic-flags.json to change the flag value.");
println!("Press Ctrl+C to stop.\n");

loop {
let result = provider
.resolve_bool_value("basic-boolean", &ctx)
.await
.expect("Failed to resolve flag");

println!("basic-boolean = {}", result.value);

tokio::time::sleep(Duration::from_secs(2)).await;
}
}
26 changes: 22 additions & 4 deletions crates/flagd/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use thiserror::Error;

/// Error type for flagd operations
#[derive(Error, Debug)]
pub enum FlagdError {
#[error("Provider error: {0}")]
Expand All @@ -8,9 +9,20 @@ pub enum FlagdError {
Connection(String),
#[error("Invalid configuration: {0}")]
Config(String),
#[error("Sync error: {0}")]
Sync(String),
#[error("Parse error: {0}")]
Parse(String),
#[error("Timeout: {0}")]
Timeout(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
#[error("Channel send error: {0}")]
Channel(String),
}

// Add implementations for error conversion
impl From<Box<dyn std::error::Error>> for FlagdError {
fn from(error: Box<dyn std::error::Error>) -> Self {
FlagdError::Provider(error.to_string())
Expand All @@ -23,8 +35,14 @@ impl From<Box<dyn std::error::Error + Send + Sync>> for FlagdError {
}
}

impl From<anyhow::Error> for FlagdError {
fn from(error: anyhow::Error) -> Self {
FlagdError::Provider(error.to_string())
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for FlagdError {
fn from(error: tokio::sync::mpsc::error::SendError<T>) -> Self {
FlagdError::Channel(error.to_string())
}
}

impl From<tokio::time::error::Elapsed> for FlagdError {
fn from(error: tokio::time::error::Elapsed) -> Self {
FlagdError::Timeout(error.to_string())
}
}
Loading