Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions eval_protocol/logging/elasticsearch_direct_http_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import json
import logging
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Tuple, Any, Dict
from datetime import datetime
from urllib.parse import urlparse
import requests

from eval_protocol.types.remote_rollout_processor import ElasticSearchConfig


class ElasticsearchDirectHttpHandler(logging.Handler):
def __init__(self, elasticsearch_config: ElasticSearchConfig) -> None:
super().__init__()
self.base_url: str = elasticsearch_config.url.rstrip("/")
self.index_name: str = elasticsearch_config.index_name
self.api_key: str = elasticsearch_config.api_key
self.url: str = f"{self.base_url}/{self.index_name}/_doc"
self.formatter: logging.Formatter = logging.Formatter()
self._executor = None

# Parse URL to determine if we should verify SSL
parsed_url = urlparse(elasticsearch_config.url)
self.verify_ssl = parsed_url.scheme == "https"

def emit(self, record: logging.LogRecord) -> None:
"""Emit a log record by scheduling it for async transmission."""
try:
# Create proper ISO 8601 timestamp
timestamp = datetime.fromtimestamp(record.created).strftime("%Y-%m-%dT%H:%M:%S.%fZ")

data: Dict[str, Any] = {
"@timestamp": timestamp,
"level": record.levelname,
"message": record.getMessage(),
"logger_name": record.name,
# Add other relevant record attributes if needed
}

# Schedule the HTTP request to run asynchronously
self._schedule_async_send(data, record)
except Exception as e:
self.handleError(record)
print(f"Error preparing log for Elasticsearch: {e}")

def _schedule_async_send(self, data: Dict[str, Any], record: logging.LogRecord) -> None:
"""Schedule an async task to send the log data to Elasticsearch."""
if self._executor is None:
self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="elasticsearch-logger")

# Submit the HTTP request to the thread pool
future = self._executor.submit(self._send_to_elasticsearch, data, record)

# Add error handling callback
future.add_done_callback(lambda f: self._handle_async_result(f, record))

def _send_to_elasticsearch(self, data: Dict[str, Any], record: logging.LogRecord) -> None:
"""Send data to Elasticsearch (runs in thread pool)."""
try:
response: requests.Response = requests.post(
self.url,
headers={"Content-Type": "application/json", "Authorization": f"ApiKey {self.api_key}"},
data=json.dumps(data),
verify=self.verify_ssl, # If using HTTPS, verify SSL certificate
)
response.raise_for_status() # Raise an exception for HTTP errors
except Exception as e:
# Re-raise to be handled by the callback
raise e

def _handle_async_result(self, future, record: logging.LogRecord) -> None:
"""Handle the result of the async send operation."""
try:
future.result() # This will raise any exception that occurred
except Exception as e:
self.handleError(record)
# You might want to log this error to a file or console
# to prevent a logging loop.
if hasattr(e, "response") and getattr(e, "response", None) is not None:
print(f"Error sending log to Elasticsearch: {e}")
print(f"Response content: {getattr(e, 'response').text}")
else:
print(f"Error sending log to Elasticsearch: {e}")

def close(self) -> None:
"""Clean up resources when the handler is closed."""
super().close()
if self._executor:
self._executor.shutdown(wait=True)
187 changes: 187 additions & 0 deletions eval_protocol/logging/elasticsearch_index_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import requests
from typing import Dict, Any, Optional
from urllib.parse import urlparse


class ElasticsearchIndexManager:
"""Manages Elasticsearch index creation and mapping configuration."""

def __init__(self, base_url: str, index_name: str, api_key: str) -> None:
"""Initialize the Elasticsearch index manager.

Args:
base_url: Elasticsearch base URL (e.g., "https://localhost:9200")
index_name: Name of the index to manage
api_key: API key for authentication
"""
self.base_url: str = base_url.rstrip("/")
self.index_name: str = index_name
self.api_key: str = api_key
self.index_url: str = f"{self.base_url}/{self.index_name}"
self._mapping_created: bool = False

# Parse URL to determine if we should verify SSL
parsed_url = urlparse(base_url)
self.verify_ssl = parsed_url.scheme == "https"

def create_logging_index_mapping(self) -> bool:
"""Create index with proper mapping for logging data.

Returns:
bool: True if mapping was created successfully, False otherwise.
"""
if self._mapping_created:
return True

try:
# Check if index exists and has correct mapping
if self._index_exists_with_correct_mapping():
self._mapping_created = True
return True

# If index exists but has wrong mapping, delete and recreate it
if self.index_exists():
print(f"Warning: Index {self.index_name} exists with incorrect mapping. Deleting and recreating...")
if not self.delete_index():
print(f"Warning: Failed to delete existing index {self.index_name}")
return False

# Create index with proper mapping
mapping = self._get_logging_mapping()
response = requests.put(
self.index_url,
headers={"Content-Type": "application/json", "Authorization": f"ApiKey {self.api_key}"},
json=mapping,
verify=self.verify_ssl,
)

if response.status_code in [200, 201]:
self._mapping_created = True
return True
else:
print(f"Warning: Failed to create index mapping: {response.status_code} - {response.text}")
return False

except Exception as e:
print(f"Warning: Failed to create index mapping: {e}")
return False

def _index_exists_with_correct_mapping(self) -> bool:
"""Check if index exists and has the correct @timestamp mapping.

Returns:
bool: True if index exists with correct mapping, False otherwise.
"""
try:
# Check if index exists
response = requests.head(
self.index_url, headers={"Authorization": f"ApiKey {self.api_key}"}, verify=self.verify_ssl
)

if response.status_code != 200:
return False

# Check if mapping is correct
mapping_response = requests.get(
f"{self.index_url}/_mapping",
headers={"Authorization": f"ApiKey {self.api_key}"},
verify=self.verify_ssl,
)

if mapping_response.status_code != 200:
return False

mapping_data = mapping_response.json()
return self._has_correct_timestamp_mapping(mapping_data)

except Exception:
return False

def _has_correct_timestamp_mapping(self, mapping_data: Dict[str, Any]) -> bool:
"""Check if the mapping has @timestamp as a date field.

Args:
mapping_data: Elasticsearch mapping response data

Returns:
bool: True if @timestamp is correctly mapped as date field
"""
try:
return (
self.index_name in mapping_data
and "mappings" in mapping_data[self.index_name]
and "properties" in mapping_data[self.index_name]["mappings"]
and "@timestamp" in mapping_data[self.index_name]["mappings"]["properties"]
and mapping_data[self.index_name]["mappings"]["properties"]["@timestamp"].get("type") == "date"
)
except (KeyError, TypeError):
return False

def _get_logging_mapping(self) -> Dict[str, Any]:
"""Get the standard mapping for logging data.

Returns:
Dict containing the index mapping configuration
"""
return {
"mappings": {
"properties": {
"@timestamp": {"type": "date", "format": "strict_date_optional_time||epoch_millis"},
"level": {"type": "keyword"},
"message": {"type": "text"},
"logger_name": {"type": "keyword"},
}
}
}

def delete_index(self) -> bool:
"""Delete the managed index.

Returns:
bool: True if index was deleted successfully, False otherwise.
"""
try:
response = requests.delete(
self.index_url, headers={"Authorization": f"ApiKey {self.api_key}"}, verify=self.verify_ssl
)
if response.status_code in [200, 404]: # 404 means index doesn't exist, which is fine
self._mapping_created = False
return True
else:
print(f"Warning: Failed to delete index: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"Warning: Failed to delete index: {e}")
return False

def index_exists(self) -> bool:
"""Check if the index exists.

Returns:
bool: True if index exists, False otherwise.
"""
try:
response = requests.head(
self.index_url, headers={"Authorization": f"ApiKey {self.api_key}"}, verify=self.verify_ssl
)
return response.status_code == 200
except Exception:
return False

def get_index_stats(self) -> Optional[Dict[str, Any]]:
"""Get statistics about the index.

Returns:
Dict containing index statistics, or None if failed
"""
try:
response = requests.get(
f"{self.index_url}/_stats",
headers={"Authorization": f"ApiKey {self.api_key}"},
verify=self.verify_ssl,
)
if response.status_code == 200:
return response.json()
return None
except Exception:
return None
Loading
Loading