Skip to content

Commit 1fd66f7

Browse files
author
Dylan Huang
committed
Implement evaluator upload and status polling in create commands
- Added `upload_and_ensure_evaluator` function to handle evaluator uploads and ensure the latest version is ACTIVE. - Updated `create_evj_command` and `create_rft_command` to utilize the new upload function. - Removed redundant polling logic from `create_rft.py` and `create_evj.py`, centralizing it in the new utility function. - Adjusted tests to mock the new upload function correctly.
1 parent 17eb18f commit 1fd66f7

File tree

4 files changed

+174
-138
lines changed

4 files changed

+174
-138
lines changed

eval_protocol/cli_commands/create_evj.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
_ensure_account_id,
1515
_extract_terminal_segment,
1616
resolve_evaluator,
17+
upload_and_ensure_evaluator,
1718
validate_evaluator_locally,
1819
)
1920
from .create_rft import (
@@ -211,7 +212,19 @@ def create_evj_command(args) -> int:
211212
if not input_dataset_id or not input_dataset_resource:
212213
return 1
213214

214-
# 6) Create the Evaluation Job
215+
# 6) Ensure evaluator exists and its latest version is ACTIVE (upload + poll if needed)
216+
if not dry_run:
217+
if not upload_and_ensure_evaluator(
218+
project_root=project_root,
219+
evaluator_id=evaluator_id,
220+
api_key=api_key,
221+
api_base=api_base,
222+
selected_test_file_path=selected_test_file_path,
223+
selected_test_func_name=selected_test_func_name,
224+
):
225+
return 1
226+
227+
# 7) Create the Evaluation Job
215228
return _create_evj_job(
216229
account_id=account_id,
217230
api_key=api_key,

eval_protocol/cli_commands/create_rft.py

Lines changed: 5 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828
_ensure_account_id,
2929
_extract_terminal_segment,
3030
_normalize_evaluator_id,
31+
_poll_evaluator_version_status,
3132
_print_links,
3233
_resolve_selected_test,
3334
load_module_from_file_path,
3435
resolve_evaluator,
36+
upload_and_ensure_evaluator,
3537
validate_evaluator_locally,
3638
)
3739
from .local_test import run_evaluator_test
@@ -222,71 +224,6 @@ def _extract_jsonl_from_input_dataset(test_file_path: str, test_func_name: str)
222224
return None
223225

224226

225-
def _poll_evaluator_version_status(
226-
evaluator_id: str,
227-
version_id: str,
228-
api_key: str,
229-
api_base: str,
230-
timeout_minutes: int = 10,
231-
) -> bool:
232-
"""
233-
Poll a specific evaluator version status until it becomes ACTIVE or times out.
234-
235-
Uses the Fireworks SDK to get the specified version of the evaluator and checks
236-
its build state.
237-
238-
Args:
239-
evaluator_id: The evaluator ID (not full resource name)
240-
version_id: The specific version ID to poll
241-
api_key: Fireworks API key
242-
api_base: Fireworks API base URL
243-
timeout_minutes: Maximum time to wait in minutes
244-
245-
Returns:
246-
True if evaluator version becomes ACTIVE, False if timeout or BUILD_FAILED
247-
"""
248-
timeout_seconds = timeout_minutes * 60
249-
poll_interval = 10 # seconds
250-
start_time = time.time()
251-
252-
print(
253-
f"Polling evaluator version '{version_id}' status (timeout: {timeout_minutes}m, interval: {poll_interval}s)..."
254-
)
255-
256-
client = create_fireworks_client(api_key=api_key, base_url=api_base)
257-
258-
while time.time() - start_time < timeout_seconds:
259-
try:
260-
version = client.evaluator_versions.get(version_id, evaluator_id=evaluator_id)
261-
state = version.state or "STATE_UNSPECIFIED"
262-
status_msg = ""
263-
if version.status and version.status.message:
264-
status_msg = version.status.message
265-
266-
if state == "ACTIVE":
267-
print("✅ Evaluator version is ACTIVE and ready!")
268-
return True
269-
elif state == "BUILD_FAILED":
270-
print(f"❌ Evaluator version build failed. Status: {status_msg}")
271-
return False
272-
elif state == "BUILDING":
273-
elapsed_minutes = (time.time() - start_time) / 60
274-
print(f"⏳ Evaluator version is still building... ({elapsed_minutes:.1f}m elapsed)")
275-
else:
276-
print(f"⏳ Evaluator version state: {state}, status: {status_msg}")
277-
278-
except Exception as e:
279-
print(f"Warning: Failed to check evaluator version status: {e}")
280-
281-
# Wait before next poll
282-
time.sleep(poll_interval)
283-
284-
# Timeout reached
285-
elapsed_minutes = (time.time() - start_time) / 60
286-
print(f"⏰ Timeout after {elapsed_minutes:.1f}m - evaluator version is not yet ACTIVE")
287-
return False
288-
289-
290227
def _validate_dataset_jsonl(jsonl_path: str, sample_limit: int = 50) -> bool:
291228
"""Validate that a JSONL file contains rows compatible with EvaluationRow.
292229
@@ -503,71 +440,6 @@ def upload_dataset(
503440
return None, None
504441

505442

506-
def _upload_and_ensure_evaluator(
507-
project_root: str,
508-
evaluator_id: str,
509-
api_key: str,
510-
api_base: str,
511-
) -> bool:
512-
"""Upload evaluator and ensure its version becomes ACTIVE.
513-
514-
Creates/updates the evaluator and uploads the code, then polls the specific
515-
version until it becomes ACTIVE.
516-
"""
517-
from eval_protocol.evaluation import create_evaluation
518-
519-
try:
520-
tests = _discover_tests(project_root)
521-
selected_entry: Optional[str] = None
522-
st_path, st_func = _resolve_selected_test(project_root, evaluator_id, selected_tests=tests)
523-
if st_path and st_func:
524-
selected_entry = _build_entry_point(project_root, st_path, st_func)
525-
# If still unresolved and multiple tests exist, fail fast to avoid uploading unintended evaluators
526-
if selected_entry is None and len(tests) > 1:
527-
print(
528-
f"Error: Multiple evaluation tests found, and the selected evaluator {evaluator_id} does not match any discovered test.\n"
529-
" Please re-run specifying the evaluator.\n"
530-
" Hints:\n"
531-
" - eval-protocol create rft --evaluator <existing-evaluator-id>\n"
532-
)
533-
return False
534-
535-
print(f"\nUploading evaluator '{evaluator_id}'...")
536-
result, version_id = create_evaluation(
537-
evaluator_id=evaluator_id,
538-
display_name=evaluator_id,
539-
description=f"Evaluator for {evaluator_id}",
540-
entry_point=selected_entry,
541-
)
542-
543-
if not version_id:
544-
print("Warning: Evaluator created but version upload failed.")
545-
return False
546-
547-
print(f"✓ Uploaded evaluator: {evaluator_id} (version: {version_id})")
548-
549-
# Poll for the specific evaluator version status
550-
print(f"Waiting for evaluator '{evaluator_id}' version '{version_id}' to become ACTIVE...")
551-
is_active = _poll_evaluator_version_status(
552-
evaluator_id=evaluator_id,
553-
version_id=version_id,
554-
api_key=api_key,
555-
api_base=api_base,
556-
timeout_minutes=10,
557-
)
558-
559-
if not is_active:
560-
dashboard_url = _build_evaluator_dashboard_url(evaluator_id)
561-
print("\n❌ Evaluator version is not ready within the timeout period.")
562-
print(f"📊 Please check the evaluator status at: {dashboard_url}")
563-
print(" Wait for it to become ACTIVE, then run 'eval-protocol create rft' again.")
564-
return False
565-
return True
566-
except Exception as e:
567-
print(f"Warning: Failed to upload evaluator automatically: {e}")
568-
return False
569-
570-
571443
def _create_rft_job(
572444
account_id: str,
573445
api_key: str,
@@ -720,11 +592,13 @@ def create_rft_command(args) -> int:
720592
return 1
721593

722594
# 5) Ensure evaluator exists and its latest version is ACTIVE (upload + poll if needed)
723-
if not _upload_and_ensure_evaluator(
595+
if not upload_and_ensure_evaluator(
724596
project_root=project_root,
725597
evaluator_id=evaluator_id,
726598
api_key=api_key,
727599
api_base=api_base,
600+
selected_test_file_path=selected_test_file_path,
601+
selected_test_func_name=selected_test_func_name,
728602
):
729603
return 1
730604

eval_protocol/cli_commands/utils.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
get_fireworks_api_key,
2424
verify_api_key_and_get_account_id,
2525
)
26+
from ..fireworks_client import create_fireworks_client
2627
from ..fireworks_rft import _map_api_host_to_app_host
2728

2829

@@ -854,3 +855,151 @@ def resolve_evaluator(
854855
evaluator_resource_name = f"accounts/{account_id}/evaluators/{evaluator_id}"
855856

856857
return evaluator_id, evaluator_resource_name, selected_test_file_path, selected_test_func_name
858+
859+
860+
def _poll_evaluator_version_status(
861+
evaluator_id: str,
862+
version_id: str,
863+
api_key: str,
864+
api_base: str,
865+
timeout_minutes: int = 10,
866+
) -> bool:
867+
"""
868+
Poll a specific evaluator version status until it becomes ACTIVE or times out.
869+
870+
Uses the Fireworks SDK to get the specified version of the evaluator and checks
871+
its build state.
872+
873+
Args:
874+
evaluator_id: The evaluator ID (not full resource name)
875+
version_id: The specific version ID to poll
876+
api_key: Fireworks API key
877+
api_base: Fireworks API base URL
878+
timeout_minutes: Maximum time to wait in minutes
879+
880+
Returns:
881+
True if evaluator version becomes ACTIVE, False if timeout or BUILD_FAILED
882+
"""
883+
timeout_seconds = timeout_minutes * 60
884+
poll_interval = 10 # seconds
885+
start_time = time.time()
886+
887+
print(
888+
f"Polling evaluator version '{version_id}' status (timeout: {timeout_minutes}m, interval: {poll_interval}s)..."
889+
)
890+
891+
client = create_fireworks_client(api_key=api_key, base_url=api_base)
892+
893+
while time.time() - start_time < timeout_seconds:
894+
try:
895+
version = client.evaluator_versions.get(version_id, evaluator_id=evaluator_id)
896+
state = version.state or "STATE_UNSPECIFIED"
897+
status_msg = ""
898+
if version.status and version.status.message:
899+
status_msg = version.status.message
900+
901+
if state == "ACTIVE":
902+
print("✅ Evaluator version is ACTIVE and ready!")
903+
return True
904+
elif state == "BUILD_FAILED":
905+
print(f"❌ Evaluator version build failed. Status: {status_msg}")
906+
return False
907+
elif state == "BUILDING":
908+
elapsed_minutes = (time.time() - start_time) / 60
909+
print(f"⏳ Evaluator version is still building... ({elapsed_minutes:.1f}m elapsed)")
910+
else:
911+
print(f"⏳ Evaluator version state: {state}, status: {status_msg}")
912+
913+
except Exception as e:
914+
print(f"Warning: Failed to check evaluator version status: {e}")
915+
916+
# Wait before next poll
917+
time.sleep(poll_interval)
918+
919+
# Timeout reached
920+
elapsed_minutes = (time.time() - start_time) / 60
921+
print(f"⏰ Timeout after {elapsed_minutes:.1f}m - evaluator version is not yet ACTIVE")
922+
return False
923+
924+
925+
def upload_and_ensure_evaluator(
926+
project_root: str,
927+
evaluator_id: str,
928+
api_key: str,
929+
api_base: str,
930+
selected_test_file_path: Optional[str] = None,
931+
selected_test_func_name: Optional[str] = None,
932+
) -> bool:
933+
"""Upload evaluator and ensure its version becomes ACTIVE.
934+
935+
Creates/updates the evaluator and uploads the code, then polls the specific
936+
version until it becomes ACTIVE. This is the shared implementation used by
937+
both 'ep upload', 'ep create rft', and 'ep create evj' commands.
938+
939+
Args:
940+
project_root: Path to the project root directory.
941+
evaluator_id: The evaluator ID.
942+
api_key: Fireworks API key.
943+
api_base: Fireworks API base URL.
944+
selected_test_file_path: Optional path to the selected test file.
945+
selected_test_func_name: Optional name of the selected test function.
946+
947+
Returns:
948+
True if evaluator was uploaded and became ACTIVE, False otherwise.
949+
"""
950+
from eval_protocol.evaluation import create_evaluation
951+
952+
try:
953+
tests = _discover_tests(project_root)
954+
selected_entry: Optional[str] = None
955+
956+
# Use provided test info if available, otherwise try to resolve
957+
if selected_test_file_path and selected_test_func_name:
958+
selected_entry = _build_entry_point(project_root, selected_test_file_path, selected_test_func_name)
959+
else:
960+
st_path, st_func = _resolve_selected_test(project_root, evaluator_id, selected_tests=tests)
961+
if st_path and st_func:
962+
selected_entry = _build_entry_point(project_root, st_path, st_func)
963+
964+
# If still unresolved and multiple tests exist, fail fast to avoid uploading unintended evaluators
965+
if selected_entry is None and len(tests) > 1:
966+
print(
967+
f"Error: Multiple evaluation tests found, and the selected evaluator {evaluator_id} does not match any discovered test.\n"
968+
" Please re-run specifying the evaluator.\n"
969+
)
970+
return False
971+
972+
print(f"\nUploading evaluator '{evaluator_id}'...")
973+
result, version_id = create_evaluation(
974+
evaluator_id=evaluator_id,
975+
display_name=evaluator_id,
976+
description=f"Evaluator for {evaluator_id}",
977+
entry_point=selected_entry,
978+
)
979+
980+
if not version_id:
981+
print("Warning: Evaluator created but version upload failed.")
982+
return False
983+
984+
print(f"✓ Uploaded evaluator: {evaluator_id} (version: {version_id})")
985+
986+
# Poll for the specific evaluator version status
987+
print(f"Waiting for evaluator '{evaluator_id}' version '{version_id}' to become ACTIVE...")
988+
is_active = _poll_evaluator_version_status(
989+
evaluator_id=evaluator_id,
990+
version_id=version_id,
991+
api_key=api_key,
992+
api_base=api_base,
993+
timeout_minutes=10,
994+
)
995+
996+
if not is_active:
997+
dashboard_url = _build_evaluator_dashboard_url(evaluator_id)
998+
print("\n❌ Evaluator version is not ready within the timeout period.")
999+
print(f"📊 Please check the evaluator status at: {dashboard_url}")
1000+
print(" Wait for it to become ACTIVE, then run the command again.")
1001+
return False
1002+
return True
1003+
except Exception as e:
1004+
print(f"Warning: Failed to upload evaluator automatically: {e}")
1005+
return False

0 commit comments

Comments
 (0)