Skip to content

Commit 4c27734

Browse files
author
Dylan Huang
committed
Enhance Elasticsearch logging to include status information
- Added a method to extract status information from log records in ElasticSearchDirectHttpHandler. - Updated the data structure sent to Elasticsearch to include status_code, status_message, and status_details if present. - Modified ElasticsearchIndexManager to validate the mapping of new status fields. - Implemented tests to verify logging of status information and searching by status code in Elasticsearch.
1 parent f4d80ec commit 4c27734

File tree

3 files changed

+217
-3
lines changed

3 files changed

+217
-3
lines changed

eval_protocol/logging/elasticsearch_direct_http_handler.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def emit(self, record: logging.LogRecord) -> None:
3333
timestamp = datetime.fromtimestamp(record.created).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
3434

3535
rollout_id = self._get_rollout_id(record)
36+
status_info = self._get_status_info(record)
3637

3738
data: Dict[str, Any] = {
3839
"@timestamp": timestamp,
@@ -42,6 +43,10 @@ def emit(self, record: logging.LogRecord) -> None:
4243
"rollout_id": rollout_id,
4344
}
4445

46+
# Add status information if present
47+
if status_info:
48+
data.update(status_info)
49+
4550
# Schedule the HTTP request to run asynchronously
4651
self._schedule_async_send(data, record)
4752
except Exception as e:
@@ -57,6 +62,34 @@ def _get_rollout_id(self, record: logging.LogRecord) -> str:
5762
)
5863
return rollout_id
5964

65+
def _get_status_info(self, record: logging.LogRecord) -> Optional[Dict[str, Any]]:
66+
"""Extract status information from the log record's extra data."""
67+
# Check if 'status' is in the extra data (passed via extra parameter)
68+
if hasattr(record, "status") and record.status is not None: # type: ignore
69+
status = record.status # type: ignore
70+
71+
# Handle Status class instances (Pydantic BaseModel)
72+
if hasattr(status, "code") and hasattr(status, "message"):
73+
# Status object - extract code and message
74+
status_code = status.code
75+
# Handle both enum values and direct integer values
76+
if hasattr(status_code, "value"):
77+
status_code = status_code.value
78+
79+
return {
80+
"status_code": status_code,
81+
"status_message": status.message,
82+
"status_details": getattr(status, "details", []),
83+
}
84+
elif isinstance(status, dict):
85+
# Dictionary representation of status
86+
return {
87+
"status_code": status.get("code"),
88+
"status_message": status.get("message"),
89+
"status_details": status.get("details", []),
90+
}
91+
return None
92+
6093
def _schedule_async_send(self, data: Dict[str, Any], record: logging.LogRecord) -> None:
6194
"""Schedule an async task to send the log data to Elasticsearch."""
6295
if self._executor is None:

eval_protocol/logging/elasticsearch_index_manager.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,13 @@ def _index_exists_with_correct_mapping(self) -> bool:
9898
return False
9999

100100
def _has_correct_timestamp_mapping(self, mapping_data: Dict[str, Any]) -> bool:
101-
"""Check if the mapping has @timestamp as a date field and rollout_id as a keyword field.
101+
"""Check if the mapping has @timestamp as a date field, rollout_id as a keyword field, and status fields.
102102
103103
Args:
104104
mapping_data: Elasticsearch mapping response data
105105
106106
Returns:
107-
bool: True if @timestamp is correctly mapped as date field and rollout_id as keyword field
107+
bool: True if all required fields are correctly mapped
108108
"""
109109
try:
110110
if not (
@@ -122,7 +122,12 @@ def _has_correct_timestamp_mapping(self, mapping_data: Dict[str, Any]) -> bool:
122122
# Check rollout_id is mapped as keyword
123123
rollout_id_ok = "rollout_id" in properties and properties["rollout_id"].get("type") == "keyword"
124124

125-
return timestamp_ok and rollout_id_ok
125+
# Check status fields are mapped correctly
126+
status_code_ok = "status_code" in properties and properties["status_code"].get("type") == "integer"
127+
status_message_ok = "status_message" in properties and properties["status_message"].get("type") == "text"
128+
status_details_ok = "status_details" in properties and properties["status_details"].get("type") == "object"
129+
130+
return timestamp_ok and rollout_id_ok and status_code_ok and status_message_ok and status_details_ok
126131
except (KeyError, TypeError):
127132
return False
128133

@@ -140,6 +145,9 @@ def _get_logging_mapping(self) -> Dict[str, Any]:
140145
"message": {"type": "text"},
141146
"logger_name": {"type": "keyword"},
142147
"rollout_id": {"type": "keyword"},
148+
"status_code": {"type": "integer"},
149+
"status_message": {"type": "text"},
150+
"status_details": {"type": "object"},
143151
}
144152
}
145153
}

tests/logging/test_elasticsearch_direct_http_handler.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,3 +402,176 @@ def test_elasticsearch_direct_http_handler_search_by_rollout_id(
402402

403403
print(f"Successfully verified search by rollout_id '{rollout_id}' found {len(hits)} log messages")
404404
print("Verified that search for different rollout_id returns 0 results")
405+
406+
407+
@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Only run this test locally (skipped in CI)")
408+
def test_elasticsearch_direct_http_handler_logs_status_info(
409+
elasticsearch_config: ElasticSearchConfig, test_logger: logging.Logger, rollout_id: str
410+
):
411+
"""Test that ElasticsearchDirectHttpHandler logs Status class instances and can search by status code."""
412+
from eval_protocol import Status
413+
414+
# Create a Status instance
415+
test_status = Status.rollout_running()
416+
417+
# Generate a unique test message
418+
test_message = f"Status logging test message at {time.time()}"
419+
420+
# Log with Status instance in extra data
421+
test_logger.info(test_message, extra={"status": test_status})
422+
423+
# Give Elasticsearch time to process the document
424+
time.sleep(3)
425+
426+
# Query Elasticsearch to verify the document was received with status info
427+
parsed_url = urlparse(elasticsearch_config.url)
428+
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
429+
search_url = f"{base_url}/{elasticsearch_config.index_name}/_search"
430+
431+
# Search for logs with our specific status code
432+
search_query = {
433+
"query": {"term": {"status_code": test_status.code.value}},
434+
"sort": [{"@timestamp": {"order": "desc"}}],
435+
"size": 1,
436+
}
437+
438+
# Execute the search
439+
response = requests.post(
440+
search_url,
441+
headers={"Content-Type": "application/json", "Authorization": f"ApiKey {elasticsearch_config.api_key}"},
442+
json=search_query,
443+
verify=parsed_url.scheme == "https",
444+
)
445+
446+
# Check for errors
447+
if response.status_code != 200:
448+
print(f"Elasticsearch search failed with status {response.status_code}")
449+
print(f"Response: {response.text}")
450+
response.raise_for_status()
451+
452+
search_results = response.json()
453+
454+
# Assert that we found our log message
455+
assert "hits" in search_results, "Search response should contain 'hits'"
456+
assert "total" in search_results["hits"], "Search hits should contain 'total'"
457+
458+
total_hits = search_results["hits"]["total"]
459+
if isinstance(total_hits, dict):
460+
total_count = total_hits["value"]
461+
else:
462+
total_count = total_hits
463+
464+
assert total_count > 0, f"Expected to find at least 1 log message, but found {total_count}"
465+
466+
# Verify the content of the found document
467+
hits = search_results["hits"]["hits"]
468+
assert len(hits) > 0, "Expected at least one hit"
469+
470+
found_document = hits[0]["_source"]
471+
472+
# Verify the status fields are present and correct
473+
assert "status_code" in found_document, "Expected document to contain 'status_code' field"
474+
assert found_document["status_code"] == test_status.code.value, (
475+
f"Expected status_code {test_status.code.value}, got {found_document['status_code']}"
476+
)
477+
assert "status_message" in found_document, "Expected document to contain 'status_message' field"
478+
assert found_document["status_message"] == test_status.message, (
479+
f"Expected status_message '{test_status.message}', got '{found_document['status_message']}'"
480+
)
481+
assert "status_details" in found_document, "Expected document to contain 'status_details' field"
482+
assert found_document["status_details"] == test_status.details, (
483+
f"Expected status_details {test_status.details}, got {found_document['status_details']}"
484+
)
485+
486+
# Verify other expected fields are still present
487+
assert found_document["message"] == test_message, (
488+
f"Expected message '{test_message}', got '{found_document['message']}'"
489+
)
490+
assert found_document["rollout_id"] == rollout_id, (
491+
f"Expected rollout_id '{rollout_id}', got '{found_document['rollout_id']}'"
492+
)
493+
494+
print(f"Successfully verified Status logging with code {test_status.code.value} in Elasticsearch: {test_message}")
495+
496+
497+
@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Only run this test locally (skipped in CI)")
498+
def test_elasticsearch_direct_http_handler_search_by_status_code(
499+
elasticsearch_config: ElasticSearchConfig, test_logger: logging.Logger, rollout_id: str
500+
):
501+
"""Test that logs can be searched by status code in Elasticsearch."""
502+
from eval_protocol.models import Status
503+
504+
# Create different Status instances for testing
505+
statuses = [
506+
Status.rollout_running(),
507+
Status.eval_finished(),
508+
Status.error("Test error message"),
509+
]
510+
511+
# Generate unique test messages
512+
test_messages = []
513+
for i, status in enumerate(statuses):
514+
message = f"Status search test message {i} at {time.time()}"
515+
test_messages.append((message, status))
516+
test_logger.info(message, extra={"status": status})
517+
time.sleep(0.1) # Small delay to ensure different timestamps
518+
519+
# Give Elasticsearch time to process all documents
520+
time.sleep(3)
521+
522+
# Query Elasticsearch to search by specific status code
523+
parsed_url = urlparse(elasticsearch_config.url)
524+
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
525+
search_url = f"{base_url}/{elasticsearch_config.index_name}/_search"
526+
527+
# Search for logs with RUNNING status code
528+
running_status = Status.Code.RUNNING
529+
search_query = {
530+
"query": {"term": {"status_code": running_status.value}},
531+
"sort": [{"@timestamp": {"order": "desc"}}],
532+
"size": 10,
533+
}
534+
535+
# Execute the search
536+
response = requests.post(
537+
search_url,
538+
headers={"Content-Type": "application/json", "Authorization": f"ApiKey {elasticsearch_config.api_key}"},
539+
json=search_query,
540+
verify=parsed_url.scheme == "https",
541+
)
542+
543+
# Check for errors
544+
if response.status_code != 200:
545+
print(f"Elasticsearch search failed with status {response.status_code}")
546+
print(f"Response: {response.text}")
547+
response.raise_for_status()
548+
549+
search_results = response.json()
550+
551+
# Assert that we found our log messages
552+
assert "hits" in search_results, "Search response should contain 'hits'"
553+
assert "total" in search_results["hits"], "Search hits should contain 'total'"
554+
555+
total_hits = search_results["hits"]["total"]
556+
if isinstance(total_hits, dict):
557+
total_count = total_hits["value"]
558+
else:
559+
total_count = total_hits
560+
561+
assert total_count >= 1, f"Expected to find at least 1 log message with RUNNING status, but found {total_count}"
562+
563+
# Verify the content of the found documents
564+
hits = search_results["hits"]["hits"]
565+
assert len(hits) >= 1, f"Expected at least 1 hit, found {len(hits)}"
566+
567+
# Verify all found documents have the correct status code
568+
for hit in hits:
569+
document = hit["_source"]
570+
assert document["status_code"] == running_status.value, (
571+
f"Expected status_code {running_status.value}, got {document['status_code']}"
572+
)
573+
assert document["rollout_id"] == rollout_id, (
574+
f"Expected rollout_id '{rollout_id}', got '{document['rollout_id']}'"
575+
)
576+
577+
print(f"Successfully verified search by status code {running_status.value} found {len(hits)} log messages")

0 commit comments

Comments
 (0)