From f35c88b215e4a398dbec488cb6ec01b686ade50d Mon Sep 17 00:00:00 2001 From: Dylan Huang Date: Wed, 8 Oct 2025 13:12:40 -0700 Subject: [PATCH 1/2] write test to validate inheritance of handler from parent logger to child logger --- .../test_elasticsearch_direct_http_handler.py | 159 ++++++++++++++++++ 1 file changed, 159 insertions(+) diff --git a/tests/logging/test_elasticsearch_direct_http_handler.py b/tests/logging/test_elasticsearch_direct_http_handler.py index df59f9de..586a614d 100644 --- a/tests/logging/test_elasticsearch_direct_http_handler.py +++ b/tests/logging/test_elasticsearch_direct_http_handler.py @@ -3,6 +3,7 @@ import time import pytest from datetime import datetime, timezone +from multiprocessing import Process, Queue from eval_protocol.log_utils.elasticsearch_direct_http_handler import ElasticsearchDirectHttpHandler from eval_protocol.log_utils.elasticsearch_client import ElasticsearchClient @@ -624,3 +625,161 @@ def test_elasticsearch_direct_http_handler_timestamp_format( print(f"Successfully verified timestamp format: {timestamp_str}") print(f"Parsed timestamp: {parsed_timestamp} (UTC)") print(f"Timestamp is within expected time range: {before_log_time} <= {parsed_timestamp} <= {after_log_time}") + + +def child_process_worker(test_message: str, result_queue: Queue): + """Worker function that runs in a child process and logs a message using inherited handler configuration.""" + try: + # Create a child logger with parent set to the configured logger + child_logger = logging.getLogger("parent_process_logger.child_process_logger") + child_logger.setLevel(logging.INFO) + # Don't clear handlers or addHandler - should inherit from parent logger setup + + # Log a message from the child process + child_logger.info(test_message) + + # Give some time for the log to be processed + time.sleep(2) + + # Signal completion + result_queue.put("success") + + except Exception as e: + result_queue.put(f"error: {str(e)}") + + +def test_elasticsearch_direct_http_handler_multiprocessing_configuration_inheritance( + elasticsearch_client: ElasticsearchClient, elasticsearch_config: ElasticsearchConfig, rollout_id: str +): + """Test that ElasticsearchDirectHttpHandler configuration is automatically inherited by child processes without reconfiguration or addHandler calls.""" + + # Test 1: Configuration inheritance without reconfiguration or addHandler + print("Testing configuration inheritance without reconfiguration or addHandler...") + + # Generate unique test messages + parent_message = f"Parent process message at {time.time()}" + child_message = f"Child process message at {time.time()}" + + # Set up parent process logging with configured handler on a specific logger + parent_handler = ElasticsearchDirectHttpHandler(elasticsearch_config) + parent_logger = logging.getLogger("parent_process_logger") + parent_logger.setLevel(logging.INFO) + parent_logger.handlers.clear() + parent_logger.addHandler(parent_handler) + parent_logger.propagate = False # Prevent propagation to avoid affecting other tests + + # Log from parent process + parent_logger.info(parent_message) + + # Start child process - should inherit configuration without calling configure() + result_queue = Queue() + child_process = Process(target=child_process_worker, args=(child_message, result_queue)) + child_process.start() + + # Wait for child process to complete + child_process.join(timeout=30) + + # Check if child process completed successfully + if child_process.is_alive(): + child_process.terminate() + child_process.join(timeout=5) + if child_process.is_alive(): + child_process.kill() + pytest.fail("Child process did not complete within timeout") + + # Check child process result + if not result_queue.empty(): + result = result_queue.get() + if result.startswith("error:"): + pytest.fail(f"Child process failed: {result}") + assert result == "success", f"Expected 'success', got '{result}'" + + # Give Elasticsearch time to process all documents + time.sleep(3) + + # Verify both parent and child messages are in Elasticsearch + parent_search_results = elasticsearch_client.search_by_term("message", parent_message, size=1) + child_search_results = elasticsearch_client.search_by_term("message", child_message, size=1) + + # Verify parent message + assert parent_search_results is not None, "Parent search should return results" + parent_hits = parent_search_results["hits"]["hits"] + assert len(parent_hits) > 0, "Expected to find parent message" + parent_document = parent_hits[0]["_source"] + assert parent_document["message"] == parent_message, "Parent message should match" + assert parent_document["logger_name"] == "parent_process_logger", "Parent logger name should match" + + # Verify child message + assert child_search_results is not None, "Child search should return results" + child_hits = child_search_results["hits"]["hits"] + assert len(child_hits) > 0, "Expected to find child message" + child_document = child_hits[0]["_source"] + assert child_document["message"] == child_message, "Child message should match" + assert child_document["logger_name"] == "parent_process_logger.child_process_logger", ( + "Child logger name should match" + ) + + # Verify both messages have the same rollout_id (inherited from environment) + assert parent_document["rollout_id"] == rollout_id, "Parent message should have correct rollout_id" + assert child_document["rollout_id"] == rollout_id, "Child message should have correct rollout_id" + + print("✓ Configuration inheritance without reconfiguration or addHandler verified") + + # Test 2: Multiple child processes with inherited configuration + print("Testing multiple child processes with inherited configuration...") + + # Create additional test messages for multiple children + child_messages = [f"Child {i} message at {time.time()}" for i in range(3)] + + # Start multiple child processes + child_processes = [] + result_queues = [] + + for i, child_msg in enumerate(child_messages): + result_queue = Queue() + child_process = Process(target=child_process_worker, args=(child_msg, result_queue)) + child_processes.append(child_process) + result_queues.append(result_queue) + child_process.start() + + # Wait for all child processes to complete + for child_process in child_processes: + child_process.join(timeout=30) + if child_process.is_alive(): + child_process.terminate() + child_process.join(timeout=5) + if child_process.is_alive(): + child_process.kill() + + # Check all child process results + for i, result_queue in enumerate(result_queues): + if not result_queue.empty(): + result = result_queue.get() + if result.startswith("error:"): + pytest.fail(f"Child process {i} failed: {result}") + assert result == "success", f"Expected 'success' from child {i}, got '{result}'" + + # Give Elasticsearch time to process all documents + time.sleep(3) + + # Verify all child messages are in Elasticsearch + for i, child_msg in enumerate(child_messages): + search_results = elasticsearch_client.search_by_match("message", child_msg, size=1) + assert search_results is not None, f"Child {i} search should return results" + hits = search_results["hits"]["hits"] + assert len(hits) > 0, f"Expected to find child {i} message" + document = hits[0]["_source"] + assert document["message"] == child_msg, f"Child {i} message should match" + assert document["logger_name"] == "parent_process_logger.child_process_logger", ( + f"Child {i} logger name should match" + ) + assert document["rollout_id"] == rollout_id, f"Child {i} should have correct rollout_id" + + print("✓ Multiple child processes with inherited configuration verified") + + print( + "Successfully verified ElasticsearchDirectHttpHandler multiprocessing configuration inheritance without reconfiguration or addHandler" + ) + + # Clean up the parent handler + parent_handler.close() From aa2063418af0a5e14e1a0d6f60a0c5e84d30dd98 Mon Sep 17 00:00:00 2001 From: Dylan Huang Date: Wed, 8 Oct 2025 14:37:28 -0700 Subject: [PATCH 2/2] save --- .../test_elasticsearch_direct_http_handler.py | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/tests/logging/test_elasticsearch_direct_http_handler.py b/tests/logging/test_elasticsearch_direct_http_handler.py index 586a614d..0244248b 100644 --- a/tests/logging/test_elasticsearch_direct_http_handler.py +++ b/tests/logging/test_elasticsearch_direct_http_handler.py @@ -631,7 +631,7 @@ def child_process_worker(test_message: str, result_queue: Queue): """Worker function that runs in a child process and logs a message using inherited handler configuration.""" try: # Create a child logger with parent set to the configured logger - child_logger = logging.getLogger("parent_process_logger.child_process_logger") + child_logger = logging.getLogger("child_process_logger") child_logger.setLevel(logging.INFO) # Don't clear handlers or addHandler - should inherit from parent logger setup @@ -661,12 +661,13 @@ def test_elasticsearch_direct_http_handler_multiprocessing_configuration_inherit child_message = f"Child process message at {time.time()}" # Set up parent process logging with configured handler on a specific logger - parent_handler = ElasticsearchDirectHttpHandler(elasticsearch_config) + handler = ElasticsearchDirectHttpHandler(elasticsearch_config) + root_logger = logging.getLogger() + root_logger.setLevel(logging.INFO) + root_logger.handlers.clear() + root_logger.addHandler(handler) + parent_logger = logging.getLogger("parent_process_logger") - parent_logger.setLevel(logging.INFO) - parent_logger.handlers.clear() - parent_logger.addHandler(parent_handler) - parent_logger.propagate = False # Prevent propagation to avoid affecting other tests # Log from parent process parent_logger.info(parent_message) @@ -677,7 +678,7 @@ def test_elasticsearch_direct_http_handler_multiprocessing_configuration_inherit child_process.start() # Wait for child process to complete - child_process.join(timeout=30) + child_process.join(timeout=100_000) # Check if child process completed successfully if child_process.is_alive(): @@ -715,9 +716,7 @@ def test_elasticsearch_direct_http_handler_multiprocessing_configuration_inherit assert len(child_hits) > 0, "Expected to find child message" child_document = child_hits[0]["_source"] assert child_document["message"] == child_message, "Child message should match" - assert child_document["logger_name"] == "parent_process_logger.child_process_logger", ( - "Child logger name should match" - ) + assert child_document["logger_name"] == "child_process_logger", "Child logger name should match" # Verify both messages have the same rollout_id (inherited from environment) assert parent_document["rollout_id"] == rollout_id, "Parent message should have correct rollout_id" @@ -770,9 +769,7 @@ def test_elasticsearch_direct_http_handler_multiprocessing_configuration_inherit assert len(hits) > 0, f"Expected to find child {i} message" document = hits[0]["_source"] assert document["message"] == child_msg, f"Child {i} message should match" - assert document["logger_name"] == "parent_process_logger.child_process_logger", ( - f"Child {i} logger name should match" - ) + assert document["logger_name"] == "child_process_logger", f"Child {i} logger name should match" assert document["rollout_id"] == rollout_id, f"Child {i} should have correct rollout_id" print("✓ Multiple child processes with inherited configuration verified") @@ -782,4 +779,4 @@ def test_elasticsearch_direct_http_handler_multiprocessing_configuration_inherit ) # Clean up the parent handler - parent_handler.close() + handler.close()