Skip to content

Commit fac0152

Browse files
authored
Deprecate Elastic search logs for Fireworks Tracing logs (#286)
* Deprecate Elastic search logs for Fireworks Tracing logs * fix tests * update naming * remove old test * fixing tests * don't skip * fix remote rollout processor
1 parent dab32f9 commit fac0152

File tree

11 files changed

+161
-168
lines changed

11 files changed

+161
-168
lines changed

.github/workflows/elasticsearch-tests.yml renamed to .github/workflows/fireworks-tracing-tests.yml

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: Elasticsearch Tests
1+
name: Fireworks Tracing Tests
22

33
on:
44
push:
@@ -13,8 +13,8 @@ on:
1313
workflow_dispatch: # Allow manual triggering
1414

1515
jobs:
16-
elasticsearch-tests:
17-
name: Elasticsearch Integration Tests
16+
fireworks-tracing-tests:
17+
name: Fireworks Tracing Integration Tests
1818
runs-on: ubuntu-latest
1919

2020
steps:
@@ -36,14 +36,15 @@ jobs:
3636
- name: Install the project
3737
run: uv sync --locked --all-extras --dev
3838

39-
- name: Run Elasticsearch Tests
39+
- name: Run Fireworks Tracing Tests
4040
env:
4141
FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }}
4242
PYTHONWARNINGS: "ignore::DeprecationWarning,ignore::RuntimeWarning"
4343
run: |
44-
# Run Elasticsearch direct HTTP handler tests
45-
uv run pytest tests/logging/test_elasticsearch_direct_http_handler.py -v --tb=short
44+
# Run RemoteRolloutProcessor End-to-End Test (auto server startup)
45+
uv run pytest tests/remote_server/test_remote_fireworks.py::test_remote_rollout_and_fetch_fireworks \
46+
-v --tb=short
4647
47-
# Run RemoteRolloutProcessor Propagate Status Smoke Test (also uses Elasticsearch)
48+
# Run RemoteRolloutProcessor Propagate Status Test (auto server startup)
4849
uv run pytest tests/remote_server/test_remote_fireworks_propagate_status.py::test_remote_rollout_and_fetch_fireworks_propagate_status \
4950
-v --tb=short

eval_protocol/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
filter_longest_conversation,
3737
)
3838
from .pytest import evaluation_test, SingleTurnRolloutProcessor, RemoteRolloutProcessor, GithubActionRolloutProcessor
39-
from .pytest.remote_rollout_processor import create_elasticsearch_config_from_env
4039
from .pytest.parameterize import DefaultParameterIdGenerator
4140
from .log_utils.elasticsearch_direct_http_handler import ElasticsearchDirectHttpHandler
4241
from .log_utils.rollout_id_filter import RolloutIdFilter
@@ -90,7 +89,6 @@
9089
warnings.filterwarnings("default", category=DeprecationWarning, module="eval_protocol")
9190

9291
__all__ = [
93-
"create_elasticsearch_config_from_env",
9492
"ElasticsearchConfig",
9593
"ElasticsearchDirectHttpHandler",
9694
"RolloutIdFilter",

eval_protocol/adapters/fireworks_tracing.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ def search_logs(self, tags: List[str], limit: int = 100, hours_back: int = 24) -
309309
"message": e.get("message"),
310310
"severity": e.get("severity", "INFO"),
311311
"tags": e.get("tags", []),
312+
"status": e.get("status"),
312313
}
313314
)
314315
return results

eval_protocol/cli_commands/logs.py

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -39,49 +39,6 @@ def logs_command(args):
3939
or os.environ.get("GATEWAY_URL")
4040
or "https://tracing.fireworks.ai"
4141
)
42-
try:
43-
if not use_fireworks:
44-
if getattr(args, "use_env_elasticsearch_config", False):
45-
# Use environment variables for configuration
46-
print("⚙️ Using environment variables for Elasticsearch config")
47-
from eval_protocol.pytest.remote_rollout_processor import (
48-
create_elasticsearch_config_from_env,
49-
)
50-
51-
elasticsearch_config = create_elasticsearch_config_from_env()
52-
# Ensure index exists with correct mapping, mirroring Docker setup path
53-
try:
54-
from eval_protocol.log_utils.elasticsearch_index_manager import (
55-
ElasticsearchIndexManager,
56-
)
57-
58-
index_manager = ElasticsearchIndexManager(
59-
elasticsearch_config.url,
60-
elasticsearch_config.index_name,
61-
elasticsearch_config.api_key,
62-
)
63-
created = index_manager.create_logging_index_mapping()
64-
if created:
65-
print(
66-
f"🧭 Verified Elasticsearch index '{elasticsearch_config.index_name}' mapping (created or already correct)"
67-
)
68-
else:
69-
print(
70-
f"⚠️ Could not verify/create mapping for index '{elasticsearch_config.index_name}'. Searches may behave unexpectedly."
71-
)
72-
except Exception as e:
73-
print(f"⚠️ Failed to ensure index mapping via IndexManager: {e}")
74-
elif not getattr(args, "disable_elasticsearch_setup", False):
75-
# Default behavior: start or connect to local Elasticsearch via Docker helper
76-
from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup
77-
78-
print("🧰 Auto-configuring local Elasticsearch (Docker)")
79-
elasticsearch_config = ElasticsearchSetup().setup_elasticsearch()
80-
else:
81-
print("🚫 Elasticsearch setup disabled; running without Elasticsearch integration")
82-
except Exception as e:
83-
print(f"❌ Failed to configure Elasticsearch: {e}")
84-
return 1
8542

8643
try:
8744
serve_logs(

eval_protocol/log_utils/fireworks_tracing_http_handler.py

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,34 @@ def _get_rollout_id(self, record: logging.LogRecord) -> Optional[str]:
7979
return str(cast(Any, getattr(record, "rollout_id")))
8080
return os.getenv(self.rollout_id_env)
8181

82+
def _get_status_info(self, record: logging.LogRecord) -> Optional[Dict[str, Any]]:
83+
"""Extract status information from the log record's extra data."""
84+
# Check if 'status' is in the extra data (passed via extra parameter)
85+
if hasattr(record, "status") and record.status is not None: # type: ignore
86+
status = record.status # type: ignore
87+
88+
# Handle Status class instances (Pydantic BaseModel)
89+
if hasattr(status, "code") and hasattr(status, "message"):
90+
# Status object - extract code and message
91+
status_code = status.code
92+
# Handle both enum values and direct integer values
93+
if hasattr(status_code, "value"):
94+
status_code = status_code.value
95+
96+
return {
97+
"code": status_code,
98+
"message": status.message,
99+
"details": getattr(status, "details", []),
100+
}
101+
elif isinstance(status, dict):
102+
# Dictionary representation of status
103+
return {
104+
"code": status.get("code"),
105+
"message": status.get("message"),
106+
"details": status.get("details", []),
107+
}
108+
return None
109+
82110
def _build_payload(self, record: logging.LogRecord, rollout_id: str) -> Dict[str, Any]:
83111
timestamp = datetime.fromtimestamp(record.created, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
84112
message = record.getMessage()
@@ -96,28 +124,12 @@ def _build_payload(self, record: logging.LogRecord, rollout_id: str) -> Dict[str
96124
except Exception:
97125
pass
98126
program = cast(Optional[str], getattr(record, "program", None)) or "eval_protocol"
99-
status_val = cast(Any, getattr(record, "status", None))
100-
status = status_val if isinstance(status_val, str) else None
101-
# Capture optional structured status fields if present
102-
metadata: Dict[str, Any] = {}
103-
status_code = cast(Any, getattr(record, "status_code", None))
104-
if isinstance(status_code, int):
105-
metadata["status_code"] = status_code
106-
status_message = cast(Any, getattr(record, "status_message", None))
107-
if isinstance(status_message, str):
108-
metadata["status_message"] = status_message
109-
status_details = getattr(record, "status_details", None)
110-
if status_details is not None:
111-
metadata["status_details"] = status_details
112-
extra_metadata = cast(Any, getattr(record, "metadata", None))
113-
if isinstance(extra_metadata, dict):
114-
metadata.update(extra_metadata)
127+
115128
return {
116129
"program": program,
117-
"status": status,
130+
"status": self._get_status_info(record),
118131
"message": message,
119132
"tags": tags,
120-
"metadata": metadata or None,
121133
"extras": {
122134
"logger_name": record.name,
123135
"level": record.levelname,

eval_protocol/pytest/remote_rollout_processor.py

Lines changed: 37 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,15 @@
44

55
import requests
66

7-
from eval_protocol.log_utils.elasticsearch_client import ElasticsearchClient
87
from eval_protocol.models import EvaluationRow, Status
98
from eval_protocol.data_loader.dynamic_data_loader import DynamicDataLoader
109
from eval_protocol.types.remote_rollout_processor import (
1110
DataLoaderConfig,
12-
ElasticsearchConfig,
1311
)
12+
from eval_protocol.adapters.fireworks_tracing import FireworksTracingAdapter
13+
1414
from .rollout_processor import RolloutProcessor
1515
from .types import RolloutProcessorConfig
16-
from .elasticsearch_setup import ElasticsearchSetup
1716
from .tracing_utils import default_fireworks_output_data_loader, build_init_request, update_row_with_remote_trace
1817
import logging
1918

@@ -22,25 +21,6 @@
2221
logger = logging.getLogger(__name__)
2322

2423

25-
def create_elasticsearch_config_from_env() -> ElasticsearchConfig:
26-
"""Setup Elasticsearch config from environment variables."""
27-
url = os.getenv("ELASTICSEARCH_URL")
28-
api_key = os.getenv("ELASTICSEARCH_API_KEY")
29-
index_name = os.getenv("ELASTICSEARCH_INDEX_NAME")
30-
31-
if url is None:
32-
raise ValueError("ELASTICSEARCH_URL must be set")
33-
if api_key is None:
34-
raise ValueError("ELASTICSEARCH_API_KEY must be set")
35-
if index_name is None:
36-
raise ValueError("ELASTICSEARCH_INDEX_NAME must be set")
37-
return ElasticsearchConfig(
38-
url=url,
39-
api_key=api_key,
40-
index_name=index_name,
41-
)
42-
43-
4424
class RemoteRolloutProcessor(RolloutProcessor):
4525
"""
4626
Rollout processor that triggers a remote HTTP server to perform the rollout.
@@ -59,8 +39,6 @@ def __init__(
5939
poll_interval: float = 1.0,
6040
timeout_seconds: float = 120.0,
6141
output_data_loader: Optional[Callable[[DataLoaderConfig], DynamicDataLoader]] = None,
62-
disable_elastic_search_setup: bool = False,
63-
elastic_search_config: Optional[ElasticsearchConfig] = None,
6442
):
6543
# Prefer constructor-provided configuration. These can be overridden via
6644
# config.kwargs at call time for backward compatibility.
@@ -74,21 +52,7 @@ def __init__(
7452
self._poll_interval = poll_interval
7553
self._timeout_seconds = timeout_seconds
7654
self._output_data_loader = output_data_loader or default_fireworks_output_data_loader
77-
self._disable_elastic_search_setup = disable_elastic_search_setup
78-
self._elastic_search_config = elastic_search_config
79-
80-
def setup(self) -> None:
81-
if self._disable_elastic_search_setup:
82-
logger.info("Elasticsearch is disabled, skipping setup")
83-
return
84-
logger.info("Setting up Elasticsearch")
85-
self._elastic_search_config = self._setup_elastic_search()
86-
logger.info("Elasticsearch setup complete")
87-
88-
def _setup_elastic_search(self) -> ElasticsearchConfig:
89-
"""Set up Elasticsearch using the dedicated setup module."""
90-
setup = ElasticsearchSetup()
91-
return setup.setup_elasticsearch()
55+
self._tracing_adapter = FireworksTracingAdapter(base_url=self._model_base_url)
9256

9357
def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> List[asyncio.Task[EvaluationRow]]:
9458
tasks: List[asyncio.Task[EvaluationRow]] = []
@@ -123,7 +87,7 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow:
12387
if row.input_metadata.row_id is None:
12488
raise ValueError("Row ID is required in RemoteRolloutProcessor")
12589

126-
init_payload = build_init_request(row, config, model_base_url, self._elastic_search_config)
90+
init_payload = build_init_request(row, config, model_base_url)
12791

12892
# Fire-and-poll
12993
def _post_init() -> None:
@@ -153,10 +117,6 @@ def _get_status() -> Dict[str, Any]:
153117
r.raise_for_status()
154118
return r.json()
155119

156-
elasticsearch_client = (
157-
ElasticsearchClient(self._elastic_search_config) if self._elastic_search_config else None
158-
)
159-
160120
continue_polling_status = True
161121
while time.time() < deadline:
162122
try:
@@ -178,29 +138,41 @@ def _get_status() -> Dict[str, Any]:
178138
# For all other exceptions, raise them
179139
raise
180140

181-
if not elasticsearch_client:
182-
continue
183-
184-
search_results = elasticsearch_client.search_by_status_code_not_in(
185-
row.execution_metadata.rollout_id, [Status.Code.RUNNING]
141+
# Search Fireworks tracing logs for completion
142+
completed_logs = self._tracing_adapter.search_logs(
143+
tags=[f"rollout_id:{row.execution_metadata.rollout_id}"]
186144
)
187-
hits = search_results["hits"]["hits"] if search_results else []
145+
# Filter for logs that actually have status information
146+
status_logs = []
147+
for log in completed_logs:
148+
status_dict = log.get("status")
149+
if status_dict and isinstance(status_dict, dict) and "code" in status_dict:
150+
status_logs.append(log)
151+
152+
if status_logs:
153+
# Use the first log with status information
154+
status_log = status_logs[0]
155+
status_dict = status_log.get("status")
156+
157+
logger.info(
158+
f"Found status log for rollout {row.execution_metadata.rollout_id}: {status_log.get('message', '')}"
159+
)
188160

189-
if hits:
190-
# log all statuses found and update rollout status from the last hit
191-
for hit in hits:
192-
document = hit["_source"]
193-
logger.info(
194-
f"Found log for rollout {row.execution_metadata.rollout_id} with status code {document['status_code']}"
195-
)
196-
# Update rollout status from the document
197-
if "status_code" in document:
198-
row.rollout_status = Status(
199-
code=Status.Code(document["status_code"]),
200-
message=document.get("status_message", ""),
201-
details=document.get("status_details", []),
202-
)
203-
logger.info("Stopping status polling for rollout %s", row.execution_metadata.rollout_id)
161+
status_code = status_dict.get("code")
162+
status_message = status_dict.get("message", "")
163+
status_details = status_dict.get("details", [])
164+
165+
logger.info(
166+
f"Found Fireworks log for rollout {row.execution_metadata.rollout_id} with status code {status_code}"
167+
)
168+
169+
row.rollout_status = Status(
170+
code=Status.Code(status_code),
171+
message=status_message,
172+
details=status_details,
173+
)
174+
175+
logger.info("Stopping polling for rollout %s", row.execution_metadata.rollout_id)
204176
break
205177

206178
await asyncio.sleep(poll_interval)

eval_protocol/pytest/tracing_utils.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ def build_init_request(
5656
row: EvaluationRow,
5757
config: RolloutProcessorConfig,
5858
model_base_url: str,
59-
elastic_search_config: Optional[Any] = None,
6059
) -> InitRequest:
6160
"""Build an InitRequest from an EvaluationRow and config (shared logic)."""
6261
# Validation
@@ -129,7 +128,6 @@ def build_init_request(
129128
tools=row.tools,
130129
metadata=meta,
131130
model_base_url=final_model_base_url,
132-
elastic_search_config=elastic_search_config,
133131
)
134132

135133

0 commit comments

Comments
 (0)