diff --git a/eval_protocol/logging/elasticsearch_direct_http_handler.py b/eval_protocol/logging/elasticsearch_direct_http_handler.py new file mode 100644 index 00000000..cbe5e402 --- /dev/null +++ b/eval_protocol/logging/elasticsearch_direct_http_handler.py @@ -0,0 +1,91 @@ +import json +import logging +import asyncio +import threading +from concurrent.futures import ThreadPoolExecutor +from typing import Optional, Tuple, Any, Dict +from datetime import datetime +from urllib.parse import urlparse +import requests + +from eval_protocol.types.remote_rollout_processor import ElasticSearchConfig + + +class ElasticsearchDirectHttpHandler(logging.Handler): + def __init__(self, elasticsearch_config: ElasticSearchConfig) -> None: + super().__init__() + self.base_url: str = elasticsearch_config.url.rstrip("/") + self.index_name: str = elasticsearch_config.index_name + self.api_key: str = elasticsearch_config.api_key + self.url: str = f"{self.base_url}/{self.index_name}/_doc" + self.formatter: logging.Formatter = logging.Formatter() + self._executor = None + + # Parse URL to determine if we should verify SSL + parsed_url = urlparse(elasticsearch_config.url) + self.verify_ssl = parsed_url.scheme == "https" + + def emit(self, record: logging.LogRecord) -> None: + """Emit a log record by scheduling it for async transmission.""" + try: + # Create proper ISO 8601 timestamp + timestamp = datetime.fromtimestamp(record.created).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + data: Dict[str, Any] = { + "@timestamp": timestamp, + "level": record.levelname, + "message": record.getMessage(), + "logger_name": record.name, + # Add other relevant record attributes if needed + } + + # Schedule the HTTP request to run asynchronously + self._schedule_async_send(data, record) + except Exception as e: + self.handleError(record) + print(f"Error preparing log for Elasticsearch: {e}") + + def _schedule_async_send(self, data: Dict[str, Any], record: logging.LogRecord) -> None: + """Schedule an async task to send the log data to Elasticsearch.""" + if self._executor is None: + self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="elasticsearch-logger") + + # Submit the HTTP request to the thread pool + future = self._executor.submit(self._send_to_elasticsearch, data, record) + + # Add error handling callback + future.add_done_callback(lambda f: self._handle_async_result(f, record)) + + def _send_to_elasticsearch(self, data: Dict[str, Any], record: logging.LogRecord) -> None: + """Send data to Elasticsearch (runs in thread pool).""" + try: + response: requests.Response = requests.post( + self.url, + headers={"Content-Type": "application/json", "Authorization": f"ApiKey {self.api_key}"}, + data=json.dumps(data), + verify=self.verify_ssl, # If using HTTPS, verify SSL certificate + ) + response.raise_for_status() # Raise an exception for HTTP errors + except Exception as e: + # Re-raise to be handled by the callback + raise e + + def _handle_async_result(self, future, record: logging.LogRecord) -> None: + """Handle the result of the async send operation.""" + try: + future.result() # This will raise any exception that occurred + except Exception as e: + self.handleError(record) + # You might want to log this error to a file or console + # to prevent a logging loop. + if hasattr(e, "response") and getattr(e, "response", None) is not None: + print(f"Error sending log to Elasticsearch: {e}") + print(f"Response content: {getattr(e, 'response').text}") + else: + print(f"Error sending log to Elasticsearch: {e}") + + def close(self) -> None: + """Clean up resources when the handler is closed.""" + super().close() + if self._executor: + self._executor.shutdown(wait=True) diff --git a/eval_protocol/logging/elasticsearch_index_manager.py b/eval_protocol/logging/elasticsearch_index_manager.py new file mode 100644 index 00000000..c9808dbc --- /dev/null +++ b/eval_protocol/logging/elasticsearch_index_manager.py @@ -0,0 +1,187 @@ +import requests +from typing import Dict, Any, Optional +from urllib.parse import urlparse + + +class ElasticsearchIndexManager: + """Manages Elasticsearch index creation and mapping configuration.""" + + def __init__(self, base_url: str, index_name: str, api_key: str) -> None: + """Initialize the Elasticsearch index manager. + + Args: + base_url: Elasticsearch base URL (e.g., "https://localhost:9200") + index_name: Name of the index to manage + api_key: API key for authentication + """ + self.base_url: str = base_url.rstrip("/") + self.index_name: str = index_name + self.api_key: str = api_key + self.index_url: str = f"{self.base_url}/{self.index_name}" + self._mapping_created: bool = False + + # Parse URL to determine if we should verify SSL + parsed_url = urlparse(base_url) + self.verify_ssl = parsed_url.scheme == "https" + + def create_logging_index_mapping(self) -> bool: + """Create index with proper mapping for logging data. + + Returns: + bool: True if mapping was created successfully, False otherwise. + """ + if self._mapping_created: + return True + + try: + # Check if index exists and has correct mapping + if self._index_exists_with_correct_mapping(): + self._mapping_created = True + return True + + # If index exists but has wrong mapping, delete and recreate it + if self.index_exists(): + print(f"Warning: Index {self.index_name} exists with incorrect mapping. Deleting and recreating...") + if not self.delete_index(): + print(f"Warning: Failed to delete existing index {self.index_name}") + return False + + # Create index with proper mapping + mapping = self._get_logging_mapping() + response = requests.put( + self.index_url, + headers={"Content-Type": "application/json", "Authorization": f"ApiKey {self.api_key}"}, + json=mapping, + verify=self.verify_ssl, + ) + + if response.status_code in [200, 201]: + self._mapping_created = True + return True + else: + print(f"Warning: Failed to create index mapping: {response.status_code} - {response.text}") + return False + + except Exception as e: + print(f"Warning: Failed to create index mapping: {e}") + return False + + def _index_exists_with_correct_mapping(self) -> bool: + """Check if index exists and has the correct @timestamp mapping. + + Returns: + bool: True if index exists with correct mapping, False otherwise. + """ + try: + # Check if index exists + response = requests.head( + self.index_url, headers={"Authorization": f"ApiKey {self.api_key}"}, verify=self.verify_ssl + ) + + if response.status_code != 200: + return False + + # Check if mapping is correct + mapping_response = requests.get( + f"{self.index_url}/_mapping", + headers={"Authorization": f"ApiKey {self.api_key}"}, + verify=self.verify_ssl, + ) + + if mapping_response.status_code != 200: + return False + + mapping_data = mapping_response.json() + return self._has_correct_timestamp_mapping(mapping_data) + + except Exception: + return False + + def _has_correct_timestamp_mapping(self, mapping_data: Dict[str, Any]) -> bool: + """Check if the mapping has @timestamp as a date field. + + Args: + mapping_data: Elasticsearch mapping response data + + Returns: + bool: True if @timestamp is correctly mapped as date field + """ + try: + return ( + self.index_name in mapping_data + and "mappings" in mapping_data[self.index_name] + and "properties" in mapping_data[self.index_name]["mappings"] + and "@timestamp" in mapping_data[self.index_name]["mappings"]["properties"] + and mapping_data[self.index_name]["mappings"]["properties"]["@timestamp"].get("type") == "date" + ) + except (KeyError, TypeError): + return False + + def _get_logging_mapping(self) -> Dict[str, Any]: + """Get the standard mapping for logging data. + + Returns: + Dict containing the index mapping configuration + """ + return { + "mappings": { + "properties": { + "@timestamp": {"type": "date", "format": "strict_date_optional_time||epoch_millis"}, + "level": {"type": "keyword"}, + "message": {"type": "text"}, + "logger_name": {"type": "keyword"}, + } + } + } + + def delete_index(self) -> bool: + """Delete the managed index. + + Returns: + bool: True if index was deleted successfully, False otherwise. + """ + try: + response = requests.delete( + self.index_url, headers={"Authorization": f"ApiKey {self.api_key}"}, verify=self.verify_ssl + ) + if response.status_code in [200, 404]: # 404 means index doesn't exist, which is fine + self._mapping_created = False + return True + else: + print(f"Warning: Failed to delete index: {response.status_code} - {response.text}") + return False + except Exception as e: + print(f"Warning: Failed to delete index: {e}") + return False + + def index_exists(self) -> bool: + """Check if the index exists. + + Returns: + bool: True if index exists, False otherwise. + """ + try: + response = requests.head( + self.index_url, headers={"Authorization": f"ApiKey {self.api_key}"}, verify=self.verify_ssl + ) + return response.status_code == 200 + except Exception: + return False + + def get_index_stats(self) -> Optional[Dict[str, Any]]: + """Get statistics about the index. + + Returns: + Dict containing index statistics, or None if failed + """ + try: + response = requests.get( + f"{self.index_url}/_stats", + headers={"Authorization": f"ApiKey {self.api_key}"}, + verify=self.verify_ssl, + ) + if response.status_code == 200: + return response.json() + return None + except Exception: + return None diff --git a/eval_protocol/pytest/elasticsearch_setup.py b/eval_protocol/pytest/elasticsearch_setup.py new file mode 100644 index 00000000..1f3af3fc --- /dev/null +++ b/eval_protocol/pytest/elasticsearch_setup.py @@ -0,0 +1,167 @@ +import os +import subprocess +import tempfile +import logging +from typing import Optional + +from dotenv import load_dotenv +from eval_protocol.directory_utils import find_eval_protocol_dir +from eval_protocol.types.remote_rollout_processor import ElasticSearchConfig +from eval_protocol.logging.elasticsearch_index_manager import ElasticsearchIndexManager + +logger = logging.getLogger(__name__) + + +class ElasticsearchSetupError(Exception): + """Exception raised when Elasticsearch setup fails.""" + + pass + + +class ElasticsearchSetup: + """Handles Elasticsearch setup with retry logic for existing containers.""" + + def __init__(self): + self.eval_protocol_dir = find_eval_protocol_dir() + + def setup_elasticsearch(self, index_name: str = "default-logs") -> ElasticSearchConfig: + """ + Set up Elasticsearch, handling both local and remote scenarios. + + Args: + index_name: Name of the Elasticsearch index to use for logging + + Returns: + ElasticSearchConfig for the running instance with the specified index name. + """ + elastic_start_local_dir = os.path.join(self.eval_protocol_dir, "elastic-start-local") + env_file_path = os.path.join(elastic_start_local_dir, ".env") + + # If elastic-start-local directory exists, use existing Docker script + if os.path.exists(elastic_start_local_dir): + config = self._setup_existing_docker_elasticsearch(elastic_start_local_dir, env_file_path) + else: + # Otherwise, initialize Docker setup from scratch + config = self._setup_initialized_docker_elasticsearch(env_file_path) + + # Create the logging index with proper mapping + self.create_logging_index(index_name) + + # Return config with the specified index name + return ElasticSearchConfig(url=config.url, api_key=config.api_key, index_name=index_name) + + def _setup_existing_docker_elasticsearch( + self, elastic_start_local_dir: str, env_file_path: str + ) -> ElasticSearchConfig: + """Set up Elasticsearch using existing Docker start.sh script.""" + from eval_protocol.utils.subprocess_utils import run_script_and_wait + + run_script_and_wait( + script_name="start.sh", + working_directory=elastic_start_local_dir, + inherit_stdout=True, + ) + return self._parse_elastic_env_file(env_file_path) + + def _setup_initialized_docker_elasticsearch(self, env_file_path: str) -> ElasticSearchConfig: + """Set up Elasticsearch by initializing Docker setup from scratch with retry logic.""" + max_retries = 2 + for attempt in range(max_retries): + # Use a temporary file to capture output while also showing it in parent stdout + with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file: + temp_file_path = temp_file.name + + try: + # Run the command and tee output to both stdout and temp file + # Use set -o pipefail to ensure we get the return code of the first failing command + process = subprocess.Popen( + [ + "sh", + "-c", + f"set -o pipefail; curl -fsSL https://elastic.co/start-local | sh -s -- --esonly | tee {temp_file_path}", + ], + cwd=self.eval_protocol_dir, + ) + returncode = process.wait() + + # Read the captured output + with open(temp_file_path, "r") as f: + stdout = f.read() + + if returncode == 0: + return self._parse_elastic_env_file(env_file_path) + + # Check if container is already running and handle it + if self._handle_existing_elasticsearch_container(stdout): + logger.info(f"Retrying Elasticsearch setup (attempt {attempt + 1}/{max_retries})") + continue + + # If we get here, it's a different error + raise ElasticsearchSetupError( + f"Failed to start Elasticsearch (attempt {attempt + 1}/{max_retries}): {stdout}" + ) + + finally: + # Clean up the temporary file + try: + os.unlink(temp_file_path) + except OSError: + pass + + raise ElasticsearchSetupError(f"Failed to start Elasticsearch after {max_retries} attempts") + + def _handle_existing_elasticsearch_container(self, output: str) -> bool: + """ + Check if the curl command output indicates that the Elasticsearch container is already running. + If so, stop the existing container and return True to indicate a retry is needed. + """ + if "docker stop es-local-dev" in output: + logger.info("Elasticsearch container 'es-local-dev' is already running. Stopping it...") + try: + subprocess.run(["docker", "stop", "es-local-dev"], check=True, capture_output=True, text=True) + logger.info("Successfully stopped existing Elasticsearch container") + return True # Indicate retry is needed + except subprocess.CalledProcessError as e: + logger.warning(f"Failed to stop existing container: {e}") + return False + return False + + def _parse_elastic_env_file(self, env_file_path: str) -> ElasticSearchConfig: + """Parse ES_LOCAL_API_KEY and ES_LOCAL_URL from .env file.""" + loaded = load_dotenv(env_file_path) + if not loaded: + raise ElasticsearchSetupError("Failed to load .env file") + + api_key = os.getenv("ES_LOCAL_API_KEY") + url = os.getenv("ES_LOCAL_URL") + + if not url or not api_key: + raise ElasticsearchSetupError("Failed to parse ES_LOCAL_API_KEY and ES_LOCAL_URL from .env file") + + return ElasticSearchConfig(url=url, api_key=api_key, index_name="default-logs") + + def create_logging_index(self, index_name: str) -> bool: + """Create an Elasticsearch index with proper mapping for logging data. + + Args: + index_name: Name of the index to create + + Returns: + bool: True if index was created successfully, False otherwise. + """ + try: + # Get the config from the .env file + config = self._parse_elastic_env_file(self._get_env_file_path()) + + # Create index manager and set up mapping + index_manager = ElasticsearchIndexManager(config.url, index_name, config.api_key) + return index_manager.create_logging_index_mapping() + + except Exception as e: + logger.error(f"Failed to create logging index {index_name}: {e}") + return False + + def _get_env_file_path(self) -> str: + """Get the path to the .env file.""" + elastic_start_local_dir = os.path.join(self.eval_protocol_dir, "elastic-start-local") + return os.path.join(elastic_start_local_dir, ".env") diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index 8a468843..4b475738 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -367,6 +367,8 @@ def _log_eval_error(status: Status, rows: list[EvaluationRow] | None, passed: bo exception_handler_config=exception_handler_config, ) + rollout_processor.setup() + async def execute_run(run_idx: int, config: RolloutProcessorConfig): nonlocal all_results diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index e7369ce8..4fd442e4 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -6,11 +6,16 @@ from eval_protocol.models import EvaluationRow, Status from eval_protocol.data_loader.dynamic_data_loader import DynamicDataLoader -from eval_protocol.types.remote_rollout_processor import InitRequest, RolloutMetadata +from eval_protocol.types.remote_rollout_processor import ElasticSearchConfig, InitRequest, RolloutMetadata from .rollout_processor import RolloutProcessor from .types import RolloutProcessorConfig +from .elasticsearch_setup import ElasticsearchSetup +import logging + import os +logger = logging.getLogger(__name__) + class RemoteRolloutProcessor(RolloutProcessor): """ @@ -27,6 +32,8 @@ def __init__( poll_interval: float = 1.0, timeout_seconds: float = 120.0, output_data_loader: Callable[[str], DynamicDataLoader], + disable_elastic_search: bool = False, + elastic_search_config: Optional[ElasticSearchConfig] = None, ): # Prefer constructor-provided configuration. These can be overridden via # config.kwargs at call time for backward compatibility. @@ -37,6 +44,21 @@ def __init__( self._poll_interval = poll_interval self._timeout_seconds = timeout_seconds self._output_data_loader = output_data_loader + self._disable_elastic_search = disable_elastic_search + self._elastic_search_config = elastic_search_config + + def setup(self) -> None: + if self._disable_elastic_search: + logger.info("Elasticsearch is disabled, skipping setup") + return + logger.info("Setting up Elasticsearch") + self._elastic_search_config = self._setup_elastic_search() + logger.info("Elasticsearch setup complete") + + def _setup_elastic_search(self) -> ElasticSearchConfig: + """Set up Elasticsearch using the dedicated setup module.""" + setup = ElasticsearchSetup() + return setup.setup_elasticsearch() def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> List[asyncio.Task[EvaluationRow]]: tasks: List[asyncio.Task[EvaluationRow]] = [] @@ -119,6 +141,7 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: tools=row.tools, metadata=meta, model_base_url=model_base_url, + elastic_search_config=self._elastic_search_config, ) # Fire-and-poll @@ -197,6 +220,11 @@ def _load_data(): langfuse_row.input_metadata.dataset_info = row.input_metadata.dataset_info langfuse_row.eval_metadata = row.eval_metadata langfuse_row.ground_truth = row.ground_truth + + # this is useful to detect stopped evaluations so we can update + # the status in the logs server + langfuse_row.pid = row.pid + return langfuse_row else: raise ValueError("RemoteRolloutProcessor's output_data_loader should return exactly one row.") diff --git a/eval_protocol/pytest/rollout_processor.py b/eval_protocol/pytest/rollout_processor.py index 313f1768..95fbfa1b 100644 --- a/eval_protocol/pytest/rollout_processor.py +++ b/eval_protocol/pytest/rollout_processor.py @@ -10,6 +10,10 @@ class RolloutProcessor(ABC): Abstract base class for all rollout processor strategies. """ + def setup(self) -> None: + """Setup resources. Override in subclasses if setup is needed. Executed once per invocation.""" + pass + @abstractmethod def __call__(self, rows: list[EvaluationRow], config: RolloutProcessorConfig) -> list[asyncio.Task[EvaluationRow]]: """Process evaluation rows and return async tasks. Must be implemented by subclasses.""" diff --git a/eval_protocol/types/remote_rollout_processor.py b/eval_protocol/types/remote_rollout_processor.py index 817d1c3f..e8ccdf75 100644 --- a/eval_protocol/types/remote_rollout_processor.py +++ b/eval_protocol/types/remote_rollout_processor.py @@ -7,6 +7,16 @@ from eval_protocol.models import Message, Status +class ElasticSearchConfig(BaseModel): + """ + Configuration for Elasticsearch. + """ + + url: str + api_key: str + index_name: str + + class RolloutMetadata(BaseModel): """Metadata for rollout execution.""" @@ -21,6 +31,7 @@ class InitRequest(BaseModel): """Request model for POST /init endpoint.""" model: str + elastic_search_config: Optional[ElasticSearchConfig] = None messages: Optional[List[Message]] = None tools: Optional[List[Dict[str, Any]]] = None diff --git a/eval_protocol/utils/subprocess_utils.py b/eval_protocol/utils/subprocess_utils.py new file mode 100644 index 00000000..277eaf87 --- /dev/null +++ b/eval_protocol/utils/subprocess_utils.py @@ -0,0 +1,118 @@ +"""Cross-platform subprocess utilities for running scripts and commands.""" + +import os +import platform +import subprocess +from typing import Optional + + +def run_script_cross_platform( + script_name: str, + working_directory: str, + capture_output: bool = True, + print_output: bool = False, + inherit_stdout: bool = False, +) -> subprocess.Popen: + """ + Run a script in a cross-platform manner. + + Args: + script_name: Name of the script to run (e.g., "start.sh") + working_directory: Directory to run the script in + capture_output: Whether to capture stdout/stderr + print_output: Whether to print output in real-time + inherit_stdout: Whether to inherit stdout from parent process + + Returns: + subprocess.Popen object for the running process + + Raises: + RuntimeError: If the script fails to start or execute + """ + script_path = os.path.join(working_directory, script_name) + + if not os.path.exists(script_path): + raise FileNotFoundError(f"Script not found: {script_path}") + + # Determine stdout handling + if inherit_stdout: + stdout = None # Inherit from parent process + stderr = subprocess.STDOUT # Still capture stderr + elif capture_output: + stdout = subprocess.PIPE + stderr = subprocess.STDOUT + else: + stdout = None + stderr = None + + if platform.system() == "Windows": + # On Windows, use cmd.exe to run the script + cmd = ["cmd.exe", "/c", script_name] + process = subprocess.Popen( + cmd, + cwd=working_directory, + stdout=stdout, + stderr=stderr, + text=True, + ) + else: + # On Unix-like systems, make executable and run with proper shebang + os.chmod(script_path, 0o755) + + # Use the full path to the script with shell=True + process = subprocess.Popen( + script_path, + stdout=stdout, + stderr=stderr, + text=True, + shell=True, + ) + + # Print output in real-time if requested + if print_output and capture_output and process.stdout: + for line in process.stdout: + print(line, end="") + + return process + + +def run_script_and_wait( + script_name: str, + working_directory: str, + print_output: bool = False, + inherit_stdout: bool = False, + timeout: Optional[int] = None, +) -> int: + """ + Run a script and wait for it to complete. + + Args: + script_name: Name of the script to run + working_directory: Directory to run the script in + print_output: Whether to print output in real-time + inherit_stdout: Whether to inherit stdout from parent process + timeout: Maximum time to wait for the script to complete + + Returns: + Return code of the script + + Raises: + RuntimeError: If the script fails to execute + subprocess.TimeoutExpired: If the script times out + """ + process = run_script_cross_platform( + script_name=script_name, + working_directory=working_directory, + capture_output=print_output and not inherit_stdout, + print_output=print_output, + inherit_stdout=inherit_stdout, + ) + + try: + returncode = process.wait(timeout=timeout) + if returncode != 0: + raise RuntimeError(f"Script '{script_name}' failed with return code {returncode}") + return returncode + except subprocess.TimeoutExpired: + process.kill() + raise diff --git a/tests/logging/test_elasticsearch_direct_http_handler.py b/tests/logging/test_elasticsearch_direct_http_handler.py new file mode 100644 index 00000000..8b7eb892 --- /dev/null +++ b/tests/logging/test_elasticsearch_direct_http_handler.py @@ -0,0 +1,196 @@ +import os +import logging +import time +import requests +import pytest +from urllib.parse import urlparse + +from eval_protocol.logging.elasticsearch_direct_http_handler import ElasticsearchDirectHttpHandler +from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup +from eval_protocol.types.remote_rollout_processor import ElasticSearchConfig + + +@pytest.fixture +def elasticsearch_config(): + """Set up Elasticsearch and return configuration.""" + import time + + index_name = f"test-logs-{int(time.time())}" + setup = ElasticsearchSetup() + config = setup.setup_elasticsearch(index_name) + return config + + +@pytest.fixture +def elasticsearch_handler(elasticsearch_config: ElasticSearchConfig): + """Create and configure ElasticsearchDirectHttpHandler.""" + # Use a unique test-specific index name with timestamp + + handler = ElasticsearchDirectHttpHandler(elasticsearch_config) + + # Set a specific log level + handler.setLevel(logging.INFO) + + return handler + + +@pytest.fixture +def test_logger(elasticsearch_handler, elasticsearch_config): + """Set up a test logger with the Elasticsearch handler.""" + # Create the index for this specific handler + setup = ElasticsearchSetup() + setup.create_logging_index(elasticsearch_handler.index_name) + + logger = logging.getLogger("test_elasticsearch_logger") + logger.setLevel(logging.INFO) + + # Clear any existing handlers + logger.handlers.clear() + + # Add our Elasticsearch handler + logger.addHandler(elasticsearch_handler) + + # Prevent propagation to avoid duplicate logs + logger.propagate = False + + return logger + + +@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Only run this test locally (skipped in CI)") +def test_elasticsearch_direct_http_handler_sends_logs( + elasticsearch_config: ElasticSearchConfig, test_logger: logging.Logger +): + """Test that ElasticsearchDirectHttpHandler successfully sends logs to Elasticsearch.""" + + # Generate a unique test message to avoid conflicts + test_message = f"Test log message at {time.time()}" + + # Send the log message + test_logger.info(test_message) + + # Give Elasticsearch a moment to process the document + time.sleep(3) + + # Query Elasticsearch to verify the document was received + # Parse the URL to construct the search endpoint + parsed_url = urlparse(elasticsearch_config.url) + base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" + search_url = f"{base_url}/{elasticsearch_config.index_name}/_search" + + # Prepare the search query with sorting by @timestamp + search_query = { + "query": {"match": {"message": test_message}}, + "sort": [{"@timestamp": {"order": "desc"}}], + "size": 1, + } + + # Execute the search + response = requests.post( + search_url, + headers={"Content-Type": "application/json", "Authorization": f"ApiKey {elasticsearch_config.api_key}"}, + json=search_query, + verify=parsed_url.scheme == "https", + ) + + # Check for errors and provide better debugging + if response.status_code != 200: + print(f"Elasticsearch search failed with status {response.status_code}") + print(f"Response: {response.text}") + response.raise_for_status() + + search_results = response.json() + + # Assert that we found our log message + assert "hits" in search_results, "Search response should contain 'hits'" + assert "total" in search_results["hits"], "Search hits should contain 'total'" + + total_hits = search_results["hits"]["total"] + if isinstance(total_hits, dict): + # Elasticsearch 7+ format + total_count = total_hits["value"] + else: + # Elasticsearch 6 format + total_count = total_hits + + assert total_count > 0, f"Expected to find at least 1 log message, but found {total_count}" + + # Verify the content of the found document + hits = search_results["hits"]["hits"] + assert len(hits) > 0, "Expected at least one hit" + + found_document = hits[0]["_source"] + assert found_document["message"] == test_message, ( + f"Expected message '{test_message}', got '{found_document['message']}'" + ) + assert found_document["level"] == "INFO", f"Expected level 'INFO', got '{found_document['level']}'" + assert found_document["logger_name"] == "test_elasticsearch_logger", ( + f"Expected logger name 'test_elasticsearch_logger', got '{found_document['logger_name']}'" + ) + assert "@timestamp" in found_document, "Expected document to contain '@timestamp' field" + + print(f"Successfully verified log message in Elasticsearch: {test_message}") + + +@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Only run this test locally (skipped in CI)") +def test_elasticsearch_direct_http_handler_sorts_logs_chronologically( + elasticsearch_config: ElasticSearchConfig, test_logger: logging.Logger +): + """Test that logs can be sorted chronologically by timestamp.""" + + # Send multiple log messages with small delays to ensure different timestamps + test_messages = [] + for i in range(3): + message = f"Chronological test message {i} at {time.time()}" + test_messages.append(message) + test_logger.info(message) + time.sleep(0.1) # Small delay to ensure different timestamps + + # Give Elasticsearch time to process all documents + time.sleep(2) + + # Query Elasticsearch to get all our test messages sorted by timestamp + parsed_url = urlparse(elasticsearch_config.url) + base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" + search_url = f"{base_url}/{elasticsearch_config.index_name}/_search" + + # Search for all messages containing our test prefix + search_query = { + "query": {"match_phrase_prefix": {"message": "Chronological test message"}}, + "sort": [{"@timestamp": {"order": "asc"}}], # Ascending order (oldest first) + "size": 10, + } + + response = requests.post( + search_url, + headers={"Content-Type": "application/json", "Authorization": f"ApiKey {elasticsearch_config.api_key}"}, + json=search_query, + verify=parsed_url.scheme == "https", + ) + + if response.status_code != 200: + print(f"Elasticsearch search failed with status {response.status_code}") + print(f"Response: {response.text}") + response.raise_for_status() + + search_results = response.json() + + # Verify we found our messages + hits = search_results["hits"]["hits"] + assert len(hits) >= 3, f"Expected at least 3 messages, found {len(hits)}" + + # Extract messages and verify they are in chronological order + found_messages = [hit["_source"]["message"] for hit in hits] + found_timestamps = [hit["_source"]["@timestamp"] for hit in hits] + + # Verify all our test messages are present + for test_message in test_messages: + assert test_message in found_messages, f"Expected message '{test_message}' not found in results" + + # Verify timestamps are in ascending order (chronological) + for i in range(1, len(found_timestamps)): + assert found_timestamps[i - 1] <= found_timestamps[i], ( + f"Timestamps not in chronological order: {found_timestamps[i - 1]} > {found_timestamps[i]}" + ) + + print(f"Successfully verified chronological sorting of {len(hits)} log messages") + print(f"Timestamps in order: {found_timestamps}")