Skip to content

feat: add EvaluationClient with run() for on-demand session evaluation#300

Open
jariy17 wants to merge 1 commit intomainfrom
feat/evaluation_client
Open

feat: add EvaluationClient with run() for on-demand session evaluation#300
jariy17 wants to merge 1 commit intomainfrom
feat/evaluation_client

Conversation

@jariy17
Copy link
Contributor

@jariy17 jariy17 commented Mar 6, 2026

Summary

  • Add EvaluationClient with run() method that collects spans from CloudWatch and calls the evaluate API with level-aware batching (SESSION/TRACE/TOOL_CALL)
  • Add internal _agent_span_collector package with CloudWatchAgentSpanCollector for span collection with retry/polling
  • Add optional query_string and end_time parameters to CloudWatchSpanHelper to support collector delegation

Details

  • run() accepts evaluator_ids, session_id, and agent_id or log_group_name
  • Auto-derives log group as /aws/bedrock-agentcore/runtimes/{agent_id}-DEFAULT
  • CloudWatch query filters by attributes.session.id + ispresent(scope.name)
  • Auto-batches evaluate requests (max 10 target IDs per request)
  • Caches evaluator level lookups via control plane
  • Operational logging at INFO/DEBUG levels for debugging

Test plan

  • Unit tests: python -m pytest tests/bedrock_agentcore/evaluation/test_client.py -v (35 tests)
  • Full evaluation suite: python -m pytest tests/bedrock_agentcore/evaluation/ -v (111 tests)
  • Manual integration test with real agent (see PR comment for test script)

@jariy17 jariy17 requested a review from a team March 6, 2026 22:02
@jariy17
Copy link
Contributor Author

jariy17 commented Mar 6, 2026

Manual Integration Test Script

Save as test_client_real.py at repo root and run with python test_client_real.py. Requires AWS credentials with access to the HealthcareAgent runtime and CloudWatch.

This test invokes 20 turns to trigger batching (>10 trace IDs), waits 180s for CW ingestion, then runs EvaluationClient.run().

"""Temporary real test for EvaluationClient.run() batching — delete after testing."""

import json
import logging
import time
import uuid

import boto3

from bedrock_agentcore.evaluation import EvaluationClient

logging.basicConfig(level=logging.DEBUG)
logging.getLogger("botocore").setLevel(logging.WARNING)
logging.getLogger("boto3").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)

AGENT_ARN = "arn:aws:bedrock-agentcore:us-west-2:363376058968:runtime/HealthcareAgent_HealthCareAgent-Pv2decFQqQ"
AGENT_ID = "HealthcareAgent_HealthCareAgent-Pv2decFQqQ"
REGION = "us-west-2"


def invoke_agent(session_id: str, prompt: str) -> str:
    dp_client = boto3.client("bedrock-agentcore", region_name=REGION)
    payload = json.dumps({"prompt": prompt}).encode()
    response = dp_client.invoke_agent_runtime(
        agentRuntimeArn=AGENT_ARN, runtimeSessionId=session_id, payload=payload,
    )
    raw_output = response["response"].read().decode("utf-8")
    text_parts = []
    for line in raw_output.splitlines():
        if line.startswith("data: "):
            chunk = line[len("data: "):]
            if chunk.startswith('"') and chunk.endswith('"'):
                chunk = json.loads(chunk)
            text_parts.append(chunk)
    return "".join(text_parts) if text_parts else raw_output


TURNS = [
    "What are the symptoms of the flu?",
    "How is the flu treated?",
    "When should I see a doctor for the flu?",
    "What causes high blood pressure?",
    "What are the symptoms of diabetes?",
    "How is type 2 diabetes diagnosed?",
    "What are common treatments for asthma?",
    "What causes migraines?",
    "How can I prevent heart disease?",
    "What are the side effects of ibuprofen?",
    "What is the difference between a cold and the flu?",
    "How does pneumonia spread?",
    "What vaccines do adults need?",
    "What are the early signs of arthritis?",
    "How is strep throat diagnosed?",
    "What causes kidney stones?",
    "How can I lower my cholesterol naturally?",
    "What are the symptoms of anemia?",
    "How is a urinary tract infection treated?",
    "What are the warning signs of a stroke?",
]


def main():
    session_id = f"test-batch-{uuid.uuid4()}"
    print(f"Session ID: {session_id}")
    print(f"Turns: {len(TURNS)}")

    for i, prompt in enumerate(TURNS):
        print(f"\n  Turn {i+1}/20: {prompt}")
        response = invoke_agent(session_id, prompt)
        print(f"  Response: {response[:150]}...")

    print(f"\n--- Waiting 180s for spans to land in CloudWatch ---")
    time.sleep(180)

    print(f"\n{'='*60}")
    print(f"Running EvaluationClient.run()")
    print(f"{'='*60}")
    client = EvaluationClient(region_name=REGION)
    results = client.run(
        evaluator_ids=["Builtin.Helpfulness"],
        session_id=session_id,
        agent_id=AGENT_ID,
    )

    print(f"\n--- Results ({len(results)} total) ---")
    for r in results:
        print(json.dumps(r, indent=4, default=str))


if __name__ == "__main__":
    main()

Expected output

  • 163 spans collected
  • Evaluator resolved to TRACE level
  • Split into 2 batched requests (20 trace IDs > max 10 per request)
  • 20 evaluation results, each scored ~0.83 ("Very Helpful")

EvaluationClient collects spans from CloudWatch and calls the evaluate
API with level-aware batching (SESSION/TRACE/TOOL_CALL). Accepts
evaluator_ids, session_id, and agent_id or log_group_name. Auto-derives
log group from agent_id, caches evaluator level lookups, and batches
evaluate requests at max 10 target IDs per request.
@jariy17 jariy17 force-pushed the feat/evaluation_client branch from e6b25d2 to 5615fb0 Compare March 6, 2026 22:29
@jariy17 jariy17 deployed to auto-approve March 6, 2026 22:30 — with GitHub Actions Active
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant