|
3 | 3 | import time |
4 | 4 | import pytest |
5 | 5 | from datetime import datetime, timezone |
| 6 | +from multiprocessing import Process, Queue |
6 | 7 |
|
7 | 8 | from eval_protocol.log_utils.elasticsearch_direct_http_handler import ElasticsearchDirectHttpHandler |
8 | 9 | from eval_protocol.log_utils.elasticsearch_client import ElasticsearchClient |
@@ -624,3 +625,161 @@ def test_elasticsearch_direct_http_handler_timestamp_format( |
624 | 625 | print(f"Successfully verified timestamp format: {timestamp_str}") |
625 | 626 | print(f"Parsed timestamp: {parsed_timestamp} (UTC)") |
626 | 627 | print(f"Timestamp is within expected time range: {before_log_time} <= {parsed_timestamp} <= {after_log_time}") |
| 628 | + |
| 629 | + |
| 630 | +def child_process_worker(test_message: str, result_queue: Queue): |
| 631 | + """Worker function that runs in a child process and logs a message using inherited handler configuration.""" |
| 632 | + try: |
| 633 | + # Create a child logger with parent set to the configured logger |
| 634 | + child_logger = logging.getLogger("parent_process_logger.child_process_logger") |
| 635 | + child_logger.setLevel(logging.INFO) |
| 636 | + # Don't clear handlers or addHandler - should inherit from parent logger setup |
| 637 | + |
| 638 | + # Log a message from the child process |
| 639 | + child_logger.info(test_message) |
| 640 | + |
| 641 | + # Give some time for the log to be processed |
| 642 | + time.sleep(2) |
| 643 | + |
| 644 | + # Signal completion |
| 645 | + result_queue.put("success") |
| 646 | + |
| 647 | + except Exception as e: |
| 648 | + result_queue.put(f"error: {str(e)}") |
| 649 | + |
| 650 | + |
| 651 | +def test_elasticsearch_direct_http_handler_multiprocessing_configuration_inheritance( |
| 652 | + elasticsearch_client: ElasticsearchClient, elasticsearch_config: ElasticsearchConfig, rollout_id: str |
| 653 | +): |
| 654 | + """Test that ElasticsearchDirectHttpHandler configuration is automatically inherited by child processes without reconfiguration or addHandler calls.""" |
| 655 | + |
| 656 | + # Test 1: Configuration inheritance without reconfiguration or addHandler |
| 657 | + print("Testing configuration inheritance without reconfiguration or addHandler...") |
| 658 | + |
| 659 | + # Generate unique test messages |
| 660 | + parent_message = f"Parent process message at {time.time()}" |
| 661 | + child_message = f"Child process message at {time.time()}" |
| 662 | + |
| 663 | + # Set up parent process logging with configured handler on a specific logger |
| 664 | + parent_handler = ElasticsearchDirectHttpHandler(elasticsearch_config) |
| 665 | + parent_logger = logging.getLogger("parent_process_logger") |
| 666 | + parent_logger.setLevel(logging.INFO) |
| 667 | + parent_logger.handlers.clear() |
| 668 | + parent_logger.addHandler(parent_handler) |
| 669 | + parent_logger.propagate = False # Prevent propagation to avoid affecting other tests |
| 670 | + |
| 671 | + # Log from parent process |
| 672 | + parent_logger.info(parent_message) |
| 673 | + |
| 674 | + # Start child process - should inherit configuration without calling configure() |
| 675 | + result_queue = Queue() |
| 676 | + child_process = Process(target=child_process_worker, args=(child_message, result_queue)) |
| 677 | + child_process.start() |
| 678 | + |
| 679 | + # Wait for child process to complete |
| 680 | + child_process.join(timeout=30) |
| 681 | + |
| 682 | + # Check if child process completed successfully |
| 683 | + if child_process.is_alive(): |
| 684 | + child_process.terminate() |
| 685 | + child_process.join(timeout=5) |
| 686 | + if child_process.is_alive(): |
| 687 | + child_process.kill() |
| 688 | + pytest.fail("Child process did not complete within timeout") |
| 689 | + |
| 690 | + # Check child process result |
| 691 | + if not result_queue.empty(): |
| 692 | + result = result_queue.get() |
| 693 | + if result.startswith("error:"): |
| 694 | + pytest.fail(f"Child process failed: {result}") |
| 695 | + assert result == "success", f"Expected 'success', got '{result}'" |
| 696 | + |
| 697 | + # Give Elasticsearch time to process all documents |
| 698 | + time.sleep(3) |
| 699 | + |
| 700 | + # Verify both parent and child messages are in Elasticsearch |
| 701 | + parent_search_results = elasticsearch_client.search_by_term("message", parent_message, size=1) |
| 702 | + child_search_results = elasticsearch_client.search_by_term("message", child_message, size=1) |
| 703 | + |
| 704 | + # Verify parent message |
| 705 | + assert parent_search_results is not None, "Parent search should return results" |
| 706 | + parent_hits = parent_search_results["hits"]["hits"] |
| 707 | + assert len(parent_hits) > 0, "Expected to find parent message" |
| 708 | + parent_document = parent_hits[0]["_source"] |
| 709 | + assert parent_document["message"] == parent_message, "Parent message should match" |
| 710 | + assert parent_document["logger_name"] == "parent_process_logger", "Parent logger name should match" |
| 711 | + |
| 712 | + # Verify child message |
| 713 | + assert child_search_results is not None, "Child search should return results" |
| 714 | + child_hits = child_search_results["hits"]["hits"] |
| 715 | + assert len(child_hits) > 0, "Expected to find child message" |
| 716 | + child_document = child_hits[0]["_source"] |
| 717 | + assert child_document["message"] == child_message, "Child message should match" |
| 718 | + assert child_document["logger_name"] == "parent_process_logger.child_process_logger", ( |
| 719 | + "Child logger name should match" |
| 720 | + ) |
| 721 | + |
| 722 | + # Verify both messages have the same rollout_id (inherited from environment) |
| 723 | + assert parent_document["rollout_id"] == rollout_id, "Parent message should have correct rollout_id" |
| 724 | + assert child_document["rollout_id"] == rollout_id, "Child message should have correct rollout_id" |
| 725 | + |
| 726 | + print("✓ Configuration inheritance without reconfiguration or addHandler verified") |
| 727 | + |
| 728 | + # Test 2: Multiple child processes with inherited configuration |
| 729 | + print("Testing multiple child processes with inherited configuration...") |
| 730 | + |
| 731 | + # Create additional test messages for multiple children |
| 732 | + child_messages = [f"Child {i} message at {time.time()}" for i in range(3)] |
| 733 | + |
| 734 | + # Start multiple child processes |
| 735 | + child_processes = [] |
| 736 | + result_queues = [] |
| 737 | + |
| 738 | + for i, child_msg in enumerate(child_messages): |
| 739 | + result_queue = Queue() |
| 740 | + child_process = Process(target=child_process_worker, args=(child_msg, result_queue)) |
| 741 | + child_processes.append(child_process) |
| 742 | + result_queues.append(result_queue) |
| 743 | + child_process.start() |
| 744 | + |
| 745 | + # Wait for all child processes to complete |
| 746 | + for child_process in child_processes: |
| 747 | + child_process.join(timeout=30) |
| 748 | + if child_process.is_alive(): |
| 749 | + child_process.terminate() |
| 750 | + child_process.join(timeout=5) |
| 751 | + if child_process.is_alive(): |
| 752 | + child_process.kill() |
| 753 | + |
| 754 | + # Check all child process results |
| 755 | + for i, result_queue in enumerate(result_queues): |
| 756 | + if not result_queue.empty(): |
| 757 | + result = result_queue.get() |
| 758 | + if result.startswith("error:"): |
| 759 | + pytest.fail(f"Child process {i} failed: {result}") |
| 760 | + assert result == "success", f"Expected 'success' from child {i}, got '{result}'" |
| 761 | + |
| 762 | + # Give Elasticsearch time to process all documents |
| 763 | + time.sleep(3) |
| 764 | + |
| 765 | + # Verify all child messages are in Elasticsearch |
| 766 | + for i, child_msg in enumerate(child_messages): |
| 767 | + search_results = elasticsearch_client.search_by_match("message", child_msg, size=1) |
| 768 | + assert search_results is not None, f"Child {i} search should return results" |
| 769 | + hits = search_results["hits"]["hits"] |
| 770 | + assert len(hits) > 0, f"Expected to find child {i} message" |
| 771 | + document = hits[0]["_source"] |
| 772 | + assert document["message"] == child_msg, f"Child {i} message should match" |
| 773 | + assert document["logger_name"] == "parent_process_logger.child_process_logger", ( |
| 774 | + f"Child {i} logger name should match" |
| 775 | + ) |
| 776 | + assert document["rollout_id"] == rollout_id, f"Child {i} should have correct rollout_id" |
| 777 | + |
| 778 | + print("✓ Multiple child processes with inherited configuration verified") |
| 779 | + |
| 780 | + print( |
| 781 | + "Successfully verified ElasticsearchDirectHttpHandler multiprocessing configuration inheritance without reconfiguration or addHandler" |
| 782 | + ) |
| 783 | + |
| 784 | + # Clean up the parent handler |
| 785 | + parent_handler.close() |
0 commit comments