Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions tests/logging/test_elasticsearch_direct_http_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
import pytest
from datetime import datetime, timezone
from multiprocessing import Process, Queue

from eval_protocol.log_utils.elasticsearch_direct_http_handler import ElasticsearchDirectHttpHandler
from eval_protocol.log_utils.elasticsearch_client import ElasticsearchClient
Expand Down Expand Up @@ -624,3 +625,158 @@ def test_elasticsearch_direct_http_handler_timestamp_format(
print(f"Successfully verified timestamp format: {timestamp_str}")
print(f"Parsed timestamp: {parsed_timestamp} (UTC)")
print(f"Timestamp is within expected time range: {before_log_time} <= {parsed_timestamp} <= {after_log_time}")


def child_process_worker(test_message: str, result_queue: Queue):
"""Worker function that runs in a child process and logs a message using inherited handler configuration."""
try:
# Create a child logger with parent set to the configured logger
child_logger = logging.getLogger("child_process_logger")
child_logger.setLevel(logging.INFO)
# Don't clear handlers or addHandler - should inherit from parent logger setup

# Log a message from the child process
child_logger.info(test_message)

# Give some time for the log to be processed
time.sleep(2)

# Signal completion
result_queue.put("success")

except Exception as e:
result_queue.put(f"error: {str(e)}")


def test_elasticsearch_direct_http_handler_multiprocessing_configuration_inheritance(
elasticsearch_client: ElasticsearchClient, elasticsearch_config: ElasticsearchConfig, rollout_id: str
):
"""Test that ElasticsearchDirectHttpHandler configuration is automatically inherited by child processes without reconfiguration or addHandler calls."""

# Test 1: Configuration inheritance without reconfiguration or addHandler
print("Testing configuration inheritance without reconfiguration or addHandler...")

# Generate unique test messages
parent_message = f"Parent process message at {time.time()}"
child_message = f"Child process message at {time.time()}"

# Set up parent process logging with configured handler on a specific logger
handler = ElasticsearchDirectHttpHandler(elasticsearch_config)
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.handlers.clear()
root_logger.addHandler(handler)

parent_logger = logging.getLogger("parent_process_logger")

# Log from parent process
parent_logger.info(parent_message)

# Start child process - should inherit configuration without calling configure()
result_queue = Queue()
child_process = Process(target=child_process_worker, args=(child_message, result_queue))
child_process.start()

# Wait for child process to complete
child_process.join(timeout=100_000)

# Check if child process completed successfully
if child_process.is_alive():
child_process.terminate()
child_process.join(timeout=5)
if child_process.is_alive():
child_process.kill()
pytest.fail("Child process did not complete within timeout")

# Check child process result
if not result_queue.empty():
result = result_queue.get()
if result.startswith("error:"):
pytest.fail(f"Child process failed: {result}")
assert result == "success", f"Expected 'success', got '{result}'"

# Give Elasticsearch time to process all documents
time.sleep(3)

# Verify both parent and child messages are in Elasticsearch
parent_search_results = elasticsearch_client.search_by_term("message", parent_message, size=1)
child_search_results = elasticsearch_client.search_by_term("message", child_message, size=1)

# Verify parent message
assert parent_search_results is not None, "Parent search should return results"
parent_hits = parent_search_results["hits"]["hits"]
assert len(parent_hits) > 0, "Expected to find parent message"
parent_document = parent_hits[0]["_source"]
assert parent_document["message"] == parent_message, "Parent message should match"
assert parent_document["logger_name"] == "parent_process_logger", "Parent logger name should match"

# Verify child message
assert child_search_results is not None, "Child search should return results"
child_hits = child_search_results["hits"]["hits"]
assert len(child_hits) > 0, "Expected to find child message"
child_document = child_hits[0]["_source"]
assert child_document["message"] == child_message, "Child message should match"
assert child_document["logger_name"] == "child_process_logger", "Child logger name should match"

# Verify both messages have the same rollout_id (inherited from environment)
assert parent_document["rollout_id"] == rollout_id, "Parent message should have correct rollout_id"
assert child_document["rollout_id"] == rollout_id, "Child message should have correct rollout_id"

print("✓ Configuration inheritance without reconfiguration or addHandler verified")

# Test 2: Multiple child processes with inherited configuration
print("Testing multiple child processes with inherited configuration...")

# Create additional test messages for multiple children
child_messages = [f"Child {i} message at {time.time()}" for i in range(3)]

# Start multiple child processes
child_processes = []
result_queues = []

for i, child_msg in enumerate(child_messages):
result_queue = Queue()
child_process = Process(target=child_process_worker, args=(child_msg, result_queue))
child_processes.append(child_process)
result_queues.append(result_queue)
child_process.start()

# Wait for all child processes to complete
for child_process in child_processes:
child_process.join(timeout=30)
if child_process.is_alive():
child_process.terminate()
child_process.join(timeout=5)
if child_process.is_alive():
child_process.kill()

# Check all child process results
for i, result_queue in enumerate(result_queues):
if not result_queue.empty():
result = result_queue.get()
if result.startswith("error:"):
pytest.fail(f"Child process {i} failed: {result}")
assert result == "success", f"Expected 'success' from child {i}, got '{result}'"

# Give Elasticsearch time to process all documents
time.sleep(3)

# Verify all child messages are in Elasticsearch
for i, child_msg in enumerate(child_messages):
search_results = elasticsearch_client.search_by_match("message", child_msg, size=1)
assert search_results is not None, f"Child {i} search should return results"
hits = search_results["hits"]["hits"]
assert len(hits) > 0, f"Expected to find child {i} message"
document = hits[0]["_source"]
assert document["message"] == child_msg, f"Child {i} message should match"
assert document["logger_name"] == "child_process_logger", f"Child {i} logger name should match"
assert document["rollout_id"] == rollout_id, f"Child {i} should have correct rollout_id"

print("✓ Multiple child processes with inherited configuration verified")

print(
"Successfully verified ElasticsearchDirectHttpHandler multiprocessing configuration inheritance without reconfiguration or addHandler"
)

# Clean up the parent handler
handler.close()
Loading