Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ A comprehensive framework for evaluating GenAI applications.
- **API Integration**: Direct integration with external API for real-time data generation (if enabled)
- **Setup/Cleanup Scripts**: Support for running setup and cleanup scripts before/after each conversation evaluation (applicable when API is enabled)
- **Token Usage Tracking**: Track input/output tokens for both API calls and Judge LLM evaluations
- **Streaming Performance Metrics**: Capture time-to-first-token (TTFT), streaming duration, and tokens/second when using streaming endpoint
- **Statistical Analysis**: Statistics for every metric with score distribution analysis
- **Rich Output**: CSV, JSON, TXT reports + visualization graphs (pass rates, distributions, heatmaps)
- **Flexible Configuration**: Configurable environment & metric metadata, Global defaults with per-conversation/per-turn metric overrides
Expand Down Expand Up @@ -355,6 +356,21 @@ export API_KEY="your-api-endpoint-key"
- **Actual Reasons**: Reason for evaluation status/result
- **Score Statistics**: Mean, median, standard deviation, min/max for every metric

### Streaming Performance Metrics

When using the streaming endpoint (`api.endpoint_type: streaming`), the framework captures additional performance metrics:

| Metric | Description |
|--------|-------------|
| `time_to_first_token` | Time in seconds from request start to first content token received |
| `streaming_duration` | Total time in seconds to receive all tokens |
| `tokens_per_second` | Output throughput (tokens generated per second, excluding TTFT) |

These metrics are included in:
- **CSV output**: Per-result columns for each metric
- **JSON output**: Per-result fields and aggregate statistics in `streaming_performance`
- **TXT output**: Aggregate statistics (mean, median, min/max) in the summary

## 🧪 Development

### Development Tools
Expand Down
4 changes: 4 additions & 0 deletions config/system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ output:
- "response"
- "api_input_tokens"
- "api_output_tokens"
# Streaming performance metrics (only populated when using streaming endpoint)
- "time_to_first_token" # Time to first token in seconds
- "streaming_duration" # Total streaming duration in seconds
- "tokens_per_second" # Output tokens per second throughput
- "judge_llm_input_tokens"
- "judge_llm_output_tokens"
- "tool_calls"
Comment thread
bsatapat-jpg marked this conversation as resolved.
Expand Down
181 changes: 138 additions & 43 deletions src/lightspeed_evaluation/core/api/streaming_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,170 @@

import json
import logging
import time
from typing import Any, Optional

import httpx
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr

logger = logging.getLogger(__name__)

DATA_PREFIX = "data: "
CONTENT_EVENTS = ("token", "tool_call", "turn_complete")


class _PerformanceTracker(BaseModel):
"""Tracks streaming performance metrics (TTFT, throughput)."""

model_config = ConfigDict(arbitrary_types_allowed=True)

stream_start_time: float = Field(default_factory=time.perf_counter)
time_to_first_token: Optional[float] = Field(default=None)
_first_content_received: bool = PrivateAttr(default=False)

def capture_ttft(self) -> None:
"""Capture time to first token if not already captured."""
if not self._first_content_received:
self.time_to_first_token = time.perf_counter() - self.stream_start_time
self._first_content_received = True
logger.debug("Time to first token: %.3f seconds", self.time_to_first_token)

def get_metrics(self, output_tokens: int) -> tuple[float, Optional[float]]:
"""Calculate streaming duration and tokens per second."""
streaming_duration = time.perf_counter() - self.stream_start_time
tokens_per_second = _calculate_tokens_per_second(
output_tokens, self.time_to_first_token, streaming_duration
)
return streaming_duration, tokens_per_second


class StreamingContext(BaseModel):
"""Context for streaming response parsing."""

model_config = ConfigDict(arbitrary_types_allowed=True)

conversation_id: str = Field(default="")
final_response: str = Field(default="")
tool_calls: list[dict[str, Any]] = Field(default_factory=list)
input_tokens: int = Field(default=0)
output_tokens: int = Field(default=0)
perf: _PerformanceTracker = Field(default_factory=_PerformanceTracker)

def to_response_dict(self) -> dict[str, Any]:
"""Convert context to response dictionary."""
streaming_duration, tokens_per_second = self.perf.get_metrics(
self.output_tokens
)
return {
"response": self.final_response,
"tool_calls": _format_tool_sequences(self.tool_calls),
"conversation_id": self.conversation_id,
"input_tokens": self.input_tokens,
"output_tokens": self.output_tokens,
"time_to_first_token": self.perf.time_to_first_token,
"streaming_duration": streaming_duration,
"tokens_per_second": tokens_per_second,
}


def _calculate_tokens_per_second(
output_tokens: int, ttft: Optional[float], total_duration: float
) -> Optional[float]:
"""Calculate tokens per second, excluding TTFT from throughput calculation."""
if output_tokens <= 0 or ttft is None:
return None
generation_time = total_duration - ttft
if generation_time <= 0:
return None
tokens_per_second = output_tokens / generation_time
logger.debug(
"Streaming performance: %.3f tokens/sec (%d tokens in %.3f sec)",
tokens_per_second,
output_tokens,
generation_time,
)
return tokens_per_second


def parse_streaming_response( # pylint: disable=too-many-branches
response: httpx.Response,
) -> dict[str, Any]:
"""Parse streaming response and extract data."""
conversation_id = ""
final_response = ""
tool_calls: list[dict[str, Any]] = []
input_tokens = 0
output_tokens = 0
"""Parse streaming response and extract data.

Captures performance metrics including:
- Time to First Token (TTFT): Time from request start to first content token
- Streaming duration: Total time to receive all tokens
- Tokens per second: Output throughput calculation

Args:
response: The httpx streaming response object to parse.

Returns:
Dictionary containing parsed response data with keys:
- response: Final response text
- conversation_id: Conversation tracking ID
- tool_calls: List of tool call sequences
- input_tokens: Number of input tokens used
- output_tokens: Number of output tokens generated
- time_to_first_token: TTFT in seconds (None if not captured)
- streaming_duration: Total streaming time in seconds
- tokens_per_second: Output throughput (None if not calculable)

Raises:
APIError: If an error event is received from the streaming API.
DataValidationError: If required response fields are missing.
"""
ctx = StreamingContext()

for line in response.iter_lines():
line = line.strip()
if not line or not line.startswith(DATA_PREFIX):
continue

json_data = line.replace(DATA_PREFIX, "") # Remove data prefix
parsed_data = _parse_sse_line(json_data)

parsed_data = _parse_sse_line(line.replace(DATA_PREFIX, ""))
if not parsed_data:
continue

event, event_data = parsed_data

if event == "error" and "token" in event_data:
error_message = event_data["token"]
logger.error("Received error event from streaming API: %s", error_message)
raise ValueError(f"Streaming API error: {error_message}")
if event == "start" and "conversation_id" in event_data:
conversation_id = event_data["conversation_id"].strip()
logger.debug("Found conversation_id: %s", conversation_id)
elif event == "turn_complete" and "token" in event_data:
final_response = event_data["token"].strip()
logger.debug("Found final response (%d characters)", len(final_response))
elif event == "tool_call" and "token" in event_data:
tool_call = _parse_tool_call(event_data["token"])
if tool_call:
tool_calls.append(tool_call)
logger.debug("Found tool call: %s", tool_call)
elif event == "end":
# Extract token counts from end event (provided by lightspeed-stack)
if "input_tokens" in event_data:
input_tokens = event_data["input_tokens"]
if "output_tokens" in event_data:
output_tokens = event_data["output_tokens"]

if not final_response:
if event in CONTENT_EVENTS:
ctx.perf.capture_ttft()

_process_event(ctx, event, event_data)

_validate_response(ctx)
return ctx.to_response_dict()
Comment thread
bsatapat-jpg marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.


def _process_event(ctx: StreamingContext, event: str, event_data: dict) -> None:
"""Process a single streaming event and update context."""
if event == "error" and "token" in event_data:
error_message = event_data["token"]
logger.error("Received error event from streaming API: %s", error_message)
raise ValueError(f"Streaming API error: {error_message}")
if event == "start" and "conversation_id" in event_data:
ctx.conversation_id = event_data["conversation_id"].strip()
logger.debug("Found conversation_id: %s", ctx.conversation_id)
elif event == "turn_complete" and "token" in event_data:
ctx.final_response = event_data["token"].strip()
logger.debug("Found final response (%d characters)", len(ctx.final_response))
elif event == "tool_call" and "token" in event_data:
tool_call = _parse_tool_call(event_data["token"])
if tool_call:
ctx.tool_calls.append(tool_call)
logger.debug("Found tool call: %s", tool_call)
elif event == "end":
ctx.input_tokens = event_data.get("input_tokens", 0)
ctx.output_tokens = event_data.get("output_tokens", 0)


def _validate_response(ctx: StreamingContext) -> None:
"""Validate that required response fields are present."""
if not ctx.final_response:
raise ValueError("No final response found in streaming output")
if not conversation_id:
if not ctx.conversation_id:
raise ValueError("No Conversation ID found")
Comment thread
bsatapat-jpg marked this conversation as resolved.

tool_sequences = _format_tool_sequences(tool_calls)

return {
"response": final_response,
"tool_calls": tool_sequences,
"conversation_id": conversation_id,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
}


def _parse_sse_line(json_data: str) -> Optional[tuple[str, dict[str, Any]]]:
"""Parse a SSE line and return event and data."""
Expand Down
4 changes: 4 additions & 0 deletions src/lightspeed_evaluation/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
"api_output_tokens",
"judge_llm_input_tokens",
"judge_llm_output_tokens",
# Streaming performance metrics
"time_to_first_token",
"streaming_duration",
"tokens_per_second",
"tool_calls",
"contexts",
"expected_response",
Expand Down
3 changes: 3 additions & 0 deletions src/lightspeed_evaluation/core/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
EvaluationScope,
TurnData,
)
from lightspeed_evaluation.core.models.mixins import StreamingMetricsMixin
from lightspeed_evaluation.core.models.system import (
APIConfig,
CoreConfig,
Expand Down Expand Up @@ -43,4 +44,6 @@
"APIRequest",
"APIResponse",
"AttachmentData",
# Mixins
"StreamingMetricsMixin",
]
12 changes: 11 additions & 1 deletion src/lightspeed_evaluation/core/models/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from pydantic import BaseModel, ConfigDict, Field

from lightspeed_evaluation.core.models.mixins import StreamingMetricsMixin


class RAGChunk(BaseModel):
"""RAG chunk data from lightspeed-stack API."""
Expand Down Expand Up @@ -77,7 +79,7 @@ def create(
)


class APIResponse(BaseModel):
class APIResponse(StreamingMetricsMixin):
"""API response model."""

model_config = ConfigDict(extra="forbid")
Expand Down Expand Up @@ -114,11 +116,19 @@ def from_raw_response(cls, raw_data: dict[str, Any]) -> "APIResponse":
input_tokens = raw_data.get("input_tokens", 0)
output_tokens = raw_data.get("output_tokens", 0)

# Extract streaming performance metrics (only available for streaming endpoint)
time_to_first_token = raw_data.get("time_to_first_token")
streaming_duration = raw_data.get("streaming_duration")
tokens_per_second = raw_data.get("tokens_per_second")

return cls(
response=raw_data["response"].strip(),
conversation_id=conversation_id,
tool_calls=tool_call_sequences,
contexts=contexts,
input_tokens=input_tokens,
output_tokens=output_tokens,
time_to_first_token=time_to_first_token,
streaming_duration=streaming_duration,
tokens_per_second=tokens_per_second,
)
6 changes: 4 additions & 2 deletions src/lightspeed_evaluation/core/models/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pydantic import BaseModel, ConfigDict, Field, field_validator

from lightspeed_evaluation.core.constants import SUPPORTED_RESULT_STATUSES
from lightspeed_evaluation.core.models.mixins import StreamingMetricsMixin

logger = logging.getLogger(__name__)

Expand All @@ -32,7 +33,7 @@ def _validate_and_deduplicate_metrics(
return deduplicated


class TurnData(BaseModel):
class TurnData(StreamingMetricsMixin):
"""Individual turn data within a conversation."""

model_config = ConfigDict(extra="forbid")
Expand Down Expand Up @@ -380,7 +381,7 @@ def validate_conversation_metrics(
return v


class EvaluationResult(BaseModel):
class EvaluationResult(StreamingMetricsMixin):
"""Single evaluation result."""

model_config = ConfigDict(extra="forbid")
Expand Down Expand Up @@ -422,6 +423,7 @@ class EvaluationResult(BaseModel):
judge_llm_output_tokens: int = Field(
default=0, ge=0, description="Judge LLM output tokens used"
)

tool_calls: Optional[str] = Field(
default=None, description="Actual tool calls formatted as string"
)
Expand Down
33 changes: 33 additions & 0 deletions src/lightspeed_evaluation/core/models/mixins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Shared model mixins for the evaluation framework."""

from typing import Optional

from pydantic import BaseModel, Field


class StreamingMetricsMixin(BaseModel):
"""Mixin providing streaming performance metric fields.

These fields capture performance metrics when using the streaming query endpoint:
- time_to_first_token: Time from request start to first content token
- streaming_duration: Total time to receive all tokens
- tokens_per_second: Output throughput calculation

These fields are None when using the non-streaming query endpoint.
"""

time_to_first_token: Optional[float] = Field(
default=None,
ge=0,
description="Time to first token in seconds (streaming only)",
)
streaming_duration: Optional[float] = Field(
default=None,
ge=0,
description="Total streaming duration in seconds (streaming only)",
)
tokens_per_second: Optional[float] = Field(
default=None,
ge=0,
description="Output tokens per second throughput (streaming only)",
)
Loading
Loading