|
1 | 1 | from typing import List |
2 | | -import yaml |
3 | 2 | from eval_protocol.data_loader.dynamic_data_loader import DynamicDataLoader |
4 | | -from eval_protocol.models import EvaluationRow, Message, EvaluateResult, MetricResult |
| 3 | +from eval_protocol.models import EvaluationRow, EvaluateResult, MetricResult |
5 | 4 | from eval_protocol.pytest import evaluation_test |
6 | | -from eval_protocol.pytest.remote_rollout_processor import RemoteRolloutProcessor |
| 5 | +from eval_protocol.pytest.remote_rollout_processor import RemoteRolloutProcessor, create_elasticsearch_config_from_env |
7 | 6 | from eval_protocol.pytest.tracing_utils import default_fireworks_output_data_loader |
8 | | -import json |
9 | | -from pathlib import Path |
10 | 7 |
|
11 | 8 |
|
12 | 9 | def rows_from_indices(count: int) -> List[EvaluationRow]: |
@@ -39,82 +36,55 @@ def rows() -> List[EvaluationRow]: |
39 | 36 | model_base_url="https://tracing.fireworks.ai", |
40 | 37 | timeout_seconds=1800, |
41 | 38 | output_data_loader=default_fireworks_output_data_loader, |
| 39 | + disable_elastic_search_setup=True, |
| 40 | + elastic_search_config=create_elasticsearch_config_from_env(), |
42 | 41 | ), |
43 | 42 | completion_params=[{"model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b"}], |
44 | 43 | max_concurrent_rollouts=3, |
45 | 44 | ) |
46 | 45 | async def test_swebench_remote(row: EvaluationRow) -> EvaluationRow: |
47 | | - """Evaluate SWE-bench instance by reading harness report or exit status.""" |
| 46 | + """Evaluate SWE-bench instance by reading results from Elasticsearch.""" |
| 47 | + import logging |
48 | 48 |
|
49 | | - # Get row_id |
50 | | - try: |
51 | | - row_id = str(row.input_metadata.row_id) |
52 | | - except Exception: |
53 | | - return row |
54 | | - |
55 | | - row_dir = Path.cwd() / f"row_{row_id}" |
56 | | - |
57 | | - # Find instance_id from preds.json |
58 | | - preds_path = row_dir / "preds.json" |
59 | | - instance_id = None |
60 | | - if preds_path.exists(): |
61 | | - try: |
62 | | - preds = json.loads(preds_path.read_text()) |
63 | | - instance_id = next(iter(preds.keys()), None) |
64 | | - except Exception: |
65 | | - pass |
| 49 | + logger = logging.getLogger(__name__) |
66 | 50 |
|
67 | | - if not instance_id: |
| 51 | + rollout_id = row.execution_metadata.rollout_id |
| 52 | + if not rollout_id: |
68 | 53 | return row |
69 | 54 |
|
70 | | - resolved: bool | None = None |
71 | | - reason_text: str | None = None |
| 55 | + # Query Elasticsearch for results logged by server |
| 56 | + try: |
| 57 | + from eval_protocol.log_utils.elasticsearch_client import ElasticsearchClient |
72 | 58 |
|
73 | | - # Get model from completion_params and convert to safe directory name (matching SWE-bench convention) |
74 | | - model_id = row.input_metadata.completion_params.get("model") if row.input_metadata.completion_params else None |
75 | | - if not model_id: |
76 | | - return row |
77 | | - safe_model = model_id.replace("/", "__").replace(":", "-") |
| 59 | + es_config = create_elasticsearch_config_from_env() |
| 60 | + es_client = ElasticsearchClient(es_config) |
78 | 61 |
|
79 | | - # Read from report.json (harness ran tests) |
80 | | - report_path = row_dir / "logs" / "run_evaluation" / "eval-run" / safe_model / instance_id / "report.json" |
81 | | - if report_path.exists(): |
82 | | - try: |
83 | | - report_data = json.loads(report_path.read_text()) |
84 | | - resolved = bool(report_data.get(instance_id, {}).get("resolved", False)) |
85 | | - reason_text = f"harness_resolved={resolved}" |
86 | | - except Exception: |
87 | | - pass |
| 62 | + # Search for results log from this rollout |
| 63 | + query = {"bool": {"must": [{"term": {"rollout_id.keyword": rollout_id}}, {"exists": {"field": "results"}}]}} |
88 | 64 |
|
89 | | - # If no report, check exit status YAML |
90 | | - if resolved is None: |
91 | | - exit_status_files = sorted(row_dir.glob("exit_statuses_*.yaml")) |
92 | | - if exit_status_files: |
93 | | - try: |
94 | | - status_doc = yaml.safe_load(exit_status_files[-1].read_text()) or {} |
95 | | - by_status = status_doc.get("instances_by_exit_status", {}) |
96 | | - for status_name, ids in by_status.items(): |
97 | | - if instance_id in (ids or []): |
98 | | - resolved = False |
99 | | - reason_text = f"exit_status={status_name}" |
100 | | - break |
101 | | - except Exception: |
102 | | - pass |
| 65 | + search_results = es_client.es.search(index=es_config.index_name, query=query, size=1) |
103 | 66 |
|
104 | | - # Attach result |
105 | | - if resolved is not None: |
106 | | - row.evaluation_result = EvaluateResult( |
107 | | - score=1.0 if resolved else 0.0, |
108 | | - reason=reason_text or f"resolved={resolved}", |
109 | | - is_score_valid=True, |
110 | | - metrics={ |
111 | | - "resolved": MetricResult( |
| 67 | + if search_results["hits"]["total"]["value"] > 0: |
| 68 | + hit = search_results["hits"]["hits"][0]["_source"] |
| 69 | + results_data = hit.get("results", {}) |
| 70 | + resolved = results_data.get("resolved") |
| 71 | + instance_id = results_data.get("instance_id") |
| 72 | + |
| 73 | + if resolved is not None: |
| 74 | + row.evaluation_result = EvaluateResult( |
112 | 75 | score=1.0 if resolved else 0.0, |
| 76 | + reason=f"instance={instance_id}, resolved={resolved}", |
113 | 77 | is_score_valid=True, |
114 | | - reason=reason_text or f"resolved={resolved}", |
115 | | - value=int(resolved), |
| 78 | + metrics={ |
| 79 | + "resolved": MetricResult( |
| 80 | + score=1.0 if resolved else 0.0, |
| 81 | + is_score_valid=True, |
| 82 | + reason=f"resolved={resolved}", |
| 83 | + value=int(resolved), |
| 84 | + ) |
| 85 | + }, |
116 | 86 | ) |
117 | | - }, |
118 | | - ) |
| 87 | + except Exception as e: |
| 88 | + logger.warning(f"Could not read results from Elasticsearch: {e}") |
119 | 89 |
|
120 | 90 | return row |
0 commit comments