From cc4ef0f12401bb0bcc7fefbe09a07e20926a11af Mon Sep 17 00:00:00 2001 From: ActivePeter <1020401660@qq.com> Date: Thu, 2 Jul 2026 17:55:51 +0800 Subject: [PATCH 1/2] test --- .../test_api_chan_mpmc_base.py | 227 +++++--- ...pi_chan_mpmc_quick_and_weighted_consume.py | 200 +++---- .../test_api_chan_mpmc/test_rebind_client.py | 233 ++++---- fluxon_py/tests/test_lib.py | 88 ++- fluxon_py/tests/test_pyo3_etcd.py | 129 +++++ fluxon_release/test_rsc/source/prepare.yaml | 4 + fluxon_rs/fluxon_pyo3/build.rs | 148 ++++- fluxon_rs/fluxon_pyo3/src/etcd.rs | 529 +++++++++++++++++- fluxon_rs/fluxon_pyo3/src/lib.rs | 8 +- .../tests/test_test_rsc_prepare_yaml.py | 2 + .../tests/test_top_attention_index_helper.py | 6 + .../top_attention_test_index/_kv_py_core.py | 1 + 12 files changed, 1282 insertions(+), 293 deletions(-) create mode 100644 fluxon_py/tests/test_pyo3_etcd.py diff --git a/fluxon_py/tests/test_api_chan_mpmc/test_api_chan_mpmc_base.py b/fluxon_py/tests/test_api_chan_mpmc/test_api_chan_mpmc_base.py index f992c2d..ce2d56b 100644 --- a/fluxon_py/tests/test_api_chan_mpmc/test_api_chan_mpmc_base.py +++ b/fluxon_py/tests/test_api_chan_mpmc/test_api_chan_mpmc_base.py @@ -44,9 +44,7 @@ def _find_project_root(start: Path) -> Path: if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) -from typing import Dict, List, Optional, Tuple - -import etcd3 +from typing import Any, Dict, List, Optional, Tuple from fluxon_py.api_ext_chan import ( # noqa: E402 MPMCChanConsumer, @@ -82,12 +80,12 @@ def _find_project_root(start: Path) -> Path: KV_SVC_TYPE, CHAN_CONFIG_TEST, TEST_TIMEOUT_SECONDS, - ETCD_HOST, - ETCD_PORT, setup_test_environment, new_test_consumer, new_test_producer, run_with_argmatrix, + is_transient_etcd_control_failure as _is_transient_etcd_failure, + etcd_control_call_with_retry as _etcd_call_with_retry, ) from fluxon_py.tests.test_lib import pre_kill_existing_test_processes_by_script_name # noqa: E402 @@ -264,7 +262,6 @@ def run_producer(env: "ChannelState", args: argparse.Namespace) -> None: ) assert isinstance(producer, MPMCChanProducer) print(f"[Producer-{args.producer_id}] Started") - etcd_client = producer.etcd_client try: for index in range(args.message_count): unique_id = str(uuid.uuid4()) @@ -324,11 +321,14 @@ def run_consumer(env: "ChannelState", args: argparse.Namespace) -> None: f"{consumer.mpmc_channel.mpmc_member_id}", flush=True, ) - etcd_client = consumer.etcd_client - etcd_client.put( - f"/test_mpmc_consumer/{args.consumer_id}", - b"dummy_value", - consumer.mpmc_channel.mpmc_global_lease, + _etcd_call_with_retry( + f"publish consumer presence for consumer_id={args.consumer_id}", + lambda client: client.put( + f"/test_mpmc_consumer/{args.consumer_id}", + b"dummy_value", + lease_id=int(consumer.mpmc_channel.mpmc_global_lease.id), + ), + max_attempts=3, ) consumed_count = 0 @@ -344,13 +344,19 @@ def run_consumer(env: "ChannelState", args: argparse.Namespace) -> None: # Periodically check whether all producers are done. now = time.time() if now - last_producer_check >= producer_done_check_interval: - stop_flag, _ = etcd_client.get(f"/test_mpmc_stop_producer") + stop_flag = _etcd_call_with_retry( + "read producer stop flag", + lambda client: client.get("/test_mpmc_stop_producer"), + max_attempts=3, + ) all_producers_done = stop_flag is not None last_producer_check = now # External stop signal from harness. - stop_flag, _ = etcd_client.get( - f"/test_mpmc_stop_consumer/{args.consumer_id}" + stop_flag = _etcd_call_with_retry( + f"read consumer stop flag for consumer_id={args.consumer_id}", + lambda client: client.get(f"/test_mpmc_stop_consumer/{args.consumer_id}"), + max_attempts=3, ) if stop_flag: logging.info( @@ -424,7 +430,11 @@ def run_consumer(env: "ChannelState", args: argparse.Namespace) -> None: f" {consumed_count} messages" ) _atomic_stdout_write_line(f"{CONSUMER_NORMAL_EXIT_MARKER} {args.consumer_id}") - etcd_client.delete(f"/test_mpmc_consumer/{args.consumer_id}") + _etcd_call_with_retry( + f"delete consumer presence for consumer_id={args.consumer_id}", + lambda client: client.delete(f"/test_mpmc_consumer/{args.consumer_id}"), + max_attempts=3, + ) consumer.close().unwrap() finally: configure_backend(env, backend_type=prev_type, backend_ip=prev_ip) @@ -432,16 +442,17 @@ def run_consumer(env: "ChannelState", args: argparse.Namespace) -> None: def clean_etcd() -> None: - with etcd3.client(ETCD_HOST, ETCD_PORT) as etcd_client: + def _clean(etcd_client: Any) -> None: etcd_client.delete_prefix("/mpmc_channels") etcd_client.delete_prefix("/channels") etcd_client.delete_prefix("/test_mpmc_stop_consumer") etcd_client.delete_prefix("/test_mpmc_consumer") etcd_client.delete_prefix("/test_mpmc_stop_producer") + _etcd_call_with_retry("clean MPMC test etcd prefixes", _clean) + def _wait_until_lease_revoked( - etcd_client: etcd3.Etcd3Client, lease_id: int, *, timeout_s: float = 10.0, @@ -449,7 +460,11 @@ def _wait_until_lease_revoked( deadline = time.time() + timeout_s while True: try: - info = etcd_client.get_lease_info(int(lease_id)) + ttl_val = _etcd_call_with_retry( + f"read lease ttl for lease_id={lease_id}", + lambda client: client.lease_ttl(int(lease_id)), + max_attempts=3, + ) except Exception as exc: # noqa: BLE001 msg = str(exc).lower() if "not found" in msg or "requested lease not found" in msg: @@ -459,7 +474,6 @@ def _wait_until_lease_revoked( f"lease revoke verification failed for lease_id={lease_id}: {exc}" ) from exc else: - ttl_val = getattr(info, "TTL", None) if not isinstance(ttl_val, int): raise RuntimeError( f"invalid TTL returned for lease_id={lease_id}: {ttl_val!r}" @@ -493,14 +507,19 @@ def test_mpmc_member_lease_expiry_closes_owner() -> None: chan_id = producer.get_chan_id() lease_id = int(producer.mpmc_channel.mpmc_member_lease.id) - with etcd3.client(ETCD_HOST, ETCD_PORT) as etcd_client: - mpsc_meta_before = list(etcd_client.get_prefix("/channels/meta/")) - assert len(mpsc_meta_before) == 0, ( - "fresh MPMC producer should not create any sub-MPSC metadata before the first put, " - f"found {len(mpsc_meta_before)} keys" - ) - etcd_client.revoke_lease(lease_id) - _wait_until_lease_revoked(etcd_client, lease_id) + mpsc_meta_before = _etcd_call_with_retry( + "list MPSC metadata before member lease revoke", + lambda client: list(client.get_prefix("/channels/meta/")), + ) + assert len(mpsc_meta_before) == 0, ( + "fresh MPMC producer should not create any sub-MPSC metadata before the first put, " + f"found {len(mpsc_meta_before)} keys" + ) + _etcd_call_with_retry( + f"revoke member lease lease_id={lease_id}", + lambda client: client.revoke_lease(lease_id), + ) + _wait_until_lease_revoked(lease_id) first_put = producer.put_data( { @@ -516,11 +535,13 @@ def test_mpmc_member_lease_expiry_closes_owner() -> None: assert first_err.channel_id == chan_id assert producer.shutdown_ctl.closed, "producer must mark itself closed after member lease loss" - with etcd3.client(ETCD_HOST, ETCD_PORT) as etcd_client: - mpsc_meta_after = list(etcd_client.get_prefix("/channels/meta/")) - assert len(mpsc_meta_after) == 0, ( - "dead member lease must stop sub-MPSC creation before any new channel meta is published" - ) + mpsc_meta_after = _etcd_call_with_retry( + "list MPSC metadata after member lease revoke", + lambda client: list(client.get_prefix("/channels/meta/")), + ) + assert len(mpsc_meta_after) == 0, ( + "dead member lease must stop sub-MPSC creation before any new channel meta is published" + ) second_put = producer.put_data( { @@ -643,11 +664,12 @@ def scenario_dynamic_producer_consumer( # Map all process handles by identifier (producer_id/consumer_id) process_handles_by_id: Dict[str, Tuple[str, subprocess.Popen, str]] = {} joined_ids: set[str] = set() - etcd_client = etcd3.client(ETCD_HOST, ETCD_PORT) initial_consumers_id: List[str] = [] dyn_consumers: List[str] = [] recovered_consumers: List[str] = [] test_mpmc_id: Optional[str] = None + scan_stop_event = threading.Event() + scan_thread: Optional[threading.Thread] = None def fail_fast_on_subprocess_error(*, process_type_filter: Optional[str] = None) -> None: for identifier, (process_type, proc, log_file) in process_handles_by_id.items(): @@ -716,9 +738,13 @@ def _extract_mpmc_member_id_from_consumer_log(log_file: str) -> Optional[int]: def _count_ready_keys_for_member_id(member_id: int) -> int: assert test_mpmc_id is not None, "test_mpmc_id must be initialized before counting ready keys" - ready_chans_kvs = list(etcd_client.get_prefix(_new_mpmc_ready_channels_prefix(test_mpmc_id))) + ready_prefix = _new_mpmc_ready_channels_prefix(test_mpmc_id) + ready_chans_kvs = _etcd_call_with_retry( + f"count ready keys for member_id={member_id}", + lambda client: list(client.get_prefix(ready_prefix)), + ) count = 0 - for value, _meta in ready_chans_kvs: + for _key, value in ready_chans_kvs: if value is None: continue if value.decode() == str(member_id): @@ -728,17 +754,21 @@ def _count_ready_keys_for_member_id(member_id: int) -> int: def _list_ready_keys_for_member_id(member_id: int) -> List[str]: assert test_mpmc_id is not None, "test_mpmc_id must be initialized before listing ready keys" member_id_str = str(member_id) - ready_chans_kvs = list(etcd_client.get_prefix(_new_mpmc_ready_channels_prefix(test_mpmc_id))) + ready_prefix = _new_mpmc_ready_channels_prefix(test_mpmc_id) + ready_chans_kvs = _etcd_call_with_retry( + f"list ready keys for member_id={member_id}", + lambda client: list(client.get_prefix(ready_prefix)), + ) keys: List[str] = [] - for value, meta in ready_chans_kvs: + for key, value in ready_chans_kvs: if value is None: continue if value.decode() != member_id_str: continue try: - keys.append(meta.key.decode()) + keys.append(key.decode()) except Exception: - keys.append(repr(meta.key)) + keys.append(repr(key)) return keys def _wait_ready_keys_cleared_for_member_id(member_id: int, timeout_s: float) -> List[str]: @@ -755,12 +785,19 @@ def _wait_ready_keys_cleared_for_member_id(member_id: int, timeout_s: float) -> def _wait_for_test_mpmc_id() -> str: deadline = time.time() + float(TEST_TIMEOUT_SECONDS) while True: - value, _meta = etcd_client.get(NEW_OR_BIND_MAPPING_KEY) + value = _etcd_call_with_retry( + f"read MPMC mapping key {NEW_OR_BIND_MAPPING_KEY}", + lambda client: client.get(NEW_OR_BIND_MAPPING_KEY), + ) if value is not None: raw = value.decode().strip() if raw.isdigit(): candidate = raw - meta_val, _meta = etcd_client.get(_new_mpmc_meta_key(candidate)) + meta_key = _new_mpmc_meta_key(candidate) + meta_val = _etcd_call_with_retry( + f"read MPMC meta key {meta_key}", + lambda client: client.get(meta_key), + ) if meta_val is not None: return candidate logging.warning( @@ -782,7 +819,10 @@ def _wait_for_test_mpmc_id() -> str: def _assert_test_mpmc_id_stable() -> None: assert test_mpmc_id is not None - value, _meta = etcd_client.get(NEW_OR_BIND_MAPPING_KEY) + value = _etcd_call_with_retry( + f"assert stable MPMC mapping key {NEW_OR_BIND_MAPPING_KEY}", + lambda client: client.get(NEW_OR_BIND_MAPPING_KEY), + ) if value is None: raise RuntimeError( f"etcd key {NEW_OR_BIND_MAPPING_KEY!r} disappeared during test; expected mpmc_id={test_mpmc_id}" @@ -872,22 +912,25 @@ def consumer_process_cmd(consumer_id: str) -> List[str]: str(prefetch), ] - def scan_producer_offset() -> None: + def scan_producer_offset(stop_event: threading.Event) -> None: # Use dedicated scan logger (single file mpmc_scan_offset.log) logger = _scan_logger mpsc_producer_offset_pair: Dict[int, Dict[int, List[int]]] = {} - scan_client = etcd3.client(ETCD_HOST, ETCD_PORT) mpsc_chans: List[int] = [] def get_chans() -> List[int]: prefix = mpsc._new_etcd_meta_key_prefix() + meta_kvs = _etcd_call_with_retry( + "scan MPSC channel metadata keys", + lambda client: list(client.get_prefix(prefix)), + ) return [ - int(meta.key.decode().split("/")[-1]) - for _, meta in scan_client.get_prefix(prefix) + int(key.decode().split("/")[-1]) + for key, _value in meta_kvs ] try: - while True: + while not stop_event.is_set(): if mpsc_producer_offset_pair: sum_unconsumed_count = 0 sum_consumed_count = 0 @@ -925,11 +968,14 @@ def get_chans() -> List[int]: ) logger.info(">>> sum_unconsumed_count: %s", sum_unconsumed_count) logger.info(">>> sum_consumed_count: %s", sum_consumed_count) - ready_chans_kvs = list(scan_client.get_prefix(_READY_CHANNELS_BASE_PREFIX)) + ready_chans_kvs = _etcd_call_with_retry( + "scan all MPMC ready channel keys", + lambda client: list(client.get_prefix(_READY_CHANNELS_BASE_PREFIX)), + ) mpmc_ids: set[str] = set() ready_mpscs: List[str] = [] - for value, meta in ready_chans_kvs: - key = meta.key.decode() + for key_bytes, value in ready_chans_kvs: + key = key_bytes.decode() mpmc_id = key.split("/")[-2] mpsc_id = key.split("/")[-1] mpmc_ids.add(mpmc_id) @@ -950,10 +996,10 @@ def get_chans() -> List[int]: logger.info("all_mpscs: %s", mpsc_chans) for mpsc_id in mpsc_chans: mpsc_producer_offset_pair.setdefault(mpsc_id, {}) - producer_offset_kvs = list( - scan_client.get_prefix( - mpsc._new_produce_offset_of_all_producer_key(mpsc_id) - ) + producer_offset_prefix = mpsc._new_produce_offset_of_all_producer_key(mpsc_id) + producer_offset_kvs = _etcd_call_with_retry( + f"scan producer offsets for mpsc_id={mpsc_id}", + lambda client: list(client.get_prefix(producer_offset_prefix)), ) logger.info( "mpsc %s producer_offset_kvs: %s", @@ -961,8 +1007,8 @@ def get_chans() -> List[int]: producer_offset_kvs, ) mpsc_producer_offsets = { - int(meta.key.decode().split("/")[-1]): int(value.decode()) - for value, meta in producer_offset_kvs + int(key.decode().split("/")[-1]): int(value.decode()) + for key, value in producer_offset_kvs } logger.info( "mpsc %s producer_offsets dict: %s", @@ -981,7 +1027,10 @@ def get_chans() -> List[int]: consume_offset_key = mpsc._new_consume_offset_of_one_producer_key( mpsc_id, str(mpsc_producer_key) ) - consume_value, _ = scan_client.get(consume_offset_key) + consume_value = _etcd_call_with_retry( + f"read consume offset {consume_offset_key}", + lambda client: client.get(consume_offset_key), + ) logger.info( "mpsc_id: %s mpsc_producer_key: %s consume_offset_key: %s " "mpsc_consume_offset: %s", @@ -994,9 +1043,12 @@ def get_chans() -> List[int]: mpsc_producer_offset_pair[mpsc_id][mpsc_producer_key][1] = int( consume_value.decode() ) - time.sleep(5) - finally: - scan_client.close() + stop_event.wait(5) + except Exception as exc: # noqa: BLE001 + if stop_event.is_set() and _is_transient_etcd_failure(exc): + logger.info("scan_producer_offset stopped during transient etcd failure: %s", exc) + return + logger.exception("scan_producer_offset stopped unexpectedly") try: # NOTE: `new_or_bind` has a race window during first-time channel creation when multiple @@ -1022,7 +1074,11 @@ def get_chans() -> List[int]: initial_consumers_id.append(consumer_id) start_processes() - scan_thread = threading.Thread(target=scan_producer_offset, daemon=True) + scan_thread = threading.Thread( + target=scan_producer_offset, + args=(scan_stop_event,), + daemon=True, + ) scan_thread.start() print("=== Starting dynamic management phase ===") @@ -1071,8 +1127,11 @@ def get_chans() -> List[int]: consumes_to_stop.append(consumer_id) for consumer_id in consumes_to_stop: - etcd_client.put( - f"/test_mpmc_stop_consumer/{consumer_id}", b"dummy_value" + _etcd_call_with_retry( + f"publish stop flag for consumer_id={consumer_id}", + lambda client, consumer_id=consumer_id: client.put( + f"/test_mpmc_stop_consumer/{consumer_id}", b"dummy_value" + ), ) for consumer_id in consumes_to_stop: @@ -1102,7 +1161,10 @@ def get_chans() -> List[int]: member_id = _extract_mpmc_member_id_from_consumer_log(log_file) # Verify that ready keys exist (global view) before join - ready_keys_before = list(etcd_client.get_prefix(_READY_CHANNELS_BASE_PREFIX)) + ready_keys_before = _etcd_call_with_retry( + f"list ready keys before joining consumer_id={consumer_id}", + lambda client: list(client.get_prefix(_READY_CHANNELS_BASE_PREFIX)), + ) logging.info( "Before join: total ready keys=%d (all mpmc) for consumer %s", len(ready_keys_before), @@ -1110,8 +1172,10 @@ def get_chans() -> List[int]: ) if member_id is not None: assert test_mpmc_id is not None - ready_keys_under_test = list( - etcd_client.get_prefix(_new_mpmc_ready_channels_prefix(test_mpmc_id)) + test_ready_prefix = _new_mpmc_ready_channels_prefix(test_mpmc_id) + ready_keys_under_test = _etcd_call_with_retry( + f"list test ready keys before joining consumer_id={consumer_id}", + lambda client: list(client.get_prefix(test_ready_prefix)), ) logging.info( "Before join: test_mpmc_id=%s ready_keys=%d, consumer %s mpmc_member_id=%s ready_keys_by_member=%d", @@ -1151,7 +1215,10 @@ def get_chans() -> List[int]: leftover_keys = [] # Verify that ready keys have been deleted for this consumer after sleep - ready_keys_after = list(etcd_client.get_prefix(_READY_CHANNELS_BASE_PREFIX)) + ready_keys_after = _etcd_call_with_retry( + f"list ready keys after joining consumer_id={consumer_id}", + lambda client: list(client.get_prefix(_READY_CHANNELS_BASE_PREFIX)), + ) logging.info( "After join and sleep: total ready keys=%d (all mpmc) for consumer %s", len(ready_keys_after), @@ -1159,8 +1226,10 @@ def get_chans() -> List[int]: ) if member_id is not None: assert test_mpmc_id is not None - ready_keys_under_test_after = list( - etcd_client.get_prefix(_new_mpmc_ready_channels_prefix(test_mpmc_id)) + test_ready_prefix_after = _new_mpmc_ready_channels_prefix(test_mpmc_id) + ready_keys_under_test_after = _etcd_call_with_retry( + f"list test ready keys after joining consumer_id={consumer_id}", + lambda client: list(client.get_prefix(test_ready_prefix_after)), ) left = _count_ready_keys_for_member_id(member_id) logging.info( @@ -1205,9 +1274,12 @@ def debug_all_ready_channels() -> None: "debug_all_ready_channels after close consumers %s", consumes_to_stop, ) - ready_chans_kvs = list(etcd_client.get_prefix(_READY_CHANNELS_BASE_PREFIX)) - for value, meta in ready_chans_kvs: - key = meta.key.decode() + ready_chans_kvs = _etcd_call_with_retry( + "debug all ready channels after closing consumers", + lambda client: list(client.get_prefix(_READY_CHANNELS_BASE_PREFIX)), + ) + for key_bytes, value in ready_chans_kvs: + key = key_bytes.decode() mpmc_id = key.split("/")[-2] mpsc_id = key.split("/")[-1] logging.info( @@ -1241,7 +1313,10 @@ def debug_all_ready_channels() -> None: wait_all_of_type("producer", timeout_s=join_timeout_s) # 2) Notify consumers that no more producers will publish; they will exit after idle timeout. - etcd_client.put("/test_mpmc_stop_producer", b"dummy_value") + _etcd_call_with_retry( + "publish producer stop flag", + lambda client: client.put("/test_mpmc_stop_producer", b"dummy_value"), + ) # 3) Wait remaining consumers to drain and exit (or fail fast). wait_all_of_type("consumer", timeout_s=join_timeout_s) @@ -1251,7 +1326,11 @@ def debug_all_ready_channels() -> None: verify_exit_status(subprocesses) print("=== MPMC Dynamic Test PASSED ===") finally: - etcd_client.close() + scan_stop_event.set() + if scan_thread is not None: + scan_thread.join(timeout=10) + if scan_thread.is_alive(): + logging.warning("scan_producer_offset did not stop before timeout") def verify_production_consumption_counts( diff --git a/fluxon_py/tests/test_api_chan_mpmc/test_api_chan_mpmc_quick_and_weighted_consume.py b/fluxon_py/tests/test_api_chan_mpmc/test_api_chan_mpmc_quick_and_weighted_consume.py index e234cce..693abf4 100644 --- a/fluxon_py/tests/test_api_chan_mpmc/test_api_chan_mpmc_quick_and_weighted_consume.py +++ b/fluxon_py/tests/test_api_chan_mpmc/test_api_chan_mpmc_quick_and_weighted_consume.py @@ -17,8 +17,6 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple -import etcd3 - # Ensure absolute imports work when running this file directly import os as _os import sys as _sys @@ -37,14 +35,13 @@ ) from fluxon_py.tests.test_lib import ( # noqa: E402 CHAN_CONFIG_TEST, - ETCD_HOST, - ETCD_PORT, TEST_TIMEOUT_SECONDS, setup_test_environment, new_test_consumer, new_test_producer, load_test_fluxon_cluster_name, run_with_argmatrix, + etcd_control_call_with_retry as _etcd_call_with_retry, ) from fluxon_py.api_ext_chan import ( # noqa: E402 _new_unique_lock_key, @@ -267,7 +264,10 @@ def wait_for_processes(processes: List[Tuple[str, subprocess.Popen, str]]) -> No if proc.returncode != 0: raise RuntimeError( f"Process {process_type} failed (log: {log_file})," - f" return code: {proc.returncode}" + f" return code: {proc.returncode}\n" + "--- child log tail ---\n" + f"{_read_log_tail(log_file)}\n" + "--- end child log tail ---" ) @@ -448,27 +448,29 @@ def consumer_fairness_tolerance(total_consumed: int) -> int: def clean_namespace() -> None: if LOG_DIR.exists(): shutil.rmtree(LOG_DIR) - with etcd3.client(ETCD_HOST, ETCD_PORT) as etcd_client: + + def _clean(etcd_client: Any) -> None: # Delete unique mapping and its lock key to keep this scenario deterministic across reruns. etcd_client.delete(_new_unique_mapping_key(CHANNEL_KEY)) etcd_client.delete(_new_unique_lock_key(CHANNEL_KEY)) etcd_client.delete_prefix("/mpmc_channels") etcd_client.delete_prefix("/channels") - # Delete all producer done keys with correct format for p_idx in range(PRODUCER_COUNT): producer_id = f"P{p_idx}" etcd_client.delete(f"{PRODUCER_DONE_KEY}_{producer_id}") + _etcd_call_with_retry("clean quick fair consume namespace", _clean) + def reset_producer_done_flag() -> None: - with etcd3.client(ETCD_HOST, ETCD_PORT) as etcd_client: - # Delete legacy unsuffixed key (if any) + def _reset(etcd_client: Any) -> None: etcd_client.delete(PRODUCER_DONE_KEY) - # Delete all per-producer done keys for p_idx in range(PRODUCER_COUNT): producer_id = f"P{p_idx}" etcd_client.delete(f"{PRODUCER_DONE_KEY}_{producer_id}") + _etcd_call_with_retry("reset producer done flags", _reset) + def _read_log_tail(path: str, *, max_lines: int = 80) -> str: try: @@ -505,31 +507,34 @@ def _wait_unique_key_mapping( bootstrap_log: str, ) -> str: deadline = time.time() + float(timeout_seconds) - with etcd3.client(ETCD_HOST, ETCD_PORT) as etcd_client: - while time.time() < deadline: - bootstrap_state = _bootstrap_process_state( - bootstrap_proc=bootstrap_proc, - bootstrap_log=bootstrap_log, + mapping_key = _new_unique_mapping_key(CHANNEL_KEY) + while time.time() < deadline: + bootstrap_state = _bootstrap_process_state( + bootstrap_proc=bootstrap_proc, + bootstrap_log=bootstrap_log, + ) + if bootstrap_state is not None: + raise RuntimeError( + "Bootstrap producer exited before publishing channel mapping: " + f"unique_key={CHANNEL_KEY!r} {bootstrap_state}" ) - if bootstrap_state is not None: - raise RuntimeError( - "Bootstrap producer exited before publishing channel mapping: " - f"unique_key={CHANNEL_KEY!r} {bootstrap_state}" - ) - value, _ = etcd_client.get(_new_unique_mapping_key(CHANNEL_KEY)) - if value is not None: - try: - chan_id = value.decode("utf-8") - except Exception as err: # noqa: BLE001 - raise RuntimeError( - f"Invalid channel mapping value for unique_key={CHANNEL_KEY!r}: {value!r}, err={err}" - ) from None - if chan_id.isdigit(): - return chan_id + value = _etcd_call_with_retry( + f"wait channel mapping unique_key={CHANNEL_KEY!r}", + lambda etcd_client: etcd_client.get(mapping_key), + ) + if value is not None: + try: + chan_id = value.decode("utf-8") + except Exception as err: # noqa: BLE001 raise RuntimeError( - f"Invalid channel mapping for unique_key={CHANNEL_KEY!r}: {chan_id!r} (expected digit-only chan_id)" - ) - time.sleep(0.2) + f"Invalid channel mapping value for unique_key={CHANNEL_KEY!r}: {value!r}, err={err}" + ) from None + if chan_id.isdigit(): + return chan_id + raise RuntimeError( + f"Invalid channel mapping for unique_key={CHANNEL_KEY!r}: {chan_id!r} (expected digit-only chan_id)" + ) + time.sleep(0.2) bootstrap_state = _bootstrap_process_state( bootstrap_proc=bootstrap_proc, bootstrap_log=bootstrap_log, @@ -610,8 +615,11 @@ def run_producer(env, args: Dict[str, Any]) -> None: # Close returns Result[OkNone, ApiError]; consume explicitly producer.close().unwrap() finally: - with etcd3.client(ETCD_HOST, ETCD_PORT) as etcd_client: - etcd_client.put(f"{PRODUCER_DONE_KEY}_{producer_id}", str(produced)) + done_key = f"{PRODUCER_DONE_KEY}_{producer_id}" + _etcd_call_with_retry( + f"mark producer {producer_id} done", + lambda etcd_client: etcd_client.put(done_key, str(produced).encode()), + ) finally: configure_backend(env, backend_type=prev_type, backend_ip=prev_ip) @@ -647,74 +655,76 @@ def run_consumer(env, args: Dict[str, Any]) -> None: start_time = time.monotonic() max_deadline = start_time + MAX_CONSUMER_RUNTIME try: - with etcd3.client(ETCD_HOST, ETCD_PORT) as etcd_client: - all_producers_done = False - last_producer_check = time.monotonic() - producer_check_interval = 0.5 + all_producers_done = False + last_producer_check = time.monotonic() + producer_check_interval = 0.5 + + while True: + now = time.monotonic() + + if now - last_producer_check >= producer_check_interval: + if not all_producers_done: + done_count = 0 + for p_idx in range(PRODUCER_COUNT): + producer_id = f"P{p_idx}" + done_key = f"{PRODUCER_DONE_KEY}_{producer_id}" + value = _etcd_call_with_retry( + f"read producer {producer_id} done flag", + lambda etcd_client, key=done_key: etcd_client.get(key), + ) + if value is not None: + done_count += 1 + all_producers_done = done_count == PRODUCER_COUNT + if all_producers_done: + msg = f"🎉 Consumer {consumer_id}: All {PRODUCER_COUNT} producers done! consumed={consumed}" + print(msg, file=sys.stdout, flush=True) + last_producer_check = now - while True: - now = time.monotonic() + res = consumer.get_data(batch_size=1, try_time=1) - if now - last_producer_check >= producer_check_interval: - if not all_producers_done: - done_count = 0 - for p_idx in range(PRODUCER_COUNT): - producer_id = f"P{p_idx}" - value, _metadata = etcd_client.get(f"{PRODUCER_DONE_KEY}_{producer_id}") - if value is not None: - done_count += 1 - all_producers_done = done_count == PRODUCER_COUNT - if all_producers_done: - import sys - msg = f"🎉 Consumer {consumer_id}: All {PRODUCER_COUNT} producers done! consumed={consumed}" - print(msg, file=sys.stdout, flush=True) - last_producer_check = now - - res = consumer.get_data(batch_size=1, try_time=1) - - if res is None: - now = time.monotonic() + if res is None: + now = time.monotonic() + if now >= max_deadline: + raise RuntimeError( + f"Consumer {consumer_id} get_data returned None unexpectedly" + ) + elif res.is_ok(): + success = res.unwrap() + now = time.monotonic() + if isinstance(success, list) and success: + consumed += 1 + last_activity = now + if isinstance(success[0], dict): + msg_key = str(success[0]["unique_id"]) + if msg_key.startswith("quick-msg-"): + parts = msg_key.split("-") + if len(parts) >= 3: + producer_id_str = parts[2] + producer_consumed_counts[producer_id_str] = producer_consumed_counts.get(producer_id_str, 0) + 1 + else: + err = res.unwrap_error() + now = time.monotonic() + if isinstance(err, MessageConsumptionNoNewMessageError): if now >= max_deadline: raise RuntimeError( - f"Consumer {consumer_id} get_data returned None unexpectedly" + f"Consumer {consumer_id} exceeded max runtime with no new message" ) - elif res.is_ok(): - success = res.unwrap() - now = time.monotonic() - if isinstance(success, list) and success: - consumed += 1 - last_activity = now - if isinstance(success[0], dict): - msg_key = str(success[0]["unique_id"]) - if msg_key.startswith("quick-msg-"): - parts = msg_key.split("-") - if len(parts) >= 3: - producer_id_str = parts[2] - producer_consumed_counts[producer_id_str] = producer_consumed_counts.get(producer_id_str, 0) + 1 else: - err = res.unwrap_error() - now = time.monotonic() - if isinstance(err, MessageConsumptionNoNewMessageError): - if now >= max_deadline: - raise RuntimeError( - f"Consumer {consumer_id} exceeded max runtime with no new message" - ) - else: - raise RuntimeError( - f"Consumer {consumer_id} get_data failed: {err}" - ) - - idle_time = now - last_activity - if all_producers_done and idle_time >= idle_timeout: - print( - f"✅ Consumer {consumer_id} exiting: all producers done and idle for {idle_time:.1f}s (consumed {consumed} messages)" - ) - break - if now >= max_deadline: raise RuntimeError( - f"Consumer {consumer_id} exceeded max runtime" + f"Consumer {consumer_id} get_data failed: {err}" ) - time.sleep(PRODUCER_DONE_POLL_INTERVAL) + + idle_time = now - last_activity + if all_producers_done and idle_time >= idle_timeout: + print( + f"✅ Consumer {consumer_id} exiting: all producers done and idle for {idle_time:.1f}s (consumed {consumed} messages)" + ) + break + if now >= max_deadline: + raise RuntimeError( + f"Consumer {consumer_id} exceeded max runtime" + ) + time.sleep(PRODUCER_DONE_POLL_INTERVAL) if consumed == 0: raise AssertionError( f"Consumer {consumer_id} did not receive any message" diff --git a/fluxon_py/tests/test_api_chan_mpmc/test_rebind_client.py b/fluxon_py/tests/test_api_chan_mpmc/test_rebind_client.py index 5f42fab..92a967b 100644 --- a/fluxon_py/tests/test_api_chan_mpmc/test_rebind_client.py +++ b/fluxon_py/tests/test_api_chan_mpmc/test_rebind_client.py @@ -12,9 +12,7 @@ import sys import time from pathlib import Path -from typing import List, Tuple - -import etcd3 +from typing import Any, List, Optional, Tuple # Bootstrap import path to project root so absolute imports always work CURRENT_DIR = Path(__file__).resolve().parent @@ -33,8 +31,6 @@ def _find_project_root(start: Path) -> Path: from fluxon_py.api_error import MessageConsumptionNoNewMessageError # noqa: E402 from fluxon_py.logging import init_logger # noqa: E402 from fluxon_py.tests.test_lib import ( # noqa: E402 - ETCD_HOST, - ETCD_PORT, KV_SVC_TYPE, KV_SVC_IP, setup_test_environment, @@ -44,6 +40,10 @@ def _find_project_root(start: Path) -> Path: new_shared_stores, load_test_fluxon_cluster_name, run_with_argmatrix, + etcd_control_call_with_retry as _etcd_call_with_retry, + etcd_control_delete_prefix as _etcd_delete_prefix, + etcd_control_get as _etcd_get, + etcd_control_put as _etcd_put, ) from fluxon_py.kvclient import KvClientType, new_store # noqa: E402 from fluxon_py.kvclient.kvclient_interface import KvClient # noqa: E402 @@ -116,22 +116,20 @@ def _wait_fluxon_member_absent(instance_key: str, *, timeout_s: int = 45) -> Non cluster = load_test_fluxon_cluster_name() key = f"/fluxon_kv_member_base/{cluster}/members/{instance_key}" deadline = time.time() + float(timeout_s) - with etcd3.client(ETCD_HOST, ETCD_PORT) as etcd_client: - while True: - val = etcd_client.get(key)[0] - if val is None: - return - if time.time() >= deadline: - raise RuntimeError( - f"member key still exists after wait: {key}. Previous lease not expired" - ) - # Progress logging is handled by the caller; keep quiet here. - time.sleep(1.0) + while True: + val = _etcd_get(key) + if val is None: + return + if time.time() >= deadline: + raise RuntimeError( + f"member key still exists after wait: {key}. Previous lease not expired" + ) + # Progress logging is handled by the caller; keep quiet here. + time.sleep(1.0) # ------------------- Local CLI for subprocess workers ------------------- import argparse -from typing import Optional def _build_parser() -> argparse.ArgumentParser: @@ -277,14 +275,25 @@ def _create_store(env: ChannelState, instance_key: str) -> KvClient: # ------------------- Local verification and cleanup ------------------- def clean_etcd() -> None: - with etcd3.client(ETCD_HOST, ETCD_PORT) as etcd_client: - etcd_client.delete_prefix("/mpmc_channels") - etcd_client.delete_prefix("/channels") - etcd_client.delete_prefix("/test_mpmc_stop_consumer") - etcd_client.delete_prefix("/test_mpmc_consumer") - etcd_client.delete_prefix("/test_mpmc_stop_producer") - etcd_client.delete_prefix(PRODUCER_PAUSE_KEY) - etcd_client.delete_prefix("/test_mpmc_rebind") + _etcd_delete_prefix("/mpmc_channels") + _etcd_delete_prefix("/channels") + _etcd_delete_prefix("/test_mpmc_stop_consumer") + _etcd_delete_prefix("/test_mpmc_consumer") + _etcd_delete_prefix("/test_mpmc_stop_producer") + _etcd_delete_prefix(PRODUCER_PAUSE_KEY) + _etcd_delete_prefix("/test_mpmc_rebind") + + +def _read_log_tail(path: str, *, max_lines: int = 80) -> str: + try: + lines = Path(path).read_text(encoding="utf-8", errors="replace").splitlines() + except FileNotFoundError: + return f"" + except OSError as exc: + return f"" + if not lines: + return "" + return "\n".join(lines[-max_lines:]) def verify_production_consumption_counts( @@ -407,12 +416,11 @@ def run_producer(env, args: argparse.Namespace) -> None: print(f"[Producer-{args.producer_id}] Started", flush=True) try: import uuid, random - etcd_client = producer.etcd_client index = 0 while True: # Check stop first - stop_flag, _ = etcd_client.get("/test_mpmc_stop_producer") + stop_flag = _etcd_get("/test_mpmc_stop_producer") if stop_flag: logging.info( f"[RBD-STOP] Producer-{args.producer_id} stop flag detected" @@ -422,7 +430,7 @@ def run_producer(env, args: argparse.Namespace) -> None: i=0 while True: i+=1 - pause_flag, _ = etcd_client.get(PRODUCER_PAUSE_KEY) + pause_flag = _etcd_get(PRODUCER_PAUSE_KEY) if not pause_flag: logging.info( f"[RBD-RESUME] Producer-{args.producer_id} resumed" @@ -430,7 +438,7 @@ def run_producer(env, args: argparse.Namespace) -> None: break logging.info(f"[RBD-PAUSE] Producer-{args.producer_id} paused, loop i {i}") # allow quick reaction to stop while paused - stop_flag, _ = etcd_client.get("/test_mpmc_stop_producer") + stop_flag = _etcd_get("/test_mpmc_stop_producer") if stop_flag: logging.info( f"[RBD-STOP] Producer-{args.producer_id} stop while paused" @@ -441,7 +449,7 @@ def run_producer(env, args: argparse.Namespace) -> None: break # Read current loop index to embed into message key for verification per loop try: - loop_val, _ = etcd_client.get(REBIND_LOOP_KEY) + loop_val = _etcd_get(REBIND_LOOP_KEY) loop_idx = int(loop_val.decode()) if loop_val else -1 except Exception: loop_idx = -1 @@ -466,7 +474,7 @@ def run_producer(env, args: argparse.Namespace) -> None: ) print(f"PRODUCE_MARKER: {args.producer_id}:{msg_id}") # Track production per loop in etcd for gating - etcd_client.put( + _etcd_put( f"/test_mpmc_rebind/produced/{loop_idx}/{args.producer_id}/{unique_id}", b"", ) @@ -528,11 +536,10 @@ def run_consumer(env, args: argparse.Namespace) -> None: f"[Consumer-{args.consumer_id}] Started with mpmc consumer {consumer.mpmc_channel.mpmc_member_id}", flush=True, ) - etcd_client = consumer.etcd_client - etcd_client.put( + _etcd_put( f"/test_mpmc_consumer/{args.consumer_id}", b"dummy_value", - consumer.mpmc_channel.mpmc_global_lease, + lease_id=int(consumer.mpmc_channel.mpmc_global_lease.id), ) logging.info( f"[RBD-REGISTER] Consumer-{args.consumer_id} registered in etcd" @@ -581,7 +588,7 @@ def run_consumer(env, args: argparse.Namespace) -> None: raise ValueError(f"Invalid loop index in message id: {unique_id_str}") li = int(li_str) if li >= 0: - etcd_client.put( + _etcd_put( f"/test_mpmc_rebind/consumed/{li}/{args.consumer_id}/{unique_id_str}", b"", ) @@ -621,9 +628,7 @@ def run_consumer(env, args: argparse.Namespace) -> None: ) break time.sleep(0.5) - stop_flag, _ = etcd_client.get( - f"/test_mpmc_stop_consumer/{args.consumer_id}" - ) + stop_flag = _etcd_get(f"/test_mpmc_stop_consumer/{args.consumer_id}") if stop_flag: # enter draining mode: keep getting until one timeout/no-data if not draining: @@ -680,11 +685,10 @@ def _once(prefetch: int) -> None: ) shutil.rmtree("logs", ignore_errors=True) clean_etcd() - with etcd3.client(ETCD_HOST, ETCD_PORT) as etcd_client: - if etcd_client.get("/test_mpmc_stop_producer")[0] is not None: - raise RuntimeError( - "precondition failed: /test_mpmc_stop_producer exists before test start" - ) + if _etcd_get("/test_mpmc_stop_producer") is not None: + raise RuntimeError( + "precondition failed: /test_mpmc_stop_producer exists before test start" + ) logging.info("[RBD-CTL-ETCD-CLEAN] cleared test prefixes") os.makedirs("logs", exist_ok=True) @@ -730,91 +734,90 @@ def spawn(process_type: str, cmd: List[str], identifier: str) -> None: ) # Repeatedly stop and restart a single consumer while producers keep producing - etcd_client = etcd3.client(ETCD_HOST, ETCD_PORT) # initialize loop index for producers to tag messages - etcd_client.put(REBIND_LOOP_KEY, b"0") + _etcd_put(REBIND_LOOP_KEY, b"0") logging.info("[RBD-CTL-LOOPKEY] set loop_idx=0") - try: - for i in range(LOOPS - 1): - logging.info(f"[RBD-CTL-LOOP] round={i} active_window={ACTIVE_WINDOW_SEC}s") - # Soft window to allow production - time.sleep(ACTIVE_WINDOW_SEC) - - # Pause producers and stop current consumer to drain until last get_data times out - etcd_client.put(PRODUCER_PAUSE_KEY, b"1") - logging.info("[RBD-CTL-PAUSE] producers paused") - etcd_client.put( - f"/test_mpmc_stop_consumer/{current_consumer}", b"dummy_value" - ) - logging.info( - f"[RBD-CTL-STOP-CONS] request stop consumer={current_consumer}" - ) - while True: - status, _ = etcd_client.get( - f"/test_mpmc_consumer/{current_consumer}" - ) - if not status: - break - time.sleep(0.5) - logging.info( - f"[RBD-CTL-WAIT-CONS] consumer exited id={current_consumer}" - ) - - # Switch to next loop index now that previous consumer fully drained and exited - etcd_client.put(REBIND_LOOP_KEY, str(i + 1).encode()) - logging.info(f"[RBD-CTL-LOOPKEY] set loop_idx={i+1}") + for i in range(LOOPS - 1): + logging.info(f"[RBD-CTL-LOOP] round={i} active_window={ACTIVE_WINDOW_SEC}s") + # Soft window to allow production + time.sleep(ACTIVE_WINDOW_SEC) - # Short gap, then start next consumer for next loop and resume producers - time.sleep(INACTIVE_GAP_SEC) - next_consumer = f"C{i+1}" - print( - f"[rebind_client] starting next consumer {next_consumer}", - flush=True, - ) - spawn( - "consumer", - _consumer_cmd( - env.backend_type, env.backend_ip, next_consumer, prefetch - ), - next_consumer, - ) - current_consumer = next_consumer - # Resume producers for next round - etcd_client.delete(PRODUCER_PAUSE_KEY) - logging.info("[RBD-CTL-RESUME] producers resumed") - - # After last loop index set, stop producers, then stop last consumer (which drains before exit) - etcd_client.put(PRODUCER_PAUSE_KEY, b"1") - logging.info("[RBD-CTL-FINAL-PAUSE] producers paused before shutdown") - etcd_client.put("/test_mpmc_stop_producer", b"dummy_value") - logging.info("[RBD-CTL-STOP-PROD] stop producers signaled") - for process_type, proc, log_file in subprocesses: - if process_type != "producer": - continue - logging.info(f"[RBD-CTL-WAIT-PROD] waiting producer log={log_file}") - proc.wait() - if proc.returncode != 0: - raise RuntimeError( - f"producer failed with return code {proc.returncode}. Check log: {log_file}" - ) - logging.info("[RBD-CTL-PROD-DONE] producers exited") - # Stop the last consumer and wait for consumers to exit (drains until last get timeout) - etcd_client.put( - f"/test_mpmc_stop_consumer/{current_consumer}", b"dummy_value" + # Pause producers and stop current consumer to drain until last get_data times out + _etcd_put(PRODUCER_PAUSE_KEY, b"1") + logging.info("[RBD-CTL-PAUSE] producers paused") + _etcd_put(f"/test_mpmc_stop_consumer/{current_consumer}", b"dummy_value") + logging.info( + f"[RBD-CTL-STOP-CONS] request stop consumer={current_consumer}" ) + while True: + status = _etcd_get(f"/test_mpmc_consumer/{current_consumer}") + if not status: + break + time.sleep(0.5) logging.info( - f"[RBD-CTL-STOP-LAST-CONS] request stop consumer={current_consumer}" + f"[RBD-CTL-WAIT-CONS] consumer exited id={current_consumer}" + ) + + # Switch to next loop index now that previous consumer fully drained and exited + _etcd_put(REBIND_LOOP_KEY, str(i + 1).encode()) + logging.info(f"[RBD-CTL-LOOPKEY] set loop_idx={i+1}") + + # Short gap, then start next consumer for next loop and resume producers + time.sleep(INACTIVE_GAP_SEC) + next_consumer = f"C{i+1}" + print( + f"[rebind_client] starting next consumer {next_consumer}", + flush=True, + ) + spawn( + "consumer", + _consumer_cmd( + env.backend_type, env.backend_ip, next_consumer, prefetch + ), + next_consumer, + ) + current_consumer = next_consumer + # Resume producers for next round + _etcd_call_with_retry( + f"delete producer pause key {PRODUCER_PAUSE_KEY}", + lambda client: client.delete(PRODUCER_PAUSE_KEY), ) - finally: - etcd_client.close() + logging.info("[RBD-CTL-RESUME] producers resumed") + + # After last loop index set, stop producers, then stop last consumer (which drains before exit) + _etcd_put(PRODUCER_PAUSE_KEY, b"1") + logging.info("[RBD-CTL-FINAL-PAUSE] producers paused before shutdown") + _etcd_put("/test_mpmc_stop_producer", b"dummy_value") + logging.info("[RBD-CTL-STOP-PROD] stop producers signaled") + for process_type, proc, log_file in subprocesses: + if process_type != "producer": + continue + logging.info(f"[RBD-CTL-WAIT-PROD] waiting producer log={log_file}") + proc.wait() + if proc.returncode != 0: + raise RuntimeError( + f"producer failed with return code {proc.returncode}. Check log: {log_file}\n" + "--- child log tail ---\n" + f"{_read_log_tail(log_file)}\n" + "--- end child log tail ---" + ) + logging.info("[RBD-CTL-PROD-DONE] producers exited") + # Stop the last consumer and wait for consumers to exit (drains until last get timeout) + _etcd_put(f"/test_mpmc_stop_consumer/{current_consumer}", b"dummy_value") + logging.info( + f"[RBD-CTL-STOP-LAST-CONS] request stop consumer={current_consumer}" + ) for process_type, proc, log_file in subprocesses: logging.info(f"[RBD-CTL-WAIT] waiting {process_type} log={log_file}") proc.wait() if proc.returncode != 0: raise RuntimeError( - f"{process_type} failed with return code {proc.returncode}. Check log: {log_file}" + f"{process_type} failed with return code {proc.returncode}. Check log: {log_file}\n" + "--- child log tail ---\n" + f"{_read_log_tail(log_file)}\n" + "--- end child log tail ---" ) logging.info("[RBD-CTL-ALL-DONE] all subprocesses exited") diff --git a/fluxon_py/tests/test_lib.py b/fluxon_py/tests/test_lib.py index 9be7003..9700f69 100644 --- a/fluxon_py/tests/test_lib.py +++ b/fluxon_py/tests/test_lib.py @@ -8,6 +8,7 @@ from logging import Logger from fluxon_py.logging import init_logger, update_log_level import multiprocessing +import threading from typing import List from fluxon_py.kvclient.kvclient_interface import KvClient from fluxon_py import FluxonKvClientConfig @@ -15,7 +16,7 @@ from fluxon_py import ChanType, ChanRole, chan_new, chan_bind, MPSCChanConsumer, MPMCChanConsumer, MPSCChanProducer, MPMCChanProducer from typing import Optional, Dict, Union from fluxon_py import api_ext_chan -from typing import Any, Callable, Iterable +from typing import Any, Callable, Iterable, TypeVar import signal from typing import Tuple import subprocess @@ -123,6 +124,91 @@ def load_test_chan_config(*, config_path: Optional[Path] = None) -> Dict[str, in TEST_ARGMATRIX: Dict[str, Iterable[Any]] = { "prefetch": (0,), } +T = TypeVar("T") +_TEST_ETCD_CLIENT_LOCK = threading.Lock() +_TEST_ETCD_CLIENT: Optional[Any] = None + + +def etcd_control_client() -> Any: + """Return the cached PyO3 etcd client for process-level test control-plane ops.""" + global _TEST_ETCD_CLIENT + with _TEST_ETCD_CLIENT_LOCK: + if _TEST_ETCD_CLIENT is None: + from fluxon_py.tool import import_fluxon_pyo3_local + + fluxon_pyo3 = import_fluxon_pyo3_local() + _TEST_ETCD_CLIENT = fluxon_pyo3.EtcdKvClient([f"{ETCD_HOST}:{ETCD_PORT}"]) + return _TEST_ETCD_CLIENT + + +def is_transient_etcd_control_failure(exc: BaseException) -> bool: + rendered = f"{type(exc).__name__}: {exc}" + lowered = rendered.lower() + return ( + "unavailable" in lowered + or "etcdserver: request timed out" in rendered + or "timed out" in lowered + or "timeout" in lowered + or "connection" in lowered + or "transport" in lowered + or "broken pipe" in lowered + or "closed" in lowered + ) + + +def etcd_control_call_with_retry( + context: str, + operation: Callable[[Any], T], + *, + max_attempts: int = 5, +) -> T: + if max_attempts <= 0: + raise ValueError(f"max_attempts must be positive, got {max_attempts!r}") + + last_exc: Optional[BaseException] = None + for attempt in range(1, max_attempts + 1): + try: + client = etcd_control_client() + return operation(client) + except Exception as exc: # noqa: BLE001 + last_exc = exc + if (not is_transient_etcd_control_failure(exc)) or attempt == max_attempts: + raise + sleep_s = min(0.2 * attempt, 2.0) + logging.warning( + "transient etcd failure during %s; retrying in %.1fs " + "(attempt %d/%d): %s", + context, + sleep_s, + attempt, + max_attempts, + exc, + ) + time.sleep(sleep_s) + + assert last_exc is not None + raise RuntimeError(f"etcd operation {context!r} failed") from last_exc + + +def etcd_control_get(key: str) -> Optional[bytes]: + return etcd_control_call_with_retry(f"get {key}", lambda client: client.get(key)) + + +def etcd_control_put(key: str, value: bytes, lease_id: Optional[int] = None) -> None: + if lease_id is None: + etcd_control_call_with_retry(f"put {key}", lambda client: client.put(key, value)) + return + etcd_control_call_with_retry( + f"put {key} with lease", + lambda client: client.put(key, value, lease_id=lease_id), + ) + + +def etcd_control_delete_prefix(prefix: str) -> int: + return etcd_control_call_with_retry( + f"delete_prefix {prefix}", + lambda client: client.delete_prefix(prefix), + ) def run_with_argmatrix( diff --git a/fluxon_py/tests/test_pyo3_etcd.py b/fluxon_py/tests/test_pyo3_etcd.py new file mode 100644 index 0000000..4c79451 --- /dev/null +++ b/fluxon_py/tests/test_pyo3_etcd.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +import os +import sys +import time +import unittest +import uuid +from pathlib import Path +from typing import Any + + +REPO_ROOT = Path(__file__).resolve().parents[2] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +from setup_and_pack.utils.repo_config_utils import ( # noqa: E402 + _verify_host_port, + load_test_etcd_address_from_test_config, +) +from fluxon_py.tool import import_fluxon_pyo3_local # noqa: E402 + + +_ETCD_ADDRESS = load_test_etcd_address_from_test_config() +ETCD_HOST, _ETCD_PORT = _verify_host_port(_ETCD_ADDRESS, field="test_config.yaml.etcd_address") +ETCD_PORT = int(_ETCD_PORT) + + +class TestPyO3EtcdKvClient(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + cls.endpoint = f"{ETCD_HOST}:{ETCD_PORT}" + cls.fluxon_pyo3 = import_fluxon_pyo3_local() + + def setUp(self) -> None: + self.prefix = f"/fluxon_py_tests/pyo3_etcd/{os.getpid()}/{uuid.uuid4().hex}/" + self.client = self.fluxon_pyo3.EtcdKvClient([self.endpoint]) + self.addCleanup(self._cleanup_prefix) + + def _cleanup_prefix(self) -> None: + self.client.delete_prefix(self.prefix) + + def _wait_key_deleted(self, key: str, *, timeout_s: float = 10.0) -> None: + deadline = time.time() + timeout_s + while time.time() < deadline: + if self.client.get(key) is None: + return + time.sleep(0.2) + self.fail(f"timed out waiting for etcd key to be deleted: {key}") + + def _assert_lease_not_live(self, lease_id: int) -> None: + try: + ttl = self.client.lease_ttl(lease_id) + except RuntimeError as exc: + if "not found" in str(exc).lower(): + return + raise + self.assertLessEqual(ttl, 0) + + def test_kv_prefix_and_delete_roundtrip(self) -> None: + key_a = self.prefix + "a" + key_b = self.prefix + "nested/b" + + self.assertIsNone(self.client.get(key_a)) + self.client.put(key_a, b"value-a") + self.client.put(key_b, b"value-b") + + self.assertEqual(self.client.get(key_a), b"value-a") + other_client = self.fluxon_pyo3.EtcdKvClient([self.endpoint]) + self.assertEqual(other_client.get(key_b), b"value-b") + + rows = sorted( + (key.decode("utf-8"), value.decode("utf-8")) + for key, value in self.client.get_prefix(self.prefix) + ) + self.assertEqual(rows, [(key_a, "value-a"), (key_b, "value-b")]) + + self.assertTrue(self.client.delete(key_a)) + self.assertFalse(self.client.delete(key_a)) + self.assertIsNone(self.client.get(key_a)) + + self.assertEqual(self.client.delete_prefix(self.prefix), 1) + self.assertEqual(self.client.delete_prefix(self.prefix), 0) + self.assertIsNone(self.client.get(key_b)) + + def test_put_with_lease_and_revoke_deletes_key(self) -> None: + lease_mgr = self.fluxon_pyo3.LeaseManagerHandle() + lease: Any = lease_mgr.allocate_etcd_lease([self.endpoint], 30, False) + lease_id = int(lease.id) + key = self.prefix + "leased" + + self.client.put(key, b"leased-value", lease_id=lease_id) + self.assertEqual(self.client.get(key), b"leased-value") + self.assertGreater(self.client.lease_ttl(lease_id), 0) + + self.client.revoke_lease(lease_id) + self._wait_key_deleted(key) + self._assert_lease_not_live(lease_id) + + def test_lock_exclusive_and_context_manager_release(self) -> None: + lock_name = self.prefix + "lock" + lock_a = self.fluxon_pyo3.EtcdLock([self.endpoint], lock_name, 10, 1.0) + lock_b = self.fluxon_pyo3.EtcdLock([self.endpoint], lock_name, 10, 0.5) + + self.assertFalse(lock_a.held) + self.assertIsNone(lock_a.lease_id) + + self.assertTrue(lock_a.acquire()) + self.assertTrue(lock_a.held) + self.assertIsInstance(lock_a.lease_id, int) + + self.assertFalse(lock_b.acquire()) + self.assertFalse(lock_b.held) + self.assertIsNone(lock_b.lease_id) + + first_lease_id = int(lock_a.lease_id) + self.assertTrue(lock_a.release()) + self.assertFalse(lock_a.held) + self.assertIsNone(lock_a.lease_id) + self.assertFalse(lock_a.release()) + self._assert_lease_not_live(first_lease_id) + + with self.fluxon_pyo3.EtcdLock([self.endpoint], lock_name, 10, 1.0) as held_lock: + self.assertTrue(held_lock.held) + context_lease_id = int(held_lock.lease_id) + self._assert_lease_not_live(context_lease_id) + + +if __name__ == "__main__": + raise SystemExit(unittest.main()) diff --git a/fluxon_release/test_rsc/source/prepare.yaml b/fluxon_release/test_rsc/source/prepare.yaml index fb0c5ac..d9c842f 100644 --- a/fluxon_release/test_rsc/source/prepare.yaml +++ b/fluxon_release/test_rsc/source/prepare.yaml @@ -21,6 +21,10 @@ python_runtime: source: wheel - pinned: pytest==8.3.5 source: wheel + - pinned: tomli==2.2.1 + source: wheel + - pinned: exceptiongroup==1.3.0 + source: wheel zerorpc: requirements: - pinned: zerorpc==0.6.3 diff --git a/fluxon_rs/fluxon_pyo3/build.rs b/fluxon_rs/fluxon_pyo3/build.rs index 3ff8c5b..90c419b 100644 --- a/fluxon_rs/fluxon_pyo3/build.rs +++ b/fluxon_rs/fluxon_pyo3/build.rs @@ -1,6 +1,7 @@ use std::{ - env, + env, fs, path::{Path, PathBuf}, + process::Command, }; const DEFAULT_RUNTIME_SEARCH_SUBDIRS: &[&str] = &[ @@ -13,7 +14,53 @@ const DEFAULT_RUNTIME_SEARCH_SUBDIRS: &[&str] = &[ ]; const CLOSED_SDK_RUNTIME_ROOT_DIR_NAMES: &[&str] = &["native_runtime", "vendor_runtime"]; +const PYTHON_TEST_EMBED_LINK_ARGS_SCRIPT: &str = r#" +import sysconfig + +args = [] +seen = set() + +def add(arg): + arg = str(arg or "").strip() + if arg and arg not in seen: + seen.add(arg) + args.append(arg) + +for key in ("LIBPL", "LIBDIR"): + path = sysconfig.get_config_var(key) + if path: + add("-L" + path) + +libname = "" +ldlibrary = sysconfig.get_config_var("LDLIBRARY") or sysconfig.get_config_var("LIBRARY") or "" +if ldlibrary.startswith("libpython"): + stem = ldlibrary[3:] + for suffix in (".so", ".a", ".dylib"): + pos = stem.find(suffix) + if pos >= 0: + stem = stem[:pos] + break + libname = stem + +if not libname: + version = sysconfig.get_config_var("VERSION") or "" + abiflags = sysconfig.get_config_var("ABIFLAGS") or "" + if version: + libname = "python" + version + abiflags + +if libname: + add("-l" + libname) + +for key in ("LIBS", "SYSLIBS"): + for arg in (sysconfig.get_config_var(key) or "").split(): + add(arg) + +print("\n".join(args)) +"#; + fn main() { + emit_python_test_embed_link_args(); + let target_dir = get_target_dir(); let runtime_search_subdirs = load_runtime_search_subdirs(); let runtime_root_dir_names = runtime_root_dir_names(); @@ -59,6 +106,105 @@ fn main() { println!("cargo:rerun-if-changed=../target/release/"); } +fn emit_python_test_embed_link_args() { + println!("cargo:rerun-if-env-changed=PYTHON"); + + let python = env::var("PYTHON") + .ok() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) + .unwrap_or_else(|| "python3".to_string()); + + let output = match Command::new(&python) + .arg("-c") + .arg(PYTHON_TEST_EMBED_LINK_ARGS_SCRIPT) + .output() + { + Ok(output) => output, + Err(err) => { + write_python_test_link_source( + None, + &format!( + "failed to query Python embed link args with {}: {}", + python, err + ), + ); + println!( + "cargo:warning=failed to query Python embed link args with {}: {}", + python, err + ); + return; + } + }; + + if !output.status.success() { + write_python_test_link_source( + None, + &format!( + "failed to query Python embed link args with {}: status={} stderr={}", + python, + output.status, + String::from_utf8_lossy(&output.stderr).trim() + ), + ); + println!( + "cargo:warning=failed to query Python embed link args with {}: status={} stderr={}", + python, + output.status, + String::from_utf8_lossy(&output.stderr).trim() + ); + return; + } + + let mut python_link_lib = None; + for arg in String::from_utf8_lossy(&output.stdout) + .lines() + .map(str::trim) + .filter(|arg| !arg.is_empty()) + { + if let Some(path) = arg.strip_prefix("-L") { + if !path.is_empty() { + println!("cargo:rustc-link-search=native={path}"); + } + continue; + } + + if let Some(lib) = arg.strip_prefix("-l") { + if lib.starts_with("python") { + python_link_lib = Some(lib.to_string()); + } + } + } + + if let Some(lib) = python_link_lib { + write_python_test_link_source(Some(&lib), ""); + } else { + let message = format!( + "Python embed link args from {} did not include a libpython entry", + python + ); + write_python_test_link_source(None, &message); + println!( + "cargo:warning=Python embed link args from {} did not include a libpython entry", + python + ); + } +} + +fn write_python_test_link_source(python_link_lib: Option<&str>, message: &str) { + let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); + let path = out_dir.join("python_test_link.rs"); + let source = match python_link_lib { + Some(lib) => format!("#[link(name = {lib:?})]\nunsafe extern \"C\" {{}}\n"), + None => format!("compile_error!({:?});\n", message), + }; + fs::write(&path, source).expect("write generated Python test link source"); + println!( + "cargo:rustc-env=FLUXON_PYO3_TEST_PYTHON_LINK_RS={}", + path.display() + ); +} + fn get_target_dir() -> PathBuf { if let Ok(target_dir) = env::var("CARGO_TARGET_DIR") { let path = PathBuf::from(target_dir); diff --git a/fluxon_rs/fluxon_pyo3/src/etcd.rs b/fluxon_rs/fluxon_pyo3/src/etcd.rs index 2ceace8..f3c27d7 100644 --- a/fluxon_rs/fluxon_pyo3/src/etcd.rs +++ b/fluxon_rs/fluxon_pyo3/src/etcd.rs @@ -1,12 +1,417 @@ use etcd_client as etcd; +use fluxon_util::auto_clean_map::{AutoCleanMap, AutoCleanMapEntry}; use fluxon_util::run_async_from_sync::SyncAsyncBridge; use pyo3::prelude::*; +use pyo3::pybacked::PyBackedBytes; +use pyo3::types::{PyBytes, PyList, PyTuple}; use pyo3::{PyErr, PyObject}; -use std::sync::Arc; +use std::future::Future; +use std::sync::{Arc, OnceLock}; use std::time::{Duration, Instant}; use tokio::runtime::Runtime; use tracing::debug; +fn normalize_raw_endpoint(endpoint: &str) -> PyResult { + let endpoint = endpoint.trim(); + if endpoint.is_empty() { + return Err(PyErr::new::( + "etcd endpoint must be non-empty raw host:port", + )); + } + if endpoint.contains("://") { + return Err(PyErr::new::(format!( + "etcd endpoint must be raw host:port without scheme, got: {}", + endpoint + ))); + } + Ok(format!("http://{}", endpoint)) +} + +fn normalize_raw_endpoints(endpoints: Vec, component: &str) -> PyResult> { + if endpoints.is_empty() { + return Err(PyErr::new::(format!( + "{} requires at least one endpoint", + component + ))); + } + let mut normalized = Vec::with_capacity(endpoints.len()); + for endpoint in endpoints { + normalized.push(normalize_raw_endpoint(&endpoint)?); + } + Ok(normalized) +} + +struct EtcdKvBackend { + endpoints: Vec, + client: tokio::sync::RwLock>, +} + +impl EtcdKvBackend { + fn new(endpoints: Vec) -> Self { + Self { + endpoints, + client: tokio::sync::RwLock::new(None), + } + } + + async fn client(&self) -> anyhow::Result { + { + let guard = self.client.read().await; + if let Some(client) = guard.as_ref() { + return Ok(client.clone()); + } + } + + let mut guard = self.client.write().await; + if let Some(client) = guard.as_ref() { + return Ok(client.clone()); + } + + let client = etcd::Client::connect(self.endpoints.clone(), None) + .await + .map_err(|e| { + anyhow::anyhow!( + "failed to connect etcd endpoints={:?}: {:?}", + self.endpoints, + e + ) + })?; + *guard = Some(client.clone()); + Ok(client) + } + + async fn clear_client(&self) { + let mut guard = self.client.write().await; + *guard = None; + } +} + +fn etcd_kv_backend_map() -> &'static AutoCleanMap, EtcdKvBackend> { + static MAP: OnceLock, EtcdKvBackend>> = OnceLock::new(); + MAP.get_or_init(|| AutoCleanMap::new()) +} + +fn is_reconnectable_etcd_error(err: &etcd::Error) -> bool { + is_reconnectable_etcd_error_text(&format!("{:?}", err)) +} + +fn is_reconnectable_etcd_error_text(msg: &str) -> bool { + let msg = msg.to_ascii_lowercase(); + msg.contains("unavailable") + || msg.contains("connection") + || msg.contains("transport") + || msg.contains("timed out") + || msg.contains("timeout") + || msg.contains("broken pipe") + || msg.contains("closed") +} + +async fn run_etcd_op( + backend: AutoCleanMapEntry, EtcdKvBackend>, + context: String, + mut op: F, +) -> anyhow::Result +where + F: FnMut(etcd::Client) -> Fut, + Fut: Future>, +{ + let mut last_err = None; + for attempt in 1..=2 { + let client = backend.client().await?; + match op(client).await { + Ok(value) => return Ok(value), + Err(err) => { + let should_retry = attempt == 1 && is_reconnectable_etcd_error(&err); + last_err = Some(err); + if should_retry { + backend.clear_client().await; + continue; + } + let err = last_err.take().expect("etcd error must be recorded"); + return Err(anyhow::anyhow!("{}: {:?}", context, err)); + } + } + } + + let err = last_err.expect("etcd retry loop must record the last error"); + Err(anyhow::anyhow!("{}: {:?}", context, err)) +} + +#[pyclass(name = "EtcdKvClient")] +pub struct PyEtcdKvClient { + rt: Arc, + endpoints: Vec, + backend: AutoCleanMapEntry, EtcdKvBackend>, +} + +#[pymethods] +impl PyEtcdKvClient { + #[new] + fn new(endpoints: Vec) -> PyResult { + let endpoints = normalize_raw_endpoints(endpoints, "EtcdKvClient")?; + let backend = etcd_kv_backend_map() + .get_or_init(endpoints.clone(), || EtcdKvBackend::new(endpoints.clone())); + Ok(Self { + rt: crate::mpsc::get_global_runtime(), + endpoints, + backend, + }) + } + + fn get(&self, py: Python<'_>, key: String) -> PyResult>> { + if key.is_empty() { + return Err(PyErr::new::( + "etcd get key must not be empty", + )); + } + + let backend = self.backend.clone(); + let key_for_op = key.clone(); + let value = py + .allow_threads(|| { + self.rt.run_async_from_sync(async move { + let resp = run_etcd_op( + backend, + format!("etcd get failed for key={}", key), + move |mut client| { + let key = key_for_op.clone(); + async move { client.get(key, None).await } + }, + ) + .await?; + Ok::>, anyhow::Error>( + resp.kvs().first().map(|kv| kv.value().to_vec()), + ) + }) + }) + .map_err(|e| anyhow::anyhow!("runtime bridge failed in EtcdKvClient.get: {}", e)) + .map_err(|e| PyErr::new::(e.to_string()))? + .map_err(|e| PyErr::new::(e.to_string()))?; + + Ok(value.map(|raw| PyBytes::new_bound(py, &raw).into())) + } + + fn get_prefix(&self, py: Python<'_>, prefix: String) -> PyResult { + if prefix.is_empty() { + return Err(PyErr::new::( + "etcd get_prefix prefix must not be empty", + )); + } + + let backend = self.backend.clone(); + let prefix_for_op = prefix.clone(); + let rows = py + .allow_threads(|| { + self.rt.run_async_from_sync(async move { + let resp = run_etcd_op( + backend, + format!("etcd get_prefix failed for prefix={}", prefix), + move |mut client| { + let prefix = prefix_for_op.clone(); + async move { + client + .get(prefix, Some(etcd::GetOptions::new().with_prefix())) + .await + } + }, + ) + .await?; + Ok::, Vec)>, anyhow::Error>( + resp.kvs() + .iter() + .map(|kv| (kv.key().to_vec(), kv.value().to_vec())) + .collect(), + ) + }) + }) + .map_err(|e| anyhow::anyhow!("runtime bridge failed in EtcdKvClient.get_prefix: {}", e)) + .map_err(|e| PyErr::new::(e.to_string()))? + .map_err(|e| PyErr::new::(e.to_string()))?; + + let out = PyList::empty_bound(py); + for (key, value) in rows { + let item = PyTuple::new_bound( + py, + [ + PyBytes::new_bound(py, &key).into_py(py), + PyBytes::new_bound(py, &value).into_py(py), + ], + ); + out.append(item)?; + } + Ok(out.into_any().into_py(py)) + } + + #[pyo3(signature = (key, value, lease_id=None))] + fn put( + &self, + py: Python<'_>, + key: String, + value: PyBackedBytes, + lease_id: Option, + ) -> PyResult<()> { + if key.is_empty() { + return Err(PyErr::new::( + "etcd put key must not be empty", + )); + } + if let Some(lease_id) = lease_id { + if lease_id <= 0 { + return Err(PyErr::new::(format!( + "lease_id must be positive, got {}", + lease_id + ))); + } + } + + let backend = self.backend.clone(); + let key_for_op = key.clone(); + let value = value.as_ref().to_vec(); + py.allow_threads(|| { + self.rt.run_async_from_sync(async move { + run_etcd_op( + backend, + format!("etcd put failed for key={}", key), + move |mut client| { + let key = key_for_op.clone(); + let value = value.clone(); + async move { + let opts = lease_id.map(|id| etcd::PutOptions::new().with_lease(id)); + client.put(key, value, opts).await.map(|_| ()) + } + }, + ) + .await?; + Ok::<(), anyhow::Error>(()) + }) + }) + .map_err(|e| anyhow::anyhow!("runtime bridge failed in EtcdKvClient.put: {}", e)) + .map_err(|e| PyErr::new::(e.to_string()))? + .map_err(|e| PyErr::new::(e.to_string())) + } + + fn delete(&self, py: Python<'_>, key: String) -> PyResult { + if key.is_empty() { + return Err(PyErr::new::( + "etcd delete key must not be empty", + )); + } + + let backend = self.backend.clone(); + let key_for_op = key.clone(); + py.allow_threads(|| { + self.rt.run_async_from_sync(async move { + run_etcd_op( + backend, + format!("etcd delete failed for key={}", key), + move |mut client| { + let key = key_for_op.clone(); + async move { + client + .delete(key, None) + .await + .map(|resp| resp.deleted() > 0) + } + }, + ) + .await + }) + }) + .map_err(|e| anyhow::anyhow!("runtime bridge failed in EtcdKvClient.delete: {}", e)) + .map_err(|e| PyErr::new::(e.to_string()))? + .map_err(|e| PyErr::new::(e.to_string())) + } + + fn delete_prefix(&self, py: Python<'_>, prefix: String) -> PyResult { + if prefix.is_empty() { + return Err(PyErr::new::( + "etcd delete_prefix prefix must not be empty", + )); + } + + let backend = self.backend.clone(); + let prefix_for_op = prefix.clone(); + py.allow_threads(|| { + self.rt.run_async_from_sync(async move { + run_etcd_op( + backend, + format!("etcd delete_prefix failed for prefix={}", prefix), + move |mut client| { + let prefix = prefix_for_op.clone(); + async move { + client + .delete(prefix, Some(etcd::DeleteOptions::new().with_prefix())) + .await + .map(|resp| resp.deleted()) + } + }, + ) + .await + }) + }) + .map_err(|e| anyhow::anyhow!("runtime bridge failed in EtcdKvClient.delete_prefix: {}", e)) + .map_err(|e| PyErr::new::(e.to_string()))? + .map_err(|e| PyErr::new::(e.to_string())) + } + + fn lease_ttl(&self, py: Python<'_>, lease_id: i64) -> PyResult { + if lease_id <= 0 { + return Err(PyErr::new::(format!( + "lease_id must be positive, got {}", + lease_id + ))); + } + + let backend = self.backend.clone(); + py.allow_threads(|| { + self.rt.run_async_from_sync(async move { + run_etcd_op( + backend, + format!("etcd lease_ttl failed for lease_id={}", lease_id), + move |mut client| async move { + client + .lease_time_to_live(lease_id, None) + .await + .map(|resp| resp.ttl()) + }, + ) + .await + }) + }) + .map_err(|e| anyhow::anyhow!("runtime bridge failed in EtcdKvClient.lease_ttl: {}", e)) + .map_err(|e| PyErr::new::(e.to_string()))? + .map_err(|e| PyErr::new::(e.to_string())) + } + + fn revoke_lease(&self, py: Python<'_>, lease_id: i64) -> PyResult<()> { + if lease_id <= 0 { + return Err(PyErr::new::(format!( + "lease_id must be positive, got {}", + lease_id + ))); + } + + let backend = self.backend.clone(); + py.allow_threads(|| { + self.rt.run_async_from_sync(async move { + run_etcd_op( + backend, + format!("etcd revoke_lease failed for lease_id={}", lease_id), + move |mut client| async move { client.lease_revoke(lease_id).await.map(|_| ()) }, + ) + .await + }) + }) + .map_err(|e| anyhow::anyhow!("runtime bridge failed in EtcdKvClient.revoke_lease: {}", e)) + .map_err(|e| PyErr::new::(e.to_string()))? + .map_err(|e| PyErr::new::(e.to_string())) + } + + fn __repr__(&self) -> String { + format!("", self.endpoints) + } +} + #[pyclass(name = "EtcdLock")] pub struct PyEtcdLock { rt: Arc, @@ -28,11 +433,7 @@ impl PyEtcdLock { ttl_seconds: i64, timeout_seconds: Option, ) -> PyResult { - if endpoints.is_empty() { - return Err(PyErr::new::( - "EtcdLock requires at least one endpoint", - )); - } + let endpoints = normalize_raw_endpoints(endpoints, "EtcdLock")?; if ttl_seconds <= 0 { return Err(PyErr::new::(format!( "EtcdLock ttl_seconds must be > 0, got {}", @@ -266,3 +667,119 @@ impl PyEtcdLock { ) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + + static TEST_KEY_SEQ: AtomicUsize = AtomicUsize::new(1); + + fn unique_test_endpoints() -> Vec { + let seq = TEST_KEY_SEQ.fetch_add(1, Ordering::Relaxed); + vec![format!("http://unit-test-etcd-backend-{}", seq)] + } + + #[test] + fn normalize_raw_endpoint_accepts_raw_host_port() { + assert_eq!( + normalize_raw_endpoint(" 127.0.0.1:2379 ").unwrap(), + "http://127.0.0.1:2379" + ); + } + + #[test] + fn normalize_raw_endpoint_rejects_empty_or_schemed_endpoint() { + assert!(normalize_raw_endpoint("").is_err()); + assert!(normalize_raw_endpoint(" ").is_err()); + assert!(normalize_raw_endpoint("http://127.0.0.1:2379").is_err()); + assert!(normalize_raw_endpoint("https://127.0.0.1:2379").is_err()); + } + + #[test] + fn normalize_raw_endpoints_requires_at_least_one_endpoint() { + assert!(normalize_raw_endpoints(Vec::new(), "EtcdKvClient").is_err()); + assert_eq!( + normalize_raw_endpoints( + vec!["127.0.0.1:2379".to_string(), "localhost:2380".to_string()], + "EtcdKvClient", + ) + .unwrap(), + vec![ + "http://127.0.0.1:2379".to_string(), + "http://localhost:2380".to_string() + ] + ); + } + + #[test] + fn etcd_kv_client_constructor_normalizes_raw_endpoints() { + let client = PyEtcdKvClient::new(vec!["127.0.0.1:2379".to_string()]).unwrap(); + assert_eq!(client.endpoints, vec!["http://127.0.0.1:2379"]); + } + + #[test] + fn etcd_lock_constructor_normalizes_raw_endpoints() { + let lock = PyEtcdLock::new( + vec!["127.0.0.1:2379".to_string()], + "/unit-test/lock".to_string(), + 10, + Some(1.0), + ) + .unwrap(); + assert_eq!(lock.endpoints, vec!["http://127.0.0.1:2379"]); + } + + #[test] + fn etcd_lock_constructor_rejects_schemed_endpoints() { + assert!( + PyEtcdLock::new( + vec!["http://127.0.0.1:2379".to_string()], + "/unit-test/lock".to_string(), + 10, + Some(1.0), + ) + .is_err() + ); + } + + #[test] + fn etcd_kv_backend_map_reuses_and_auto_cleans_live_entries() { + let endpoints = unique_test_endpoints(); + let map = etcd_kv_backend_map(); + assert!(map.with_existing(&endpoints, |_| ()).is_none()); + + { + let entry_a = + map.get_or_init(endpoints.clone(), || EtcdKvBackend::new(endpoints.clone())); + assert!(map.with_existing(&endpoints, |_| ()).is_some()); + + { + let entry_b = map.get_or_init(endpoints.clone(), || { + panic!("live backend entry should be reused") + }); + assert!(std::ptr::eq(&*entry_a, &*entry_b)); + } + + assert!(map.with_existing(&endpoints, |_| ()).is_some()); + } + + assert!(map.with_existing(&endpoints, |_| ()).is_none()); + } + + #[test] + fn reconnectable_error_text_matches_transient_transport_failures() { + assert!(is_reconnectable_etcd_error_text("StatusCode::UNAVAILABLE")); + assert!(is_reconnectable_etcd_error_text( + "etcdserver: request timed out" + )); + assert!(is_reconnectable_etcd_error_text("transport error")); + assert!(is_reconnectable_etcd_error_text("connection closed")); + assert!(is_reconnectable_etcd_error_text("broken pipe")); + + assert!(!is_reconnectable_etcd_error_text( + "requested lease not found" + )); + assert!(!is_reconnectable_etcd_error_text("permission denied")); + } +} diff --git a/fluxon_rs/fluxon_pyo3/src/lib.rs b/fluxon_rs/fluxon_pyo3/src/lib.rs index a73591f..631f1ea 100644 --- a/fluxon_rs/fluxon_pyo3/src/lib.rs +++ b/fluxon_rs/fluxon_pyo3/src/lib.rs @@ -50,6 +50,11 @@ use std::os::fd::IntoRawFd; use std::time::Duration; use tokio::runtime::Runtime; +// Unit tests build a native test binary, so PyO3's extension-module mode must +// link libpython there. The cdylib/wheel build keeps Python symbols unresolved. +#[cfg(test)] +include!(env!("FLUXON_PYO3_TEST_PYTHON_LINK_RS")); + mod memholder; pub use memholder::{ExternalMemHolder, MemHolder}; mod flatdict_zerocopy; @@ -58,7 +63,7 @@ pub use kvfuture::KvFuture; mod error; mod etcd; mod mpsc; // Python ApiError constructors and MPSC error mapping -pub use etcd::PyEtcdLock; +pub use etcd::{PyEtcdKvClient, PyEtcdLock}; pub use mpsc::{MpscConsumerHandle, MpscContext, MpscProducerHandle}; mod lease_manager; pub use lease_manager::{LeaseManagerHandle, PyGeneralLease, PyLeaseBackendUid}; @@ -4154,6 +4159,7 @@ fn fluxon_pyo3(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; // English note: keep the `from fluxon_pyo3 import LeaseManagerHandle` import path stable for Python users. m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/fluxon_test_stack/tests/test_test_rsc_prepare_yaml.py b/fluxon_test_stack/tests/test_test_rsc_prepare_yaml.py index f723fb9..d81b59e 100644 --- a/fluxon_test_stack/tests/test_test_rsc_prepare_yaml.py +++ b/fluxon_test_stack/tests/test_test_rsc_prepare_yaml.py @@ -26,6 +26,8 @@ def test_base_dependency_set_includes_pytest(self) -> None: self.assertIsInstance(requirements, list) pinned = {item.get("pinned") for item in requirements if isinstance(item, dict)} self.assertIn("pytest==8.3.5", pinned) + self.assertIn("tomli==2.2.1", pinned) + self.assertIn("exceptiongroup==1.3.0", pinned) if __name__ == "__main__": diff --git a/fluxon_test_stack/tests/test_top_attention_index_helper.py b/fluxon_test_stack/tests/test_top_attention_index_helper.py index dcec087..7857366 100644 --- a/fluxon_test_stack/tests/test_top_attention_index_helper.py +++ b/fluxon_test_stack/tests/test_top_attention_index_helper.py @@ -62,6 +62,12 @@ def test_top_attention_scene_id_uses_stable_prefix(self) -> None: "ci_top_attention_bin_kvtest", ) + def test_kv_py_core_keeps_pyo3_etcd_integration_test(self) -> None: + entry_text = ( + REPO_ROOT / "fluxon_test_stack" / "top_attention_test_index" / "_kv_py_core.py" + ).read_text(encoding="utf-8") + self.assertIn("fluxon_py/tests/test_pyo3_etcd.py", entry_text) + if __name__ == "__main__": unittest.main() diff --git a/fluxon_test_stack/top_attention_test_index/_kv_py_core.py b/fluxon_test_stack/top_attention_test_index/_kv_py_core.py index 924ca7a..6459407 100755 --- a/fluxon_test_stack/top_attention_test_index/_kv_py_core.py +++ b/fluxon_test_stack/top_attention_test_index/_kv_py_core.py @@ -11,6 +11,7 @@ def main() -> int: return run_pytest( "Flat index entry for Python KV backend core smoke tests.", [ + "fluxon_py/tests/test_pyo3_etcd.py", "fluxon_py/tests/test_backend.py", "fluxon_py/tests/test_backend_fallback_close.py", ], From 258f2b4f0bf8c0924eadd474d5d5c67191fad73c Mon Sep 17 00:00:00 2001 From: ActivePeter <1020401660@qq.com> Date: Thu, 2 Jul 2026 20:18:26 +0800 Subject: [PATCH 2/2] test --- .../test_api_chan_mpmc_base.py | 89 ++++++++++++++++--- 1 file changed, 76 insertions(+), 13 deletions(-) diff --git a/fluxon_py/tests/test_api_chan_mpmc/test_api_chan_mpmc_base.py b/fluxon_py/tests/test_api_chan_mpmc/test_api_chan_mpmc_base.py index ce2d56b..ad6f11e 100644 --- a/fluxon_py/tests/test_api_chan_mpmc/test_api_chan_mpmc_base.py +++ b/fluxon_py/tests/test_api_chan_mpmc/test_api_chan_mpmc_base.py @@ -116,6 +116,35 @@ def _atomic_stdout_write_line(line: str) -> None: os.write(sys.stdout.fileno(), payload) +def _read_log_tail(path: str, *, max_lines: int = 120) -> str: + try: + with open(path, "r", encoding="utf-8", errors="replace") as handle: + lines = handle.readlines() + except FileNotFoundError: + return f"" + except Exception as exc: # noqa: BLE001 + return f"" + return "".join(lines[-max_lines:]).rstrip() + + +def _format_subprocess_failure( + process_type: str, + identifier: str, + rc: int, + log_file: str, + *, + early: bool, +) -> str: + phase = "exited early" if early else "failed" + return ( + f"{process_type} {identifier} {phase} with return code {rc}. " + f"Log file: {log_file}\n" + "--- child log tail ---\n" + f"{_read_log_tail(log_file)}\n" + "--- end child log tail ---" + ) + + INITIAL_PRODUCERS_COUNT = 3 INITIAL_CONSUMERS_COUNT = 2 NEW_PRODUCERS_MIN_COUNT = 1 @@ -289,11 +318,22 @@ def run_producer(env: "ChannelState", args: argparse.Namespace) -> None: except Exception as exc: # noqa: BLE001 print(f"[Producer-{args.producer_id}] Error: {exc}") _atomic_stdout_write_line(f"{PRODUCER_CRASH_MARKER} {args.producer_id}") + try: + producer.close().unwrap() + except Exception as close_exc: # noqa: BLE001 + print( + f"[Producer-{args.producer_id}] close after error failed: " + f"{close_exc}" + ) raise - finally: - print(f"[Producer-{args.producer_id}] Finished") - _atomic_stdout_write_line(f"{PRODUCER_NORMAL_EXIT_MARKER} {args.producer_id}") + try: producer.close().unwrap() + except Exception as exc: # noqa: BLE001 + print(f"[Producer-{args.producer_id}] close failed: {exc}") + _atomic_stdout_write_line(f"{PRODUCER_CRASH_MARKER} {args.producer_id}") + raise + print(f"[Producer-{args.producer_id}] Finished") + _atomic_stdout_write_line(f"{PRODUCER_NORMAL_EXIT_MARKER} {args.producer_id}") finally: configure_backend(env, backend_type=prev_type, backend_ip=prev_ip) release(env, store_key) @@ -423,19 +463,30 @@ def run_consumer(env: "ChannelState", args: argparse.Namespace) -> None: except Exception as exc: # noqa: BLE001 print(f"[Consumer-{args.consumer_id}] Error: {exc}") _atomic_stdout_write_line(f"{CONSUMER_CRASH_MARKER} {args.consumer_id}") + try: + consumer.close().unwrap() + except Exception as close_exc: # noqa: BLE001 + print( + f"[Consumer-{args.consumer_id}] close after error failed: " + f"{close_exc}" + ) raise - finally: - print( - f"[Consumer-{args.consumer_id}] Finished, consumed" - f" {consumed_count} messages" - ) - _atomic_stdout_write_line(f"{CONSUMER_NORMAL_EXIT_MARKER} {args.consumer_id}") + try: _etcd_call_with_retry( f"delete consumer presence for consumer_id={args.consumer_id}", lambda client: client.delete(f"/test_mpmc_consumer/{args.consumer_id}"), max_attempts=3, ) consumer.close().unwrap() + except Exception as exc: # noqa: BLE001 + print(f"[Consumer-{args.consumer_id}] cleanup failed: {exc}") + _atomic_stdout_write_line(f"{CONSUMER_CRASH_MARKER} {args.consumer_id}") + raise + print( + f"[Consumer-{args.consumer_id}] Finished, consumed" + f" {consumed_count} messages" + ) + _atomic_stdout_write_line(f"{CONSUMER_NORMAL_EXIT_MARKER} {args.consumer_id}") finally: configure_backend(env, backend_type=prev_type, backend_ip=prev_ip) release(env, store_key) @@ -680,8 +731,13 @@ def fail_fast_on_subprocess_error(*, process_type_filter: Optional[str] = None) continue if rc != 0: raise RuntimeError( - f"{process_type} {identifier} exited early with return code {rc}. " - f"Check log file for details: {log_file}" + _format_subprocess_failure( + process_type, + identifier, + rc, + log_file, + early=True, + ) ) def wait_all_of_type(process_type: str, *, timeout_s: int) -> None: @@ -703,8 +759,13 @@ def wait_all_of_type(process_type: str, *, timeout_s: int) -> None: print(f"Log file: {log_file}") continue raise RuntimeError( - f"{ptype} {identifier} failed with return code {proc.returncode}." - f" Check log file for details: {log_file}" + _format_subprocess_failure( + ptype, + identifier, + proc.returncode, + log_file, + early=False, + ) ) if not running: @@ -1114,6 +1175,7 @@ def get_chans() -> List[int]: start_processes() time.sleep(5) + fail_fast_on_subprocess_error(process_type_filter="producer") _assert_test_mpmc_id_stable() for phase in range(CONSUMER_CRASH_RECOVER_PHASES): @@ -1305,6 +1367,7 @@ def debug_all_ready_channels() -> None: recovered_consumers.append(consumer_id) start_processes() time.sleep(5) + fail_fast_on_subprocess_error(process_type_filter="producer") _assert_test_mpmc_id_stable() join_timeout_s = int(TEST_TIMEOUT_SECONDS)