diff --git a/tests/logging/test_elasticsearch_direct_http_handler.py b/tests/logging/test_elasticsearch_direct_http_handler.py index df59f9de..0244248b 100644 --- a/tests/logging/test_elasticsearch_direct_http_handler.py +++ b/tests/logging/test_elasticsearch_direct_http_handler.py @@ -3,6 +3,7 @@ import time import pytest from datetime import datetime, timezone +from multiprocessing import Process, Queue from eval_protocol.log_utils.elasticsearch_direct_http_handler import ElasticsearchDirectHttpHandler from eval_protocol.log_utils.elasticsearch_client import ElasticsearchClient @@ -624,3 +625,158 @@ def test_elasticsearch_direct_http_handler_timestamp_format( print(f"Successfully verified timestamp format: {timestamp_str}") print(f"Parsed timestamp: {parsed_timestamp} (UTC)") print(f"Timestamp is within expected time range: {before_log_time} <= {parsed_timestamp} <= {after_log_time}") + + +def child_process_worker(test_message: str, result_queue: Queue): + """Worker function that runs in a child process and logs a message using inherited handler configuration.""" + try: + # Create a child logger with parent set to the configured logger + child_logger = logging.getLogger("child_process_logger") + child_logger.setLevel(logging.INFO) + # Don't clear handlers or addHandler - should inherit from parent logger setup + + # Log a message from the child process + child_logger.info(test_message) + + # Give some time for the log to be processed + time.sleep(2) + + # Signal completion + result_queue.put("success") + + except Exception as e: + result_queue.put(f"error: {str(e)}") + + +def test_elasticsearch_direct_http_handler_multiprocessing_configuration_inheritance( + elasticsearch_client: ElasticsearchClient, elasticsearch_config: ElasticsearchConfig, rollout_id: str +): + """Test that ElasticsearchDirectHttpHandler configuration is automatically inherited by child processes without reconfiguration or addHandler calls.""" + + # Test 1: Configuration inheritance without reconfiguration or addHandler + print("Testing configuration inheritance without reconfiguration or addHandler...") + + # Generate unique test messages + parent_message = f"Parent process message at {time.time()}" + child_message = f"Child process message at {time.time()}" + + # Set up parent process logging with configured handler on a specific logger + handler = ElasticsearchDirectHttpHandler(elasticsearch_config) + root_logger = logging.getLogger() + root_logger.setLevel(logging.INFO) + root_logger.handlers.clear() + root_logger.addHandler(handler) + + parent_logger = logging.getLogger("parent_process_logger") + + # Log from parent process + parent_logger.info(parent_message) + + # Start child process - should inherit configuration without calling configure() + result_queue = Queue() + child_process = Process(target=child_process_worker, args=(child_message, result_queue)) + child_process.start() + + # Wait for child process to complete + child_process.join(timeout=100_000) + + # Check if child process completed successfully + if child_process.is_alive(): + child_process.terminate() + child_process.join(timeout=5) + if child_process.is_alive(): + child_process.kill() + pytest.fail("Child process did not complete within timeout") + + # Check child process result + if not result_queue.empty(): + result = result_queue.get() + if result.startswith("error:"): + pytest.fail(f"Child process failed: {result}") + assert result == "success", f"Expected 'success', got '{result}'" + + # Give Elasticsearch time to process all documents + time.sleep(3) + + # Verify both parent and child messages are in Elasticsearch + parent_search_results = elasticsearch_client.search_by_term("message", parent_message, size=1) + child_search_results = elasticsearch_client.search_by_term("message", child_message, size=1) + + # Verify parent message + assert parent_search_results is not None, "Parent search should return results" + parent_hits = parent_search_results["hits"]["hits"] + assert len(parent_hits) > 0, "Expected to find parent message" + parent_document = parent_hits[0]["_source"] + assert parent_document["message"] == parent_message, "Parent message should match" + assert parent_document["logger_name"] == "parent_process_logger", "Parent logger name should match" + + # Verify child message + assert child_search_results is not None, "Child search should return results" + child_hits = child_search_results["hits"]["hits"] + assert len(child_hits) > 0, "Expected to find child message" + child_document = child_hits[0]["_source"] + assert child_document["message"] == child_message, "Child message should match" + assert child_document["logger_name"] == "child_process_logger", "Child logger name should match" + + # Verify both messages have the same rollout_id (inherited from environment) + assert parent_document["rollout_id"] == rollout_id, "Parent message should have correct rollout_id" + assert child_document["rollout_id"] == rollout_id, "Child message should have correct rollout_id" + + print("✓ Configuration inheritance without reconfiguration or addHandler verified") + + # Test 2: Multiple child processes with inherited configuration + print("Testing multiple child processes with inherited configuration...") + + # Create additional test messages for multiple children + child_messages = [f"Child {i} message at {time.time()}" for i in range(3)] + + # Start multiple child processes + child_processes = [] + result_queues = [] + + for i, child_msg in enumerate(child_messages): + result_queue = Queue() + child_process = Process(target=child_process_worker, args=(child_msg, result_queue)) + child_processes.append(child_process) + result_queues.append(result_queue) + child_process.start() + + # Wait for all child processes to complete + for child_process in child_processes: + child_process.join(timeout=30) + if child_process.is_alive(): + child_process.terminate() + child_process.join(timeout=5) + if child_process.is_alive(): + child_process.kill() + + # Check all child process results + for i, result_queue in enumerate(result_queues): + if not result_queue.empty(): + result = result_queue.get() + if result.startswith("error:"): + pytest.fail(f"Child process {i} failed: {result}") + assert result == "success", f"Expected 'success' from child {i}, got '{result}'" + + # Give Elasticsearch time to process all documents + time.sleep(3) + + # Verify all child messages are in Elasticsearch + for i, child_msg in enumerate(child_messages): + search_results = elasticsearch_client.search_by_match("message", child_msg, size=1) + assert search_results is not None, f"Child {i} search should return results" + hits = search_results["hits"]["hits"] + assert len(hits) > 0, f"Expected to find child {i} message" + document = hits[0]["_source"] + assert document["message"] == child_msg, f"Child {i} message should match" + assert document["logger_name"] == "child_process_logger", f"Child {i} logger name should match" + assert document["rollout_id"] == rollout_id, f"Child {i} should have correct rollout_id" + + print("✓ Multiple child processes with inherited configuration verified") + + print( + "Successfully verified ElasticsearchDirectHttpHandler multiprocessing configuration inheritance without reconfiguration or addHandler" + ) + + # Clean up the parent handler + handler.close()